Spaces:
Paused
Paused
import os | |
import gradio as gr | |
import threading | |
import discord | |
from discord import app_commands | |
from typing import List | |
from elevenlabs import set_api_key, voices, generate, Voice, VoiceSettings, User | |
import tempfile | |
import io | |
import speech_recognition as sr | |
from pydub import AudioSegment | |
import logging | |
import google.generativeai as genai | |
import asyncio | |
# Set up logging | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
# Configure Gemini AI | |
GEMINI_API_KEY = os.getenv("GEMINI_API_KEY") | |
if GEMINI_API_KEY: | |
genai.configure(api_key=GEMINI_API_KEY) | |
model = genai.GenerativeModel('gemini-pro') | |
else: | |
logger.warning("GEMINI_API_KEY not found! Accent modification will be disabled.") | |
# Set your ElevenLabs API key | |
ELEVENLABS_API_KEY = os.getenv("ELEVENLABS_API_KEY") | |
set_api_key(ELEVENLABS_API_KEY) | |
# Role configurations | |
ALLOWED_ROLES = ["+mechanic", "+trusted"] | |
CHAR_LIMITS = { | |
"+mechanic": 500, | |
"+trusted": 3000, | |
"Administrator": float('inf') | |
} | |
# Accent configurations | |
ACCENT_OPTIONS = ["American", "Arabian", "Russian"] | |
# Update the ACCENT_PROMPTS dictionary | |
ACCENT_PROMPTS = { | |
"Russian": """Modify this text to sound like someone speaking with a strong Russian accent. Rules: | |
1. Use simple Russian words like 'da', 'nyet', 'tovarisch' occasionally | |
2. Drop articles ('the', 'a') sometimes and insist on the 'r' sometimes | |
3. DO NOT add any annotations, asterisks, or explanations | |
4. DO NOT add formatting or parentheses | |
5. Return ONLY the modified text | |
Example input: "We will work tomorrow my friend" | |
Example output: Ve vill vork tomorrow, tovarisch""", | |
"Arabian": """Modify this text to sound like someone speaking with an Arabian accent. Rules: | |
1. Use simple Arabic words like 'habibi', 'yalla', 'wallah' occasionally | |
2. Modify 'th' sounds to 'z' sometimes | |
3. DO NOT add any annotations, asterisks, or explanations | |
4. DO NOT add formatting or parentheses | |
5. Return ONLY the modified text | |
Example input: "Hello my friend, how are you today?" | |
Example output: Habibi, how are you zis beautiful day?""", | |
"American": """Modify this text to sound like someone speaking with a strong American accent. Rules: | |
1. Use American colloquialisms | |
2. Add casual American phrases | |
3. DO NOT add any annotations, asterisks, or explanations | |
4. DO NOT add formatting or parentheses | |
5. Return ONLY the modified text | |
Example input: "Hello there, how are you?" | |
Example output: Howdy partner, how ya doin'?""" | |
} | |
def get_available_voices(): | |
"""Fetch only custom voices from ElevenLabs account""" | |
all_voices = voices() | |
return {voice.name: voice.voice_id for voice in all_voices if not voice.category == "premade"} | |
def get_remaining_credits(): | |
"""Get remaining character credits from ElevenLabs""" | |
user = User.from_api() | |
subscription = user.subscription | |
return { | |
"character_count": subscription.character_count, | |
"character_limit": subscription.character_limit | |
} | |
def format_credits_message(credits_info): | |
"""Format credits information into a readable message""" | |
return f"Credits Status: {credits_info['character_count']} / {credits_info['character_limit']}" | |
def has_permission(member: discord.Member) -> tuple[bool, int]: | |
"""Check if member has permission and return their character limit""" | |
if member.guild_permissions.administrator: | |
return True, CHAR_LIMITS["Administrator"] | |
member_roles = [role.name for role in member.roles] | |
for role_name in ALLOWED_ROLES: | |
if role_name in member_roles: | |
return True, CHAR_LIMITS.get(role_name, 0) | |
return False, 0 | |
async def modify_accent(text: str, accent: str, enhance: bool = False) -> str: | |
"""Modify text based on selected accent using Gemini AI""" | |
if not GEMINI_API_KEY or not accent or accent == "American": | |
return text | |
base_prompt = ACCENT_PROMPTS[accent] | |
if enhance: | |
base_prompt += """\n\nAdd more authentic elements: | |
1. Include more cultural phrases | |
2. Use more native words (but keep text mostly understandable) | |
3. Adjust speech patterns | |
BUT REMEMBER: | |
- DO NOT add any annotations or explanations | |
- DO NOT use asterisks or parentheses | |
- Return ONLY the modified text""" | |
prompt = f"{base_prompt}\n\nInput text: {text}\nModified text:" | |
try: | |
response = await model.generate_content_async(prompt) | |
# Get content from the first part of the first candidate | |
parts = response.candidates[0].content.parts | |
if not parts: | |
return text | |
modified_text = parts[0].text.strip() | |
# Clean up any remaining annotations or formatting | |
modified_text = modified_text.replace('*', '').replace('(', '').replace(')', '') | |
modified_text = modified_text.split('\n')[0] if '\n' in modified_text else modified_text | |
# If response is empty or contains unwanted formatting, return original | |
if not modified_text or '**' in modified_text or 'Enhanced Text:' in modified_text: | |
return text | |
return modified_text | |
except Exception as e: | |
logger.error(f"Error modifying accent: {str(e)}") | |
return text | |
# Get available voices early | |
VOICE_LIST = get_available_voices() | |
# Discord bot setup | |
class VoiceBot(discord.Client): | |
def __init__(self): | |
super().__init__(intents=discord.Intents.default()) | |
self.tree = app_commands.CommandTree(self) | |
self.guild_id = int(os.getenv('DISCORD_GUILD_ID', '0')) | |
self.activity = discord.Activity( | |
type=discord.ActivityType.watching, | |
name="voice creation | /create /list" | |
) | |
async def setup_hook(self): | |
"""This is called when the bot starts up""" | |
guild = discord.Object(id=self.guild_id) | |
self.tree.copy_global_to(guild=guild) | |
await self.tree.sync(guild=guild) | |
client = VoiceBot() | |
tree = client.tree | |
async def voice_list(interaction: discord.Interaction): | |
await interaction.response.defer() | |
voice_list = "\n".join([f"• {name}" for name in VOICE_LIST.keys()]) | |
credits_info = get_remaining_credits() | |
credits_msg = format_credits_message(credits_info) | |
embed = discord.Embed( | |
title="Available Voices", | |
description=f"{voice_list}\n\n{credits_msg}", | |
color=0x2B2D31 | |
) | |
await interaction.followup.send(embed=embed) | |
async def voice_autocomplete(interaction: discord.Interaction, current: str) -> List[app_commands.Choice[str]]: | |
return [ | |
app_commands.Choice(name=voice, value=voice) | |
for voice in VOICE_LIST.keys() | |
if current.lower() in voice.lower() | |
][:25] | |
async def voice_create( | |
interaction: discord.Interaction, | |
text: str, | |
voice_name: str, | |
stability: float = 0.5, | |
clarity: float = 0.75, | |
style: float = 0.5, | |
accent: str = None, | |
accent_enhance: bool = False | |
): | |
await interaction.response.defer() | |
# Check permissions | |
has_perm, char_limit = has_permission(interaction.user) | |
if not has_perm: | |
embed = discord.Embed( | |
title="Permission Denied", | |
description="You need to be an administrator or have the +mechanic/+trusted role to use this command.", | |
color=0xFF0000 | |
) | |
await interaction.followup.send(embed=embed) | |
return | |
# Check character limit | |
if len(text) > char_limit: | |
embed = discord.Embed( | |
title="Character Limit Exceeded", | |
description=f"Your message exceeds your character limit of {char_limit}. Current length: {len(text)}", | |
color=0xFF0000 | |
) | |
await interaction.followup.send(embed=embed) | |
return | |
# Process accent if specified | |
if accent: | |
text = await modify_accent(text, accent, accent_enhance) | |
if voice_name not in VOICE_LIST: | |
embed = discord.Embed( | |
title="Voice Not Found", | |
description=f"The voice '{voice_name}' was not found. Use `/list` to see available voices.", | |
color=0x2B2D31 | |
) | |
await interaction.followup.send(embed=embed) | |
return | |
try: | |
voice_settings = VoiceSettings( | |
stability=stability, | |
similarity_boost=clarity, | |
style=style, | |
use_speaker_boost=True | |
) | |
audio = generate( | |
text=text, | |
voice=Voice( | |
voice_id=VOICE_LIST[voice_name], | |
settings=voice_settings | |
) | |
) | |
with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as temp_file: | |
audio_segment = AudioSegment.from_file(io.BytesIO(audio), format="mp3") | |
audio_segment = audio_segment.set_frame_rate(22050).set_channels(1).set_sample_width(2) | |
audio_segment.export(temp_file.name, format='wav') | |
temp_path = temp_file.name | |
credits_info = get_remaining_credits() | |
credits_msg = format_credits_message(credits_info) | |
accent_info = f"\nAccent: {accent}" if accent else "" | |
accent_enhance_info = f"\nAccent Enhancement: {'On' if accent_enhance else 'Off'}" if accent else "" | |
embed = discord.Embed( | |
title="Voice Generated", | |
description=f"Prompt: {text}\nVoice: {voice_name}\nStability: {stability}\nClarity: {clarity}\nStyle: {style}{accent_info}{accent_enhance_info}\n\n{credits_msg}", | |
color=0x57F287 | |
) | |
await interaction.followup.send( | |
embed=embed, | |
file=discord.File(temp_path) | |
) | |
os.unlink(temp_path) | |
except Exception as e: | |
logger.error(f"Error generating audio: {str(e)}") | |
await interaction.followup.send(f"Error generating audio: {str(e)}") | |
async def on_ready(): | |
logger.info(f"Bot is ready and logged in as {client.user}") | |
await client.change_presence(activity=client.activity) | |
# Gradio interface functions | |
def text_to_speech(text, voice_name, stability, clarity, style): | |
"""Convert text to speech using selected voice and parameters""" | |
voice_settings = VoiceSettings( | |
stability=stability, | |
similarity_boost=clarity, | |
style=style, | |
use_speaker_boost=True | |
) | |
voice_id = VOICE_LIST[voice_name] | |
audio = generate( | |
text=text, | |
voice=Voice( | |
voice_id=voice_id, | |
settings=voice_settings | |
) | |
) | |
credits_info = get_remaining_credits() | |
credits_message = format_credits_message(credits_info) | |
with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as temp_file: | |
temp_file.write(audio) | |
return temp_file.name, credits_message | |
def speech_to_text(audio_file): | |
"""Convert speech to text using speech recognition""" | |
recognizer = sr.Recognizer() | |
audio = AudioSegment.from_file(audio_file) | |
wav_path = tempfile.mktemp(suffix=".wav") | |
audio.export(wav_path, format="wav") | |
with sr.AudioFile(wav_path) as source: | |
audio_data = recognizer.record(source) | |
try: | |
text = recognizer.recognize_google(audio_data) | |
return text | |
except sr.UnknownValueError: | |
return "Could not understand audio" | |
except sr.RequestError: | |
return "Error in speech recognition service" | |
finally: | |
os.unlink(wav_path) | |
def speech_to_speech(audio_file, voice_name, stability, clarity, style): | |
"""Convert speech to speech by first converting to text, then to speech""" | |
text = speech_to_text(audio_file) | |
if text.startswith("Error") or text.startswith("Could not"): | |
return None, text, "" | |
audio_output, credits_message = text_to_speech(text, voice_name, stability, clarity, style) | |
return audio_output, text, credits_message | |
# Create Gradio interface | |
with gr.Blocks() as demo: | |
gr.Markdown("# ElevenLabs Voice Generation") | |
credits_info = get_remaining_credits() | |
credits_display = gr.Markdown(format_credits_message(credits_info)) | |
with gr.Tab("Text to Speech"): | |
with gr.Row(): | |
with gr.Column(): | |
text_input = gr.Textbox(label="Text to convert", lines=5) | |
voice_dropdown = gr.Dropdown(choices=list(VOICE_LIST.keys()), label="Select Voice") | |
with gr.Row(): | |
stability = gr.Slider(minimum=0, maximum=1, value=0.5, label="Stability") | |
clarity = gr.Slider(minimum=0, maximum=1, value=0.75, label="Clarity/Similarity Boost") | |
style = gr.Slider(minimum=0, maximum=1, value=0.5, label="Style") | |
convert_btn = gr.Button("Convert") | |
with gr.Column(): | |
audio_output = gr.Audio(label="Generated Audio") | |
credits_output = gr.Markdown() | |
convert_btn.click( | |
fn=text_to_speech, | |
inputs=[text_input, voice_dropdown, stability, clarity, style], | |
outputs=[audio_output, credits_output] | |
) | |
with gr.Tab("Speech to Speech"): | |
with gr.Row(): | |
with gr.Column(): | |
audio_input = gr.Audio(label="Input Audio", sources=["microphone", "upload"]) | |
voice_dropdown_s2s = gr.Dropdown(choices=list(VOICE_LIST.keys()), label="Select Voice") | |
with gr.Row(): | |
stability_s2s = gr.Slider(minimum=0, maximum=1, value=0.5, label="Stability") | |
clarity_s2s = gr.Slider(minimum=0, maximum=1, value=0.75, label="Clarity/Similarity Boost") | |
style_s2s = gr.Slider(minimum=0, maximum=1, value=0.5, label="Style") | |
convert_btn_s2s = gr.Button("Convert") | |
with gr.Column(): | |
text_output = gr.Textbox(label="Recognized Text", lines=3) | |
audio_output_s2s = gr.Audio(label="Generated Audio") | |
credits_output_s2s = gr.Markdown() | |
convert_btn_s2s.click( | |
fn=speech_to_speech, | |
inputs=[audio_input, voice_dropdown_s2s, stability, clarity_s2s, style_s2s], | |
outputs=[audio_output_s2s, text_output, credits_output_s2s] | |
) | |
def start_discord_bot(): | |
"""Start the Discord bot""" | |
DISCORD_TOKEN = os.getenv("DISCORD_BOT_TOKEN") | |
if not DISCORD_TOKEN: | |
logger.error("DISCORD_BOT_TOKEN not found!") | |
return | |
logger.info("Starting Discord bot...") | |
try: | |
asyncio.set_event_loop(asyncio.new_event_loop()) | |
client.run(DISCORD_TOKEN) | |
except Exception as e: | |
logger.error(f"Failed to start Discord bot: {str(e)}") | |
# Start Discord bot in a separate thread | |
discord_thread = threading.Thread(target=start_discord_bot, daemon=True) | |
discord_thread.start() | |
# Launch Gradio interface | |
demo.launch(server_name="0.0.0.0", share=False) |