Spaces:
Sleeping
Sleeping
import os | |
import time | |
import re | |
from typing import Union, AnyStr | |
from urllib.parse import urlparse, parse_qs | |
import textwrap | |
import streamlit as st | |
import openai | |
from openai import OpenAI | |
from pydub import AudioSegment | |
from youtube_transcript_api import YouTubeTranscriptApi | |
from deep_translator import GoogleTranslator | |
import yt_dlp as youtube_dl | |
from transformers import AutoModelForCausalLM, GPT2Tokenizer | |
import torch | |
from tqdm import trange | |
import torch.nn.functional as F | |
client = OpenAI( | |
api_key='sk-proj-Yzjez2g6rfAiVPpb3cfJT3BlbkFJRLU4ZQpMhyLJDf0XksF4' | |
) | |
def generate_response(article_text, lang ): | |
messages=[ | |
{"role": "system", "content": "You are an expert in summarizing text in two languages: English and Vietnamese"}, | |
{"role": "user", "content": f"summarize the following text professionally and return the summary according to the input language:\n{article_text}\nSummary:"} | |
] | |
if lang == 'vi': | |
messages=[ | |
{"role": "system", "content": "Bạn là chuyên gia tóm tắt văn bản bằng hai ngôn ngữ: tiếng Anh và tiếng Việt"}, | |
{"role": "user", "content": f"hãy tóm tắt văn bản sau đây một cách chuyên nghiệp và trả về bản tóm tắt theo ngôn ngữ đầu vào:\n{article_text}\nBản Tóm tắt:"} | |
] | |
response = client.chat.completions.create( | |
model='ft:gpt-3.5-turbo-0125:personal::9eZjpJwa' , | |
messages=messages, | |
max_tokens=150, # Tăng lên để có thêm không gian cho tóm tắt | |
temperature=0.3, # Giảm xuống để tạo ra nội dung tập trung hơn | |
top_p=0.95, # Tăng nhẹ để mở rộng phạm vi từ vựng | |
frequency_penalty=0.5, # Tăng lên để khuyến khích đa dạng từ ngữ | |
presence_penalty=0.5 # Tăng lên để khuyến khích đề cập đến các chủ đề mới | |
) | |
# Extract and return the generated summary | |
summary = response.choices[0].message.content.strip() | |
return summary | |
def cleaning_input(input_text): | |
from html import unescape | |
text = str(input_text) | |
text = re.sub(r'\n\s*\n', '\n', text) | |
text = re.sub(r'[ ]+', ' ', text) | |
text = re.sub(r'\.{2,}', '.', text) | |
text = re.sub(r',{2,}', ',', text) | |
text = re.sub(r'-{2,}', '-', text) | |
text = re.sub(r'_{2,}', '_', text) | |
text = re.sub(r'!{2,}', '!', text) | |
text = re.sub(r'\?{2,}', '?', text) | |
text = re.sub(r'(\d)([A-Za-z])', r'\1 \2', text) | |
text = re.sub(r'([A-Za-z])(\d)', r'\1 \2', text) | |
text = unescape(text) | |
text = re.sub(r'[^\w\s\[\]\(\)\$\\.\n\/:#<>{},_"!@\\-\\*=\\]', '', text) | |
text = re.sub(r'\s+', ' ', text) | |
return text | |
def top_k_top_p_filtering(logits, top_k, top_p, filter_value=-float('Inf')): | |
""" Filter a distribution of logits using top-k and/or nucleus (top-p) filtering | |
Args: | |
logits: logits distribution shape (vocabulary size) | |
top_k > 0: keep only top k tokens with highest probability (top-k filtering). | |
top_p > 0.0: keep the top tokens with cumulative probability >= top_p (nucleus filtering). | |
Nucleus filtering is described in Holtzman et al. (http://arxiv.org/abs/1904.09751) | |
From: https://gist.github.com/thomwolf/1a5a29f6962089e871b94cbd09daf317 | |
""" | |
assert logits.dim() == 1 # batch size 1 for now - could be updated for more but the code would be less clear | |
top_k = min(top_k, logits.size(-1)) # Safety check | |
if top_k > 0: | |
# Remove all tokens with a probability less than the last token of the top-k | |
indices_to_remove = logits < torch.topk(logits, top_k)[0][..., -1, None] | |
logits[indices_to_remove] = filter_value | |
if top_p > 0.0: | |
sorted_logits, sorted_indices = torch.sort(logits, descending=True) | |
cumulative_probs = torch.cumsum(F.softmax(sorted_logits, dim=-1), dim=-1) | |
# Remove tokens with cumulative probability above the threshold | |
sorted_indices_to_remove = cumulative_probs > top_p | |
# Shift the indices to the right to keep also the first token above the threshold | |
sorted_indices_to_remove[..., 1:] = sorted_indices_to_remove[..., :-1].clone() | |
sorted_indices_to_remove[..., 0] = 0 | |
indices_to_remove = sorted_indices[sorted_indices_to_remove] | |
logits[indices_to_remove] = filter_value | |
return logits | |
def sample_seq(model, context, length, device, temperature, top_k, top_p): | |
""" Generates a sequence of tokens | |
Args: | |
model: gpt/gpt2 model | |
context: tokenized text using gpt/gpt2 tokenizer | |
length: length of generated sequence. | |
device: torch.device object. | |
temperature >0: used to control the randomness of predictions by scaling the logits before applying softmax. | |
top_k > 0: keep only top k tokens with highest probability (top-k filtering). | |
top_p > 0.0: keep the top tokens with cumulative probability >= top_p (nucleus filtering). | |
""" | |
context = torch.tensor(context, dtype=torch.long, device=device) | |
context = context.unsqueeze(0) | |
generated = context | |
with torch.no_grad(): | |
for _ in trange(length): | |
inputs = {'input_ids': generated} | |
outputs = model( | |
**inputs) # Note: we could also use 'past' with GPT-2/Transfo-XL/XLNet (cached hidden-states) | |
next_token_logits = outputs[0][0, -1, :] / temperature | |
filtered_logits = top_k_top_p_filtering(next_token_logits, top_k=top_k, top_p=top_p) | |
next_token = torch.multinomial(F.softmax(filtered_logits, dim=-1), num_samples=1) | |
generated = torch.cat((generated, next_token.unsqueeze(0)), dim=1) | |
return generated | |
def add_special_tokens(lang): | |
""" Returns GPT2 tokenizer after adding separator and padding tokens """ | |
token = 'gpt2' | |
if lang =='vi': | |
token = 'NlpHUST/gpt2-vietnamese' | |
tokenizer = GPT2Tokenizer.from_pretrained(token) | |
special_tokens = {'pad_token': '<|pad|>', 'sep_token': '<|sep|>'} | |
tokenizer.add_special_tokens(special_tokens) | |
return tokenizer | |
def gene(t,a): | |
tokenizer = add_special_tokens(a) | |
article = tokenizer.encode(t)[:900] | |
# Load model directly | |
model = AutoModelForCausalLM.from_pretrained("tiennlu/GPT2en_CNNen_3k") | |
if a=="vi": | |
model = AutoModelForCausalLM.from_pretrained("tiennlu/GPT2vi_CNNvi_3k") | |
generated_text = sample_seq(model, article, 50, torch.device('cpu'), temperature=1, top_k=10, top_p=0.5) | |
generated_text = generated_text[0, len(article):].tolist() | |
text = tokenizer.convert_ids_to_tokens(generated_text, skip_special_tokens=True) | |
text = tokenizer.convert_tokens_to_string(text) | |
return text | |
def find_audio_files(path, extension=".mp3"): | |
audio_files = [] | |
for root, dirs, files in os.walk(path): | |
for f in files: | |
if f.endswith(extension): | |
audio_files.append(os.path.join(root, f)) | |
return audio_files | |
def youtube_to_mp3(youtube_url: str, output_dir: str) -> Union[AnyStr, str, bytes]: | |
ydl_config = { | |
"format": "bestaudio/best", | |
"postprocessors": [ | |
{ | |
"key": "FFmpegExtractAudio", | |
"preferredcodec": "mp3", | |
"preferredquality": "192", | |
} | |
], | |
"outtmpl": os.path.join(output_dir, "%(title)s.%(ext)s"), | |
"verbose": True, | |
} | |
if not os.path.exists(output_dir): | |
os.makedirs(output_dir) | |
with youtube_dl.YoutubeDL(ydl_config) as ydl: | |
ydl.download([youtube_url]) | |
return find_audio_files(output_dir)[0] | |
def chunk_audio(filename, segment_length: int, output_dir): | |
"""segment lenght is in seconds""" | |
# print(f"Chunking audio to {segment_length} second segments...") | |
if not os.path.isdir(output_dir): | |
os.mkdir(output_dir) | |
# Load audio file | |
audio = AudioSegment.from_mp3(filename) | |
# Calculate duration in milliseconds | |
duration = len(audio) | |
# Calculate number of segments | |
num_segments = duration // (segment_length * 1000) + 1 | |
print(f"Chunking {num_segments} chunks...") | |
# Iterate through segments and save them | |
for i in range(num_segments): | |
start = i * segment_length * 1000 | |
end = min((i + 1) * segment_length * 1000, duration) | |
segment = audio[start:end] | |
segment.export(os.path.join(output_dir, f"segment_{i}.mp3"), format="mp3") | |
chunked_audio_files = find_audio_files(output_dir) | |
return sorted(chunked_audio_files) | |
def translate_text(text): | |
wrapped_text = textwrap.wrap(text, 3500) | |
tran_text = "" | |
for line in wrapped_text: | |
translation = GoogleTranslator(source='en', target='vi').translate(line) | |
tran_text += translation + " " | |
return tran_text | |
def transcribe_audio(audio_files: list, model_name="whisper-1"): | |
transcripts = "" | |
for audio_file in audio_files: | |
audio = open(audio_file, "rb") | |
try: | |
response = completions_with_backoff( | |
model=model_name, file=audio, response_format="text" | |
) | |
transcripts += response.text + " " | |
except openai.OpenAIError as e: | |
print(f"An error occurred: {e}") | |
return None | |
return transcripts | |
import random | |
# define a retry decorator | |
def retry_with_exponential_backoff( | |
func, | |
initial_delay: float = 1, | |
exponential_base: float = 2, | |
jitter: bool = True, | |
max_retries: int = 10, | |
errors: tuple = (openai.RateLimitError,), | |
): | |
def wrapper(*args, **kwargs): | |
num_retries = 0 | |
delay = initial_delay | |
while True: | |
try: | |
return func(*args, **kwargs) | |
except errors as e: | |
print(f"Error: {e}") | |
num_retries += 1 | |
if num_retries > max_retries: | |
raise Exception(f"Maximum number of retries ({max_retries}) exceeded.") | |
delay *= exponential_base * (1 + jitter * random.random()) | |
time.sleep(delay) | |
except Exception as e: | |
raise e | |
return wrapper | |
def completions_with_backoff(**kwargs): | |
return client.audio.translations.create(**kwargs) | |
def get_video_id(youtube_url): | |
"""Extract video ID from YouTube URL.""" | |
parsed_url = urlparse(youtube_url) | |
video_id = parse_qs(parsed_url.query).get("v") | |
return video_id[0] if video_id else None | |
def get_transcript(video_id): | |
tran = [] | |
transcript_list = YouTubeTranscriptApi.list_transcripts(video_id) | |
transcript = transcript_list.find_generated_transcript(['vi', 'en']) | |
translated_transcript = transcript.translate('en') | |
transcript_data = translated_transcript.fetch() | |
words_to_remove = ['[music]', '[clause]', '[smile]', '[laugh]', '[cry]', '[sigh]', '[uh]', '[um]', '[uh-huh]', '[sob]', '[giggle]', '[hmm]'] | |
for t in transcript_data: | |
if t['text'].lower() not in words_to_remove: | |
tran.append(t['text']) | |
return ' '.join(tran) | |
def chunk_text(text, chunk_size=1000, overlap_size=24): | |
encoder = RecursiveCharacterTextSplitter().from_tiktoken_encoder(model_name="gpt-3.5-turbo", chunk_size=chunk_size, | |
chunk_overlap=overlap_size) | |
return encoder.split_text(text=text) | |
def summarize_youtube_video(youtube_url, outputs_dir): | |
# Tạo đường dẫn đầy đủ cho thư mục đầu ra | |
video_id = get_video_id(youtube_url) | |
en_transcript = get_transcript(video_id) | |
if not os.path.exists(outputs_dir): | |
os.makedirs(outputs_dir) | |
if not en_transcript: | |
outputs_dir = f"{outputs_dir}\\{video_id}" | |
raw_audio_dir = f"{outputs_dir}\\raw_audio\\" | |
chunks_dir = f"{outputs_dir}\\chunks" | |
segment_length = 10 * 60 # chunk to 10 minute segments | |
if not os.path.exists(outputs_dir): | |
os.makedirs(outputs_dir) | |
audio_filename = youtube_to_mp3(youtube_url, output_dir=raw_audio_dir) | |
chunked_audio_files = chunk_audio( | |
audio_filename, segment_length=segment_length, output_dir=chunks_dir | |
) | |
en_transcript = transcribe_audio(chunked_audio_files) | |
en_transcript = cleaning_input(en_transcript) | |
vi_transcript = translate_text(en_transcript) | |
summ_en = summary(en_transcript, 'en') | |
summ_vi = summary(vi_transcript, 'vi') | |
return tuple(summ_en), tuple(summ_vi) | |
def main(): | |
st.set_page_config(layout="wide") | |
st.title("YouTube Video Summarizer 🎥") | |
st.markdown('<style>h1{color: orange; text-align: center;}</style>', unsafe_allow_html=True) | |
st.subheader('Built with the GPT2, Streamlit and ❤️') | |
st.markdown('<style>h3{color: pink; text-align: center;}</style>', unsafe_allow_html=True) | |
# Expander for app details | |
with st.expander("About the App"): | |
st.write("This app allows you to summarize while watching a YouTube video.") | |
st.write( | |
"Enter a YouTube URL in the input box below and click 'Submit' to start. This app is built by AI Anytime.") | |
# Input box for YouTube URL | |
youtube_url = st.text_input("Enter YouTube URL") | |
# Submit button | |
if st.button("Submit") and youtube_url: | |
start_time = time.time() # Start the timer | |
summ, tran = summarize_youtube_video(youtube_url, "./outputs") | |
sum = summ[0] | |
script = summ[1] | |
sum_tran = tran[0] | |
script_tran = tran[1] | |
end_time = time.time() # End the timer | |
elapsed_time = end_time - start_time | |
# Centering the video and elapsed time | |
st.markdown(""" | |
<div style="display: flex; justify-content: center; flex-direction: column; align-items: center;"> | |
<div style="width: 60%; max-width: 720px;"> | |
<iframe width="100%" height="315" src="{youtube_url}" frameborder="0" allow="accelerometer; autoplay; encrypted-media; gyroscope; picture-in-picture" allowfullscreen></iframe> | |
</div> | |
<h2>Summarization of YouTube Video</h2> | |
<p>Time taken: {elapsed_time:.2f} seconds</p> | |
</div> | |
""".format(youtube_url=youtube_url.replace("watch?v=", "embed/"), elapsed_time=elapsed_time), | |
unsafe_allow_html=True) | |
col1, col2 = st.columns(2) | |
with col1: | |
st.subheader("Transcript english") | |
st.markdown( | |
f'<div style="height: 400px; overflow-y: auto; border: 1px solid #ccc; padding: 10px;">{script}</div>', | |
unsafe_allow_html=True) | |
st.subheader("Summary english") | |
st.write(sum) | |
with col2: | |
st.subheader("Transcript vietnamese") | |
st.markdown( | |
f'<div style="height: 400px; overflow-y: auto; border: 1px solid #ccc; padding: 10px;">{script_tran}</div>', | |
unsafe_allow_html=True) | |
st.subheader("Summary vietnamese") | |
st.write(sum_tran) | |
from langchain.text_splitter import RecursiveCharacterTextSplitter | |
def chunk_overlap_text(text, chunk_size=1000, overlap_size=24): | |
return RecursiveCharacterTextSplitter().from_tiktoken_encoder(model_name="gpt-3.5-turbo", chunk_size=chunk_size, | |
chunk_overlap=overlap_size).split_text(text=text) | |
def summary(text, lang): | |
chunks = chunk_overlap_text(text) | |
rs = "" | |
print(len(chunks[0])) | |
print(f"Number of chunks: {len(chunks)}") | |
for t in chunks: | |
generated_summary = generate_response(t, lang) | |
rs += generated_summary + " " | |
text = "" | |
for t in chunks: | |
text += t + " " | |
return rs, text | |
if __name__ == "__main__": | |
main() | |