Spaces:
Sleeping
Sleeping
""" | |
TTS Dataset Collection Tool with Font Support and Enhanced Error Handling | |
""" | |
import os | |
import json | |
import nltk | |
import gradio as gr | |
from datetime import datetime | |
from pathlib import Path | |
import shutil | |
import logging | |
from typing import Dict, List, Tuple, Optional | |
import traceback | |
# Configure logging | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s - %(levelname)s - %(message)s' | |
) | |
logger = logging.getLogger(__name__) | |
# Font configurations | |
FONT_STYLES = { | |
"english_serif": { | |
"name": "Times New Roman", | |
"family": "serif", | |
"css": "font-family: 'Times New Roman', serif;" | |
}, | |
"english_sans": { | |
"name": "Arial", | |
"family": "sans-serif", | |
"css": "font-family: Arial, sans-serif;" | |
}, | |
"nastaliq": { | |
"name": "Nastaliq", | |
"family": "Jameel Noori Nastaleeq", | |
"css": "font-family: 'Jameel Noori Nastaleeq', serif;" | |
}, | |
"naskh": { | |
"name": "Naskh", | |
"family": "Traditional Arabic", | |
"css": "font-family: 'Traditional Arabic', serif;" | |
} | |
} | |
class TTSDatasetCollector: | |
"""Manages TTS dataset collection and organization with enhanced features""" | |
def __init__(self): | |
"""Initialize the TTS Dataset Collector""" | |
# Initialize NLTK | |
self._initialize_nltk() | |
# Set up paths and directories | |
self.root_path = Path(os.path.dirname(os.path.abspath(__file__))) / "dataset" | |
self.sentences: List[str] = [] | |
self.current_index: int = 0 | |
self.current_font: str = "english_serif" | |
self.setup_directories() | |
logger.info("TTS Dataset Collector initialized") | |
def _initialize_nltk(self) -> None: | |
"""Initialize NLTK with error handling""" | |
try: | |
nltk.download('punkt', quiet=True) | |
logger.info("NLTK punkt tokenizer downloaded successfully") | |
except Exception as e: | |
logger.error(f"Failed to download NLTK data: {str(e)}") | |
logger.error(traceback.format_exc()) | |
raise RuntimeError("Failed to initialize NLTK. Please check your internet connection.") | |
def setup_directories(self) -> None: | |
"""Create necessary directory structure with logging""" | |
try: | |
# Create main dataset directory | |
self.root_path.mkdir(exist_ok=True) | |
# Create subdirectories | |
for subdir in ['audio', 'transcriptions', 'metadata', 'fonts']: | |
(self.root_path / subdir).mkdir(exist_ok=True) | |
# Initialize log file | |
log_file = self.root_path / 'dataset_log.txt' | |
if not log_file.exists(): | |
with open(log_file, 'w', encoding='utf-8') as f: | |
f.write(f"Dataset collection initialized on {datetime.now().isoformat()}\n") | |
logger.info("Directory structure created successfully") | |
except Exception as e: | |
logger.error(f"Failed to create directory structure: {str(e)}") | |
logger.error(traceback.format_exc()) | |
raise RuntimeError("Failed to initialize directory structure") | |
def log_operation(self, message: str, level: str = "info") -> None: | |
"""Log operations with timestamp and level""" | |
try: | |
log_file = self.root_path / 'dataset_log.txt' | |
timestamp = datetime.now().isoformat() | |
with open(log_file, 'a', encoding='utf-8') as f: | |
f.write(f"[{timestamp}] [{level.upper()}] {message}\n") | |
if level.lower() == "error": | |
logger.error(message) | |
else: | |
logger.info(message) | |
except Exception as e: | |
logger.error(f"Failed to log operation: {str(e)}") | |
def load_text_file(self, file) -> Tuple[bool, str]: | |
"""Process and load text file with enhanced error handling""" | |
if not file: | |
return False, "No file provided" | |
try: | |
# Validate file extension | |
if not file.name.endswith('.txt'): | |
return False, "Only .txt files are supported" | |
with open(file.name, 'r', encoding='utf-8') as f: | |
text = f.read() | |
# Validate text content | |
if not text.strip(): | |
return False, "File is empty" | |
# Tokenize sentences | |
self.sentences = nltk.sent_tokenize(text) | |
if not self.sentences: | |
return False, "No valid sentences found in file" | |
self.current_index = 0 | |
# Log success | |
self.log_operation( | |
f"Loaded text file: {file.name} with {len(self.sentences)} sentences" | |
) | |
return True, f"Successfully loaded {len(self.sentences)} sentences" | |
except UnicodeDecodeError: | |
error_msg = "File encoding error. Please ensure the file is UTF-8 encoded" | |
self.log_operation(error_msg, "error") | |
return False, error_msg | |
except Exception as e: | |
error_msg = f"Error loading file: {str(e)}" | |
self.log_operation(error_msg, "error") | |
logger.error(traceback.format_exc()) | |
return False, error_msg | |
def get_styled_text(self, text: str) -> str: | |
"""Get text with current font styling""" | |
font_css = FONT_STYLES[self.current_font]['css'] | |
return f"<div style='{font_css}'>{text}</div>" | |
def generate_filenames(self, dataset_name: str, speaker_id: str) -> Tuple[str, str]: | |
"""Generate unique filenames for audio and text files""" | |
timestamp = datetime.now().strftime("%Y%m%d%H%M%S") | |
sentence_id = f"{self.current_index+1:04d}" | |
base_name = f"{dataset_name}_{speaker_id}_{sentence_id}_{timestamp}" | |
return f"{base_name}.wav", f"{base_name}.txt" | |
def set_font(self, font_style: str) -> Tuple[bool, str]: | |
"""Set the current font style""" | |
if font_style not in FONT_STYLES: | |
return False, f"Invalid font style. Available styles: {', '.join(FONT_STYLES.keys())}" | |
self.current_font = font_style | |
return True, f"Font style set to {font_style}" | |
def save_recording(self, audio_file, speaker_id: str, dataset_name: str) -> Tuple[bool, str]: | |
"""Save recording with enhanced error handling and logging""" | |
if not all([audio_file, speaker_id, dataset_name]): | |
missing = [] | |
if not audio_file: missing.append("audio recording") | |
if not speaker_id: missing.append("speaker ID") | |
if not dataset_name: missing.append("dataset name") | |
return False, f"Missing required information: {', '.join(missing)}" | |
try: | |
# Validate inputs | |
if not speaker_id.strip().isalnum(): | |
return False, "Speaker ID must contain only letters and numbers" | |
if not dataset_name.strip().isalnum(): | |
return False, "Dataset name must contain only letters and numbers" | |
# Generate filenames | |
audio_name, text_name = self.generate_filenames(dataset_name, speaker_id) | |
# Create speaker directories | |
audio_dir = self.root_path / 'audio' / speaker_id | |
text_dir = self.root_path / 'transcriptions' / speaker_id | |
audio_dir.mkdir(exist_ok=True) | |
text_dir.mkdir(exist_ok=True) | |
# Save audio file | |
audio_path = audio_dir / audio_name | |
shutil.copy2(audio_file, audio_path) | |
# Save transcription | |
text_path = text_dir / text_name | |
self.save_transcription( | |
text_path, | |
self.sentences[self.current_index], | |
{ | |
'speaker_id': speaker_id, | |
'dataset_name': dataset_name, | |
'timestamp': datetime.now().isoformat(), | |
'audio_file': audio_name, | |
'font_style': self.current_font | |
} | |
) | |
# Update metadata | |
self.update_metadata(speaker_id, dataset_name) | |
# Log success | |
self.log_operation( | |
f"Saved recording: Speaker={speaker_id}, Dataset={dataset_name}, " | |
f"Audio={audio_name}, Text={text_name}" | |
) | |
return True, f"Recording saved successfully as {audio_name}" | |
except Exception as e: | |
error_msg = f"Error saving recording: {str(e)}" | |
self.log_operation(error_msg, "error") | |
logger.error(traceback.format_exc()) | |
return False, error_msg | |
def save_transcription(self, file_path: Path, text: str, metadata: Dict) -> None: | |
"""Save transcription with metadata""" | |
content = f"""[METADATA] | |
Recording_ID: {metadata['audio_file']} | |
Speaker_ID: {metadata['speaker_id']} | |
Dataset_Name: {metadata['dataset_name']} | |
Timestamp: {metadata['timestamp']} | |
Font_Style: {metadata['font_style']} | |
[TEXT] | |
{text} | |
""" | |
with open(file_path, 'w', encoding='utf-8') as f: | |
f.write(content) | |
def update_metadata(self, speaker_id: str, dataset_name: str) -> None: | |
"""Update dataset metadata with error handling""" | |
metadata_file = self.root_path / 'metadata' / 'dataset_info.json' | |
try: | |
if metadata_file.exists(): | |
with open(metadata_file, 'r') as f: | |
metadata = json.load(f) | |
else: | |
metadata = {'speakers': {}, 'last_updated': None} | |
# Update speaker data | |
if speaker_id not in metadata['speakers']: | |
metadata['speakers'][speaker_id] = { | |
'total_recordings': 0, | |
'datasets': {} | |
} | |
if dataset_name not in metadata['speakers'][speaker_id]['datasets']: | |
metadata['speakers'][speaker_id]['datasets'][dataset_name] = { | |
'recordings': 0, | |
'sentences': len(self.sentences), | |
'first_recording': datetime.now().isoformat(), | |
'last_recording': None, | |
'font_styles_used': [] | |
} | |
# Update counts and timestamps | |
metadata['speakers'][speaker_id]['total_recordings'] += 1 | |
metadata['speakers'][speaker_id]['datasets'][dataset_name]['recordings'] += 1 | |
metadata['speakers'][speaker_id]['datasets'][dataset_name]['last_recording'] = \ | |
datetime.now().isoformat() | |
# Update font styles | |
if self.current_font not in metadata['speakers'][speaker_id]['datasets'][dataset_name]['font_styles_used']: | |
metadata['speakers'][speaker_id]['datasets'][dataset_name]['font_styles_used'].append( | |
self.current_font | |
) | |
metadata['last_updated'] = datetime.now().isoformat() | |
# Save updated metadata | |
with open(metadata_file, 'w') as f: | |
json.dump(metadata, f, indent=2) | |
self.log_operation(f"Updated metadata for {speaker_id} in {dataset_name}") | |
except Exception as e: | |
error_msg = f"Error updating metadata: {str(e)}" | |
self.log_operation(error_msg, "error") | |
logger.error(traceback.format_exc()) | |
def get_navigation_info(self) -> Dict[str, Optional[str]]: | |
"""Get current and next sentence information""" | |
if not self.sentences: | |
return { | |
'current': None, | |
'next': None, | |
'progress': "No text loaded" | |
} | |
current = self.get_styled_text(self.sentences[self.current_index]) | |
next_text = None | |
if self.current_index < len(self.sentences) - 1: | |
next_text = self.get_styled_text(self.sentences[self.current_index + 1]) | |
progress = f"Sentence {self.current_index + 1} of {len(self.sentences)}" | |
return { | |
'current': current, | |
'next': next_text, | |
'progress': progress | |
} | |
def navigate(self, direction: str) -> Dict[str, Optional[str]]: | |
"""Navigate through sentences""" | |
if not self.sentences: | |
return { | |
'current': None, | |
'next': None, | |
'progress': "No text loaded", | |
'status': "⚠️ Please load a text file first" | |
} | |
if direction == "next" and self.current_index < len(self.sentences) - 1: | |
self.current_index += 1 | |
elif direction == "prev" and self.current_index > 0: | |
self.current_index -= 1 | |
nav_info = self.get_navigation_info() | |
nav_info['status'] = "✅ Navigation successful" | |
return nav_info | |
def get_dataset_statistics(self) -> Dict: | |
"""Get current dataset statistics""" | |
try: | |
metadata_file = self.root_path / 'metadata' / 'dataset_info.json' | |
if not metadata_file.exists(): | |
return {} | |
with open(metadata_file, 'r') as f: | |
return json.load(f) | |
except Exception as e: | |
logger.error(f"Error reading dataset statistics: {str(e)}") | |
return {} | |
def create_interface(): | |
"""Create Gradio interface with enhanced features""" | |
# Create custom CSS for fonts | |
custom_css = """ | |
.gradio-container { | |
max-width: 1200px !important; | |
} | |
.record-button { | |
font-size: 1.2em !important; | |
padding: 20px !important; | |
} | |
.sentence-display { | |
font-size: 1.4em !important; | |
padding: 15px !important; | |
border: 1px solid #ddd !important; | |
border-radius: 8px !important; | |
margin: 10px 0 !important; | |
min-height: 100px !important; | |
} | |
""" | |
# Add font-face declarations | |
for font_style, font_info in FONT_STYLES.items(): | |
if font_style in ['nastaliq', 'naskh']: | |
custom_css += f""" | |
@font-face {{ | |
font-family: '{font_info["family"]}'; | |
src: url('fonts/{font_info["family"]}.ttf') format('truetype'); | |
}} | |
""" | |
collector = TTSDatasetCollector() | |
with gr.Blocks(title="TTS Dataset Collection Tool", css=custom_css) as interface: | |
gr.Markdown("# TTS Dataset Collection Tool") | |
with gr.Row(): | |
# Left column - Configuration | |
with gr.Column(): | |
file_input = gr.File( | |
label="Upload Text File (.txt)", | |
file_types=[".txt"] | |
) | |
speaker_id = gr.Textbox( | |
label="Speaker ID", | |
placeholder="Enter unique speaker identifier (letters and numbers only)" | |
) | |
dataset_name = gr.Textbox( | |
label="Dataset Name", | |
placeholder="Enter dataset name (letters and numbers only)" | |
) | |
font_select = gr.Dropdown( | |
choices=list(FONT_STYLES.keys()), | |
value="english_serif", | |
label="Select Font Style" | |
) | |
# Right column - Recording | |
with gr.Column(): | |
current_text = gr.HTML( | |
label="Current Sentence", | |
elem_classes=["sentence-display"] | |
) | |
audio_recorder = gr.Audio( | |
label="Record Audio", | |
type="filepath", | |
elem_classes=["record-button"] | |
) | |
next_text = gr.HTML( | |
label="Next Sentence", | |
elem_classes=["sentence-display"] | |
) | |
# Controls | |
with gr.Row(): | |
prev_btn = gr.Button("Previous", variant="secondary") | |
next_btn = gr.Button("Next", variant="primary") | |
save_btn = gr.Button("Save Recording", variant="primary", elem_classes=["record-button"]) | |
# Status and Progress | |
with gr.Row(): | |
progress = gr.Textbox( | |
label="Progress", | |
interactive=False | |
) | |
status = gr.Textbox( | |
label="Status", | |
interactive=False, | |
max_lines=3 | |
) | |
# Dataset Info | |
with gr.Row(): | |
dataset_info = gr.JSON( | |
label="Dataset Statistics", | |
value={} | |
) | |
def update_font(font_style): | |
"""Update font and refresh display""" | |
success, msg = collector.set_font(font_style) | |
if not success: | |
return {status: msg} | |
nav_info = collector.get_navigation_info() | |
return { | |
current_text: nav_info['current'], | |
next_text: nav_info['next'], | |
status: f"Font updated to {font_style}" | |
} | |
def load_file(file): | |
"""Handle file loading with enhanced error reporting""" | |
if not file: | |
return { | |
current_text: "", | |
next_text: "", | |
progress: "", | |
status: "⚠️ No file selected", | |
dataset_info: collector.get_dataset_statistics() | |
} | |
success, msg = collector.load_text_file(file) | |
if not success: | |
return { | |
current_text: "", | |
next_text: "", | |
progress: "", | |
status: f"❌ {msg}", | |
dataset_info: collector.get_dataset_statistics() | |
} | |
nav_info = collector.get_navigation_info() | |
return { | |
current_text: nav_info['current'], | |
next_text: nav_info['next'], | |
progress: nav_info['progress'], | |
status: f"✅ {msg}", | |
dataset_info: collector.get_dataset_statistics() | |
} | |
def save_current_recording(audio_file, speaker_id_value, dataset_name_value): | |
"""Handle saving the current recording""" | |
if not audio_file: | |
return {status: "⚠️ Please record audio first"} | |
success, msg = collector.save_recording( | |
audio_file, speaker_id_value, dataset_name_value | |
) | |
if not success: | |
return { | |
status: f"❌ {msg}", | |
dataset_info: collector.get_dataset_statistics() | |
} | |
# Auto-advance to next sentence after successful save | |
nav_info = collector.navigate("next") | |
return { | |
current_text: nav_info['current'], | |
next_text: nav_info['next'], | |
progress: nav_info['progress'], | |
status: f"✅ {msg}", | |
dataset_info: collector.get_dataset_statistics() | |
} | |
def navigate_sentences(direction): | |
"""Handle navigation between sentences""" | |
nav_info = collector.navigate(direction) | |
return { | |
current_text: nav_info['current'], | |
next_text: nav_info['next'], | |
progress: nav_info['progress'], | |
status: nav_info['status'] | |
} | |
# Event handlers | |
file_input.upload( | |
load_file, | |
inputs=[file_input], | |
outputs=[current_text, next_text, progress, status, dataset_info] | |
) | |
font_select.change( | |
update_font, | |
inputs=[font_select], | |
outputs=[current_text, next_text, status] | |
) | |
save_btn.click( | |
save_current_recording, | |
inputs=[audio_recorder, speaker_id, dataset_name], | |
outputs=[current_text, next_text, progress, status, dataset_info] | |
) | |
prev_btn.click( | |
lambda: navigate_sentences("prev"), | |
outputs=[current_text, next_text, progress, status] | |
) | |
next_btn.click( | |
lambda: navigate_sentences("next"), | |
outputs=[current_text, next_text, progress, status] | |
) | |
# Initialize dataset info | |
dataset_info.value = collector.get_dataset_statistics() | |
return interface | |
if __name__ == "__main__": | |
try: | |
# Set up any required environment variables | |
os.environ["GRADIO_SERVER_NAME"] = "0.0.0.0" | |
os.environ["GRADIO_SERVER_PORT"] = "7860" | |
# Create and launch the interface | |
interface = create_interface() | |
interface.queue() # Enable queuing for better handling of concurrent users | |
interface.launch( | |
server_name="0.0.0.0", | |
server_port=7860, | |
share=True, | |
debug=True, | |
show_error=True | |
) | |
except Exception as e: | |
logger.error(f"Failed to launch interface: {str(e)}") | |
logger.error(traceback.format_exc()) | |
raise |