import flet as ft import os from pathlib import Path import subprocess from typing import Optional import asyncio import threading import sys import os import sys from i18n.i18n import I18nAuto i18n = I18nAuto() # Força o uso de UTF-8 para o Python os.environ["PYTHONIOENCODING"] = "utf-8" # Para garantir que a codificação esteja correta no terminal if sys.platform == "win32": # No Windows, podemos forçar a codificação UTF-8 no console os.system('chcp 65001') # Redefine a configuração de codificação da saída padrão sys.stdout.reconfigure(encoding='utf-8') sys.stderr.reconfigure(encoding='utf-8') import re def clean_ansi(text: str) -> str: """Remove códigos ANSI e limpa o texto para exibição no Flet""" ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])') return ansi_escape.sub('', text) def process_output(line: str) -> str: """Processa uma linha de saída, limpando-a apenas para exibição no Flet""" clean_line = clean_ansi(line.strip()) # Verifica por padrões de progresso if "%" in clean_line: try: # Extrai o número do progresso progress_match = re.search(r'(\d+\.?\d*)%', clean_line) if progress_match: progress_value = float(progress_match.group(1)) / 100 return clean_line, progress_value except ValueError: pass return clean_line, None class LeGenUI: def __init__(self): self.input_path: Optional[Path] = None self.selected_file_text: Optional[ft.Text] = None self.dark_mode = True self.progress = None self.progress_text = None self.page = None self.processing = False self.current_process = None self.run_button = None def create_credits_dialog(self): async def close_dialog(e): self.credits_dialog.open = False await self.page.update_async() async def open_url(e, url): await self.page.launch_url_async(url) return ft.AlertDialog( modal=True, title=ft.Text("Créditos", size=20, weight=ft.FontWeight.BOLD), content=ft.Column([ ft.Text("LeGen - Legendas Generator", weight=ft.FontWeight.BOLD), ft.Text("Desenvolvido com ❤️ pela comunidade"), ft.Text(""), ft.Text("Desenvolvedores:", weight=ft.FontWeight.BOLD), ft.Text("• Matheus bach"), ft.Text("• UI por Rafa.Godoy"), ft.Text("• Colaboradores do GitHub"), ft.Text(""), ft.Text("Links:", weight=ft.FontWeight.BOLD), ft.TextButton( "GitHub", icon=ft.icons.CODE, on_click=lambda e: self.page.launch_url("https://github.com/matheusbach/legen") ), ft.TextButton( "Telegram", icon=ft.icons.TELEGRAM, on_click=lambda e: self.page.launch_url("https://t.me/+Q_Czz4LbbrA0Y2Ux") ), ft.Text(""), ft.Text("Tecnologias:", weight=ft.FontWeight.BOLD), ft.Text("• Python"), ft.Text("• WhisperX/Whisper"), ft.Text("• FFmpeg"), ft.Text("• Flet UI Framework"), ], tight=True), actions=[ ft.TextButton("Fechar", on_click=close_dialog), ], actions_alignment=ft.MainAxisAlignment.END, ) def main(self, page: ft.Page): self.page = page page.title = "LeGen UI" page.padding = 20 page.theme_mode = ft.ThemeMode.DARK if self.dark_mode else ft.ThemeMode.LIGHT page.window.min_width = 1280 page.window.min_height = 720 page.scroll = ft.ScrollMode.AUTO def toggle_dark_mode(e): self.dark_mode = not self.dark_mode page.theme_mode = ft.ThemeMode.DARK if self.dark_mode else ft.ThemeMode.LIGHT page.update() async def show_credits(e): self.credits_dialog = self.create_credits_dialog() self.page.dialog = self.credits_dialog self.credits_dialog.open = True await self.page.update_async() async def pick_folder_result(e: ft.FilePickerResultEvent): if e.path: self.input_path = Path(e.path) self.selected_file_text.value = f"{i18n('Selected folder:')} {self.input_path}" await page.update_async() async def pick_output_softsubs_result(e: ft.FilePickerResultEvent): if e.path: output_softsubs_path.value = e.path await page.update_async() async def pick_output_hardsubs_result(e: ft.FilePickerResultEvent): if e.path: output_hardsubs_path.value = e.path await page.update_async() #file_picker = ft.FilePicker(on_result=pick_file_result) folder_picker = ft.FilePicker(on_result=pick_folder_result) softsubs_folder_picker = ft.FilePicker(on_result=pick_output_softsubs_result) hardsubs_folder_picker = ft.FilePicker(on_result=pick_output_hardsubs_result) page.overlay.extend([folder_picker, softsubs_folder_picker, hardsubs_folder_picker]) self.selected_file_text = ft.Text(i18n("No folder selected")) input_section = ft.Container( content=ft.Column([ ft.Row([ ft.ElevatedButton(i18n("Select Folder"), on_click=lambda _: folder_picker.get_directory_path()), ]), self.selected_file_text, ]), padding=10, ) transcription_engines = ["whisperx", "whisper"] transcription_models = ["tiny", "base", "small", "medium", "large", "large-v1", "large-v2", "large-v3", "large-v3-turbo"] compute_types = ["auto", "int8", "float16", "float32"] devices = ["auto", "cpu", "cuda"] languages = ["auto", "en", "es", "pt", "fr", "de", "it", "ja", "ko", "zh"] video_codecs = ["h264", "libx264", "h264_vaapi", "h264_nvenc", "hevc", "libx265", "hevc_vaapi"] audio_codecs = ["aac", "libopus", "mp3", "vorbis"] transcription_engine = ft.Dropdown( label=i18n("Transcription Engine"), options=[ft.dropdown.Option(e) for e in transcription_engines], value="whisperx", expand=1 ) transcription_model = ft.Dropdown( label=i18n("Model"), options=[ft.dropdown.Option(m) for m in transcription_models], value="medium", expand=1 ) compute_type = ft.Dropdown( label=i18n("Compute Type"), options=[ft.dropdown.Option(t) for t in compute_types], value="auto", expand=1 ) device = ft.Dropdown( label=i18n("Device"), options=[ft.dropdown.Option(d) for d in devices], value="auto", expand=1 ) batch_size = ft.TextField( label=i18n("Batch Size"), value="4", expand=1, input_filter=ft.NumbersOnlyInputFilter() ) input_lang = ft.Dropdown( label=i18n("Input Language"), options=[ft.dropdown.Option(l) for l in languages], value="auto", expand=1 ) translate_lang = ft.Dropdown( label=i18n("Translate to"), options=[ft.dropdown.Option(l) for l in languages], value="none", expand=1 ) video_codec = ft.Dropdown( label=i18n("Video Codec"), options=[ft.dropdown.Option(c) for c in video_codecs], value="h264", expand=1 ) audio_codec = ft.Dropdown( label=i18n("Audio Codec"), options=[ft.dropdown.Option(c) for c in audio_codecs], value="aac", expand=1 ) normalize = ft.Checkbox(label=i18n("Normalize folder times"), value=False) overwrite = ft.Checkbox(label=i18n("Overwrite existing files"), value=False) copy_files = ft.Checkbox(label=i18n("Copy non-video files"), value=False) disable_srt = ft.Checkbox(label=i18n("Disable SRT generation"), value=False) disable_softsubs = ft.Checkbox(label=i18n("Disable softsubs"), value=False) disable_hardsubs = ft.Checkbox(label=i18n("Disable hardsubs"), value=False) output_softsubs_path = ft.TextField( label=i18n("Softsubs Output Path"), value="softsubs_output", expand=1 ) output_hardsubs_path = ft.TextField( label=i18n("Hardsubs Output Path"), value="hardsubs_output", expand=1 ) output_paths = ft.Container( content=ft.Column([ ft.Row([ output_softsubs_path, ft.ElevatedButton(i18n("Browse"), on_click=lambda _: softsubs_folder_picker.get_directory_path()), ]), ft.Row([ output_hardsubs_path, ft.ElevatedButton(i18n("Browse"), on_click=lambda _: hardsubs_folder_picker.get_directory_path()), ]), ]), padding=10, ) async def show_alert(msg: str, color: str = "error"): banner = ft.Banner( bgcolor=color, leading=ft.Icon(ft.icons.WARNING_AMBER_ROUNDED if color == "error" else ft.icons.INFO), content=ft.Text(msg), actions=[ ft.TextButton("Ok", on_click=lambda _: self.page.overlay.remove(banner)), ], ) self.page.overlay.append(banner) await page.update_async() async def stop_processing(): if self.current_process: self.current_process.terminate() self.current_process = None self.processing = False self.run_button.text = i18n("Run LeGen") await page.update_async() await show_alert(i18n("Processing stopped by user"), "info") await update_progress(False) async def update_progress(visible: bool, progress: float = 0): self.progress.visible = visible self.progress_text.visible = visible if visible: self.progress.value = progress await page.update_async() async def update_ui(text): if not self.progress_text: return self.progress_text.value = text await page.update_async() import locale def process_in_thread(cmd): print(f"Executing command: {' '.join(cmd)}") loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: self.current_process = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, # Redireciona stderr também text=True, encoding='utf-8', bufsize=1, # Linha a linha universal_newlines=True ) async def read_output(): final_message = None # Variável para armazenar a mensagem final while True: if self.current_process is None: break # Leitura de stdout output = self.current_process.stdout.readline() if output: print(f"{output.strip()}") #print(f"Process output (with ANSI): {output.strip()}") clean_line, progress_value = process_output(output.strip()) # Verifica se a mensagem final está presente e armazena if "⌛ Processing files for" in clean_line: final_message = clean_line # Armazena a mensagem final await update_ui(clean_line) if progress_value is not None: await update_progress(True, progress_value) # Leitura de stderr (para capturar possíveis erros do FFmpeg) error_output = self.current_process.stderr.readline() if error_output: #print(f"Process error: {error_output.strip()}") # Exibe stderr no terminal clean_line, _ = process_output(error_output.strip()) await update_ui(clean_line) # Verifica se o processo terminou if output == '' and error_output == '' and self.current_process.poll() is not None: break # Após a conclusão, exibe a mensagem final, se ela existir if final_message: await update_ui(final_message) loop.run_until_complete(read_output()) if self.current_process: returncode = self.current_process.poll() if returncode is None: self.current_process.terminate() self.current_process.wait() stdout, stderr = self.current_process.communicate() loop.run_until_complete(update_progress(False)) if returncode == 0: loop.run_until_complete(show_alert("Processing completed successfully!", "success")) else: print(f"Process error: {stderr}") loop.run_until_complete(show_alert(f"Error: {stderr}")) except Exception as e: print(f"Exception occurred: {str(e)}") loop.run_until_complete(update_progress(False)) loop.run_until_complete(show_alert(f"Error: {str(e)}")) finally: loop.close() async def reset_ui(): self.processing = False self.run_button.text = "Run LeGen" await page.update_async() loop.run_until_complete(reset_ui()) async def _reset_button(): self.processing = False self.run_button.text = "Run LeGen" await self.page.update_async() async def run_legen(e): if self.processing: await stop_processing() return if not self.input_path: await show_alert(i18n("Please select a folder")) return if not os.path.exists(str(self.input_path)): await show_alert(i18n("Selected folder path does not exist")) return if not os.path.exists("legen.py"): await show_alert(i18n("legen.py not found in current directory")) return cmd = ["python", "legen.py", "-i", str(self.input_path)] if normalize.value: cmd.append("--norm") if overwrite.value: cmd.append("--overwrite") if copy_files.value: cmd.append("--copy_files") if disable_srt.value: cmd.append("--disable_srt") if disable_softsubs.value: cmd.append("--disable_softsubs") if disable_hardsubs.value: cmd.append("--disable_hardsubs") cmd.extend(["-ts:e", transcription_engine.value]) cmd.extend(["-ts:m", transcription_model.value]) cmd.extend(["-ts:d", device.value]) cmd.extend(["-ts:c", compute_type.value]) cmd.extend(["-ts:b", batch_size.value]) if translate_lang.value != "none": cmd.extend(["--translate", translate_lang.value]) if input_lang.value != "auto": cmd.extend(["--input_lang", input_lang.value]) cmd.extend(["-c:v", video_codec.value]) cmd.extend(["-c:a", audio_codec.value]) if output_softsubs_path.value: cmd.extend(["-o:s", output_softsubs_path.value]) if output_hardsubs_path.value: cmd.extend(["-o:h", output_hardsubs_path.value]) self.processing = True self.run_button.text = i18n("Stop LeGen") self.page.update() try: await update_progress(True, 0) await asyncio.get_event_loop().run_in_executor(None, process_in_thread, cmd) except Exception as e: #print(f"Error during execution: {str(e)}") await show_alert(f"Error during execution: {str(e)}") finally: self.processing = False self.run_button.text = i18n("Run LeGen") await update_progress(False) #await page.update_async() # Ocultar a barra de progresso após a conclusão # Você pode adicionar um alerta ou mensagem final aqui se necessário await show_alert("Processing has finished.", "info") # A seção do layout pode ficar inalterada a menos que você precise de alterações. # Progress bar configuration self.progress = ft.ProgressBar(width=400, color="blue", visible=False, value=0) self.progress_text = ft.Text(i18n("Processing..."), visible=False) progress_section = ft.Container( content=ft.Column([ self.progress, self.progress_text, ]) ) self.run_button = ft.ElevatedButton(i18n("Run LeGen"), on_click=run_legen, width=200) content = ft.Container( content=ft.Column([ ft.Row([ ft.Text("LeGen UI", size=30, weight=ft.FontWeight.BOLD), ft.Row([ ft.ElevatedButton( "Créditos", on_click=show_credits, icon=ft.icons.INFO ), ft.IconButton( icon=ft.icons.DARK_MODE if not self.dark_mode else ft.icons.LIGHT_MODE, on_click=toggle_dark_mode, ), ]), ], alignment=ft.MainAxisAlignment.SPACE_BETWEEN), input_section, ft.Divider(), ft.Text(i18n("Transcription Settings"), size=20, weight=ft.FontWeight.BOLD), ft.ResponsiveRow([ ft.Column([transcription_engine], col=6), ft.Column([transcription_model], col=6), ]), ft.ResponsiveRow([ ft.Column([compute_type], col=6), ft.Column([device], col=6), ]), ft.ResponsiveRow([ ft.Column([batch_size], col=6), ft.Column([input_lang], col=6), ]), ft.ResponsiveRow([ ft.Column([translate_lang], col=6), ]), ft.Divider(), ft.Text(i18n("Output Settings"), size=20, weight=ft.FontWeight.BOLD), ft.ResponsiveRow([ ft.Column([video_codec], col=6), ft.Column([audio_codec], col=6), ]), output_paths, ft.Divider(), ft.Text(i18n("Options"), size=20, weight=ft.FontWeight.BOLD), ft.ResponsiveRow([ ft.Column([normalize], col=4), ft.Column([overwrite], col=4), ft.Column([copy_files], col=4), ]), ft.ResponsiveRow([ ft.Column([disable_srt], col=4), ft.Column([disable_softsubs], col=4), ft.Column([disable_hardsubs], col=4), ]), ft.Divider(), progress_section, ft.ElevatedButton(i18n("Run LeGen"), on_click=run_legen, width=200), ]), padding=20 ) # Add content to a scrollable container page.add( ft.Container( content=content, expand=True, ) ) if __name__ == '__main__': app = LeGenUI() ft.app(target=app.main)