Spaces:
Sleeping
Sleeping
import os | |
import re | |
import requests | |
import json | |
import gradio as gr | |
from langchain.chat_models import ChatOpenAI | |
from langchain import LLMChain, PromptTemplate | |
from langchain.memory import ConversationBufferMemory | |
OPENAI_API_KEY=os.getenv('OPENAI_API_KEY') | |
PLAY_HT_API_KEY=os.getenv('PLAY_HT_API_KEY') | |
PLAY_HT_USER_ID=os.getenv('PLAY_HT_USER_ID') | |
PLAY_HT_VOICE_ID=os.getenv('PLAY_HT_VOICE_ID') | |
play_ht_api_get_audio_url = "https://play.ht/api/v2/tts" | |
template = """Meet Charlie, your youthful and witty personal assistant! At 20 years old, he's full of energy and always eager to help. Charlie's goal is to assist you with any questions or problems you might have. His enthusiasm shines through in every response, making interactions with his enjoyable and engaging. | |
{chat_history} | |
User: {user_message} | |
Chatbot:""" | |
prompt = PromptTemplate( | |
input_variables=["chat_history", "user_message"], template=template | |
) | |
memory = ConversationBufferMemory(memory_key="chat_history") | |
llm_chain = LLMChain( | |
llm=ChatOpenAI(temperature='0.5', model_name="gpt-3.5-turbo"), | |
prompt=prompt, | |
verbose=True, | |
memory=memory, | |
) | |
headers = { | |
"accept": "text/event-stream", | |
"content-type": "application/json", | |
"AUTHORIZATION": "Bearer "+ PLAY_HT_API_KEY, | |
"X-USER-ID": PLAY_HT_USER_ID | |
} | |
def get_payload(text): | |
return { | |
"text": text, | |
"voice": PLAY_HT_VOICE_ID, | |
"quality": "medium", | |
"output_format": "mp3", | |
"speed": 1, | |
"sample_rate": 24000, | |
"seed": None, | |
"temperature": None | |
} | |
def get_generated_audio(text): | |
payload = get_payload(text) | |
generated_response = {} | |
try: | |
response = requests.post(play_ht_api_get_audio_url, json=payload, headers=headers) | |
response.raise_for_status() | |
generated_response["type"]= 'SUCCESS' | |
generated_response["response"] = response.text | |
except requests.exceptions.RequestException as e: | |
generated_response["type"]= 'ERROR' | |
try: | |
response_text = json.loads(response.text) | |
if response_text['error_message']: | |
generated_response["response"] = response_text['error_message'] | |
else: | |
generated_response["response"] = response.text | |
except Exception as e: | |
generated_response["response"] = response.text | |
except Exception as e: | |
generated_response["type"]= 'ERROR' | |
generated_response["response"] = response.text | |
return generated_response | |
def extract_urls(text): | |
# Define the regex pattern for URLs | |
url_pattern = r'https?://(?:[-\w.]|(?:%[\da-fA-F]{2}))+[/\w\.-]*' | |
# Find all occurrences of URLs in the text | |
urls = re.findall(url_pattern, text) | |
return urls | |
def get_audio_reply_for_question(text): | |
generated_audio_event = get_generated_audio(text) | |
#From get_generated_audio, you will get events in a string format, from that we need to extract the url | |
final_response = { | |
"audio_url": '', | |
"message": '' | |
} | |
if generated_audio_event["type"] == 'SUCCESS': | |
audio_urls = extract_urls(generated_audio_event["response"]) | |
if len(audio_urls) == 0: | |
final_response['message'] = "No audio file link found in generated event" | |
else: | |
final_response['audio_url'] = audio_urls[-1] | |
else: | |
final_response['message'] = generated_audio_event['response'] | |
return final_response | |
def download_url(url): | |
try: | |
# Send a GET request to the URL to fetch the content | |
final_response = { | |
'content':'', | |
'error':'' | |
} | |
response = requests.get(url) | |
# Check if the request was successful (status code 200) | |
if response.status_code == 200: | |
final_response['content'] = response.content | |
else: | |
final_response['error'] = f"Failed to download the URL. Status code: {response.status_code}" | |
except Exception as e: | |
final_response['error'] = f"Failed to download the URL. Error: {e}" | |
return final_response | |
def get_filename_from_url(url): | |
# Use os.path.basename() to extract the file name from the URL | |
file_name = os.path.basename(url) | |
return file_name | |
def get_text_response(user_message): | |
response = llm_chain.predict(user_message = user_message) | |
return response | |
def get_text_response_and_audio_response(user_message): | |
response = get_text_response(user_message) # Getting the reply from Open AI | |
audio_reply_for_question_response = get_audio_reply_for_question(response) | |
final_response = { | |
'output_file_path': '', | |
'message':'' | |
} | |
audio_url = audio_reply_for_question_response['audio_url'] | |
if audio_url: | |
output_file_path=get_filename_from_url(audio_url) | |
download_url_response = download_url(audio_url) | |
audio_content = download_url_response['content'] | |
if audio_content: | |
with open(output_file_path, "wb") as audio_file: | |
audio_file.write(audio_content) | |
final_response['output_file_path'] = output_file_path | |
else: | |
final_response['message'] = download_url_response['error'] | |
else: | |
final_response['message'] = audio_reply_for_question_response['message'] | |
return final_response | |
def chat_bot_response(message, history): | |
text_and_audio_response = get_text_response_and_audio_response(message) | |
output_file_path = text_and_audio_response['output_file_path'] | |
if output_file_path: | |
return (text_and_audio_response['output_file_path'],) | |
else: | |
return text_and_audio_response['message'] | |
demo = gr.ChatInterface(chat_bot_response,examples=["How are you doing?","What are your interests?","Which places do you like to visit?"]) | |
if __name__ == "__main__": | |
demo.launch() #To create a public link, set `share=True` in `launch()`. To enable errors and logs, set `debug=True` in `launch()`. | |