Medical_Reports / app.py
farhananis005's picture
Update app.py
ba06781 verified
import os
import shutil
import openai
import docx
import base64
import gradio as gr
import assemblyai as aai
from langchain_community.document_loaders import PyPDFLoader
from langchain_community.document_loaders import PyPDFLoader
from langchain_community.document_loaders import DirectoryLoader
from langchain_community.document_loaders import TextLoader
from langchain_community.document_loaders import Docx2txtLoader
from langchain_community.vectorstores import FAISS
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.chains.question_answering import load_qa_chain
from langchain_community.callbacks.manager import get_openai_callback
from langchain.llms import OpenAI
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_core.prompts import ChatPromptTemplate
from pydantic import BaseModel, Field
from langchain import PromptTemplate, LLMChain
os.environ["TOKENIZERS_PARALLELISM"] = "false"
aai.settings.api_key = os.environ.get("AAPI_KEY")
openai.api_key = os.environ.get("OPENAI_API_KEY")
embeddings = OpenAIEmbeddings()
client = OpenAI()
upload_dir="/home/user/app/file/"
upload_files_vector_db="/home/user/app/file_db/"
report_vector_db="/home/user/app/local_db/"
soap_dir="/home/user/app/soap_docs/"
sbar_dir="/home/user/app/sbar_docs/"
temp_reports_dir="/home/user/app/temp_reports/"
temp_vector_db="/home/user/app/temp_db/"
directories = [
upload_dir,
upload_files_vector_db,
report_vector_db,
soap_dir,
sbar_dir,
temp_reports_dir,
temp_vector_db
]
# Create each directory if it doesn't already exist
for directory in directories:
if not os.path.exists(directory):
os.makedirs(directory)
print(f"Created directory: {directory}")
else:
print(f"Directory already exists: {directory}")
llm = ChatOpenAI(model="gpt-4o-mini")
embedding_model = OpenAIEmbeddings()
# report_db = FAISS.load_local(report_vector_db, embeddings=embedding_model, allow_dangerous_deserialization=True)
qa_chain = load_qa_chain(ChatOpenAI(), chain_type="stuff")
"""# Page 1"""
def save_file(input_file):
os.makedirs(upload_dir, exist_ok=True)
for file in input_file:
shutil.copy(file.name, upload_dir)
return "File(s) saved successfully!"
def vectorise(input_dir, output_dir):
loader1 = DirectoryLoader(input_dir, glob="./*.pdf", loader_cls=PyPDFLoader)
document1 = loader1.load()
loader2 = DirectoryLoader(input_dir, glob="./*.txt", loader_cls=TextLoader)
document2 = loader2.load()
loader3 = DirectoryLoader(input_dir, glob="./*.docx", loader_cls=Docx2txtLoader)
document3 = loader3.load()
document1.extend(document2)
document1.extend(document3)
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200,
length_function=len)
docs = text_splitter.split_documents(document1)
file_db = FAISS.from_documents(docs, embeddings)
file_db.save_local(output_dir)
return "File(s) processed successfully!"
def merge_vectors(vectorDB_path):
docs_db1 = FAISS.load_local(report_vector_db, embeddings,allow_dangerous_deserialization=True)
docs_db2 = FAISS.load_local(vectorDB_path, embeddings,allow_dangerous_deserialization=True)
docs_db2.merge_from(docs_db1)
docs_db2.save_local(report_vector_db)
def formatted_response(docs, response):
formatted_output = response + "\n\nSources"
for i, doc in enumerate(docs):
source_info = doc.metadata.get('source', 'Unknown source')
page_info = doc.metadata.get('page', None)
file_name = source_info.split('/')[-1].strip()
if page_info is not None:
formatted_output += f"\n{file_name}\tpage no {page_info}"
else:
formatted_output += f"\n{file_name}"
return formatted_output
class AI_Medical_Report(BaseModel):
patient_name: str = Field(
...,
description="The full name of the patient if provided in the context. Otherwise Unknown"
)
soap_report: str = Field(
...,
description="""SOAP reports are a structured way to document patient interactions in healthcare:
Subjective: Patient’s own description of symptoms and concerns.
Objective: Factual, measurable data like exam results and vital signs.
Assessment: The healthcare provider’s diagnosis or clinical impression.
Plan: Recommended next steps, treatments, or follow-up actions."""
)
sbar_report: str = Field(
...,
description="""SBAR reports are a structured communication tool in healthcare to convey critical information efficiently:
Situation: Briefly state the current issue or reason for the communication.
Background: Provide context, such as patient history or relevant background info.
Assessment: Share your professional assessment of the problem.
Recommendation: Suggest actions or what you need from the listener."""
)
recommendations_for_doc: str = Field(
...,
description="provide 3 recommendations for the doctor like further questions to ask the patient, follow-up tests etc."
)
def assemblyai_STT(audio_url: str) -> str:
"""
Transcribes an audio file with speaker labels and returns a formatted string.
Parameters:
audio_url (str): URL or path to the audio file to be transcribed.
Returns:
str: A formatted string with each speaker's label and their corresponding text.
"""
# Configure transcription with speaker labels enabled
config = aai.TranscriptionConfig(speaker_labels=True)
# Perform transcription
transcript = aai.Transcriber().transcribe(audio_url, config)
# Format each utterance into a single string with speaker labels
transcription_output = "\n".join(
f"Speaker {utterance.speaker}: {utterance.text}" for utterance in transcript.utterances
)
return transcription_output
def openai_STT(audio_url: str) -> str:
from openai import OpenAI
client = OpenAI()
audio = open(audio_url, "rb")
transcript = client.audio.transcriptions.create(
model="whisper-1",
file=audio,
response_format="text"
)
output = transcript
return output
def generate_report(input_text: str = None, file_path: str = None) -> AI_Medical_Report:
"""
Generates a SOAP report from text or audio input using OpenAI's GPT-4 model.
Args:
client (OpenAI): Initialized OpenAI client.
input_text (str, optional): Text input containing the patient case study.
file_path (str, optional): Path to the audio file. Defaults to None.
model (str): Model name to use for generating the report. Defaults to "gpt-4o-audio-preview".
Returns:
SOAPExtraction: Parsed SOAP information including patient name, subjective, objective, assessment, plan, and doctor recommendations.
"""
from openai import OpenAI
client = OpenAI()
try:
# Prepare message content based on input type
messages = [{"role": "system", "content": (
"You are an AI medical assistant designed to help doctors. Your job is to convert the patient information into SOAP and SBAR reports. in the given JSON format"
)}]
if input_text:
# Text-based input
messages.append({"role": "user", "content": input_text})
model="gpt-4o"
elif file_path:
# Audio-based input: load and encode the audio file
model="gpt-4o-audio-preview-2024-10-01"
with open(file_path, "rb") as audio_file:
wav_data = audio_file.read()
encoded_string = base64.b64encode(wav_data).decode('utf-8')
messages.append({
"role": "user",
"content": [
{
"type": "text",
"text": "Please generate Medical reports based on the following audio input"
},
{
"type": "input_audio",
"input_audio": {
"data": encoded_string,
"format": "wav"
}
}
]
})
else:
raise ValueError("Either input_text or file_path must be provided.")
# Create completion request
completion = client.beta.chat.completions.parse(
model=model,
modalities=["text"],
messages=messages,
response_format=AI_Medical_Report
)
# Retrieve structured SOAP report
report = completion.choices[0].message.parsed
return report
except Exception as e:
print(f"An error occurred: {e}")
return None
# wrapper function for audio
def report_audio(audio_file: str = None, transcription_service: str = "OpenAI"):
return report_main(audio_file=audio_file,transcription_service=transcription_service)
# driver function for making reports
def report_main(input_text: str = None, audio_file: str = None, transcription_service: str = "OpenAI"):
"""
Generates a SOAP and SBAR report based on user input, either from text or audio.
Args:
input_text (str, optional): Text input from the user.
audio_file (str, optional): Path to the audio file (if provided).
transcription_service (str): Selected transcription service ("AssemblyAI" or "OpenAI").
Returns:
tuple: Contains patient_name, SOAP Report, SBAR_Report,
doctor_recommendations, and transcription_text (if audio input was used).
"""
from openai import OpenAI
client = OpenAI() # Initialize OpenAI client
# Initialize empty strings for the SOAP report components
patient_name = ""
soap_report=""
sbar_report = ""
doctor_recommendations = ""
transcription_text = ""
# Process input based on provided input_text or audio_file
if input_text:
# Generate SOAP report from text input
report = generate_report(input_text=input_text)
elif audio_file:
# Use selected transcription service for audio input
if transcription_service == "AssemblyAI":
transcription_text += assemblyai_STT(audio_file)
report = generate_report(input_text=transcription_text)
# print(transcription_text)
elif transcription_service == "OpenAI":
transcription_text += openai_STT(audio_file)
report = generate_report(input_text=transcription_text)
# print(transcription_text)
else:
raise ValueError("Invalid transcription service specified. Choose 'AssemblyAI' or 'OpenAI'.")
print(report)
# Assign values from the generated report
else:
raise ValueError("Either input_text or audio_file must be provided.")
patient_name = report.patient_name
soap_report = report.soap_report
sbar_report = report.sbar_report
doctor_recommendations = report.recommendations_for_doc
# Return structured output in a tuple
if audio_file:
return patient_name, soap_report, sbar_report, doctor_recommendations, transcription_text
else:
return patient_name, soap_report, sbar_report, doctor_recommendations
def delete_dir(dir):
try:
shutil.rmtree(dir)
return "Deleted Successfully"
except:
return "Already Deleted"
def save_reports(file_name, file_content, report_type ,destination_folder):
# Ensure the destination folder exists
if not os.path.exists(destination_folder):
os.makedirs(destination_folder)
# Define the path for the .docx file in the destination folder
destination_path = os.path.join(destination_folder, f"{report_type}_{file_name}.docx")
# Create a new document and add the SOAP response text
doc = docx.Document()
doc.add_paragraph(file_content)
# Save the document to the specified destination folder
doc.save(destination_path)
# Define and create the path for the temp folder
if not os.path.exists(temp_reports_dir):
os.makedirs(temp_reports_dir)
# Define the path for the temp copy
temp_path = os.path.join(temp_reports_dir, f"{report_type}_{file_name}.docx")
# Save a copy of the document in the temp folder
doc.save(temp_path)
return f"Successfully saved"
# driver function for save
def save_reports_main(file_name, soap_report_content, sbar_report_content):
# Save SOAP report
soap_result = save_reports(file_name, soap_report_content, "SOAP", soap_dir)
print(soap_result)
# Save SBAR report
sbar_result = save_reports(file_name, sbar_report_content, "SBAR", sbar_dir)
print(sbar_result)
# Vectorize the reports in the temporary directory
vectorise(temp_reports_dir, temp_vector_db)
# Check if report_vector_db is empty
if not os.listdir(report_vector_db): # If report_vector_db is empty
# Copy all contents from temp_vector_db to report_vector_db
for item in os.listdir(temp_vector_db):
source_path = os.path.join(temp_vector_db, item)
destination_path = os.path.join(report_vector_db, item)
if os.path.isdir(source_path):
shutil.copytree(source_path, destination_path)
else:
shutil.copy2(source_path, destination_path)
print("Copied contents from temp_vector_db to report_vector_db.")
else:
# Call merge_vectors to merge temp_vector_db into report_vector_db
merge_vectors(temp_vector_db)
print("Merged temp_vector_db into report_vector_db.")
# Clean up by deleting the temporary directories
delete_dir(temp_reports_dir)
delete_dir(temp_vector_db)
print("Deleted temporary directories.")
return "Reports saved successfully!"
"""#Page 2"""
def refresh_files(docs_dir):
if not os.path.exists(docs_dir):
os.makedirs(docs_dir)
file_list = []
for root, dirs, files in os.walk(docs_dir):
for file in files:
file_list.append(file)
return gr.Dropdown(choices=file_list, interactive=True)
def soap_refresh():
return refresh_files(soap_dir)
def sbar_refresh():
return refresh_files(sbar_dir)
def get_content(docs_dir, selected_file_name):
docx_path = os.path.join(docs_dir, selected_file_name)
# Check if the file exists and has a .docx extension
if not os.path.isfile(docx_path) or not docx_path.endswith('.docx'):
raise FileNotFoundError(f"File {selected_file_name} not found in {docs_dir} or is not a .docx file.")
try:
# Open and read the document
doc = docx.Document(docx_path)
paragraphs = [paragraph.text for paragraph in doc.paragraphs if paragraph.text]
return "\n\n".join(paragraphs) # Join paragraphs with double newlines for readability
except Exception as e:
raise IOError(f"An error occurred while reading the document: {e}")
def get_soap_report_content(selected_file_name):
return get_content(soap_dir, selected_file_name)
def get_sbar_report_content(selected_file):
return get_content(sbar_dir, selected_file)
# Updated generate_response function
def generate_response(message, history, soap_content):
from openai import OpenAI
client = OpenAI()
# Format history as expected by OpenAI's API
formatted_history = [{"role": "system", "content": "This conversation is based on the following SOAP report content:\n" + soap_content}]
for interaction in history:
if len(interaction) == 2:
user, assistant = interaction
formatted_history.append({"role": "user", "content": user})
formatted_history.append({"role": "assistant", "content": assistant})
# Add the latest user message to the formatted history
formatted_history.append({"role": "user", "content": message})
# Generate the assistant's response with streaming enabled
response = client.chat.completions.create(
model='gpt-4o-mini',
messages=formatted_history,
stream=True
)
partial_message = ""
for chunk in response:
if chunk.choices[0].delta.content is not None:
partial_message += chunk.choices[0].delta.content
yield partial_message # Yield each chunk as it comes
# Updated handle_chat_message function
def handle_chat_message(history, message, soap_content):
response_generator = generate_response(message, history, soap_content)
new_history = history + [[message, ""]] # Initialize with an empty assistant response
for partial_response in response_generator:
new_history[-1][1] = partial_response # Update assistant's response in history
yield new_history, "" # Stream the updated history and clear the text box
def ask_reports(docs_dir, doc_name, question):
# Construct the path to the docx file
docx_path = os.path.join(docs_dir, doc_name)
# Read and extract text from the .docx file
doc = docx.Document(docx_path)
extracted_text = f"You are provided with a medical report of a patient {doc_name}.\n\n"
text = ""
for paragraph in doc.paragraphs:
text += paragraph.text + "\n"
# Append the question to extracted text
extracted_text = extracted_text+text+"\n\nUse the report to answer the following question:\n" + question
if not text:
return "Failed to retrieve text from document."
from openai import OpenAI
client = OpenAI()
# Prepare the messages for the chat completion request
messages = [
{"role": "system", "content": "You are a helpful assistant with medical expertise."},
{"role": "user", "content": extracted_text}
]
# Use the ChatCompletion API to get a response
try:
completion = client.chat.completions.create(
model="gpt-4o",
messages=messages,
)
answer = completion.choices[0].message
except Exception as e:
return f"An error occurred: {e}"
return answer
def ask_soap(selected_file, question):
# Logic to answer the question based on the selected SOAP file
return f"Answer to '{question}' based on {selected_file}"
def ask_sbar(selected_file, question):
# Logic to answer the question based on the selected SBAR file
return f"Answer to '{question}' based on {selected_file}"
"""# page 3"""
def local_search(question):
embeddings = OpenAIEmbeddings()
file_db = FAISS.load_local(report_vector_db, embeddings, allow_dangerous_deserialization=True)
docs = file_db.similarity_search(question)
chain = load_qa_chain(llm, chain_type="stuff")
with get_openai_callback() as cb:
response = chain.run(input_documents=docs, question=question)
print(cb)
return formatted_response_local(docs, response)
def formatted_response_local(docs, response):
formatted_output = response + "\n\nSources"
for i, doc in enumerate(docs):
source_info = doc.metadata.get('source', 'Unknown source')
page_info = doc.metadata.get('page', None)
file_name = source_info.split('/')[-1].strip()
if page_info is not None:
formatted_output += f"\n{file_name}\tpage no {page_info}"
else:
formatted_output += f"\n{file_name}"
return formatted_output
def local_gpt(question):
template = """Question: {question}
Answer: Let's think step by step."""
prompt = PromptTemplate(template=template, input_variables=["question"])
llm_chain = LLMChain(prompt=prompt, llm=llm)
response = llm_chain.run(question)
return response
"""# Page 4"""
def save2_docs(docs):
import shutil
import os
output_dir=upload_dir
if os.path.exists(output_dir):
shutil.rmtree(output_dir)
if not os.path.exists(output_dir):
os.makedirs(output_dir)
for doc in docs:
shutil.copy(doc.name, output_dir)
return "Successful!"
global agent2
def create2_agent():
from langchain.chat_models import ChatOpenAI
from langchain.chains.conversation.memory import ConversationSummaryBufferMemory
from langchain.chains import ConversationChain
global agent2
llm = ChatOpenAI(model_name='gpt-4o-mini')
memory = ConversationSummaryBufferMemory(llm=llm, max_token_limit=1500)
agent2 = ConversationChain(llm=llm, memory=memory, verbose=True)
return "Successful!"
def search2_docs(prompt, question, state):
from langchain.embeddings.openai import OpenAIEmbeddings
from langchain.vectorstores import FAISS
from langchain.callbacks import get_openai_callback
global agent2
agent2 = agent2
state = state or []
embeddings = OpenAIEmbeddings()
docs_db = FAISS.load_local(upload_files_vector_db, embeddings, allow_dangerous_deserialization=True)
docs = docs_db.similarity_search(question)
prompt += "\n\n"
prompt += question
prompt += "\n\n"
prompt += str(docs)
with get_openai_callback() as cb:
response = agent2.predict(input=prompt)
print(cb)
return formatted_response(docs, question, response, state)
def delete2_docs():
import shutil
path1 = upload_dir
path2 = upload_files_vector_db
try:
shutil.rmtree(path1)
shutil.rmtree(path2)
return "Deleted Successfully"
except:
return "Already Deleted"
def process2_docs():
from langchain.document_loaders import PyPDFLoader
from langchain.document_loaders import DirectoryLoader
from langchain.document_loaders import TextLoader
from langchain.document_loaders import Docx2txtLoader
from langchain.document_loaders.csv_loader import CSVLoader
from langchain.document_loaders import UnstructuredExcelLoader
from langchain.vectorstores import FAISS
from langchain.embeddings.openai import OpenAIEmbeddings
from langchain.text_splitter import RecursiveCharacterTextSplitter
loader1 = DirectoryLoader(upload_dir, glob="./*.pdf", loader_cls=PyPDFLoader)
document1 = loader1.load()
loader2 = DirectoryLoader(upload_dir, glob="./*.txt", loader_cls=TextLoader)
document2 = loader2.load()
loader3 = DirectoryLoader(upload_dir, glob="./*.docx", loader_cls=Docx2txtLoader)
document3 = loader3.load()
loader4 = DirectoryLoader(upload_dir, glob="./*.csv", loader_cls=CSVLoader)
document4 = loader4.load()
loader5 = DirectoryLoader(upload_dir, glob="./*.xlsx", loader_cls=UnstructuredExcelLoader)
document5 = loader5.load()
document1.extend(document2)
document1.extend(document3)
document1.extend(document4)
document1.extend(document5)
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200,
length_function=len
)
docs = text_splitter.split_documents(document1)
embeddings = OpenAIEmbeddings()
docs_db = FAISS.from_documents(docs, embeddings)
docs_db.save_local(upload_files_vector_db)
return "Successful!"
def formatted_response(docs, question, response, state):
formatted_output = response + "\n\nSources"
for i, doc in enumerate(docs):
source_info = doc.metadata.get('source', 'Unknown source')
page_info = doc.metadata.get('page', None)
doc_name = source_info.split('/')[-1].strip()
if page_info is not None:
formatted_output += f"\n{doc_name}\tpage no {page_info}"
else:
formatted_output += f"\n{doc_name}"
state.append((question, formatted_output))
return state, state
"""# UI"""
import gradio as gr
css = """
.col {
max-width: 70%;
margin: 0 auto;
display: flex;
flex-direction: column;
justify-content: center;
align-items: center;
}
"""
# Define the Gradio interface
with gr.Blocks(css=css) as demo:
gr.Markdown("## <center>Medical App</center>")
# Page 1----------------------------------------------------------------------
with gr.Tab("SOAP and SBAR Note Creation"):
# Tab for generating from audio
with gr.Tab("From Audio"):
with gr.Row():
with gr.Column():
audio_file = gr.Audio(label="Audio Input", type="filepath")
with gr.Column():
transcription_service = gr.Dropdown(label="Select Transcription Service", choices=["OpenAI", "AssemblyAI"], value="OpenAI")
gr.Markdown("<small>Upload an audio file or select a transcription service.</small>")
generate_with_audio_button = gr.Button("Generate Report", variant="primary")
# Shared output containers
patient_name_box_text = gr.Textbox(label="Patient Name", interactive=True, placeholder="Generated Patient Name", lines=1)
with gr.Row():
with gr.Column():
soap_report_box_text = gr.Textbox(label="SOAP Report", interactive=True, placeholder="Generated SOAP Report", lines=10)
with gr.Column():
sbar_report_box_text = gr.Textbox(label="SBAR Report", interactive=True, placeholder="Generated SBAR Report", lines=10)
audio_doctor_recommendations_box = gr.Textbox(label="Doctor Recommendations", interactive=False, placeholder="Recommendations", lines=5)
audio_transcription_box = gr.Textbox(label="Transcription Text", interactive=False, placeholder="Transcribed Text", lines=5)
# Click event for audio
generate_with_audio_button.click(
fn=report_audio,
inputs=[audio_file, transcription_service],
outputs=[
patient_name_box_text,
soap_report_box_text,
sbar_report_box_text,
audio_doctor_recommendations_box,
audio_transcription_box
]
)
# Add Save Report Button
with gr.Row():
save_button = gr.Button("Save Report", variant="secondary")
save_message = gr.Textbox(label="Save Status", interactive=False, placeholder="Status of the save operation", lines=1)
# Click event for Save Report Button using `patient_name_box_text` as the file name
save_button.click(
fn=save_reports_main,
inputs=[patient_name_box_text, soap_report_box_text, sbar_report_box_text],
outputs=[save_message]
)
# Tab for generating from text input
with gr.Tab("From Transcript"):
with gr.Column():
input_text = gr.Textbox(label="Patient Case Study (Text Input)", placeholder="Enter the patient case study here...", lines=7)
gr.Markdown("<small>Enter the patient's details, symptoms, and any relevant information.</small>")
generate_with_text_button = gr.Button("Generate Report", variant="primary")
# Shared output containers for this tab
patient_name_box_text = gr.Textbox(label="Patient Name", interactive=True, placeholder="Generated Patient Name", lines=1)
with gr.Row():
with gr.Column():
soap_report_box_text = gr.Textbox(label="SOAP Report", interactive=True, placeholder="Generated SOAP Report", lines=10)
with gr.Column():
sbar_report_box_text = gr.Textbox(label="SBAR Report", interactive=True, placeholder="Generated SBAR Report", lines=10)
doctor_recommendations_box_text = gr.Textbox(label="Doctor Recommendations", interactive=False, placeholder="Recommendations", lines=5)
# Click event for text
generate_with_text_button.click(
fn=report_main,
inputs=[input_text],
outputs=[
patient_name_box_text,
soap_report_box_text,
sbar_report_box_text,
doctor_recommendations_box_text
]
)
# Add Save Report Button
with gr.Row():
save_button = gr.Button("Save Report", variant="secondary")
save_message = gr.Textbox(label="Save Status", interactive=False, placeholder="Status of the save operation", lines=1)
# Click event for Save Report Button using `patient_name_box_text` as the file name
save_button.click(
fn=save_reports_main,
inputs=[patient_name_box_text, soap_report_box_text, sbar_report_box_text],
outputs=[save_message]
)
# Page 2----------------------------------------------------------------------
####|
with gr.Tab("SOAP and SBAR Queries"):
with gr.Tab("Query SOAP Reports"):
with gr.Row():
with gr.Column():
soap_refresh_button = gr.Button("Refresh")
ask_soap_input = gr.Dropdown(label="Choose File")
soap_content_display = gr.Textbox(
label="SOAP Report Content", interactive=False, placeholder="Report content will appear here...", lines=5
)
with gr.Column():
# Chatbot for Q&A
soap_chatbot = gr.Chatbot(label="SOAP Chatbot")
soap_chat_input = gr.Textbox(placeholder="Enter your question here...", submit_btn=True)
audio_file = gr.Audio(label="Audio Input", type="filepath",sources="microphone")
submit_audio_btn = gr.Button("Submit Audio")
clear = gr.ClearButton([soap_chat_input, soap_chatbot,audio_file])
# Refresh button for SOAP file dropdown
soap_refresh_button.click(fn=soap_refresh, inputs=None, outputs=ask_soap_input)
# Display selected SOAP report content
ask_soap_input.change(fn=get_soap_report_content, inputs=ask_soap_input, outputs=soap_content_display)
submit_audio_btn.click(openai_STT, inputs=audio_file, outputs=soap_chat_input)
# Handle chatbot input submission with streaming response
soap_chat_input.submit(
handle_chat_message,
inputs=[soap_chatbot, soap_chat_input, soap_content_display],
outputs=[soap_chatbot, soap_chat_input]
)
# Query SBAR Reports Tab
with gr.Tab("Query SBAR Reports"):
with gr.Row():
with gr.Column():
sbar_refresh_button = gr.Button("Refresh")
ask_sbar_input = gr.Dropdown(label="Choose File")
sbar_content_display = gr.Textbox(
label="SBAR Report Content", interactive=False, placeholder="Report content will appear here...", lines=5
)
with gr.Column():
# Chatbot for SBAR Q&A
sbar_chatbot = gr.Chatbot(label="SBAR Chatbot")
sbar_chat_input = gr.Textbox(placeholder="Enter your question here...",submit_btn=True)
audio_file = gr.Audio(label="Audio Input", type="filepath",sources="microphone")
submit_audio_btn = gr.Button("Submit Audio")
clear_sbar = gr.ClearButton([sbar_chat_input, sbar_chatbot,audio_file])
# Refresh button for SBAR file dropdown
sbar_refresh_button.click(fn=sbar_refresh, inputs=None, outputs=ask_sbar_input)
# Display selected SBAR report content
ask_sbar_input.change(fn=get_sbar_report_content, inputs=ask_sbar_input, outputs=sbar_content_display)
submit_audio_btn.click(openai_STT, inputs=audio_file, outputs=sbar_chat_input)
# Handle chatbot input submission with streaming response
sbar_chat_input.submit(
handle_chat_message,
inputs=[sbar_chatbot, sbar_chat_input, sbar_content_display], # Pass the SBAR content
outputs=[sbar_chatbot, sbar_chat_input]
)
# Page 3----------------------------------------------------------------------
####|Chatbot to query all SOAP and SBAR reports (RAG). Chatbot can ask OpenAI for answers directly
with gr.Tab("All Queries"):
with gr.Column(elem_classes="col"):
local_search_input = gr.Textbox(label="Enter Question here")
local_search_button = gr.Button("Search")
local_search_output = gr.Textbox(label="Output")
local_gpt_button = gr.Button("Ask ChatGPT")
local_gpt_output = gr.Textbox(label="Output")
local_search_button.click(local_search, inputs=local_search_input, outputs=local_search_output)
local_gpt_button.click(local_gpt, inputs=local_search_input, outputs=local_gpt_output)
# Page 4----------------------------------------------------------------------
####|
with gr.Tab("Documents Queries"):
with gr.Column(elem_classes="col"):
with gr.Tab("Upload and Process Documents"):
with gr.Column():
docs2_upload_input = gr.Files(label="Upload File(s)")
docs2_upload_button = gr.Button("Upload")
docs2_upload_output = gr.Textbox(label="Output")
docs2_process_button = gr.Button("Process")
docs2_process_output = gr.Textbox(label="Output")
create2_agent_button = gr.Button("Create Agent")
create2_agent_output = gr.Textbox(label="Output")
gr.ClearButton([docs2_upload_input, docs2_upload_output, docs2_process_output, create2_agent_output])
docs2_upload_button.click(save2_docs, inputs=docs2_upload_input, outputs=docs2_upload_output)
docs2_process_button.click(process2_docs, inputs=None, outputs=docs2_process_output)
create2_agent_button.click(create2_agent, inputs=None, outputs=create2_agent_output)
with gr.Tab("Query Documents"):
with gr.Column():
docs2_prompt_input = gr.Textbox(label="Custom Prompt")
docs2_chatbot = gr.Chatbot(label="Chats")
docs2_state = gr.State()
docs2_search_input = gr.Textbox(label="Enter Question")
docs2_search_button = gr.Button("Search")
docs2_delete_button = gr.Button("Delete")
docs2_delete_output = gr.Textbox(label="Output")
gr.ClearButton([docs2_prompt_input, docs2_search_input, docs2_delete_output])
docs2_search_button.click(search2_docs, inputs=[docs2_prompt_input, docs2_search_input, docs2_state], outputs=[docs2_chatbot, docs2_state])
docs2_delete_button.click(delete2_docs, inputs=None, outputs=docs2_delete_output)
demo.launch(debug=True)