# Make sure to install the required packageschainlit and groq import os, time from openai import AsyncOpenAI import chainlit as cl import re import requests from io import BytesIO from chainlit.element import ElementBased from groq import Groq # Import threadpools to run the crawl_url function in a separate thread from concurrent.futures import ThreadPoolExecutor client = AsyncOpenAI(base_url="https://api.groq.com/openai/v1", api_key=os.getenv("GROQ_API_KEY")) # Instrument the OpenAI client cl.instrument_openai() settings = { "model": "llama3-8b-8192", "temperature": 0.5, "max_tokens": 500, "top_p": 1, "frequency_penalty": 0, "presence_penalty": 0, } def extract_urls(text): url_pattern = re.compile(r'(https?://\S+)') return url_pattern.findall(text) def crawl_url(url): data = { "urls": [url], "include_raw_html": True, "word_count_threshold": 10, "extraction_strategy": "NoExtractionStrategy", "chunking_strategy": "RegexChunking" } response = requests.post("https://crawl4ai.com/crawl", json=data) response_data = response.json() response_data = response_data['results'][0] return response_data['markdown'] @cl.on_chat_start async def on_chat_start(): cl.user_session.set("session", { "history": [], "context": {} }) await cl.Message( content="Welcome to the chat! How can I assist you today?" ).send() @cl.on_message async def on_message(message: cl.Message): user_session = cl.user_session.get("session") # Extract URLs from the user's message urls = extract_urls(message.content) futures = [] with ThreadPoolExecutor() as executor: for url in urls: futures.append(executor.submit(crawl_url, url)) results = [future.result() for future in futures] for url, result in zip(urls, results): ref_number = f"REF_{len(user_session['context']) + 1}" user_session["context"][ref_number] = { "url": url, "content": result } user_session["history"].append({ "role": "user", "content": message.content }) # Create a system message that includes the context context_messages = [ f'\n{data["content"]}\n' for ref, data in user_session["context"].items() ] if context_messages: system_message = { "role": "system", "content": ( "You are a helpful bot. Use the following context for answering questions. " "Refer to the sources using the REF number in square brackets, e.g., [1], only if the source is given in the appendices below.\n\n" "If the question requires any information from the provided appendices or context, refer to the sources. " "If not, there is no need to add a references section. " "At the end of your response, provide a reference section listing the URLs and their REF numbers only if sources from the appendices were used.\n\n" "\n\n".join(context_messages) ) } else: system_message = { "role": "system", "content": "You are a helpful assistant." } msg = cl.Message(content="") await msg.send() # Get response from the LLM stream = await client.chat.completions.create( messages=[ system_message, *user_session["history"] ], stream=True, **settings ) assistant_response = "" async for part in stream: if token := part.choices[0].delta.content: assistant_response += token await msg.stream_token(token) # Add assistant message to the history user_session["history"].append({ "role": "assistant", "content": assistant_response }) await msg.update() # Append the reference section to the assistant's response reference_section = "\n\nReferences:\n" for ref, data in user_session["context"].items(): reference_section += f"[{ref.split('_')[1]}]: {data['url']}\n" msg.content += reference_section await msg.update() @cl.on_audio_chunk async def on_audio_chunk(chunk: cl.AudioChunk): if chunk.isStart: buffer = BytesIO() # This is required for whisper to recognize the file type buffer.name = f"input_audio.{chunk.mimeType.split('/')[1]}" # Initialize the session for a new audio stream cl.user_session.set("audio_buffer", buffer) cl.user_session.set("audio_mime_type", chunk.mimeType) # Write the chunks to a buffer and transcribe the whole audio at the end cl.user_session.get("audio_buffer").write(chunk.data) pass @cl.step(type="tool") async def speech_to_text(audio_file): cli = Groq() response = await client.audio.transcriptions.create( model="whisper-large-v3", file=audio_file ) return response.text @cl.on_audio_end async def on_audio_end(elements: list[ElementBased]): # Get the audio buffer from the session audio_buffer: BytesIO = cl.user_session.get("audio_buffer") audio_buffer.seek(0) # Move the file pointer to the beginning audio_file = audio_buffer.read() audio_mime_type: str = cl.user_session.get("audio_mime_type") start_time = time.time() whisper_input = (audio_buffer.name, audio_file, audio_mime_type) transcription = await speech_to_text(whisper_input) end_time = time.time() print(f"Transcription took {end_time - start_time} seconds") user_msg = cl.Message( author="You", type="user_message", content=transcription ) await user_msg.send() await on_message(user_msg) if __name__ == "__main__": from chainlit.cli import run_chainlit run_chainlit(__file__)