from bs4 import BeautifulSoup from flask import Flask, request, jsonify import requests import re import json from flask_cors import CORS from apscheduler.schedulers.background import BackgroundScheduler from datetime import datetime app = Flask(__name__) CORS(app) # Cache data and update time cached_results_soaring = [] cached_results_hot = [] cached_results_newSongs = [] cached_results_popular = [] last_update_time = None def parse_time(time_str): if not time_str: return "00:00.000" time_str = time_str.strip('[]') time_parts = time_str.split(':') if len(time_parts) == 2: minutes = time_parts[0] seconds = time_parts[1] if '.' not in seconds: seconds = seconds + '.000' elif len(seconds.split('.')[1]) == 2: seconds = seconds + '0' return f"{minutes}:{seconds}" return "00:00.000" def convert_lyrics(input_str): """Convert lyrics format""" lyrics = [] lines = input_str.split('\n') for line in lines: if not line.strip(): continue match = re.match(r'\[(\d{2}:\d{2}\.\d{2})\](.*)', line) if match: time_str, content = match.groups() formatted_time = parse_time(time_str) lyrics.append({ "name": content.strip(), "time": formatted_time }) return {"lyric": lyrics} def song(url): """Crawl song information from QQ Music rankings page""" headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", "Referer": "https://y.qq.com/", "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8", } results = [] try: response = requests.get(url, headers=headers) response.encoding = 'utf-8' soup = BeautifulSoup(response.text, 'html.parser') song_items = soup.select("ul.songlist__list li") for item in song_items[:10]: try: song_name_span = item.select_one("span.songlist__songname_txt") song_name = song_name_span.text.strip() artist_div = item.select_one("div.songlist__artist") artist_name = artist_div.text.strip() song_link = song_name_span.find('a', class_='') mid = "default_mid" if song_link and song_link.get('href'): mid = song_link['href'].split('/')[-1] img_link = song_name_span.find('a', class_='songlist__cover') if img_link and img_link.get('href'): imgsrc = img_link['href'].split('/')[-1] img_url = f"https://y.qq.com/music/photo_new/T002R300x300M000{imgsrc}_1.jpg?max_age=2592000" if song_name and artist_name: results.append({ "muName": song_name, "cover": img_url if img_url else "", "mid": mid, "id": mid, "song": song_name, "singer": artist_name, "muLink": 'qqdg/?word', "uname": 'QQ音乐', "muId": 1 }) except Exception as e: print(f"Error processing song item: {e}") continue except Exception as e: print(f"Error during crawling: {e}") return [] if results: print(f"Data updated: {last_update_time}") return results def schedule_music_fetch(): """Schedule daily task at 8 AM""" scheduler = BackgroundScheduler() def run_tasks(): global cached_results_soaring, cached_results_hot, cached_results_newSongs, cached_results_popular, last_update_time cached_results_soaring = song("https://y.qq.com/n/ryqq/toplist/62") cached_results_hot = song("https://y.qq.com/n/ryqq/toplist/26") cached_results_newSongs = song("https://y.qq.com/n/ryqq/toplist/27") cached_results_popular = song("https://y.qq.com/n/ryqq/toplist/4") last_update_time = datetime.now() scheduler.add_job(run_tasks, 'cron', hour=8, minute=0) scheduler.start() print("Scheduled task started, crawling time is 8 AM daily.") @app.route('/star') def star(): global cached_results_soaring, cached_results_hot, cached_results_newSongs, cached_results_popular, last_update_time cached_results_soaring = song("https://y.qq.com/n/ryqq/toplist/62") cached_results_hot = song("https://y.qq.com/n/ryqq/toplist/26") cached_results_newSongs = song("https://y.qq.com/n/ryqq/toplist/27") cached_results_popular = song("https://y.qq.com/n/ryqq/toplist/4") last_update_time = datetime.now() return jsonify({ "data": "Data updated", "update_time": last_update_time }) @app.route('/fetch_music_soaring', methods=['GET']) def fetch_music_route_soaring(): if not cached_results_soaring: return jsonify({"message": "Data not ready, please visit /star first to update data"}), 503 return jsonify({ "last_update_time": last_update_time.strftime("%Y-%m-%d %H:%M:%S"), "data": cached_results_soaring }) @app.route('/fetch_music_hot', methods=['GET']) def fetch_music_route_hot(): if not cached_results_hot: return jsonify({"message": "Data not ready, please visit /star first to update data"}), 503 return jsonify({ "last_update_time": last_update_time.strftime("%Y-%m-%d %H:%M:%S"), "data": cached_results_hot }) @app.route('/fetch_music_newSongs', methods=['GET']) def fetch_music_route_newSongs(): if not cached_results_newSongs: return jsonify({"message": "Data not ready, please visit /star first to update data"}), 503 return jsonify({ "last_update_time": last_update_time.strftime("%Y-%m-%d %H:%M:%S"), "data": cached_results_newSongs }) @app.route('/fetch_music_popular', methods=['GET']) def fetch_music_route_popular(): if not cached_results_popular: return jsonify({"message": "Data not ready, please visit /star first to update data"}), 503 return jsonify({ "last_update_time": last_update_time.strftime("%Y-%m-%d %H:%M:%S"), "data": cached_results_popular }) @app.route('/get_lyrics', methods=['GET']) def get_lyrics(): mid = request.args.get('mid') if not mid: return jsonify({"error": "Missing mid parameter"}), 400 target_url = f"https://c.y.qq.com/lyric/fcgi-bin/fcg_query_lyric_new.fcg?songmid={mid}&format=json&nobase64=1" headers = { "Referer": "https://y.qq.com/portal/player.html" } try: response = requests.get(target_url, headers=headers) response.raise_for_status() data = response.json() matched_lyrics = data.get('lyric') if not matched_lyrics: return jsonify({"error": f"No lyrics found for mid: {mid}"}), 404 result_json = convert_lyrics(matched_lyrics) return jsonify(result_json) except requests.RequestException as e: return jsonify({"error": "Failed to get lyrics", "details": str(e)}), 500 except (KeyError, ValueError): return jsonify({"error": "Unexpected data format from server"}), 500 if __name__ == '__main__': schedule_music_fetch() app.run(debug=True, host='0.0.0.0', port=7860)