from flask import Flask, send_file, request, make_response, Response import yt_dlp import os from urllib.parse import urlparse, unquote from gallery_dl import job import requests from datetime import datetime import tempfile import shutil import mimetypes import uuid import re app = Flask(__name__) class MediaDownloader: def __init__(self): self.temp_dir = tempfile.mkdtemp() self.create_directories() def create_directories(self): self.image_path = os.path.join(self.temp_dir, "images") self.video_path = os.path.join(self.temp_dir, "videos") os.makedirs(self.image_path, exist_ok=True) os.makedirs(self.video_path, exist_ok=True) def cleanup(self): try: if os.path.exists(self.temp_dir): shutil.rmtree(self.temp_dir) except Exception as e: print(f"Cleanup error: {str(e)}") def is_valid_url(self, url): try: result = urlparse(url) return all([result.scheme, result.netloc]) except: return False def sanitize_filename(self, filename): return re.sub(r'[<>:"/\\|?*]', '_', filename) def download_video(self, url): if not self.is_valid_url(url): return None unique_id = str(uuid.uuid4()) output_template = os.path.join(self.video_path, f'{unique_id}.%(ext)s') ydl_opts = { 'format': 'best', 'outtmpl': output_template, 'ignoreerrors': True, 'no_warnings': True, 'quiet': True, 'extract_flat': False, 'http_headers': { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36' } } try: with yt_dlp.YoutubeDL(ydl_opts) as ydl: info = ydl.extract_info(url, download=True) if info: filename = ydl.prepare_filename(info) return filename if os.path.exists(filename) else None except Exception as e: print(f"Video download error: {str(e)}") return None return None def download_images(self, url): if not self.is_valid_url(url): return None try: class UrlDl(job.Job): def __init__(self, url, parent=None): job.Job.__init__(self, url, parent) self.urls = [] def handle_url(self, url, _): self.urls.append(url) j = UrlDl(url) j.run() if not j.urls: return None downloaded_files = [] for img_url in j.urls: try: response = requests.get(img_url, headers={ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36' }, timeout=30) response.raise_for_status() unique_id = str(uuid.uuid4()) ext = os.path.splitext(urlparse(img_url).path)[1] if not ext or ext.lower() not in ['.jpg', '.jpeg', '.png', '.gif', '.webp']: ext = '.jpg' filename = self.sanitize_filename(f"{unique_id}{ext}") filepath = os.path.join(self.image_path, filename) with open(filepath, 'wb') as f: f.write(response.content) downloaded_files.append(filepath) except Exception as e: print(f"Error downloading image {img_url}: {str(e)}") continue return downloaded_files if downloaded_files else None except Exception as e: print(f"Error in download_images: {str(e)}") return None def download_media(self, url): if not url or not self.is_valid_url(url): return None video_path = self.download_video(url) if video_path: return [video_path] image_paths = self.download_images(url) if image_paths: return image_paths return None @app.route('/health') def health_check(): return 'OK', 200 @app.route('/') def home(): return """
Use: /download?url=url
""" @app.route('/download') def download(): try: url = request.args.get('url') if not url: return "No URL", 400 url = unquote(url) downloader = MediaDownloader() try: files = downloader.download_media(url) if not files: downloader.cleanup() return "Download failed", 400 if len(files) == 1: response = make_response(send_file( files[0], as_attachment=True, download_name=os.path.basename(files[0]) )) @response.call_on_close def cleanup(): downloader.cleanup() return response else: def generate(): try: for file_path in files: if os.path.exists(file_path): with open(file_path, 'rb') as f: content = f.read() filename = os.path.basename(file_path) yield f'--boundary\n'.encode() yield f'Content-Disposition: attachment; filename="{filename}"\n'.encode() yield f'Content-Type: {mimetypes.guess_type(file_path)[0]}\n'.encode() yield f'Content-Length: {len(content)}\n\n'.encode() yield content yield b'\n' yield b'--boundary--\n' finally: downloader.cleanup() response = Response( generate(), mimetype='multipart/x-mixed-replace; boundary=boundary', direct_passthrough=True ) response.headers['Content-Type'] = 'multipart/x-mixed-replace; boundary=boundary' return response except Exception as e: downloader.cleanup() return f"Download error: {str(e)}", 500 except Exception as e: return f"Server error: {str(e)}", 500 if __name__ == '__main__': app.run(host='0.0.0.0', port=7860, debug=False)