|
from flask import Flask, send_file, request, make_response, Response |
|
import yt_dlp |
|
import os |
|
from urllib.parse import urlparse |
|
from gallery_dl import job |
|
import requests |
|
from datetime import datetime |
|
import tempfile |
|
import shutil |
|
import mimetypes |
|
from asgiref.wsgi import WsgiToAsgi |
|
|
|
app = Flask(__name__) |
|
asgi_app = WsgiToAsgi(app) |
|
|
|
class MediaDownloader: |
|
def __init__(self): |
|
self.temp_dir = tempfile.mkdtemp() |
|
self.create_directories() |
|
|
|
def create_directories(self): |
|
self.image_path = os.path.join(self.temp_dir, "images") |
|
self.video_path = os.path.join(self.temp_dir, "videos") |
|
os.makedirs(self.image_path, exist_ok=True) |
|
os.makedirs(self.video_path, exist_ok=True) |
|
|
|
def cleanup(self): |
|
try: |
|
shutil.rmtree(self.temp_dir) |
|
except Exception as e: |
|
print(f"Cleanup error: {str(e)}") |
|
|
|
def download_video(self, url): |
|
output_template = os.path.join(self.video_path, '%(title)s.%(ext)s') |
|
ydl_opts = { |
|
'format': 'best', |
|
'outtmpl': output_template, |
|
'ignoreerrors': True, |
|
'no_warnings': True, |
|
'quiet': True, |
|
'extract_flat': False, |
|
'socket_timeout': 30, |
|
'http_headers': { |
|
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36', |
|
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8', |
|
'Accept-Language': 'en-US,en;q=0.5', |
|
'Accept-Encoding': 'gzip, deflate, br', |
|
'Connection': 'keep-alive', |
|
'Upgrade-Insecure-Requests': '1' |
|
} |
|
} |
|
|
|
if 'tiktok.com' in url: |
|
ydl_opts.update({ |
|
'format': 'best', |
|
'quiet': False, |
|
'no_warnings': False, |
|
'extractor_args': {'TikTok': {'download_api': True}}, |
|
'force_generic_extractor': False, |
|
'cookiesfrombrowser': None |
|
}) |
|
elif 'instagram.com' in url: |
|
ydl_opts.update({ |
|
'format': 'best', |
|
'extract_flat': True, |
|
'quiet': False, |
|
'no_warnings': False, |
|
'socket_timeout': 30, |
|
'retries': 10 |
|
}) |
|
|
|
try: |
|
with yt_dlp.YoutubeDL(ydl_opts) as ydl: |
|
info = ydl.extract_info(url, download=True) |
|
if info: |
|
filename = ydl.prepare_filename(info) |
|
return filename if os.path.exists(filename) else None |
|
return None |
|
except Exception as e: |
|
print(f"Video download error: {str(e)}") |
|
return None |
|
|
|
def download_images(self, url): |
|
try: |
|
class UrlDl(job.Job): |
|
def __init__(self, url, parent=None): |
|
job.Job.__init__(self, url, parent) |
|
self.urls = [] |
|
|
|
def handle_url(self, url, _): |
|
self.urls.append(url) |
|
|
|
j = UrlDl(url) |
|
j.run() |
|
|
|
if not j.urls: |
|
return None |
|
|
|
downloaded_files = [] |
|
headers = { |
|
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36', |
|
'Accept': 'image/webp,*/*', |
|
'Accept-Language': 'en-US,en;q=0.5', |
|
'Accept-Encoding': 'gzip, deflate, br', |
|
'Connection': 'keep-alive' |
|
} |
|
|
|
for img_url in j.urls: |
|
try: |
|
response = requests.get( |
|
img_url, |
|
headers=headers, |
|
timeout=30, |
|
allow_redirects=True |
|
) |
|
response.raise_for_status() |
|
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S_%f") |
|
ext = os.path.splitext(urlparse(img_url).path)[1] |
|
if not ext: |
|
ext = '.jpg' |
|
filename = f"image_{timestamp}{ext}" |
|
filepath = os.path.join(self.image_path, filename) |
|
|
|
with open(filepath, 'wb') as f: |
|
f.write(response.content) |
|
downloaded_files.append(filepath) |
|
|
|
except Exception as e: |
|
print(f"Error downloading image {img_url}: {str(e)}") |
|
continue |
|
|
|
return downloaded_files if downloaded_files else None |
|
|
|
except Exception as e: |
|
print(f"Error in download_images: {str(e)}") |
|
return None |
|
|
|
def download_media(self, url): |
|
if not url.startswith(('http://', 'https://')): |
|
url = 'https://' + url |
|
|
|
video_path = self.download_video(url) |
|
if video_path: |
|
return [video_path] |
|
|
|
image_paths = self.download_images(url) |
|
if image_paths: |
|
return image_paths |
|
|
|
return None |
|
|
|
@app.route('/') |
|
def home(): |
|
return """ |
|
<h1>Social Media Downloader API</h1> |
|
<p>Use: /download?url=YOUR_URL_HERE</p> |
|
""" |
|
|
|
@app.route('/download') |
|
def download(): |
|
try: |
|
url = request.args.get('url') |
|
if not url: |
|
return "No URL provided", 400 |
|
|
|
downloader = MediaDownloader() |
|
|
|
try: |
|
files = downloader.download_media(url) |
|
|
|
if files and len(files) > 0: |
|
if len(files) == 1: |
|
response = make_response(send_file(files[0], as_attachment=True)) |
|
@response.call_on_close |
|
def cleanup(): |
|
downloader.cleanup() |
|
return response |
|
else: |
|
def generate(): |
|
for file_path in files: |
|
with open(file_path, 'rb') as f: |
|
content = f.read() |
|
filename = os.path.basename(file_path) |
|
yield f'Content-Disposition: attachment; filename="{filename}"\n'.encode() |
|
yield f'Content-Type: {mimetypes.guess_type(file_path)[0]}\n'.encode() |
|
yield f'Content-Length: {len(content)}\n\n'.encode() |
|
yield content |
|
yield b'\n--boundary--\n' |
|
downloader.cleanup() |
|
|
|
response = Response( |
|
generate(), |
|
mimetype='multipart/x-mixed-replace; boundary=boundary', |
|
direct_passthrough=True |
|
) |
|
response.headers['Content-Type'] = 'multipart/x-mixed-replace; boundary=boundary' |
|
return response |
|
else: |
|
downloader.cleanup() |
|
return "Failed to download media", 400 |
|
|
|
except Exception as e: |
|
downloader.cleanup() |
|
return f"Error: {str(e)}", 500 |
|
|
|
except Exception as e: |
|
return f"Error: {str(e)}", 500 |
|
|
|
if __name__ == '__main__': |
|
app.run(host='0.0.0.0', port=7860) |