Spaces:
Running
Running
# Make sure to install the required packageschainlit and groq | |
import os, time | |
from openai import AsyncOpenAI | |
import chainlit as cl | |
import re | |
import requests | |
from io import BytesIO | |
from chainlit.element import ElementBased | |
from groq import Groq | |
# Import threadpools to run the crawl_url function in a separate thread | |
from concurrent.futures import ThreadPoolExecutor | |
client = AsyncOpenAI(base_url="https://api.groq.com/openai/v1", api_key=os.getenv("GROQ_API_KEY")) | |
# Instrument the OpenAI client | |
cl.instrument_openai() | |
settings = { | |
"model": "llama3-8b-8192", | |
"temperature": 0.5, | |
"max_tokens": 500, | |
"top_p": 1, | |
"frequency_penalty": 0, | |
"presence_penalty": 0, | |
} | |
def extract_urls(text): | |
url_pattern = re.compile(r'(https?://\S+)') | |
return url_pattern.findall(text) | |
def crawl_url(url): | |
data = { | |
"urls": [url], | |
"include_raw_html": True, | |
"word_count_threshold": 10, | |
"extraction_strategy": "NoExtractionStrategy", | |
"chunking_strategy": "RegexChunking" | |
} | |
response = requests.post("https://crawl4ai.com/crawl", json=data) | |
response_data = response.json() | |
response_data = response_data['results'][0] | |
return response_data['markdown'] | |
async def on_chat_start(): | |
cl.user_session.set("session", { | |
"history": [], | |
"context": {} | |
}) | |
await cl.Message( | |
content="Welcome to the chat! How can I assist you today?" | |
).send() | |
async def on_message(message: cl.Message): | |
user_session = cl.user_session.get("session") | |
# Extract URLs from the user's message | |
urls = extract_urls(message.content) | |
futures = [] | |
with ThreadPoolExecutor() as executor: | |
for url in urls: | |
futures.append(executor.submit(crawl_url, url)) | |
results = [future.result() for future in futures] | |
for url, result in zip(urls, results): | |
ref_number = f"REF_{len(user_session['context']) + 1}" | |
user_session["context"][ref_number] = { | |
"url": url, | |
"content": result | |
} | |
user_session["history"].append({ | |
"role": "user", | |
"content": message.content | |
}) | |
# Create a system message that includes the context | |
context_messages = [ | |
f'<appendix ref="{ref}">\n{data["content"]}\n</appendix>' | |
for ref, data in user_session["context"].items() | |
] | |
if context_messages: | |
system_message = { | |
"role": "system", | |
"content": ( | |
"You are a helpful bot. Use the following context for answering questions. " | |
"Refer to the sources using the REF number in square brackets, e.g., [1], only if the source is given in the appendices below.\n\n" | |
"If the question requires any information from the provided appendices or context, refer to the sources. " | |
"If not, there is no need to add a references section. " | |
"At the end of your response, provide a reference section listing the URLs and their REF numbers only if sources from the appendices were used.\n\n" | |
"\n\n".join(context_messages) | |
) | |
} | |
else: | |
system_message = { | |
"role": "system", | |
"content": "You are a helpful assistant." | |
} | |
msg = cl.Message(content="") | |
await msg.send() | |
# Get response from the LLM | |
stream = await client.chat.completions.create( | |
messages=[ | |
system_message, | |
*user_session["history"] | |
], | |
stream=True, | |
**settings | |
) | |
assistant_response = "" | |
async for part in stream: | |
if token := part.choices[0].delta.content: | |
assistant_response += token | |
await msg.stream_token(token) | |
# Add assistant message to the history | |
user_session["history"].append({ | |
"role": "assistant", | |
"content": assistant_response | |
}) | |
await msg.update() | |
# Append the reference section to the assistant's response | |
reference_section = "\n\nReferences:\n" | |
for ref, data in user_session["context"].items(): | |
reference_section += f"[{ref.split('_')[1]}]: {data['url']}\n" | |
msg.content += reference_section | |
await msg.update() | |
async def on_audio_chunk(chunk: cl.AudioChunk): | |
if chunk.isStart: | |
buffer = BytesIO() | |
# This is required for whisper to recognize the file type | |
buffer.name = f"input_audio.{chunk.mimeType.split('/')[1]}" | |
# Initialize the session for a new audio stream | |
cl.user_session.set("audio_buffer", buffer) | |
cl.user_session.set("audio_mime_type", chunk.mimeType) | |
# Write the chunks to a buffer and transcribe the whole audio at the end | |
cl.user_session.get("audio_buffer").write(chunk.data) | |
pass | |
async def speech_to_text(audio_file): | |
cli = Groq() | |
response = await client.audio.transcriptions.create( | |
model="whisper-large-v3", file=audio_file | |
) | |
return response.text | |
async def on_audio_end(elements: list[ElementBased]): | |
# Get the audio buffer from the session | |
audio_buffer: BytesIO = cl.user_session.get("audio_buffer") | |
audio_buffer.seek(0) # Move the file pointer to the beginning | |
audio_file = audio_buffer.read() | |
audio_mime_type: str = cl.user_session.get("audio_mime_type") | |
start_time = time.time() | |
whisper_input = (audio_buffer.name, audio_file, audio_mime_type) | |
transcription = await speech_to_text(whisper_input) | |
end_time = time.time() | |
print(f"Transcription took {end_time - start_time} seconds") | |
user_msg = cl.Message( | |
author="You", | |
type="user_message", | |
content=transcription | |
) | |
await user_msg.send() | |
await on_message(user_msg) | |
if __name__ == "__main__": | |
from chainlit.cli import run_chainlit | |
run_chainlit(__file__) | |