SEO / app.py
AnalysisWithMSR's picture
Update app.py
6aaea8c verified
import googleapiclient.discovery
import re
import yt_dlp
import whisper
from pydub import AudioSegment
import tempfile
from transformers import pipeline
from youtube_transcript_api import YouTubeTranscriptApi
import torch
import openai
import json
from urllib.parse import urlparse, parse_qs
import os
import gradio as gr
def extract_video_id(url):
"""Extracts the video ID from a YouTube URL."""
try:
parsed_url = urlparse(url)
if "youtube.com" in parsed_url.netloc:
query_params = parse_qs(parsed_url.query)
return query_params.get('v', [None])[0]
elif "youtu.be" in parsed_url.netloc:
return parsed_url.path.strip("/")
else:
print("Invalid YouTube URL.")
return None
except Exception as e:
print(f"Error parsing URL: {e}")
return None
def get_video_duration(video_id, api_key):
"""Fetches the video duration in minutes."""
try:
youtube = googleapiclient.discovery.build("youtube", "v3", developerKey=api_key)
request = youtube.videos().list(part="contentDetails", id=video_id)
response = request.execute()
if response["items"]:
duration = response["items"][0]["contentDetails"]["duration"]
match = re.match(r'PT(?:(\d+)H)?(?:(\d+)M)?(?:(\d+)S)?', duration)
hours = int(match.group(1)) if match.group(1) else 0
minutes = int(match.group(2)) if match.group(2) else 0
seconds = int(match.group(3)) if match.group(3) else 0
return hours * 60 + minutes + seconds / 60
else:
print("No video details found.")
return None
except Exception as e:
print(f"Error fetching video duration: {e}")
return None
def download_and_transcribe_with_whisper(youtube_url):
try:
with tempfile.TemporaryDirectory() as temp_dir:
temp_audio_file = os.path.join(temp_dir, "audio.mp3")
ydl_opts = {
'format': 'bestaudio/best',
'outtmpl': temp_audio_file,
'extractaudio': True,
'audioquality': 1,
}
# Download audio using yt-dlp
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
ydl.download([youtube_url])
# Convert to wav for Whisper
audio = AudioSegment.from_file(temp_audio_file)
wav_file = os.path.join(temp_dir, "audio.wav")
audio.export(wav_file, format="wav")
# Run Whisper transcription
model = whisper.load_model("large")
result = model.transcribe(wav_file)
transcript = result['text']
return transcript
except Exception as e:
print(f"Error during transcription: {e}")
return None
def get_transcript_from_youtube_api(video_id, video_length):
"""Fetches transcript using YouTube API if available."""
try:
transcript_list = YouTubeTranscriptApi.list_transcripts(video_id)
for transcript in transcript_list:
if not transcript.is_generated:
segments = transcript.fetch()
return " ".join(segment['text'] for segment in segments)
if video_length > 15:
auto_transcript = transcript_list.find_generated_transcript(['en'])
if auto_transcript:
segments = auto_transcript.fetch()
return " ".join(segment['text'] for segment in segments)
print("Manual transcript not available, and video is too short for auto-transcript.")
return None
except Exception as e:
print(f"Error fetching transcript: {e}")
return None
def get_transcript(youtube_url, api_key):
"""Gets transcript from YouTube API or Whisper if unavailable."""
video_id = extract_video_id(youtube_url)
if not video_id:
print("Invalid or unsupported YouTube URL.")
return None
video_length = get_video_duration(video_id, api_key)
if video_length is not None:
print(f"Video length: {video_length:.2f} minutes.")
transcript = get_transcript_from_youtube_api(video_id, video_length)
if transcript:
return transcript
print("Using Whisper for transcription.")
return download_and_transcribe_with_whisper(youtube_url)
else:
print("Error fetching video duration.")
return None
def summarize_text_huggingface(text):
"""Summarizes text using a Hugging Face summarization model."""
summarizer = pipeline("summarization", model="facebook/bart-large-cnn", device=0 if torch.cuda.is_available() else -1)
max_input_length = 1024
chunk_overlap = 100
text_chunks = [
text[i:i + max_input_length]
for i in range(0, len(text), max_input_length - chunk_overlap)
]
summaries = [
summarizer(chunk, max_length=100, min_length=50, do_sample=False)[0]['summary_text']
for chunk in text_chunks
]
return " ".join(summaries)
def generate_optimized_content(api_key, summarized_transcript):
openai.api_key = api_key
prompt = f"""
Analyze the following summarized YouTube video transcript and:
1. Extract the top 10 keywords.
2. Generate an optimized title (less than 65 characters).
3. Create an engaging description.
4. Generate related tags for the video.
Summarized Transcript:
{summarized_transcript}
Provide the results in the following JSON format:
{{
"keywords": ["keyword1", "keyword2", ..., "keyword10"],
"title": "Generated Title",
"description": "Generated Description",
"tags": ["tag1", "tag2", ..., "tag10"]
}}
"""
try:
# Use the updated OpenAI API format for chat completions
response = openai.ChatCompletion.create(
model="gpt-3.5-turbo",
messages=[{"role": "system", "content": "You are an SEO expert."},
{"role": "user", "content": prompt}]
)
# Extract and parse the response
response_content = response['choices'][0]['message']['content']
content = json.loads(response_content)
return content
except Exception as e:
print(f"Error generating content: {e}")
return None
def process_youtube_url(youtube_url, youtube_api_key, openai_api_key):
transcript = get_transcript(youtube_url, youtube_api_key)
if not transcript:
return "Could not fetch the transcript. Please try another video."
summary = summarize_text_huggingface(transcript)
optimized_content = generate_optimized_content(openai_api_key, summary)
if optimized_content:
return json.dumps(optimized_content, indent=4)
else:
return "Error generating optimized content."
# Gradio Interface
def gradio_interface(youtube_url, youtube_api_key, openai_api_key):
return process_youtube_url(youtube_url, youtube_api_key, openai_api_key)
# Creating the Gradio interface
iface = gr.Interface(
fn=gradio_interface,
inputs=[
gr.Textbox(label="YouTube URL"),
gr.Textbox(label="YouTube API Key", type="password"),
gr.Textbox(label="OpenAI API Key", type="password")
],
outputs=gr.Textbox(label="Optimized Content"),
live=True
)
if __name__ == "__main__":
iface.launch()