download / app.py
coollsd's picture
Update app.py
1168724 verified
raw
history blame
7.31 kB
from flask import Flask, send_file, request, make_response, Response
import yt_dlp
import os
from urllib.parse import urlparse
from gallery_dl import job
import requests
from datetime import datetime
import tempfile
import shutil
import mimetypes
from asgiref.wsgi import WsgiToAsgi
app = Flask(__name__)
asgi_app = WsgiToAsgi(app)
class MediaDownloader:
def __init__(self):
self.temp_dir = tempfile.mkdtemp()
self.create_directories()
def create_directories(self):
self.image_path = os.path.join(self.temp_dir, "images")
self.video_path = os.path.join(self.temp_dir, "videos")
os.makedirs(self.image_path, exist_ok=True)
os.makedirs(self.video_path, exist_ok=True)
def cleanup(self):
try:
shutil.rmtree(self.temp_dir)
except Exception as e:
print(f"Cleanup error: {str(e)}")
def download_video(self, url):
output_template = os.path.join(self.video_path, '%(title)s.%(ext)s')
ydl_opts = {
'format': 'best',
'outtmpl': output_template,
'ignoreerrors': True,
'no_warnings': True,
'quiet': True,
'extract_flat': False,
'socket_timeout': 30,
'http_headers': {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Language': 'en-US,en;q=0.5',
'Accept-Encoding': 'gzip, deflate, br',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1'
}
}
if 'tiktok.com' in url:
ydl_opts.update({
'format': 'best',
'quiet': False,
'no_warnings': False,
'extractor_args': {'TikTok': {'download_api': True}},
'force_generic_extractor': False,
'cookiesfrombrowser': None
})
elif 'instagram.com' in url:
ydl_opts.update({
'format': 'best',
'extract_flat': True,
'quiet': False,
'no_warnings': False,
'socket_timeout': 30,
'retries': 10
})
try:
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(url, download=True)
if info:
filename = ydl.prepare_filename(info)
return filename if os.path.exists(filename) else None
return None
except Exception as e:
print(f"Video download error: {str(e)}")
return None
def download_images(self, url):
try:
class UrlDl(job.Job):
def __init__(self, url, parent=None):
job.Job.__init__(self, url, parent)
self.urls = []
def handle_url(self, url, _):
self.urls.append(url)
j = UrlDl(url)
j.run()
if not j.urls:
return None
downloaded_files = []
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36',
'Accept': 'image/webp,*/*',
'Accept-Language': 'en-US,en;q=0.5',
'Accept-Encoding': 'gzip, deflate, br',
'Connection': 'keep-alive'
}
for img_url in j.urls:
try:
response = requests.get(
img_url,
headers=headers,
timeout=30,
allow_redirects=True
)
response.raise_for_status()
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S_%f")
ext = os.path.splitext(urlparse(img_url).path)[1]
if not ext:
ext = '.jpg'
filename = f"image_{timestamp}{ext}"
filepath = os.path.join(self.image_path, filename)
with open(filepath, 'wb') as f:
f.write(response.content)
downloaded_files.append(filepath)
except Exception as e:
print(f"Error downloading image {img_url}: {str(e)}")
continue
return downloaded_files if downloaded_files else None
except Exception as e:
print(f"Error in download_images: {str(e)}")
return None
def download_media(self, url):
if not url.startswith(('http://', 'https://')):
url = 'https://' + url
video_path = self.download_video(url)
if video_path:
return [video_path]
image_paths = self.download_images(url)
if image_paths:
return image_paths
return None
@app.route('/')
def home():
return """
<h1>Social Media Downloader API</h1>
<p>Use: /download?url=YOUR_URL_HERE</p>
"""
@app.route('/download')
def download():
try:
url = request.args.get('url')
if not url:
return "No URL provided", 400
downloader = MediaDownloader()
try:
files = downloader.download_media(url)
if files and len(files) > 0:
if len(files) == 1:
response = make_response(send_file(files[0], as_attachment=True))
@response.call_on_close
def cleanup():
downloader.cleanup()
return response
else:
def generate():
for file_path in files:
with open(file_path, 'rb') as f:
content = f.read()
filename = os.path.basename(file_path)
yield f'Content-Disposition: attachment; filename="{filename}"\n'.encode()
yield f'Content-Type: {mimetypes.guess_type(file_path)[0]}\n'.encode()
yield f'Content-Length: {len(content)}\n\n'.encode()
yield content
yield b'\n--boundary--\n'
downloader.cleanup()
response = Response(
generate(),
mimetype='multipart/x-mixed-replace; boundary=boundary',
direct_passthrough=True
)
response.headers['Content-Type'] = 'multipart/x-mixed-replace; boundary=boundary'
return response
else:
downloader.cleanup()
return "Failed to download media", 400
except Exception as e:
downloader.cleanup()
return f"Error: {str(e)}", 500
except Exception as e:
return f"Error: {str(e)}", 500
if __name__ == '__main__':
app.run(host='0.0.0.0', port=7860)