legen / ui.py
RafaG's picture
Upload 24 files
5fa5566 verified
import flet as ft
import os
from pathlib import Path
import subprocess
from typing import Optional
import asyncio
import threading
import sys
import os
import sys
from i18n.i18n import I18nAuto
i18n = I18nAuto()
# Força o uso de UTF-8 para o Python
os.environ["PYTHONIOENCODING"] = "utf-8"
# Para garantir que a codificação esteja correta no terminal
if sys.platform == "win32":
# No Windows, podemos forçar a codificação UTF-8 no console
os.system('chcp 65001')
# Redefine a configuração de codificação da saída padrão
sys.stdout.reconfigure(encoding='utf-8')
sys.stderr.reconfigure(encoding='utf-8')
import re
def clean_ansi(text: str) -> str:
"""Remove códigos ANSI e limpa o texto para exibição no Flet"""
ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])')
return ansi_escape.sub('', text)
def process_output(line: str) -> str:
"""Processa uma linha de saída, limpando-a apenas para exibição no Flet"""
clean_line = clean_ansi(line.strip())
# Verifica por padrões de progresso
if "%" in clean_line:
try:
# Extrai o número do progresso
progress_match = re.search(r'(\d+\.?\d*)%', clean_line)
if progress_match:
progress_value = float(progress_match.group(1)) / 100
return clean_line, progress_value
except ValueError:
pass
return clean_line, None
class LeGenUI:
def __init__(self):
self.input_path: Optional[Path] = None
self.selected_file_text: Optional[ft.Text] = None
self.dark_mode = True
self.progress = None
self.progress_text = None
self.page = None
self.processing = False
self.current_process = None
self.run_button = None
def create_credits_dialog(self):
async def close_dialog(e):
self.credits_dialog.open = False
await self.page.update_async()
async def open_url(e, url):
await self.page.launch_url_async(url)
return ft.AlertDialog(
modal=True,
title=ft.Text("Créditos", size=20, weight=ft.FontWeight.BOLD),
content=ft.Column([
ft.Text("LeGen - Legendas Generator", weight=ft.FontWeight.BOLD),
ft.Text("Desenvolvido com ❤️ pela comunidade"),
ft.Text(""),
ft.Text("Desenvolvedores:", weight=ft.FontWeight.BOLD),
ft.Text("• Matheus bach"),
ft.Text("• UI por Rafa.Godoy"),
ft.Text("• Colaboradores do GitHub"),
ft.Text(""),
ft.Text("Links:", weight=ft.FontWeight.BOLD),
ft.TextButton(
"GitHub",
icon=ft.icons.CODE,
on_click=lambda e: self.page.launch_url("https://github.com/matheusbach/legen")
),
ft.TextButton(
"Telegram",
icon=ft.icons.TELEGRAM,
on_click=lambda e: self.page.launch_url("https://t.me/+Q_Czz4LbbrA0Y2Ux")
),
ft.Text(""),
ft.Text("Tecnologias:", weight=ft.FontWeight.BOLD),
ft.Text("• Python"),
ft.Text("• WhisperX/Whisper"),
ft.Text("• FFmpeg"),
ft.Text("• Flet UI Framework"),
], tight=True),
actions=[
ft.TextButton("Fechar", on_click=close_dialog),
],
actions_alignment=ft.MainAxisAlignment.END,
)
def main(self, page: ft.Page):
self.page = page
page.title = "LeGen UI"
page.padding = 20
page.theme_mode = ft.ThemeMode.DARK if self.dark_mode else ft.ThemeMode.LIGHT
page.window.min_width = 1280
page.window.min_height = 720
page.scroll = ft.ScrollMode.AUTO
def toggle_dark_mode(e):
self.dark_mode = not self.dark_mode
page.theme_mode = ft.ThemeMode.DARK if self.dark_mode else ft.ThemeMode.LIGHT
page.update()
async def show_credits(e):
self.credits_dialog = self.create_credits_dialog()
self.page.dialog = self.credits_dialog
self.credits_dialog.open = True
await self.page.update_async()
async def pick_folder_result(e: ft.FilePickerResultEvent):
if e.path:
self.input_path = Path(e.path)
self.selected_file_text.value = f"{i18n('Selected folder:')} {self.input_path}"
await page.update_async()
async def pick_output_softsubs_result(e: ft.FilePickerResultEvent):
if e.path:
output_softsubs_path.value = e.path
await page.update_async()
async def pick_output_hardsubs_result(e: ft.FilePickerResultEvent):
if e.path:
output_hardsubs_path.value = e.path
await page.update_async()
#file_picker = ft.FilePicker(on_result=pick_file_result)
folder_picker = ft.FilePicker(on_result=pick_folder_result)
softsubs_folder_picker = ft.FilePicker(on_result=pick_output_softsubs_result)
hardsubs_folder_picker = ft.FilePicker(on_result=pick_output_hardsubs_result)
page.overlay.extend([folder_picker, softsubs_folder_picker, hardsubs_folder_picker])
self.selected_file_text = ft.Text(i18n("No folder selected"))
input_section = ft.Container(
content=ft.Column([
ft.Row([
ft.ElevatedButton(i18n("Select Folder"), on_click=lambda _: folder_picker.get_directory_path()),
]),
self.selected_file_text,
]),
padding=10,
)
transcription_engines = ["whisperx", "whisper"]
transcription_models = ["tiny", "base", "small", "medium", "large", "large-v1", "large-v2", "large-v3", "large-v3-turbo"]
compute_types = ["auto", "int8", "float16", "float32"]
devices = ["auto", "cpu", "cuda"]
languages = ["auto", "en", "es", "pt", "fr", "de", "it", "ja", "ko", "zh"]
video_codecs = ["h264", "libx264", "h264_vaapi", "h264_nvenc", "hevc", "libx265", "hevc_vaapi"]
audio_codecs = ["aac", "libopus", "mp3", "vorbis"]
transcription_engine = ft.Dropdown(
label=i18n("Transcription Engine"),
options=[ft.dropdown.Option(e) for e in transcription_engines],
value="whisperx",
expand=1
)
transcription_model = ft.Dropdown(
label=i18n("Model"),
options=[ft.dropdown.Option(m) for m in transcription_models],
value="medium",
expand=1
)
compute_type = ft.Dropdown(
label=i18n("Compute Type"),
options=[ft.dropdown.Option(t) for t in compute_types],
value="auto",
expand=1
)
device = ft.Dropdown(
label=i18n("Device"),
options=[ft.dropdown.Option(d) for d in devices],
value="auto",
expand=1
)
batch_size = ft.TextField(
label=i18n("Batch Size"),
value="4",
expand=1,
input_filter=ft.NumbersOnlyInputFilter()
)
input_lang = ft.Dropdown(
label=i18n("Input Language"),
options=[ft.dropdown.Option(l) for l in languages],
value="auto",
expand=1
)
translate_lang = ft.Dropdown(
label=i18n("Translate to"),
options=[ft.dropdown.Option(l) for l in languages],
value="none",
expand=1
)
video_codec = ft.Dropdown(
label=i18n("Video Codec"),
options=[ft.dropdown.Option(c) for c in video_codecs],
value="h264",
expand=1
)
audio_codec = ft.Dropdown(
label=i18n("Audio Codec"),
options=[ft.dropdown.Option(c) for c in audio_codecs],
value="aac",
expand=1
)
normalize = ft.Checkbox(label=i18n("Normalize folder times"), value=False)
overwrite = ft.Checkbox(label=i18n("Overwrite existing files"), value=False)
copy_files = ft.Checkbox(label=i18n("Copy non-video files"), value=False)
disable_srt = ft.Checkbox(label=i18n("Disable SRT generation"), value=False)
disable_softsubs = ft.Checkbox(label=i18n("Disable softsubs"), value=False)
disable_hardsubs = ft.Checkbox(label=i18n("Disable hardsubs"), value=False)
output_softsubs_path = ft.TextField(
label=i18n("Softsubs Output Path"),
value="softsubs_output",
expand=1
)
output_hardsubs_path = ft.TextField(
label=i18n("Hardsubs Output Path"),
value="hardsubs_output",
expand=1
)
output_paths = ft.Container(
content=ft.Column([
ft.Row([
output_softsubs_path,
ft.ElevatedButton(i18n("Browse"), on_click=lambda _: softsubs_folder_picker.get_directory_path()),
]),
ft.Row([
output_hardsubs_path,
ft.ElevatedButton(i18n("Browse"), on_click=lambda _: hardsubs_folder_picker.get_directory_path()),
]),
]),
padding=10,
)
async def show_alert(msg: str, color: str = "error"):
banner = ft.Banner(
bgcolor=color,
leading=ft.Icon(ft.icons.WARNING_AMBER_ROUNDED if color == "error" else ft.icons.INFO),
content=ft.Text(msg),
actions=[
ft.TextButton("Ok", on_click=lambda _: self.page.overlay.remove(banner)),
],
)
self.page.overlay.append(banner)
await page.update_async()
async def stop_processing():
if self.current_process:
self.current_process.terminate()
self.current_process = None
self.processing = False
self.run_button.text = i18n("Run LeGen")
await page.update_async()
await show_alert(i18n("Processing stopped by user"), "info")
await update_progress(False)
async def update_progress(visible: bool, progress: float = 0):
self.progress.visible = visible
self.progress_text.visible = visible
if visible:
self.progress.value = progress
await page.update_async()
async def update_ui(text):
if not self.progress_text:
return
self.progress_text.value = text
await page.update_async()
import locale
def process_in_thread(cmd):
print(f"Executing command: {' '.join(cmd)}")
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
self.current_process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE, # Redireciona stderr também
text=True,
encoding='utf-8',
bufsize=1, # Linha a linha
universal_newlines=True
)
async def read_output():
final_message = None # Variável para armazenar a mensagem final
while True:
if self.current_process is None:
break
# Leitura de stdout
output = self.current_process.stdout.readline()
if output:
print(f"{output.strip()}")
#print(f"Process output (with ANSI): {output.strip()}")
clean_line, progress_value = process_output(output.strip())
# Verifica se a mensagem final está presente e armazena
if "⌛ Processing files for" in clean_line:
final_message = clean_line # Armazena a mensagem final
await update_ui(clean_line)
if progress_value is not None:
await update_progress(True, progress_value)
# Leitura de stderr (para capturar possíveis erros do FFmpeg)
error_output = self.current_process.stderr.readline()
if error_output:
#print(f"Process error: {error_output.strip()}") # Exibe stderr no terminal
clean_line, _ = process_output(error_output.strip())
await update_ui(clean_line)
# Verifica se o processo terminou
if output == '' and error_output == '' and self.current_process.poll() is not None:
break
# Após a conclusão, exibe a mensagem final, se ela existir
if final_message:
await update_ui(final_message)
loop.run_until_complete(read_output())
if self.current_process:
returncode = self.current_process.poll()
if returncode is None:
self.current_process.terminate()
self.current_process.wait()
stdout, stderr = self.current_process.communicate()
loop.run_until_complete(update_progress(False))
if returncode == 0:
loop.run_until_complete(show_alert("Processing completed successfully!", "success"))
else:
print(f"Process error: {stderr}")
loop.run_until_complete(show_alert(f"Error: {stderr}"))
except Exception as e:
print(f"Exception occurred: {str(e)}")
loop.run_until_complete(update_progress(False))
loop.run_until_complete(show_alert(f"Error: {str(e)}"))
finally:
loop.close()
async def reset_ui():
self.processing = False
self.run_button.text = "Run LeGen"
await page.update_async()
loop.run_until_complete(reset_ui())
async def _reset_button():
self.processing = False
self.run_button.text = "Run LeGen"
await self.page.update_async()
async def run_legen(e):
if self.processing:
await stop_processing()
return
if not self.input_path:
await show_alert(i18n("Please select a folder"))
return
if not os.path.exists(str(self.input_path)):
await show_alert(i18n("Selected folder path does not exist"))
return
if not os.path.exists("legen.py"):
await show_alert(i18n("legen.py not found in current directory"))
return
cmd = ["python", "legen.py", "-i", str(self.input_path)]
if normalize.value:
cmd.append("--norm")
if overwrite.value:
cmd.append("--overwrite")
if copy_files.value:
cmd.append("--copy_files")
if disable_srt.value:
cmd.append("--disable_srt")
if disable_softsubs.value:
cmd.append("--disable_softsubs")
if disable_hardsubs.value:
cmd.append("--disable_hardsubs")
cmd.extend(["-ts:e", transcription_engine.value])
cmd.extend(["-ts:m", transcription_model.value])
cmd.extend(["-ts:d", device.value])
cmd.extend(["-ts:c", compute_type.value])
cmd.extend(["-ts:b", batch_size.value])
if translate_lang.value != "none":
cmd.extend(["--translate", translate_lang.value])
if input_lang.value != "auto":
cmd.extend(["--input_lang", input_lang.value])
cmd.extend(["-c:v", video_codec.value])
cmd.extend(["-c:a", audio_codec.value])
if output_softsubs_path.value:
cmd.extend(["-o:s", output_softsubs_path.value])
if output_hardsubs_path.value:
cmd.extend(["-o:h", output_hardsubs_path.value])
self.processing = True
self.run_button.text = i18n("Stop LeGen")
self.page.update()
try:
await update_progress(True, 0)
await asyncio.get_event_loop().run_in_executor(None, process_in_thread, cmd)
except Exception as e:
#print(f"Error during execution: {str(e)}")
await show_alert(f"Error during execution: {str(e)}")
finally:
self.processing = False
self.run_button.text = i18n("Run LeGen")
await update_progress(False)
#await page.update_async() # Ocultar a barra de progresso após a conclusão
# Você pode adicionar um alerta ou mensagem final aqui se necessário
await show_alert("Processing has finished.", "info")
# A seção do layout pode ficar inalterada a menos que você precise de alterações.
# Progress bar configuration
self.progress = ft.ProgressBar(width=400, color="blue", visible=False, value=0)
self.progress_text = ft.Text(i18n("Processing..."), visible=False)
progress_section = ft.Container(
content=ft.Column([
self.progress,
self.progress_text,
])
)
self.run_button = ft.ElevatedButton(i18n("Run LeGen"), on_click=run_legen, width=200)
content = ft.Container(
content=ft.Column([
ft.Row([
ft.Text("LeGen UI", size=30, weight=ft.FontWeight.BOLD),
ft.Row([
ft.ElevatedButton(
"Créditos",
on_click=show_credits,
icon=ft.icons.INFO
),
ft.IconButton(
icon=ft.icons.DARK_MODE if not self.dark_mode else ft.icons.LIGHT_MODE,
on_click=toggle_dark_mode,
),
]),
], alignment=ft.MainAxisAlignment.SPACE_BETWEEN),
input_section,
ft.Divider(),
ft.Text(i18n("Transcription Settings"), size=20, weight=ft.FontWeight.BOLD),
ft.ResponsiveRow([
ft.Column([transcription_engine], col=6),
ft.Column([transcription_model], col=6),
]),
ft.ResponsiveRow([
ft.Column([compute_type], col=6),
ft.Column([device], col=6),
]),
ft.ResponsiveRow([
ft.Column([batch_size], col=6),
ft.Column([input_lang], col=6),
]),
ft.ResponsiveRow([
ft.Column([translate_lang], col=6),
]),
ft.Divider(),
ft.Text(i18n("Output Settings"), size=20, weight=ft.FontWeight.BOLD),
ft.ResponsiveRow([
ft.Column([video_codec], col=6),
ft.Column([audio_codec], col=6),
]),
output_paths,
ft.Divider(),
ft.Text(i18n("Options"), size=20, weight=ft.FontWeight.BOLD),
ft.ResponsiveRow([
ft.Column([normalize], col=4),
ft.Column([overwrite], col=4),
ft.Column([copy_files], col=4),
]),
ft.ResponsiveRow([
ft.Column([disable_srt], col=4),
ft.Column([disable_softsubs], col=4),
ft.Column([disable_hardsubs], col=4),
]),
ft.Divider(),
progress_section,
ft.ElevatedButton(i18n("Run LeGen"), on_click=run_legen, width=200),
]),
padding=20
)
# Add content to a scrollable container
page.add(
ft.Container(
content=content,
expand=True,
)
)
if __name__ == '__main__':
app = LeGenUI()
ft.app(target=app.main)