Spaces:
Running
Running
# script_search_api.py | |
from fastapi import FastAPI, HTTPException | |
from fastapi.middleware.cors import CORSMiddleware | |
import asyncio | |
from datetime import datetime, timedelta | |
from typing import Dict, Optional | |
from pydantic import BaseModel | |
from dataclasses import dataclass | |
import logging | |
import requests | |
from bs4 import BeautifulSoup | |
from difflib import get_close_matches | |
from model.analyzer import analyze_content | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
app = FastAPI() | |
app.add_middleware( | |
CORSMiddleware, | |
allow_origins=["*"], | |
allow_credentials=True, | |
allow_methods=["*"], | |
allow_headers=["*"], | |
) | |
class ProgressState: | |
progress: float | |
status: str | |
timestamp: datetime | |
task_id: str | |
is_complete: bool = False | |
result: Optional[dict] = None | |
error: Optional[str] = None | |
class ProgressResponse(BaseModel): | |
progress: float | |
status: str | |
is_complete: bool | |
result: Optional[dict] = None | |
error: Optional[str] = None | |
# Global progress tracker | |
progress_tracker: Dict[str, ProgressState] = {} | |
BASE_URL = "https://imsdb.com" | |
ALL_SCRIPTS_URL = f"{BASE_URL}/all-scripts.html" | |
def create_task_id(movie_name: str) -> str: | |
"""Create a unique task ID for a movie analysis request""" | |
return f"{movie_name}-{datetime.now().timestamp()}" | |
async def cleanup_old_tasks(): | |
"""Remove tasks older than 1 hour""" | |
while True: | |
current_time = datetime.now() | |
expired_tasks = [ | |
task_id for task_id, state in progress_tracker.items() | |
if current_time - state.timestamp > timedelta(hours=1) | |
] | |
for task_id in expired_tasks: | |
del progress_tracker[task_id] | |
await asyncio.sleep(300) # Cleanup every 5 minutes | |
async def startup_event(): | |
"""Initialize the server and start cleanup task""" | |
progress_tracker.clear() | |
asyncio.create_task(cleanup_old_tasks()) | |
logger.info("Server started, progress tracker initialized") | |
def update_progress(task_id: str, progress: float, status: str, result: Optional[dict] = None, error: Optional[str] = None): | |
"""Update progress state for a task""" | |
is_complete = progress >= 1.0 | |
progress_tracker[task_id] = ProgressState( | |
progress=progress, | |
status=status, | |
timestamp=datetime.now(), | |
task_id=task_id, | |
is_complete=is_complete, | |
result=result, | |
error=error | |
) | |
logger.info(f"Task {task_id}: {status} (Progress: {progress * 100:.0f}%)") | |
async def start_analysis(movie_name: str): | |
"""Start a new analysis task""" | |
task_id = create_task_id(movie_name) | |
update_progress(task_id, 0.0, "Starting analysis...") | |
# Start the analysis task in the background | |
asyncio.create_task(run_analysis(task_id, movie_name)) | |
return {"task_id": task_id} | |
async def get_progress(task_id: str) -> ProgressResponse: | |
"""Get current progress for a task""" | |
if task_id not in progress_tracker: | |
raise HTTPException(status_code=404, detail="Task not found") | |
state = progress_tracker[task_id] | |
return ProgressResponse( | |
progress=state.progress, | |
status=state.status, | |
is_complete=state.is_complete, | |
result=state.result, | |
error=state.error | |
) | |
def find_movie_link(movie_name: str, soup: BeautifulSoup) -> str | None: | |
"""Find the closest matching movie link from the script database.""" | |
movie_links = {link.text.strip().lower(): link['href'] for link in soup.find_all('a', href=True)} | |
close_matches = get_close_matches(movie_name.lower(), movie_links.keys(), n=1, cutoff=0.6) | |
if close_matches: | |
logger.info(f"Close match found: {close_matches[0]}") | |
return BASE_URL + movie_links[close_matches[0]] | |
logger.info("No close match found.") | |
return None | |
def find_script_link(soup: BeautifulSoup, movie_name: str) -> str | None: | |
"""Find the script download link for a given movie.""" | |
patterns = [ | |
f'Read "{movie_name}" Script', | |
f'Read "{movie_name.title()}" Script', | |
f'Read "{movie_name.upper()}" Script', | |
f'Read "{movie_name.lower()}" Script' | |
] | |
for link in soup.find_all('a', href=True): | |
link_text = link.text.strip() | |
if any(pattern.lower() in link_text.lower() for pattern in patterns): | |
return link['href'] | |
elif all(word.lower() in link_text.lower() for word in ["Read", "Script", movie_name]): | |
return link['href'] | |
return None | |
def fetch_script(movie_name: str) -> str | None: | |
"""Fetch and extract the script content for a given movie.""" | |
# Initial page load | |
update_progress(movie_name, 0.1, "Fetching the script database...") | |
try: | |
response = requests.get(ALL_SCRIPTS_URL) | |
response.raise_for_status() | |
except requests.RequestException as e: | |
logger.error(f"Failed to load the main page: {str(e)}") | |
return None | |
# Search for movie | |
update_progress(movie_name, 0.2, "Searching for the movie...") | |
soup = BeautifulSoup(response.text, 'html.parser') | |
movie_link = find_movie_link(movie_name, soup) | |
if not movie_link: | |
logger.error(f"Script for '{movie_name}' not found.") | |
return None | |
# Fetch movie page | |
update_progress(movie_name, 0.3, "Loading movie details...") | |
try: | |
response = requests.get(movie_link) | |
response.raise_for_status() | |
except requests.RequestException as e: | |
logger.error(f"Failed to load the movie page: {str(e)}") | |
return None | |
# Find script link | |
update_progress(movie_name, 0.4, "Locating script download...") | |
soup = BeautifulSoup(response.text, 'html.parser') | |
script_link = find_script_link(soup, movie_name) | |
if not script_link: | |
logger.error(f"Unable to find script link for '{movie_name}'.") | |
return None | |
# Fetch script content | |
script_page_url = BASE_URL + script_link | |
update_progress(movie_name, 0.5, "Downloading script content...") | |
try: | |
response = requests.get(script_page_url) | |
response.raise_for_status() | |
except requests.RequestException as e: | |
logger.error(f"Failed to load the script: {str(e)}") | |
return None | |
# Extract script text | |
update_progress(movie_name, 0.6, "Extracting script text...") | |
soup = BeautifulSoup(response.text, 'html.parser') | |
script_content = soup.find('pre') | |
if script_content: | |
update_progress(movie_name, 0.7, "Script extracted successfully") | |
return script_content.get_text() | |
else: | |
logger.error("Failed to extract script content.") | |
return None | |
async def run_analysis(task_id: str, movie_name: str): | |
"""Run the actual analysis task""" | |
try: | |
# Fetch script | |
update_progress(task_id, 0.2, "Fetching script...") | |
script_text = fetch_script(movie_name) | |
if not script_text: | |
raise Exception("Script not found") | |
# Analyze content | |
update_progress(task_id, 0.6, "Analyzing content...") | |
result = await analyze_content(script_text) | |
# Complete | |
update_progress(task_id, 1.0, "Analysis complete", result=result) | |
except Exception as e: | |
logger.error(f"Error in analysis: {str(e)}", exc_info=True) | |
update_progress(task_id, 1.0, "Error occurred", error=str(e)) | |
async def fetch_and_analyze(movie_name: str): | |
"""Fetch and analyze a movie script, with progress tracking.""" | |
try: | |
# Initialize progress | |
task_id = create_task_id(movie_name) | |
update_progress(task_id, 0.0, "Starting script search...") | |
# Fetch script | |
script_text = fetch_script(movie_name) | |
if not script_text: | |
raise HTTPException(status_code=404, detail="Script not found or error occurred") | |
# Analyze content | |
update_progress(task_id, 0.8, "Analyzing script content...") | |
result = await analyze_content(script_text) | |
# Finalize | |
update_progress(task_id, 1.0, "Analysis complete!") | |
return result | |
except Exception as e: | |
logger.error(f"Error in fetch_and_analyze: {str(e)}", exc_info=True) | |
# Clean up progress tracker in case of error | |
if movie_name in progress_tracker: | |
del progress_tracker[movie_name] | |
raise HTTPException(status_code=500, detail=f"Internal Server Error: {str(e)}") | |
def get_progress(movie_name: str): | |
"""Get the current progress and status for a movie analysis.""" | |
if movie_name not in progress_tracker: | |
return { | |
"progress": 0, | |
"status": "Waiting to start..." | |
} | |
progress_info = progress_tracker[movie_name] | |
# Clean up old entries (optional) | |
current_time = datetime.now() | |
if (current_time - progress_info.timestamp).total_seconds() > 3600: # 1 hour timeout | |
del progress_tracker[movie_name] | |
return { | |
"progress": 0, | |
"status": "Session expired. Please try again." | |
} | |
return { | |
"progress": progress_info.progress, | |
"status": progress_info.status | |
} | |
if __name__ == "__main__": | |
import uvicorn | |
uvicorn.run(app, host="0.0.0.0", port=8000) |