uzi007 commited on
Commit
955f567
·
1 Parent(s): 1c387c0

Removed Unneeded Files

Browse files
Files changed (7) hide show
  1. .gitignore +3 -0
  2. audiobook.py +52 -0
  3. main.py +196 -0
  4. media_download.py +395 -0
  5. requirements.txt +13 -0
  6. summarizer.py +429 -0
  7. transcription.py +221 -0
.gitignore ADDED
@@ -0,0 +1,3 @@
 
 
 
 
1
+ Output/*
2
+ *.pyc
3
+ *.sh
audiobook.py ADDED
@@ -0,0 +1,52 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os
2
+ from IPython.display import Audio
3
+ import nltk # we'll use this to split into sentences
4
+ import numpy as np
5
+
6
+ from bark.generation import (
7
+ generate_text_semantic,
8
+ preload_models,
9
+ )
10
+ from bark.api import semantic_to_waveform
11
+ from bark import generate_audio, SAMPLE_RATE
12
+ import soundfile as sf
13
+ os.environ["CUDA_VISIBLE_DEVICES"] = "0"
14
+
15
+ # Loads the model, should be run one time
16
+ preload_models()
17
+
18
+ class AudioBook:
19
+ def __init__(self, output_folder="output"):
20
+ self.output_folder = output_folder
21
+
22
+ # Create the output folder if it doesn't exist
23
+ if not os.path.exists(output_folder):
24
+ os.makedirs(output_folder)
25
+
26
+ def generate_audio_from_text(self, text, speaker="male", filename="output_audio"):
27
+ # Preprocess text
28
+ text = text.replace("\n", " ").strip()
29
+ sentences = nltk.sent_tokenize(text)
30
+
31
+ # Choose the speaker based on the input
32
+ if speaker == "male":
33
+ SPEAKER = "v2/en_speaker_6"
34
+ elif speaker == "female":
35
+ SPEAKER = "v2/en_speaker_9"
36
+ else:
37
+ raise ValueError("Invalid speaker selection. Use 'male' or 'female'.")
38
+
39
+ silence = np.zeros(int(0.25 * SAMPLE_RATE)) # quarter-second of silence
40
+
41
+ pieces = []
42
+ for sentence in sentences:
43
+ audio_array = generate_audio(sentence, history_prompt=SPEAKER, text_temp=0.7, waveform_temp=0.7)
44
+ pieces += [audio_array, silence.copy()]
45
+
46
+ audio_data = np.concatenate(pieces)
47
+
48
+ # Save the audio to a WAV file in the output folder
49
+ output_path = os.path.join(self.output_folder, f"{filename}.wav")
50
+ sf.write(output_path, audio_data, SAMPLE_RATE)
51
+
52
+ return output_path
main.py ADDED
@@ -0,0 +1,196 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import uvicorn
2
+ from fastapi import FastAPI, Request
3
+
4
+ from media_download import YoutubeDownloader
5
+ from transcription import StableWhisper
6
+ from summarizer import Extract_Summary, AudioBookNarration
7
+ from audiobook import AudioBook
8
+
9
+ app = FastAPI()
10
+ output_folder = 'Output'
11
+
12
+ # Create a context variable to store the contexts for each user
13
+ users_context = dict()
14
+
15
+
16
+ @app.get("/get_media_info")
17
+ async def get_media_info(request: Request, url: str):
18
+
19
+ # Getting User's IP
20
+ user_ip = request.client.host
21
+
22
+ # Getting User's Youtube Downloader
23
+ youtube_downloader = YoutubeDownloader(url, output_folder)
24
+
25
+ # Getting Youtube Media Info
26
+ media_info = youtube_downloader.get_media_info()
27
+
28
+ # Storing Info in the context for this user's session
29
+ users_context[user_ip] = dict()
30
+ users_context[user_ip]['downloader'] = youtube_downloader
31
+ # users_context[user_ip]['media_info'] = media_info
32
+ users_context[user_ip]['url'] = url
33
+
34
+ return media_info
35
+
36
+
37
+ @app.get("/download_media")
38
+ async def download_media(request: Request, media_type: str, media_format: str, media_quality: str):
39
+
40
+ # Getting User's IP
41
+ user_ip = request.client.host
42
+
43
+ # Downloading Media for User
44
+ media_path = users_context[user_ip]['downloader'].download(media_type, media_format, media_quality)
45
+
46
+ # Getting Status
47
+ status = 1 if media_path else 0
48
+
49
+ if status:
50
+ # Storing Media Info in the context for this user's session
51
+ users_context[user_ip]['media_path'] = media_path
52
+ users_context[user_ip]['media_type'] = media_type
53
+
54
+ return {"status": status}
55
+
56
+
57
+ @app.get("/get_transcript")
58
+ async def get_transcript(request: Request, subtitle_format: str = 'srt', word_level: bool = True):
59
+
60
+ # Getting User's IP
61
+ user_ip = request.client.host
62
+
63
+ # Retrieving the media_path from the context for this user's session
64
+ media_path = users_context[user_ip]['media_path']
65
+
66
+ # Checking if the media_type is Video, then extract it's audio
67
+ media_type = users_context[user_ip]['media_type']
68
+ if media_type == 'video':
69
+ media_path = users_context[user_ip]['downloader'].extract_audio(media_path)
70
+
71
+ # Whisper based transcription
72
+ stable_whisper_transcript = StableWhisper(media_path, output_folder, subtitle_format=subtitle_format, word_level=word_level)
73
+ transcript = stable_whisper_transcript.generate_transcript()
74
+ transcript_path = stable_whisper_transcript.save_transcript()
75
+
76
+ # Getting Status
77
+ status = 1 if transcript else 0
78
+
79
+ if status:
80
+ # Storing Transcript Info in the context for this user's session
81
+ users_context[user_ip]['transcript'] = transcript
82
+ users_context[user_ip]['transcript_path'] = transcript_path
83
+
84
+ return {"status": status, "transcript": transcript}
85
+
86
+
87
+ @app.get("/get_summary")
88
+ async def get_summary(request: Request, Summary_type: str, Summary_strategy: str, Target_Person_type: str,
89
+ Response_length: str, Writing_style: str, text_input: str = None):
90
+
91
+ # Getting User's IP
92
+ user_ip = request.client.host
93
+
94
+ # Getting Transcript if not provided
95
+ if not text_input:
96
+ text_input = users_context[user_ip]['transcript']
97
+
98
+ # Extracting Summary
99
+ summary_extractor = Extract_Summary(text_input=text_input)
100
+ output = summary_extractor.define_chain(Summary_type=Summary_type,
101
+ Summary_strategy=Summary_strategy,
102
+ Target_Person_type=Target_Person_type,
103
+ Response_length=Response_length,
104
+ Writing_style=Writing_style,
105
+ key_information=False)
106
+
107
+ # Getting Status
108
+ status = 1 if output else 0
109
+
110
+ if status:
111
+ # Storing Summary Info in the context for this user's session
112
+ users_context[user_ip]['summary'] = output
113
+
114
+ return {"status": status, "summary": output}
115
+
116
+
117
+ @app.get("/get_key_info")
118
+ async def get_key_info(request: Request, Summary_type: str, Summary_strategy: str, Target_Person_type: str,
119
+ Response_length: str, Writing_style: str, text_input: str = None):
120
+
121
+ # Getting User's IP
122
+ user_ip = request.client.host
123
+
124
+ # Getting Transcript if not provided
125
+ if not text_input:
126
+ text_input = users_context[user_ip]['transcript']
127
+
128
+ # Extracting Summary
129
+ summary_extractor = Extract_Summary(text_input=text_input)
130
+ output = summary_extractor.define_chain(Summary_type=Summary_type,
131
+ Summary_strategy=Summary_strategy,
132
+ Target_Person_type=Target_Person_type,
133
+ Response_length=Response_length,
134
+ Writing_style=Writing_style,
135
+ key_information=True)
136
+
137
+ # Getting Status
138
+ status = 1 if output else 0
139
+
140
+ if status:
141
+ # Storing Key Info in the context for this user's session
142
+ users_context[user_ip]['key_info'] = output
143
+
144
+ return {"status": status, "key_info": output}
145
+
146
+
147
+ @app.get("/get_narration")
148
+ async def get_narration(request: Request, Narration_style: str, text_input: str = None):
149
+
150
+ # Getting User's IP
151
+ user_ip = request.client.host
152
+
153
+ # Getting Transcript if not provided
154
+ if not text_input:
155
+ text_input = users_context[user_ip]['transcript']
156
+
157
+ # Extracting Narration
158
+ narrator = AudioBookNarration(text_input=text_input)
159
+ output = narrator.define_chain(Narration_style=Narration_style)
160
+
161
+ # Getting Status
162
+ status = 1 if output else 0
163
+
164
+ if status:
165
+ # Storing Narration Info in the context for this user's session
166
+ users_context[user_ip]['narration'] = output
167
+
168
+ return {"status": status, "narration": output}
169
+
170
+
171
+ @app.get("/get_audiobook")
172
+ async def get_audiobook(request: Request, speaker: str = "male", text_input: str = None):
173
+
174
+ # Getting User's IP
175
+ user_ip = request.client.host
176
+
177
+ # Getting Transcript if not provided
178
+ if not text_input:
179
+ text_input = users_context[user_ip]['narration']
180
+
181
+ # Generating Audiobook
182
+ audiobook = AudioBook(output_folder=output_folder)
183
+ audio_path = audiobook.generate_audio_from_text(text_input, speaker=speaker, filename="output_audio")
184
+
185
+ # Getting Status
186
+ status = 1 if audio_path else 0
187
+
188
+ if status:
189
+ # Storing Audiobook path in the context for this user's session
190
+ users_context[user_ip]['audiobook_path'] = audio_path
191
+
192
+ return {"status": status, "audiobook_path": audio_path}
193
+
194
+
195
+ if __name__ == "__main__":
196
+ uvicorn.run(app, host="127.0.0.1", port=8000)
media_download.py ADDED
@@ -0,0 +1,395 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os
2
+ import re
3
+ import json
4
+ import time
5
+ import subprocess
6
+
7
+ import numpy as np
8
+ import pandas as pd
9
+
10
+ from abc import ABC, abstractmethod
11
+
12
+ from pytube import YouTube
13
+
14
+
15
+ class MediaDownloader(ABC):
16
+
17
+ def __init__(self, url, output_path, start_time=None, end_time=None):
18
+ self.url = url
19
+ self.output_path = os.path.join(os.getcwd(), output_path)
20
+ self.start_time = start_time
21
+ self.end_time = end_time
22
+ self.__create_output_dir()
23
+
24
+ def __create_output_dir(self):
25
+ if not os.path.exists(self.output_path):
26
+ os.makedirs(self.output_path)
27
+
28
+ @abstractmethod
29
+ def _get_supported_media_formats(self):
30
+ pass
31
+
32
+ @abstractmethod
33
+ def download(self, media_type, media_format, media_quality):
34
+ pass
35
+
36
+ @abstractmethod
37
+ def _download_media(self, media_type, media_format, media_quality):
38
+ pass
39
+
40
+ @abstractmethod
41
+ def _download_audio(self, audio_format, audio_quality):
42
+ pass
43
+
44
+ @abstractmethod
45
+ def _download_video(self, video_format, video_quality):
46
+ pass
47
+
48
+ @abstractmethod
49
+ def _download_audio_and_video(self, media_format, media_quality):
50
+ pass
51
+
52
+ @abstractmethod
53
+ def _download_media_chunk(self, media_type, media_format, media_quality):
54
+ pass
55
+
56
+ @abstractmethod
57
+ def _download_audio_chunk(self, audio_format, audio_quality):
58
+ pass
59
+
60
+ @abstractmethod
61
+ def _download_video_chunk(self, video_format, video_quality):
62
+ pass
63
+
64
+ class YoutubeDownloader(MediaDownloader):
65
+
66
+ def __init__(self, url, output_path, start_time=None, end_time=None):
67
+ super().__init__(url, output_path, start_time, end_time)
68
+ self.youtube = YouTube(url)
69
+ self.title = self.youtube.title
70
+ self.media_length = self.youtube.length
71
+ self.thumbnail_url = self.youtube.thumbnail_url
72
+ self.streams = self.youtube.streams
73
+ self.streams_df, self.media_formats_dict = self._get_supported_media_formats()
74
+
75
+ def __get_quality_int(self, media_quality):
76
+ '''
77
+ Returns the Quality in Integer
78
+ E.g: Given input 1080p, it returns 1080
79
+ '''
80
+ match = re.search(r'^\d+', media_quality)
81
+ if match:
82
+ return int(match.group())
83
+ else:
84
+ return None
85
+
86
+ def _get_supported_media_formats(self):
87
+ '''
88
+ Returns all supported media formats for both audio & video
89
+ '''
90
+
91
+ # Creating Pandas Dataframe for Video Streams' Details
92
+ streams_details = []
93
+ for stream in self.streams.filter(only_video=True):
94
+ media_type = stream.type
95
+ media_format = stream.mime_type.split('/')[1]
96
+ quality = stream.resolution
97
+ progressive = stream.is_progressive
98
+ stream_details = [media_type, media_format, quality, progressive]
99
+ streams_details.append(stream_details)
100
+ cols = ['media_type', 'media_format', 'media_quality', 'progressive']
101
+ streams_df = pd.DataFrame(streams_details, columns=cols)
102
+
103
+ # Adding Custom Audio Streams
104
+ streams_df.loc[len(streams_df)] = ['audio', 'mp3', '128kbps', False]
105
+ streams_df.loc[len(streams_df)] = ['audio', 'mp3', '256kbps', False]
106
+ streams_df.loc[len(streams_df)] = ['audio', 'wav', '1411kbps', False]
107
+
108
+ # Converting to Dictionary for Unique User Options
109
+ media_formats_dict = dict()
110
+ for media_type in sorted(streams_df['media_type'].unique()):
111
+ media_formats_dict[media_type] = dict()
112
+ media_type_df = streams_df[streams_df['media_type'] == media_type]
113
+ for media_format in sorted(media_type_df['media_format'].unique()):
114
+ media_format_df = media_type_df[media_type_df['media_format'] == media_format]
115
+ media_qualities = sorted(media_format_df['media_quality'].unique(), key=self.__get_quality_int)
116
+ media_formats_dict[media_type][media_format] = media_qualities
117
+
118
+ return streams_df, media_formats_dict
119
+
120
+ def get_media_formats(self):
121
+ '''
122
+ Returns a dictioary for supported media formats
123
+ '''
124
+ return self.media_formats_dict
125
+
126
+ def _select_media_format(self):
127
+ '''
128
+ For selecting media format to download
129
+ '''
130
+ print(json.dumps(self.media_formats_dict, indent=12))
131
+
132
+ # Getting Media Type
133
+ media_types = list(self.media_formats_dict.keys())
134
+ media_type = input(f'Select a Media Type from {media_types}: ')
135
+ assert(media_type in media_types)
136
+
137
+ # Getting Media Format
138
+ media_formats = list(self.media_formats_dict[media_type].keys())
139
+ media_format = input(f'Select a Media Format from {media_formats}: ')
140
+ assert(media_format in media_formats)
141
+
142
+ # Getting Media Type
143
+ media_qualities = self.media_formats_dict[media_type][media_format]
144
+ media_quality = input(f'Select a Media Quality from {media_qualities}: ')
145
+ assert(media_quality in media_qualities)
146
+
147
+ return media_type, media_format, media_quality
148
+
149
+ def download(self, media_type, media_format, media_quality):
150
+ '''
151
+ Download Handler Function:
152
+ Handles all types of media download
153
+ '''
154
+ if (self.start_time) or (self.end_time):
155
+ output_path = self._download_media_chunk(media_type, media_format, media_quality)
156
+ else:
157
+ output_path = self._download_media(media_type, media_format, media_quality)
158
+ return output_path
159
+
160
+ def _download_media(self, media_type, media_format, media_quality):
161
+ '''
162
+ Media Download Handler Function:
163
+ Checks which type of media download is required & passes it onto the relevant method
164
+ '''
165
+
166
+ # Checking for the Media in Dataframe
167
+ media_mask = (self.streams_df['media_type'] == media_type) & \
168
+ (self.streams_df['media_format'] == media_format) & \
169
+ (self.streams_df['media_quality'] == media_quality)
170
+ media_df = self.streams_df[media_mask]
171
+
172
+ # Downloading Media according to the Arguments
173
+ if media_type == 'audio':
174
+ output_path = self._download_audio(media_format, media_quality)
175
+
176
+ elif media_type == 'video':
177
+
178
+ # Checking if Progressive Video is Available
179
+ is_progressive = True if True in media_df['progressive'].unique() else False
180
+
181
+ if is_progressive:
182
+ output_path = self._download_video(media_format, media_quality)
183
+
184
+ else:
185
+ output_path = self._download_audio_and_video(media_format, media_quality)
186
+
187
+ return output_path
188
+
189
+
190
+ def _download_audio(self, audio_format, audio_quality):
191
+ '''
192
+ Filters the required audio stream & downloads it
193
+ '''
194
+
195
+ # Getting Quality Command String
196
+ quality = str(self.__get_quality_int(audio_quality)) + 'K'
197
+
198
+ # Getting Output Path
199
+ output_path = os.path.join(self.output_path, f"{self.title}.{audio_format}")
200
+
201
+ # Download Command
202
+ command = [
203
+ "yt-dlp",
204
+ "-x", "--audio-format", audio_format,
205
+ "--audio-quality", quality,
206
+ "-o", output_path,
207
+ self.url, "-q"
208
+ ]
209
+
210
+ # Running the command using Subprocess
211
+ subprocess.run(command)
212
+
213
+ return output_path
214
+
215
+ def _download_video(self, video_format, video_quality):
216
+ '''
217
+ Filters the required video stream & downloads it
218
+ Only for Progressive media i.e containing both audio & video streams
219
+ '''
220
+ stream = self.streams.filter(progressive=True, file_extension=video_format, resolution=video_quality).first()
221
+ print(stream)
222
+ video_path = stream.download(output_path=self.output_path, filename=f"{self.title}.{video_format}")
223
+ return video_path
224
+
225
+ def _download_audio_and_video(self, media_format, media_quality):
226
+ '''
227
+ Filters the required video stream & downloads it
228
+ Filters the best quality audio stream of the same format & downloads it
229
+ '''
230
+
231
+ # Downloading Audio
232
+ stream = self.streams.filter(file_extension=media_format, only_audio=True).order_by('abr').desc().first()
233
+ print(stream)
234
+ audio_filename = f"{self.title} - Audio.{media_format}"
235
+ audio_path = stream.download(output_path=self.output_path, filename=audio_filename)
236
+
237
+ # Downloading Video
238
+ stream = self.streams.filter(file_extension=media_format, resolution=media_quality).first()
239
+ print(stream)
240
+ video_filename = f"{self.title} - Video.{media_format}"
241
+ video_path = stream.download(output_path=self.output_path, filename=video_filename)
242
+
243
+ # Combining the Audio & Video Files using FFMPEG Command
244
+ output_path = os.path.join(self.output_path, f"{self.title}.{media_format}")
245
+ command = ['ffmpeg', '-i', video_path, '-i', audio_path,
246
+ '-c:v', 'copy', '-c:a', 'copy', output_path,
247
+ '-loglevel', 'quiet']
248
+ subprocess.run(command)
249
+
250
+ os.remove(audio_path)
251
+ os.remove(video_path)
252
+
253
+ return output_path
254
+
255
+ def _download_media_chunk(self, media_type, media_format, media_quality):
256
+ '''
257
+ Media Download Handler Function:
258
+ Checks which type of media download is required for particular chunk & passes it onto the relevant method
259
+ '''
260
+ # Downloading Media according to the Arguments
261
+ if media_type == 'audio':
262
+ output_path = self._download_audio_chunk(media_format, media_quality)
263
+
264
+ elif media_type == 'video':
265
+ output_path = self._download_video_chunk(media_format, media_quality)
266
+
267
+ return output_path
268
+
269
+ def _download_audio_chunk(self, audio_format, audio_quality):
270
+ '''
271
+ Filters the required audio stream & downloads it for particular chunk
272
+ '''
273
+
274
+ # Getting Chunk Command String
275
+ if (self.start_time) and (self.end_time):
276
+ chunk_string = f"-ss {self.start_time} -to {self.end_time}"
277
+
278
+ elif (self.start_time) and (not self.end_time):
279
+ chunk_string = f"-ss {self.start_time}"
280
+
281
+ elif (not self.start_time) and (self.end_time):
282
+ chunk_string = f"-to {self.end_time}"
283
+
284
+ # Getting Quality Command String
285
+ quality = str(self.__get_quality_int(audio_quality)) + 'K'
286
+
287
+ # Getting Output Path
288
+ output_path = os.path.join(self.output_path, f"{self.title}.{audio_format}")
289
+
290
+ # Download Command
291
+ command = [
292
+ "yt-dlp",
293
+ "-x", "--audio-format", audio_format,
294
+ "--audio-quality", quality,
295
+ "--external-downloader", "ffmpeg",
296
+ "--external-downloader-args", chunk_string,
297
+ "-o", output_path,
298
+ url, "-q"
299
+ ]
300
+
301
+ # Running the command using Subprocess
302
+ subprocess.run(command)
303
+
304
+ return output_path
305
+
306
+ def _download_video_chunk(self, video_format, video_quality):
307
+ '''
308
+ Filters the required video stream & downloads it for particular chunk
309
+ '''
310
+
311
+ # Getting Chunk Command String
312
+ if (self.start_time) and (self.end_time):
313
+ chunk_string = f"-ss {self.start_time} -to {self.end_time}"
314
+
315
+ elif (self.start_time) and (not self.end_time):
316
+ chunk_string = f"-ss {self.start_time}"
317
+
318
+ elif (not self.start_time) and (self.end_time):
319
+ chunk_string = f"-to {self.end_time}"
320
+
321
+ # Getting Output Path
322
+ output_path = os.path.join(self.output_path, f"{self.title}.{video_format}")
323
+
324
+ # Getting Video Quality Integer
325
+ video_quality = self.__get_quality_int(video_quality)
326
+
327
+ # Download Command
328
+ if video_format == 'mp4':
329
+ video_codec = "h264"
330
+ audio_codec = "m4a"
331
+
332
+ elif video_format == 'webm':
333
+ video_codec = "vp9"
334
+ audio_codec = "opus"
335
+
336
+ else:
337
+ print('Unexpected Video Format Encountered:', video_format)
338
+ os.exit(0)
339
+
340
+ command = [
341
+ "yt-dlp",
342
+ url,
343
+ "-S", f"res:{video_quality},vcodec:{video_codec},acodec:{audio_codec}",
344
+ "--merge-output-format", video_format,
345
+ "--download-sections", f"*{self.start_time}-{self.end_time}",
346
+ "-o", f"{output_path}",
347
+ # "-q"
348
+ ]
349
+
350
+ print(' '.join(command))
351
+
352
+ # Running the command using Subprocess
353
+ subprocess.run(command)
354
+
355
+ return output_path
356
+
357
+ def get_media_info(self):
358
+ media_info = {
359
+ 'title': self.title,
360
+ 'media_length': self.media_length,
361
+ 'thumbnail_url': self.thumbnail_url,
362
+ 'formats': self.media_formats_dict
363
+ }
364
+ return media_info
365
+
366
+ @staticmethod
367
+ def extract_audio(video_path):
368
+ """
369
+ Extract audio from a video file (MP4 or WebM) and save it as an MP3 file using ffmpeg.
370
+
371
+ Args:
372
+ video_path (str): Path to the input video file.
373
+
374
+ Returns:
375
+ bool: True if extraction is successful, False otherwise.
376
+ """
377
+ try:
378
+ # Determine the file format (MP4 or WebM) based on the file extension
379
+ filename, extension = os.path.splitext(video_path)
380
+
381
+ # Extracted audio path
382
+ audio_path = filename + '.mp3'
383
+
384
+ # Choose the appropriate codec for the output audio format (MP3)
385
+ audio_codec = "libmp3lame" if extension.lower() in (".mp4", ".webm") else "mp3"
386
+
387
+ # Run the ffmpeg command to extract audio
388
+ subprocess.run(["ffmpeg", "-i", video_path, "-vn", "-acodec",
389
+ audio_codec, audio_path, '-loglevel', 'quiet'], check=True)
390
+
391
+
392
+ return audio_path
393
+
394
+ except subprocess.CalledProcessError as e:
395
+ print(f"Error: {e}")
requirements.txt ADDED
@@ -0,0 +1,13 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ fastapi
2
+ faster-whisper
3
+ langchain
4
+ openai
5
+ pandas
6
+ pytube
7
+ scikit-learn
8
+ soundfile
9
+ stable-ts
10
+ uvicorn
11
+ wordcloud
12
+ youtube-transcript-api
13
+ git+https://github.com/suno-ai/bark.git
summarizer.py ADDED
@@ -0,0 +1,429 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from langchain import PromptTemplate
2
+ from langchain.chat_models import ChatOpenAI
3
+ from langchain.chains.summarize import load_summarize_chain
4
+ from langchain.text_splitter import RecursiveCharacterTextSplitter
5
+ from langchain.document_loaders import DirectoryLoader
6
+ from wordcloud import WordCloud, STOPWORDS
7
+ import numpy as np
8
+ from langchain.embeddings import OpenAIEmbeddings
9
+ from sklearn.cluster import KMeans
10
+ from sklearn.metrics import silhouette_score
11
+ import os
12
+ from langchain.docstore.document import Document
13
+
14
+ os.environ["OPENAI_API_KEY"] = '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'
15
+
16
+ class Extract_Summary:
17
+
18
+ def __init__(self,text_input, file_path=None, chunks=2000, chunking_strategy=None, LLM_Model="gpt-3.5-turbo", temperature=1, top_p=None, top_k=None):
19
+ self.chunks = chunks
20
+ self.file_path = file_path
21
+ self.text_input = text_input
22
+ self.chuking_strategy = chunking_strategy
23
+ self.LLM_Model = LLM_Model
24
+ self.temperature = temperature
25
+ self.top_p = top_p
26
+ self.top_k = top_k
27
+
28
+
29
+ def doc_summary(self, docs):
30
+ # print(f'You have {len(docs)} documents')
31
+ num_words = sum([len(doc.page_content.split(" ")) for doc in docs])
32
+ # print(f"You have {num_words} words in documents")
33
+ return num_words, len(docs)
34
+
35
+ def load_docs(self):
36
+
37
+ if self.file_path is not None:
38
+ docs = DirectoryLoader(self.file_path, glob="**/*.txt").load()
39
+ else:
40
+
41
+ docs = Document(page_content=f"{self.text_input}", metadata={"source": "local"})
42
+ docs = [docs]
43
+ # docs = self.text_input
44
+ tokens, documents_count = self.doc_summary(docs)
45
+
46
+ if documents_count > 8 or tokens > 6000: ## Add token checks as well. Add Model availabilty checks
47
+ docs = self.chunk_docs(docs) ## Handling Large Document with token more than 6000
48
+ docs = self.summarise_large_documents(docs)
49
+ tokens, documents_count = self.doc_summary(docs)
50
+
51
+ if tokens > 2000:
52
+ docs = self.chunk_docs(docs)
53
+ chain_type = 'map_reduce'
54
+ else:
55
+ chain_type = 'stuff'
56
+
57
+ print("=="*20)
58
+ print(tokens)
59
+ print(chain_type)
60
+ return docs, chain_type
61
+
62
+ ## Add ensemble retriver for this as well.
63
+
64
+ def summarise_large_documents(self, docs):
65
+ print("=="*20)
66
+ print('Orignial Docs size : ' ,len(docs))
67
+ embeddings = OpenAIEmbeddings()
68
+ vectors = embeddings. embed_documents([x.page_content for x in docs])
69
+
70
+ # Silhoute Score
71
+ n_clusters_range = range(2, 11)
72
+ silhouette_scores = []
73
+ for i in n_clusters_range:
74
+ kmeans = KMeans(n_clusters=i, init='k-means++',
75
+ max_iter=300, n_init=10, random_state=0)
76
+ kmeans.fit(vectors)
77
+ score = silhouette_score(vectors, kmeans.labels_)
78
+ silhouette_scores.append(score)
79
+
80
+ optimal_n_clusters = n_clusters_range[np.argmax(silhouette_scores)]
81
+ # n_clusters = 5
82
+ kmeans = KMeans(n_clusters=optimal_n_clusters,
83
+ random_state=42).fit(vectors)
84
+
85
+ # Getting documents closers to centeriod
86
+ closest_indices = []
87
+ # Loop through the number of clusters you have
88
+ for i in range(optimal_n_clusters):
89
+ # Get the list of distances from that particular cluster center
90
+ distances = np.linalg.norm(
91
+ vectors - kmeans.cluster_centers_[i], axis=1)
92
+ # Find the list position of the closest one (using argmin to find the smallest distance)
93
+ closest_index = np.argmin(distances)
94
+ # Append that position to your closest indices list
95
+ closest_indices.append(closest_index)
96
+
97
+ sorted_indices = sorted(closest_indices)
98
+ selected_docs = [docs[doc] for doc in sorted_indices]
99
+
100
+ print('Selected Docs size : ' ,len(selected_docs))
101
+
102
+ return selected_docs
103
+
104
+ def chunk_docs(self, docs):
105
+
106
+ text_splitter = RecursiveCharacterTextSplitter(
107
+ chunk_size=self.chunks,
108
+ chunk_overlap=50,
109
+ length_function=len,
110
+ is_separator_regex=False,
111
+ )
112
+ splitted_document = text_splitter.split_documents(docs)
113
+
114
+ return splitted_document
115
+
116
+ def get_key_information_stuff(self):
117
+
118
+ prompt_template = """
119
+ Extract Key Informtion from the text below. This key information can include People Names & their Role/rank, Locations, Organization,Nationalities,Religions,
120
+ Events such as Historical, social, sporting and naturally occurring events, Products , Address & email, URL, Date & Time, Provide the list of Key information each
121
+ should be labeled with thier crossponding category.if key information related to category is not present, dont add that category in Response.
122
+ {text}
123
+
124
+ """
125
+ prompt = PromptTemplate(
126
+ template=prompt_template, input_variables=['text'])
127
+
128
+ return prompt
129
+
130
+
131
+ def get_key_information_map_reduce(self):
132
+
133
+ map_prompts = """
134
+ Extract Key Informtion from the text below. This key information can include People Names & their Role/rank, Locations, Organization,Nationalities,Religions,
135
+ Events such as Historical, social, sporting and naturally occurring events, Products , Address & email, URL, Date & Time, Provide the list of Key information each
136
+ should be labeled with thier crossponding category.if key information related to category is not present, dont add that category in Response.
137
+ {text}
138
+
139
+ """
140
+ combine_prompt = """
141
+ Below Text contains Key Information that was extracted from text. You job is to combine the Key Information and Return the results.This key information can include People Names & their Role/rank,
142
+ Locations, Organization,Nationalities,Religions,Events such as Historical, social, sporting and naturally occurring events, Products ,
143
+ Address & email, URL, Date & Time, Provide the list of Key information each should be labeled with thier crossponding category.
144
+ if key information related to category is not present, dont add that category in Response.
145
+ {text}
146
+
147
+ """
148
+ map_template = PromptTemplate(template=map_prompts,input_variables=['text']
149
+ )
150
+ # combine_template = PromptTemplate(template=combine_prompt,input_variables=['Summary_type','Summary_strategy','Target_Person_type','Response_lenght','Writing_style','text']
151
+ # )
152
+ combine_template = PromptTemplate(template=combine_prompt,input_variables=['text'])
153
+
154
+
155
+ return map_template, combine_template
156
+
157
+
158
+
159
+ def get_stuff_prompt(self):
160
+ prompt_template = """
161
+
162
+ Write a {Summary_type} and {Summary_strategy} for {Target_Person_type} lenght of the summary should be of {Response_length} words and writing style should be of {Writing_style}.
163
+ From the text below by identifying most important topics based on their importance in text corpus and summary should be based on these important topics.
164
+
165
+ {text}
166
+
167
+ """
168
+
169
+ # prompt = PromptTemplate.from_template(prompt_template,input_variables=['Summary_type','Summary_strategy','Target_Person_type','Response_lenght','Writing_style','text'])
170
+
171
+ prompt = PromptTemplate(
172
+ template=prompt_template, input_variables=['Summary_type','Summary_strategy','Target_Person_type','Response_length','Writing_style','text'])
173
+
174
+
175
+ return prompt
176
+
177
+ def define_prompts(self):
178
+
179
+ map_prompts = """
180
+ "Identify the key topics in the following text. in your response only add the most relevant and most important topics and Concised yet eloborative summary of text below.
181
+ Dont add all the topics that you find.if you didnt find any important topic,dont return anything in response.Also provide me importance score of each idenfied topics out of 1.
182
+ 'Your response should be like this , eg: Summary of text: blah blah blah,list of comma saperated topic names `Topic 1 Topic 2 Topic 3`
183
+ and list of comma saperated importance scores for these topics `1 , 0.5,0.2`, so response should be formated like this.
184
+
185
+ Summary:
186
+ blah Blah blah
187
+ Topic Names : Topic 1, Topic 2, Topic 3
188
+ Importance Score: 1,0.4,0.3
189
+
190
+ {text}
191
+ """
192
+
193
+ combine_prompt = """
194
+ Here is list of summaries ,Topics Names and thier respective importance score that were extracted from text.
195
+ your job is to provide best possible summary based on the list of summaries below and Use most important topics present based on thier importance score.
196
+ Write a {Summary_type} and {Summary_strategy} for {Target_Person_type} lenght of the summary should be of {Response_length} words and writing style should be of {Writing_style}.
197
+
198
+ {text}
199
+
200
+ output Format should be like this.Dont try Return to multiple summaries.Only return one combined summary for above mentioned summaries.
201
+
202
+ Summary:
203
+ blah blah blah
204
+
205
+ """
206
+
207
+
208
+
209
+ map_template = PromptTemplate(template=map_prompts, input_variables=['text']
210
+ )
211
+ combine_template = PromptTemplate(
212
+ template=combine_prompt, input_variables=['Summary_type','Summary_strategy','Target_Person_type','Response_length','Writing_style','text'])
213
+
214
+ return map_template, combine_template
215
+ # pass
216
+
217
+ def define_chain(self,Summary_type,Summary_strategy,
218
+ Target_Person_type,Response_length,Writing_style,chain_type=None,key_information=False):
219
+
220
+
221
+ docs, chain_type = self.load_docs()
222
+ llm = ChatOpenAI(model='gpt-3.5-turbo', temperature=0)
223
+
224
+ if chain_type == 'stuff':
225
+ if key_information:
226
+ prompt = self.get_key_information_stuff()
227
+ else:
228
+ prompt = self.get_stuff_prompt()
229
+ chain = load_summarize_chain(
230
+ llm=llm, chain_type='stuff', verbose=False,prompt=prompt)
231
+
232
+ elif chain_type == 'map_reduce':
233
+
234
+ if key_information:
235
+ map_prompts, combine_prompt = self.get_key_information_map_reduce()
236
+ else:
237
+ map_prompts, combine_prompt = self.define_prompts()
238
+
239
+ chain = load_summarize_chain(
240
+ llm=llm, map_prompt=map_prompts, combine_prompt=combine_prompt, chain_type='map_reduce', verbose=False)
241
+
242
+ # elif chain_type == 'refine':
243
+
244
+ # chain = load_summarize_chain(llm=llm, question_prompt=map_prompts,
245
+ # refine_prompt=combine_prompt, chain_type='refine', verbose=False)
246
+ if ~key_information:
247
+ output = chain.run(Summary_type=Summary_type,Summary_strategy=Summary_strategy,
248
+ Target_Person_type=Target_Person_type,Response_length=Response_length,Writing_style=Writing_style,input_documents = docs)
249
+ else:
250
+ output = chain.run(input_documents = docs)
251
+
252
+ # self.create_wordcloud(output=output)
253
+ # display(Markdown(f"Text: {docs}"))
254
+ # display(Markdown(f"Summary Response: {output}"))
255
+ return output
256
+
257
+ def create_wordcloud(self, output):
258
+ wc = WordCloud(stopwords=STOPWORDS, height=500, width=300)
259
+ wc.generate(output)
260
+ wc.to_file('WordCloud.png')
261
+
262
+
263
+ class AudioBookNarration:
264
+
265
+ def __init__(self,text_input ,file_path=None, chunks=2000, chunking_strategy=None, LLM_Model="gpt-3.5-turbo", temperature=1, top_p=None, top_k=None):
266
+ self.chunks = chunks
267
+ self.file_path = file_path
268
+ self.text_input = text_input
269
+ self.chuking_strategy = chunking_strategy
270
+ self.LLM_Model = LLM_Model
271
+ self.temperature = temperature
272
+ self.top_p = top_p
273
+ self.top_k = top_k
274
+
275
+
276
+ def doc_summary(self, docs):
277
+ # print(f'You have {len(docs)} documents')
278
+ num_words = sum([len(doc.page_content.split(" ")) for doc in docs])
279
+ # print(f"You have {num_words} words in documents")
280
+ return num_words, len(docs)
281
+
282
+ def load_docs(self):
283
+
284
+ if self.file_path is not None:
285
+ docs = DirectoryLoader(self.file_path, glob="**/*.txt").load()
286
+ else:
287
+
288
+ docs = Document(page_content=f"{self.text_input}", metadata={"source": "local"})
289
+ docs = [docs]
290
+ # docs = self.text_input
291
+ tokens, documents_count = self.doc_summary(docs)
292
+
293
+ if documents_count > 8 or tokens > 6000: ## Add token checks as well. Add Model availabilty checks
294
+ docs = self.chunk_docs(docs) ## Handling Large Document with token more than 6000
295
+ docs = self.summarise_large_documents(docs)
296
+ tokens, documents_count = self.doc_summary(docs)
297
+
298
+ if tokens > 2000:
299
+ docs = self.chunk_docs(docs)
300
+ chain_type = 'map_reduce'
301
+ else:
302
+ chain_type = 'stuff'
303
+
304
+ print("=="*20)
305
+ print(tokens)
306
+ print(chain_type)
307
+ return docs, chain_type
308
+
309
+ ## Add ensemble retriver for this as well.
310
+
311
+ def summarise_large_documents(self, docs):
312
+ print("=="*20)
313
+ print('Orignial Docs size : ' ,len(docs))
314
+ embeddings = OpenAIEmbeddings()
315
+ vectors = embeddings. embed_documents([x.page_content for x in docs])
316
+
317
+ # Silhoute Score
318
+ n_clusters_range = range(2, 11)
319
+ silhouette_scores = []
320
+ for i in n_clusters_range:
321
+ kmeans = KMeans(n_clusters=i, init='k-means++',
322
+ max_iter=300, n_init=10, random_state=0)
323
+ kmeans.fit(vectors)
324
+ score = silhouette_score(vectors, kmeans.labels_)
325
+ silhouette_scores.append(score)
326
+
327
+ optimal_n_clusters = n_clusters_range[np.argmax(silhouette_scores)]
328
+ # n_clusters = 5
329
+ kmeans = KMeans(n_clusters=optimal_n_clusters,
330
+ random_state=42).fit(vectors)
331
+
332
+ # Getting documents closers to centeriod
333
+ closest_indices = []
334
+ # Loop through the number of clusters you have
335
+ for i in range(optimal_n_clusters):
336
+ # Get the list of distances from that particular cluster center
337
+ distances = np.linalg.norm(
338
+ vectors - kmeans.cluster_centers_[i], axis=1)
339
+ # Find the list position of the closest one (using argmin to find the smallest distance)
340
+ closest_index = np.argmin(distances)
341
+ # Append that position to your closest indices list
342
+ closest_indices.append(closest_index)
343
+
344
+ sorted_indices = sorted(closest_indices)
345
+ selected_docs = [docs[doc] for doc in sorted_indices]
346
+
347
+ print('Selected Docs size : ' ,len(selected_docs))
348
+
349
+ return selected_docs
350
+
351
+ def chunk_docs(self, docs):
352
+
353
+ text_splitter = RecursiveCharacterTextSplitter(
354
+ chunk_size=self.chunks,
355
+ chunk_overlap=50,
356
+ length_function=len,
357
+ is_separator_regex=False,
358
+ )
359
+ splitted_document = text_splitter.split_documents(docs)
360
+
361
+ return splitted_document
362
+
363
+
364
+
365
+ def get_stuff_prompt(self):
366
+
367
+ prompt_template = """
368
+ Create a {Narration_style} narration for this below text. This narration will be used for audiobook generation.
369
+ So provide the output that is verbose, easier to understand and full of expressions.
370
+ {text}
371
+
372
+ """
373
+ prompt = PromptTemplate(
374
+ template=prompt_template, input_variables=['Narration_style','text'])
375
+
376
+
377
+ return prompt
378
+
379
+ def define_prompts(self):
380
+
381
+ map_prompts = """
382
+ Create a {Narration_style} narration for this below text. This narration will be used for audiobook generation.
383
+ So provide the output that is verbose, easier to understand and full of expressions.
384
+ {text}
385
+ """
386
+
387
+ combine_prompt = """
388
+ Below are the list of text that represent narration from the text.
389
+ Your job is to combine these narrations and craete one verbose,easier to understand and full of experssions {Narration_style} narration.
390
+ {text}
391
+
392
+ """
393
+
394
+
395
+
396
+ map_template = PromptTemplate(template=map_prompts, input_variables=['Narration_style','text']
397
+ )
398
+ combine_template = PromptTemplate(
399
+ template=combine_prompt, input_variables=['Narration_style','text'])
400
+
401
+ return map_template, combine_template
402
+ # pass
403
+
404
+ def define_chain(self,Narration_style=None,chain_type=None):
405
+
406
+
407
+ docs, chain_type = self.load_docs()
408
+ llm = ChatOpenAI(model='gpt-3.5-turbo', temperature=0)
409
+
410
+ if chain_type == 'stuff':
411
+
412
+ prompt = self.get_stuff_prompt()
413
+ chain = load_summarize_chain(
414
+ llm=llm, chain_type='stuff', verbose=False,prompt=prompt)
415
+
416
+ elif chain_type == 'map_reduce':
417
+
418
+ map_prompts, combine_prompt = self.define_prompts()
419
+ chain = load_summarize_chain(
420
+ llm=llm, map_prompt=map_prompts, combine_prompt=combine_prompt, chain_type='map_reduce', verbose=False)
421
+
422
+
423
+ output = chain.run(Narration_style = Narration_style,input_documents = docs)
424
+
425
+ # self.create_wordcloud(output=output)
426
+ # display(Markdown(f"Text: {docs}"))
427
+ # display(Markdown(f"Summary Response: {output}"))
428
+ return output
429
+
transcription.py ADDED
@@ -0,0 +1,221 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os
2
+ from abc import ABC, abstractmethod
3
+
4
+ from youtube_transcript_api import YouTubeTranscriptApi
5
+ from youtube_transcript_api.formatters import SRTFormatter, WebVTTFormatter
6
+
7
+ # import whisperx
8
+ import stable_whisper
9
+ from faster_whisper import WhisperModel
10
+
11
+
12
+
13
+ class Transcription(ABC):
14
+
15
+ def __init__(self, media_path, output_path, subtitle_format):
16
+ self.media_path = media_path
17
+ self.output_path = os.path.join(os.getcwd(), output_path)
18
+ self.filename = os.path.splitext(media_path)[0]
19
+ self.subtitle_format = subtitle_format
20
+
21
+ @abstractmethod
22
+ def generate_transcript(self):
23
+ pass
24
+
25
+ @abstractmethod
26
+ def save_transcript(self):
27
+ pass
28
+
29
+ class YouTubeTranscriptAPI(Transcription):
30
+ def __init__(self, url, media_path, output_path, subtitle_format='srt', transcript_language='en'):
31
+ super().__init__(media_path, output_path, subtitle_format)
32
+ self.url = url
33
+ self.video_id = url.split('v=')[1]
34
+ self.transcript_language = transcript_language
35
+ self.supported_subtitle_formats = ['srt', 'vtt']
36
+ assert(self.subtitle_format.lower() in self.supported_subtitle_formats)
37
+
38
+ def get_available_transcripts(self):
39
+ '''
40
+ Returns a dictionary of available transcripts & their info
41
+ '''
42
+
43
+ # Getting List of all Available Transcripts
44
+ transcript_list = YouTubeTranscriptApi.list_transcripts(self.video_id)
45
+
46
+ # Converting to Available Transcripts to Dictionary
47
+ transcripts_info = dict()
48
+ for transcript in transcript_list:
49
+ transcript_info = {
50
+ 'language': transcript.language,
51
+ 'is_generated': transcript.is_generated,
52
+ 'is_translatable': transcript.is_translatable
53
+ }
54
+ transcripts_info[transcript.language_code] = transcript_info
55
+ return transcripts_info
56
+
57
+ def generate_transcript(self):
58
+ '''
59
+ Generates the transcript for the media file
60
+ '''
61
+ self.transcript = YouTubeTranscriptApi.get_transcript(self.video_id, languages=[self.transcript_language])
62
+
63
+ def save_transcript(self):
64
+ '''
65
+ Writes the transcript into file
66
+ '''
67
+
68
+ # Getting the Formatter
69
+ if self.subtitle_format == 'srt':
70
+ formatter = SRTFormatter()
71
+ elif self.subtitle_format == 'vtt':
72
+ formatter = WebVTTFormatter()
73
+
74
+ # Getting the Formatted Transcript
75
+ formatted_transcript = formatter.format_transcript(self.transcript)
76
+
77
+ # Writing the Formatted Transcript
78
+ file_path = f'{self.filename}.{self.subtitle_format}'
79
+ with open(file_path, 'w', encoding='utf-8') as transcript_file:
80
+ transcript_file.write(formatted_transcript)
81
+ return file_path
82
+
83
+
84
+ class Whisper(Transcription):
85
+ def __init__(self, media_path, output_path, subtitle_format, word_level):
86
+ super().__init__(media_path, output_path, subtitle_format)
87
+ self.word_level = word_level
88
+ self.supported_subtitle_formats = ['ass', 'srt', 'vtt']
89
+ assert(self.subtitle_format.lower() in self.supported_subtitle_formats)
90
+
91
+
92
+ class FasterWhisper(Whisper):
93
+ def __init__(self, media_path, output_path, subtitle_format='srt', word_level=True):
94
+ super().__init__(media_path, output_path, subtitle_format, word_level)
95
+ self.model = WhisperModel("large-v2", device="cuda", compute_type="float16")
96
+
97
+ def generate_transcript(self):
98
+ '''
99
+ Generates the transcript for the media file
100
+ '''
101
+
102
+ all_text = []
103
+ all_segments = []
104
+
105
+ if self.word_level:
106
+
107
+ # Generating Word Level Transcript
108
+ segments, info = self.model.transcribe(self.media_path, word_timestamps=True)
109
+
110
+ # Converting to Dictionary
111
+ all_segments = []
112
+ for segment in segments:
113
+ for word in segment.words:
114
+ all_text.append(word.word)
115
+ segment_info = {
116
+ 'text': word.word,
117
+ 'start': round(word.start, 2),
118
+ 'end': round(word.end, 2)
119
+ }
120
+ all_segments.append(segment_info)
121
+
122
+ else:
123
+
124
+ # Generating Word Level Transcript
125
+ segments, info = self.model.transcribe(self.media_path, beam_size=5)
126
+
127
+ # Converting to Dictionary
128
+ for segment in segments:
129
+ all_text.append(segment.text)
130
+ segment_info = {
131
+ 'text': segment.text,
132
+ 'start': round(segment.start, 2),
133
+ 'end': round(segment.end, 2)
134
+ }
135
+ all_segments.append(segment_info)
136
+
137
+ # Setting Transcript Properties
138
+ self.text = ' '.join(all_text)
139
+ self.language = info.language
140
+ self.segments = all_segments
141
+
142
+ # Returning Transcript Properties as Dictionary
143
+ transcript_dict = {
144
+ 'language': self.language,
145
+ 'text': self.text,
146
+ 'segments': self.segments
147
+ }
148
+ return transcript_dict
149
+
150
+
151
+ def save_transcript(self, transcript, output_file):
152
+ '''
153
+ Writes the transcript into file
154
+ '''
155
+ # TODO: Can't seem to find any built-in methods for writing transcript
156
+ pass
157
+
158
+ class StableWhisper(Whisper):
159
+ def __init__(self, media_path, output_path, subtitle_format='srt', word_level=True):
160
+ super().__init__(media_path, output_path, subtitle_format, word_level)
161
+ self.model = stable_whisper.load_model('large-v2')
162
+
163
+ def generate_transcript(self):
164
+ '''
165
+ Generates the transcript for the media file
166
+ '''
167
+
168
+ # Generating Word Level Transcript
169
+ self.result = self.model.transcribe(self.media_path, word_timestamps=self.word_level)
170
+
171
+ # Converting to Dictionary
172
+ self.resultdict = self.result.to_dict()
173
+
174
+ # Formatting Dictionary
175
+ all_segments = []
176
+ if self.word_level:
177
+
178
+ all_segments = []
179
+ for segment in self.resultdict['segments']:
180
+ for word in segment['words']:
181
+ segment_info = {
182
+ 'text': word['word'],
183
+ 'start': round(word['start'], 2),
184
+ 'end': round(word['end'], 2)
185
+ }
186
+ all_segments.append(segment_info)
187
+
188
+ else:
189
+
190
+ for segment in self.resultdict['segments']:
191
+ segment_info = {
192
+ 'text': segment['text'],
193
+ 'start': round(segment['start'], 2),
194
+ 'end': round(segment['end'], 2)
195
+ }
196
+ all_segments.append(segment_info)
197
+
198
+ # Setting Transcript Properties
199
+ self.text = self.resultdict['text']
200
+ self.language = self.resultdict['language']
201
+ self.segments = all_segments
202
+
203
+ # Returning Transcript Properties as Dictionary
204
+ transcript_dict = {
205
+ 'language': self.language,
206
+ 'text': self.text,
207
+ 'segments': self.segments
208
+ }
209
+ return transcript_dict
210
+
211
+ def save_transcript(self):
212
+ '''
213
+ Writes the transcript into file
214
+ '''
215
+ # Writing according to the Format
216
+ file_path = f'{self.filename}.{self.subtitle_format}'
217
+ if self.subtitle_format == 'ass':
218
+ self.result.to_ass(file_path, segment_level=True, word_level=self.word_level)
219
+ elif self.subtitle_format in ['srt', 'vtt']:
220
+ self.result.to_srt_vtt(file_path, segment_level=True, word_level=self.word_level)
221
+ return file_path