Omarrran commited on
Commit
4e9e165
·
verified ·
1 Parent(s): da9e6d9

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +247 -3
app.py CHANGED
@@ -243,10 +243,254 @@ class TTSDatasetCollector:
243
  logger.error(traceback.format_exc())
244
  return False, error_msg
245
 
246
- # ... (Rest of the class remains unchanged)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
247
 
248
- # For brevity, the rest of the class methods are unchanged
249
- # Please ensure to include the rest of the methods from the previous code
250
 
251
  def create_interface():
252
  """Create Gradio interface with enhanced features"""
 
243
  logger.error(traceback.format_exc())
244
  return False, error_msg
245
 
246
+ def generate_filenames(self, dataset_name: str, speaker_id: str, sentence_text: str) -> Tuple[str, str]:
247
+ """Generate unique filenames for audio and text files"""
248
+ line_number = self.current_index + 1
249
+ timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
250
+
251
+ # Sanitize strings for filenames
252
+ def sanitize_filename(s):
253
+ return re.sub(r'[^a-zA-Z0-9_-]', '_', s)[:50]
254
+
255
+ dataset_name_safe = sanitize_filename(dataset_name)
256
+ speaker_id_safe = sanitize_filename(speaker_id)
257
+ sentence_excerpt = sanitize_filename(sentence_text[:20])
258
+ base_name = f"{dataset_name_safe}_{speaker_id_safe}_line{line_number}_{sentence_excerpt}_{timestamp}"
259
+ return f"{base_name}.wav", f"{base_name}.txt"
260
+
261
+ def save_recording(self, audio_file, speaker_id: str, dataset_name: str) -> Tuple[bool, str]:
262
+ """Save recording with enhanced error handling and logging"""
263
+ if not all([audio_file, speaker_id, dataset_name]):
264
+ missing = []
265
+ if not audio_file:
266
+ missing.append("audio recording")
267
+ if not speaker_id:
268
+ missing.append("speaker ID")
269
+ if not dataset_name:
270
+ missing.append("dataset name")
271
+ return False, f"Missing required information: {', '.join(missing)}"
272
+
273
+ # Check if sentences have been loaded
274
+ if not self.sentences:
275
+ return False, "No sentences have been loaded. Please load text before saving recordings."
276
+ if self.current_index >= len(self.sentences):
277
+ return False, "Current sentence index is out of range."
278
+
279
+ try:
280
+ # Validate inputs
281
+ if not speaker_id.strip().isalnum():
282
+ return False, "Speaker ID must contain only letters and numbers"
283
+ if not dataset_name.strip().isalnum():
284
+ return False, "Dataset name must contain only letters and numbers"
285
+
286
+ # Get current sentence text
287
+ sentence_text = self.sentences[self.current_index]
288
+
289
+ # Generate filenames
290
+ audio_name, text_name = self.generate_filenames(dataset_name, speaker_id, sentence_text)
291
+
292
+ # Create speaker directories
293
+ audio_dir = self.root_path / 'audio' / speaker_id
294
+ text_dir = self.root_path / 'transcriptions' / speaker_id
295
+ audio_dir.mkdir(parents=True, exist_ok=True)
296
+ text_dir.mkdir(parents=True, exist_ok=True)
297
+
298
+ # Save audio file
299
+ audio_path = audio_dir / audio_name
300
+
301
+ # Read the audio file using soundfile
302
+ audio_data, sampling_rate = sf.read(audio_file)
303
+
304
+ # Save audio file
305
+ sf.write(str(audio_path), audio_data, sampling_rate)
306
+
307
+ # Save transcription
308
+ text_path = text_dir / text_name
309
+ self.save_transcription(
310
+ text_path,
311
+ sentence_text,
312
+ {
313
+ 'speaker_id': speaker_id,
314
+ 'dataset_name': dataset_name,
315
+ 'timestamp': datetime.now().isoformat(),
316
+ 'audio_file': audio_name,
317
+ 'font_style': self.current_font
318
+ }
319
+ )
320
+
321
+ # Update metadata
322
+ self.update_metadata(speaker_id, dataset_name)
323
+
324
+ # Log success
325
+ self.log_operation(
326
+ f"Saved recording: Speaker={speaker_id}, Dataset={dataset_name}, "
327
+ f"Audio={audio_name}, Text={text_name}"
328
+ )
329
+
330
+ return True, f"Recording saved successfully as {audio_name}"
331
+
332
+ except Exception as e:
333
+ error_msg = f"Error saving recording: {str(e)}"
334
+ self.log_operation(error_msg, "error")
335
+ logger.error(traceback.format_exc())
336
+ return False, error_msg
337
+
338
+ def save_transcription(self, file_path: Path, text: str, metadata: Dict) -> None:
339
+ """Save transcription with metadata"""
340
+ content = f"""[METADATA]
341
+ Recording_ID: {metadata['audio_file']}
342
+ Speaker_ID: {metadata['speaker_id']}
343
+ Dataset_Name: {metadata['dataset_name']}
344
+ Timestamp: {metadata['timestamp']}
345
+ Font_Style: {metadata['font_style']}
346
+
347
+ [TEXT]
348
+ {text}
349
+ """
350
+ with open(file_path, 'w', encoding='utf-8') as f:
351
+ f.write(content)
352
+
353
+ def update_metadata(self, speaker_id: str, dataset_name: str) -> None:
354
+ """Update dataset metadata with error handling"""
355
+ metadata_file = self.root_path / 'metadata' / 'dataset_info.json'
356
+
357
+ try:
358
+ if metadata_file.exists():
359
+ with open(metadata_file, 'r') as f:
360
+ metadata = json.load(f)
361
+ else:
362
+ metadata = {'speakers': {}, 'last_updated': None}
363
+
364
+ # Update speaker data
365
+ if speaker_id not in metadata['speakers']:
366
+ metadata['speakers'][speaker_id] = {
367
+ 'total_recordings': 0,
368
+ 'datasets': {}
369
+ }
370
+
371
+ if dataset_name not in metadata['speakers'][speaker_id]['datasets']:
372
+ metadata['speakers'][speaker_id]['datasets'][dataset_name] = {
373
+ 'recordings': 0,
374
+ 'sentences': len(self.sentences),
375
+ 'recorded_sentences': [],
376
+ 'first_recording': datetime.now().isoformat(),
377
+ 'last_recording': None,
378
+ 'font_styles_used': []
379
+ }
380
+
381
+ # Update counts and timestamps
382
+ metadata['speakers'][speaker_id]['total_recordings'] += 1
383
+ metadata['speakers'][speaker_id]['datasets'][dataset_name]['recordings'] += 1
384
+ metadata['speakers'][speaker_id]['datasets'][dataset_name]['last_recording'] = \
385
+ datetime.now().isoformat()
386
+
387
+ # Add current index to recorded sentences
388
+ if self.current_index not in metadata['speakers'][speaker_id]['datasets'][dataset_name]['recorded_sentences']:
389
+ metadata['speakers'][speaker_id]['datasets'][dataset_name]['recorded_sentences'].append(self.current_index)
390
+
391
+ # Update font styles
392
+ if self.current_font not in metadata['speakers'][speaker_id]['datasets'][dataset_name]['font_styles_used']:
393
+ metadata['speakers'][speaker_id]['datasets'][dataset_name]['font_styles_used'].append(
394
+ self.current_font
395
+ )
396
+
397
+ metadata['last_updated'] = datetime.now().isoformat()
398
+
399
+ # Save updated metadata
400
+ with open(metadata_file, 'w') as f:
401
+ json.dump(metadata, f, indent=2)
402
+
403
+ self.log_operation(f"Updated metadata for {speaker_id} in {dataset_name}")
404
+
405
+ except Exception as e:
406
+ error_msg = f"Error updating metadata: {str(e)}"
407
+ self.log_operation(error_msg, "error")
408
+ logger.error(traceback.format_exc())
409
+
410
+ def get_navigation_info(self) -> Dict[str, Optional[str]]:
411
+ """Get current and next sentence information"""
412
+ if not self.sentences:
413
+ return {
414
+ 'current': None,
415
+ 'next': None,
416
+ 'progress': "No text loaded"
417
+ }
418
+
419
+ current = self.get_styled_text(self.sentences[self.current_index])
420
+ next_text = None
421
+
422
+ if self.current_index < len(self.sentences) - 1:
423
+ next_text = self.get_styled_text(self.sentences[self.current_index + 1])
424
+
425
+ progress = f"Sentence {self.current_index + 1} of {len(self.sentences)}"
426
+
427
+ return {
428
+ 'current': current,
429
+ 'next': next_text,
430
+ 'progress': progress
431
+ }
432
+
433
+ def navigate(self, direction: str) -> Dict[str, Optional[str]]:
434
+ """Navigate through sentences"""
435
+ if not self.sentences:
436
+ return {
437
+ 'current': None,
438
+ 'next': None,
439
+ 'progress': "No text loaded",
440
+ 'status': "⚠️ Please load a text file first"
441
+ }
442
+
443
+ if direction == "next" and self.current_index < len(self.sentences) - 1:
444
+ self.current_index += 1
445
+ elif direction == "prev" and self.current_index > 0:
446
+ self.current_index -= 1
447
+
448
+ nav_info = self.get_navigation_info()
449
+ nav_info['status'] = "✅ Navigation successful"
450
+
451
+ return nav_info
452
+
453
+ def get_dataset_statistics(self) -> Dict:
454
+ """Get current dataset statistics"""
455
+ try:
456
+ metadata_file = self.root_path / 'metadata' / 'dataset_info.json'
457
+ if not metadata_file.exists():
458
+ return {}
459
+ with open(metadata_file, 'r') as f:
460
+ metadata = json.load(f)
461
+ # Flatten statistics for display
462
+ total_sentences = len(self.sentences)
463
+ recorded = sum(len(dataset.get('recorded_sentences', [])) for speaker in metadata['speakers'].values() for dataset in speaker['datasets'].values())
464
+ remaining = total_sentences - recorded
465
+ stats = {
466
+ "Total Sentences": total_sentences,
467
+ "Recorded Sentences": recorded,
468
+ "Remaining Sentences": remaining,
469
+ "Last Updated": metadata.get('last_updated', 'N/A')
470
+ }
471
+ return stats
472
+ except Exception as e:
473
+ logger.error(f"Error reading dataset statistics: {str(e)}")
474
+ return {}
475
+
476
+ def get_last_audio_path(self, speaker_id: str) -> Optional[str]:
477
+ """Get the path to the last saved audio file for downloading"""
478
+ audio_dir = self.root_path / 'audio' / speaker_id
479
+ audio_files = sorted(audio_dir.glob('*.wav'), key=lambda f: f.stat().st_mtime, reverse=True)
480
+ if audio_files:
481
+ return str(audio_files[0])
482
+ else:
483
+ return None
484
+
485
+ def get_last_transcript_path(self, speaker_id: str) -> Optional[str]:
486
+ """Get the path to the last saved transcription file for downloading"""
487
+ text_dir = self.root_path / 'transcriptions' / speaker_id
488
+ text_files = sorted(text_dir.glob('*.txt'), key=lambda f: f.stat().st_mtime, reverse=True)
489
+ if text_files:
490
+ return str(text_files[0])
491
+ else:
492
+ return None
493
 
 
 
494
 
495
  def create_interface():
496
  """Create Gradio interface with enhanced features"""