Spaces:
Sleeping
Sleeping
import os | |
import shutil | |
import openai | |
import docx | |
import base64 | |
import gradio as gr | |
import assemblyai as aai | |
from langchain_community.document_loaders import PyPDFLoader | |
from langchain_community.document_loaders import PyPDFLoader | |
from langchain_community.document_loaders import DirectoryLoader | |
from langchain_community.document_loaders import TextLoader | |
from langchain_community.document_loaders import Docx2txtLoader | |
from langchain_community.vectorstores import FAISS | |
from langchain.text_splitter import RecursiveCharacterTextSplitter | |
from langchain.chains.question_answering import load_qa_chain | |
from langchain_community.callbacks.manager import get_openai_callback | |
from langchain.llms import OpenAI | |
from langchain_openai import ChatOpenAI, OpenAIEmbeddings | |
from langchain_core.prompts import ChatPromptTemplate | |
from pydantic import BaseModel, Field | |
from langchain import PromptTemplate, LLMChain | |
os.environ["TOKENIZERS_PARALLELISM"] = "false" | |
aai.settings.api_key = os.environ.get("AAPI_KEY") | |
openai.api_key = os.environ.get("OPENAI_API_KEY") | |
embeddings = OpenAIEmbeddings() | |
client = OpenAI() | |
upload_dir="/home/user/app/file/" | |
upload_files_vector_db="/home/user/app/file_db/" | |
report_vector_db="/home/user/app/local_db/" | |
soap_dir="/home/user/app/soap_docs/" | |
sbar_dir="/home/user/app/sbar_docs/" | |
temp_reports_dir="/home/user/app/temp_reports/" | |
temp_vector_db="/home/user/app/temp_db/" | |
directories = [ | |
upload_dir, | |
upload_files_vector_db, | |
report_vector_db, | |
soap_dir, | |
sbar_dir, | |
temp_reports_dir, | |
temp_vector_db | |
] | |
# Create each directory if it doesn't already exist | |
for directory in directories: | |
if not os.path.exists(directory): | |
os.makedirs(directory) | |
print(f"Created directory: {directory}") | |
else: | |
print(f"Directory already exists: {directory}") | |
llm = ChatOpenAI(model="gpt-4o-mini") | |
embedding_model = OpenAIEmbeddings() | |
# report_db = FAISS.load_local(report_vector_db, embeddings=embedding_model, allow_dangerous_deserialization=True) | |
qa_chain = load_qa_chain(ChatOpenAI(), chain_type="stuff") | |
"""# Page 1""" | |
def save_file(input_file): | |
os.makedirs(upload_dir, exist_ok=True) | |
for file in input_file: | |
shutil.copy(file.name, upload_dir) | |
return "File(s) saved successfully!" | |
def vectorise(input_dir, output_dir): | |
loader1 = DirectoryLoader(input_dir, glob="./*.pdf", loader_cls=PyPDFLoader) | |
document1 = loader1.load() | |
loader2 = DirectoryLoader(input_dir, glob="./*.txt", loader_cls=TextLoader) | |
document2 = loader2.load() | |
loader3 = DirectoryLoader(input_dir, glob="./*.docx", loader_cls=Docx2txtLoader) | |
document3 = loader3.load() | |
document1.extend(document2) | |
document1.extend(document3) | |
text_splitter = RecursiveCharacterTextSplitter( | |
chunk_size=1000, | |
chunk_overlap=200, | |
length_function=len) | |
docs = text_splitter.split_documents(document1) | |
file_db = FAISS.from_documents(docs, embeddings) | |
file_db.save_local(output_dir) | |
return "File(s) processed successfully!" | |
def merge_vectors(vectorDB_path): | |
docs_db1 = FAISS.load_local(report_vector_db, embeddings,allow_dangerous_deserialization=True) | |
docs_db2 = FAISS.load_local(vectorDB_path, embeddings,allow_dangerous_deserialization=True) | |
docs_db2.merge_from(docs_db1) | |
docs_db2.save_local(report_vector_db) | |
def formatted_response(docs, response): | |
formatted_output = response + "\n\nSources" | |
for i, doc in enumerate(docs): | |
source_info = doc.metadata.get('source', 'Unknown source') | |
page_info = doc.metadata.get('page', None) | |
file_name = source_info.split('/')[-1].strip() | |
if page_info is not None: | |
formatted_output += f"\n{file_name}\tpage no {page_info}" | |
else: | |
formatted_output += f"\n{file_name}" | |
return formatted_output | |
class AI_Medical_Report(BaseModel): | |
patient_name: str = Field( | |
..., | |
description="The full name of the patient if provided in the context. Otherwise Unknown" | |
) | |
soap_report: str = Field( | |
..., | |
description="""SOAP reports are a structured way to document patient interactions in healthcare: | |
Subjective: Patient’s own description of symptoms and concerns. | |
Objective: Factual, measurable data like exam results and vital signs. | |
Assessment: The healthcare provider’s diagnosis or clinical impression. | |
Plan: Recommended next steps, treatments, or follow-up actions.""" | |
) | |
sbar_report: str = Field( | |
..., | |
description="""SBAR reports are a structured communication tool in healthcare to convey critical information efficiently: | |
Situation: Briefly state the current issue or reason for the communication. | |
Background: Provide context, such as patient history or relevant background info. | |
Assessment: Share your professional assessment of the problem. | |
Recommendation: Suggest actions or what you need from the listener.""" | |
) | |
recommendations_for_doc: str = Field( | |
..., | |
description="provide 3 recommendations for the doctor like further questions to ask the patient, follow-up tests etc." | |
) | |
def assemblyai_STT(audio_url: str) -> str: | |
""" | |
Transcribes an audio file with speaker labels and returns a formatted string. | |
Parameters: | |
audio_url (str): URL or path to the audio file to be transcribed. | |
Returns: | |
str: A formatted string with each speaker's label and their corresponding text. | |
""" | |
# Configure transcription with speaker labels enabled | |
config = aai.TranscriptionConfig(speaker_labels=True) | |
# Perform transcription | |
transcript = aai.Transcriber().transcribe(audio_url, config) | |
# Format each utterance into a single string with speaker labels | |
transcription_output = "\n".join( | |
f"Speaker {utterance.speaker}: {utterance.text}" for utterance in transcript.utterances | |
) | |
return transcription_output | |
def openai_STT(audio_url: str) -> str: | |
from openai import OpenAI | |
client = OpenAI() | |
audio = open(audio_url, "rb") | |
transcript = client.audio.transcriptions.create( | |
model="whisper-1", | |
file=audio, | |
response_format="text" | |
) | |
output = transcript | |
return output | |
def generate_report(input_text: str = None, file_path: str = None) -> AI_Medical_Report: | |
""" | |
Generates a SOAP report from text or audio input using OpenAI's GPT-4 model. | |
Args: | |
client (OpenAI): Initialized OpenAI client. | |
input_text (str, optional): Text input containing the patient case study. | |
file_path (str, optional): Path to the audio file. Defaults to None. | |
model (str): Model name to use for generating the report. Defaults to "gpt-4o-audio-preview". | |
Returns: | |
SOAPExtraction: Parsed SOAP information including patient name, subjective, objective, assessment, plan, and doctor recommendations. | |
""" | |
from openai import OpenAI | |
client = OpenAI() | |
try: | |
# Prepare message content based on input type | |
messages = [{"role": "system", "content": ( | |
"You are an AI medical assistant designed to help doctors. Your job is to convert the patient information into SOAP and SBAR reports. in the given JSON format" | |
)}] | |
if input_text: | |
# Text-based input | |
messages.append({"role": "user", "content": input_text}) | |
model="gpt-4o" | |
elif file_path: | |
# Audio-based input: load and encode the audio file | |
model="gpt-4o-audio-preview-2024-10-01" | |
with open(file_path, "rb") as audio_file: | |
wav_data = audio_file.read() | |
encoded_string = base64.b64encode(wav_data).decode('utf-8') | |
messages.append({ | |
"role": "user", | |
"content": [ | |
{ | |
"type": "text", | |
"text": "Please generate Medical reports based on the following audio input" | |
}, | |
{ | |
"type": "input_audio", | |
"input_audio": { | |
"data": encoded_string, | |
"format": "wav" | |
} | |
} | |
] | |
}) | |
else: | |
raise ValueError("Either input_text or file_path must be provided.") | |
# Create completion request | |
completion = client.beta.chat.completions.parse( | |
model=model, | |
modalities=["text"], | |
messages=messages, | |
response_format=AI_Medical_Report | |
) | |
# Retrieve structured SOAP report | |
report = completion.choices[0].message.parsed | |
return report | |
except Exception as e: | |
print(f"An error occurred: {e}") | |
return None | |
# wrapper function for audio | |
def report_audio(audio_file: str = None, transcription_service: str = "OpenAI"): | |
return report_main(audio_file=audio_file,transcription_service=transcription_service) | |
# driver function for making reports | |
def report_main(input_text: str = None, audio_file: str = None, transcription_service: str = "OpenAI"): | |
""" | |
Generates a SOAP and SBAR report based on user input, either from text or audio. | |
Args: | |
input_text (str, optional): Text input from the user. | |
audio_file (str, optional): Path to the audio file (if provided). | |
transcription_service (str): Selected transcription service ("AssemblyAI" or "OpenAI"). | |
Returns: | |
tuple: Contains patient_name, SOAP Report, SBAR_Report, | |
doctor_recommendations, and transcription_text (if audio input was used). | |
""" | |
from openai import OpenAI | |
client = OpenAI() # Initialize OpenAI client | |
# Initialize empty strings for the SOAP report components | |
patient_name = "" | |
soap_report="" | |
sbar_report = "" | |
doctor_recommendations = "" | |
transcription_text = "" | |
# Process input based on provided input_text or audio_file | |
if input_text: | |
# Generate SOAP report from text input | |
report = generate_report(input_text=input_text) | |
elif audio_file: | |
# Use selected transcription service for audio input | |
if transcription_service == "AssemblyAI": | |
transcription_text += assemblyai_STT(audio_file) | |
report = generate_report(input_text=transcription_text) | |
# print(transcription_text) | |
elif transcription_service == "OpenAI": | |
transcription_text += openai_STT(audio_file) | |
report = generate_report(input_text=transcription_text) | |
# print(transcription_text) | |
else: | |
raise ValueError("Invalid transcription service specified. Choose 'AssemblyAI' or 'OpenAI'.") | |
print(report) | |
# Assign values from the generated report | |
else: | |
raise ValueError("Either input_text or audio_file must be provided.") | |
patient_name = report.patient_name | |
soap_report = report.soap_report | |
sbar_report = report.sbar_report | |
doctor_recommendations = report.recommendations_for_doc | |
# Return structured output in a tuple | |
if audio_file: | |
return patient_name, soap_report, sbar_report, doctor_recommendations, transcription_text | |
else: | |
return patient_name, soap_report, sbar_report, doctor_recommendations | |
def delete_dir(dir): | |
try: | |
shutil.rmtree(dir) | |
return "Deleted Successfully" | |
except: | |
return "Already Deleted" | |
def save_reports(file_name, file_content, report_type ,destination_folder): | |
# Ensure the destination folder exists | |
if not os.path.exists(destination_folder): | |
os.makedirs(destination_folder) | |
# Define the path for the .docx file in the destination folder | |
destination_path = os.path.join(destination_folder, f"{report_type}_{file_name}.docx") | |
# Create a new document and add the SOAP response text | |
doc = docx.Document() | |
doc.add_paragraph(file_content) | |
# Save the document to the specified destination folder | |
doc.save(destination_path) | |
# Define and create the path for the temp folder | |
if not os.path.exists(temp_reports_dir): | |
os.makedirs(temp_reports_dir) | |
# Define the path for the temp copy | |
temp_path = os.path.join(temp_reports_dir, f"{report_type}_{file_name}.docx") | |
# Save a copy of the document in the temp folder | |
doc.save(temp_path) | |
return f"Successfully saved" | |
# driver function for save | |
def save_reports_main(file_name, soap_report_content, sbar_report_content): | |
# Save SOAP report | |
soap_result = save_reports(file_name, soap_report_content, "SOAP", soap_dir) | |
print(soap_result) | |
# Save SBAR report | |
sbar_result = save_reports(file_name, sbar_report_content, "SBAR", sbar_dir) | |
print(sbar_result) | |
# Vectorize the reports in the temporary directory | |
vectorise(temp_reports_dir, temp_vector_db) | |
# Check if report_vector_db is empty | |
if not os.listdir(report_vector_db): # If report_vector_db is empty | |
# Copy all contents from temp_vector_db to report_vector_db | |
for item in os.listdir(temp_vector_db): | |
source_path = os.path.join(temp_vector_db, item) | |
destination_path = os.path.join(report_vector_db, item) | |
if os.path.isdir(source_path): | |
shutil.copytree(source_path, destination_path) | |
else: | |
shutil.copy2(source_path, destination_path) | |
print("Copied contents from temp_vector_db to report_vector_db.") | |
else: | |
# Call merge_vectors to merge temp_vector_db into report_vector_db | |
merge_vectors(temp_vector_db) | |
print("Merged temp_vector_db into report_vector_db.") | |
# Clean up by deleting the temporary directories | |
delete_dir(temp_reports_dir) | |
delete_dir(temp_vector_db) | |
print("Deleted temporary directories.") | |
return "Reports saved successfully!" | |
"""#Page 2""" | |
def refresh_files(docs_dir): | |
if not os.path.exists(docs_dir): | |
os.makedirs(docs_dir) | |
file_list = [] | |
for root, dirs, files in os.walk(docs_dir): | |
for file in files: | |
file_list.append(file) | |
return gr.Dropdown(choices=file_list, interactive=True) | |
def soap_refresh(): | |
return refresh_files(soap_dir) | |
def sbar_refresh(): | |
return refresh_files(sbar_dir) | |
def get_content(docs_dir, selected_file_name): | |
docx_path = os.path.join(docs_dir, selected_file_name) | |
# Check if the file exists and has a .docx extension | |
if not os.path.isfile(docx_path) or not docx_path.endswith('.docx'): | |
raise FileNotFoundError(f"File {selected_file_name} not found in {docs_dir} or is not a .docx file.") | |
try: | |
# Open and read the document | |
doc = docx.Document(docx_path) | |
paragraphs = [paragraph.text for paragraph in doc.paragraphs if paragraph.text] | |
return "\n\n".join(paragraphs) # Join paragraphs with double newlines for readability | |
except Exception as e: | |
raise IOError(f"An error occurred while reading the document: {e}") | |
def get_soap_report_content(selected_file_name): | |
return get_content(soap_dir, selected_file_name) | |
def get_sbar_report_content(selected_file): | |
return get_content(sbar_dir, selected_file) | |
# Updated generate_response function | |
def generate_response(message, history, soap_content): | |
from openai import OpenAI | |
client = OpenAI() | |
# Format history as expected by OpenAI's API | |
formatted_history = [{"role": "system", "content": "This conversation is based on the following SOAP report content:\n" + soap_content}] | |
for interaction in history: | |
if len(interaction) == 2: | |
user, assistant = interaction | |
formatted_history.append({"role": "user", "content": user}) | |
formatted_history.append({"role": "assistant", "content": assistant}) | |
# Add the latest user message to the formatted history | |
formatted_history.append({"role": "user", "content": message}) | |
# Generate the assistant's response with streaming enabled | |
response = client.chat.completions.create( | |
model='gpt-4o-mini', | |
messages=formatted_history, | |
stream=True | |
) | |
partial_message = "" | |
for chunk in response: | |
if chunk.choices[0].delta.content is not None: | |
partial_message += chunk.choices[0].delta.content | |
yield partial_message # Yield each chunk as it comes | |
# Updated handle_chat_message function | |
def handle_chat_message(history, message, soap_content): | |
response_generator = generate_response(message, history, soap_content) | |
new_history = history + [[message, ""]] # Initialize with an empty assistant response | |
for partial_response in response_generator: | |
new_history[-1][1] = partial_response # Update assistant's response in history | |
yield new_history, "" # Stream the updated history and clear the text box | |
def ask_reports(docs_dir, doc_name, question): | |
# Construct the path to the docx file | |
docx_path = os.path.join(docs_dir, doc_name) | |
# Read and extract text from the .docx file | |
doc = docx.Document(docx_path) | |
extracted_text = f"You are provided with a medical report of a patient {doc_name}.\n\n" | |
text = "" | |
for paragraph in doc.paragraphs: | |
text += paragraph.text + "\n" | |
# Append the question to extracted text | |
extracted_text = extracted_text+text+"\n\nUse the report to answer the following question:\n" + question | |
if not text: | |
return "Failed to retrieve text from document." | |
from openai import OpenAI | |
client = OpenAI() | |
# Prepare the messages for the chat completion request | |
messages = [ | |
{"role": "system", "content": "You are a helpful assistant with medical expertise."}, | |
{"role": "user", "content": extracted_text} | |
] | |
# Use the ChatCompletion API to get a response | |
try: | |
completion = client.chat.completions.create( | |
model="gpt-4o", | |
messages=messages, | |
) | |
answer = completion.choices[0].message | |
except Exception as e: | |
return f"An error occurred: {e}" | |
return answer | |
def ask_soap(selected_file, question): | |
# Logic to answer the question based on the selected SOAP file | |
return f"Answer to '{question}' based on {selected_file}" | |
def ask_sbar(selected_file, question): | |
# Logic to answer the question based on the selected SBAR file | |
return f"Answer to '{question}' based on {selected_file}" | |
"""# page 3""" | |
def local_search(question): | |
embeddings = OpenAIEmbeddings() | |
file_db = FAISS.load_local(report_vector_db, embeddings, allow_dangerous_deserialization=True) | |
docs = file_db.similarity_search(question) | |
chain = load_qa_chain(llm, chain_type="stuff") | |
with get_openai_callback() as cb: | |
response = chain.run(input_documents=docs, question=question) | |
print(cb) | |
return formatted_response_local(docs, response) | |
def formatted_response_local(docs, response): | |
formatted_output = response + "\n\nSources" | |
for i, doc in enumerate(docs): | |
source_info = doc.metadata.get('source', 'Unknown source') | |
page_info = doc.metadata.get('page', None) | |
file_name = source_info.split('/')[-1].strip() | |
if page_info is not None: | |
formatted_output += f"\n{file_name}\tpage no {page_info}" | |
else: | |
formatted_output += f"\n{file_name}" | |
return formatted_output | |
def local_gpt(question): | |
template = """Question: {question} | |
Answer: Let's think step by step.""" | |
prompt = PromptTemplate(template=template, input_variables=["question"]) | |
llm_chain = LLMChain(prompt=prompt, llm=llm) | |
response = llm_chain.run(question) | |
return response | |
"""# Page 4""" | |
def save2_docs(docs): | |
import shutil | |
import os | |
output_dir=upload_dir | |
if os.path.exists(output_dir): | |
shutil.rmtree(output_dir) | |
if not os.path.exists(output_dir): | |
os.makedirs(output_dir) | |
for doc in docs: | |
shutil.copy(doc.name, output_dir) | |
return "Successful!" | |
global agent2 | |
def create2_agent(): | |
from langchain.chat_models import ChatOpenAI | |
from langchain.chains.conversation.memory import ConversationSummaryBufferMemory | |
from langchain.chains import ConversationChain | |
global agent2 | |
llm = ChatOpenAI(model_name='gpt-4o-mini') | |
memory = ConversationSummaryBufferMemory(llm=llm, max_token_limit=1500) | |
agent2 = ConversationChain(llm=llm, memory=memory, verbose=True) | |
return "Successful!" | |
def search2_docs(prompt, question, state): | |
from langchain.embeddings.openai import OpenAIEmbeddings | |
from langchain.vectorstores import FAISS | |
from langchain.callbacks import get_openai_callback | |
global agent2 | |
agent2 = agent2 | |
state = state or [] | |
embeddings = OpenAIEmbeddings() | |
docs_db = FAISS.load_local(upload_files_vector_db, embeddings, allow_dangerous_deserialization=True) | |
docs = docs_db.similarity_search(question) | |
prompt += "\n\n" | |
prompt += question | |
prompt += "\n\n" | |
prompt += str(docs) | |
with get_openai_callback() as cb: | |
response = agent2.predict(input=prompt) | |
print(cb) | |
return formatted_response(docs, question, response, state) | |
def delete2_docs(): | |
import shutil | |
path1 = upload_dir | |
path2 = upload_files_vector_db | |
try: | |
shutil.rmtree(path1) | |
shutil.rmtree(path2) | |
return "Deleted Successfully" | |
except: | |
return "Already Deleted" | |
def process2_docs(): | |
from langchain.document_loaders import PyPDFLoader | |
from langchain.document_loaders import DirectoryLoader | |
from langchain.document_loaders import TextLoader | |
from langchain.document_loaders import Docx2txtLoader | |
from langchain.document_loaders.csv_loader import CSVLoader | |
from langchain.document_loaders import UnstructuredExcelLoader | |
from langchain.vectorstores import FAISS | |
from langchain.embeddings.openai import OpenAIEmbeddings | |
from langchain.text_splitter import RecursiveCharacterTextSplitter | |
loader1 = DirectoryLoader(upload_dir, glob="./*.pdf", loader_cls=PyPDFLoader) | |
document1 = loader1.load() | |
loader2 = DirectoryLoader(upload_dir, glob="./*.txt", loader_cls=TextLoader) | |
document2 = loader2.load() | |
loader3 = DirectoryLoader(upload_dir, glob="./*.docx", loader_cls=Docx2txtLoader) | |
document3 = loader3.load() | |
loader4 = DirectoryLoader(upload_dir, glob="./*.csv", loader_cls=CSVLoader) | |
document4 = loader4.load() | |
loader5 = DirectoryLoader(upload_dir, glob="./*.xlsx", loader_cls=UnstructuredExcelLoader) | |
document5 = loader5.load() | |
document1.extend(document2) | |
document1.extend(document3) | |
document1.extend(document4) | |
document1.extend(document5) | |
text_splitter = RecursiveCharacterTextSplitter( | |
chunk_size=1000, | |
chunk_overlap=200, | |
length_function=len | |
) | |
docs = text_splitter.split_documents(document1) | |
embeddings = OpenAIEmbeddings() | |
docs_db = FAISS.from_documents(docs, embeddings) | |
docs_db.save_local(upload_files_vector_db) | |
return "Successful!" | |
def formatted_response(docs, question, response, state): | |
formatted_output = response + "\n\nSources" | |
for i, doc in enumerate(docs): | |
source_info = doc.metadata.get('source', 'Unknown source') | |
page_info = doc.metadata.get('page', None) | |
doc_name = source_info.split('/')[-1].strip() | |
if page_info is not None: | |
formatted_output += f"\n{doc_name}\tpage no {page_info}" | |
else: | |
formatted_output += f"\n{doc_name}" | |
state.append((question, formatted_output)) | |
return state, state | |
"""# UI""" | |
import gradio as gr | |
css = """ | |
.col { | |
max-width: 70%; | |
margin: 0 auto; | |
display: flex; | |
flex-direction: column; | |
justify-content: center; | |
align-items: center; | |
} | |
""" | |
# Define the Gradio interface | |
with gr.Blocks(css=css) as demo: | |
gr.Markdown("## <center>Medical App</center>") | |
# Page 1---------------------------------------------------------------------- | |
with gr.Tab("SOAP and SBAR Note Creation"): | |
# Tab for generating from audio | |
with gr.Tab("From Audio"): | |
with gr.Row(): | |
with gr.Column(): | |
audio_file = gr.Audio(label="Audio Input", type="filepath") | |
with gr.Column(): | |
transcription_service = gr.Dropdown(label="Select Transcription Service", choices=["OpenAI", "AssemblyAI"], value="OpenAI") | |
gr.Markdown("<small>Upload an audio file or select a transcription service.</small>") | |
generate_with_audio_button = gr.Button("Generate Report", variant="primary") | |
# Shared output containers | |
patient_name_box_text = gr.Textbox(label="Patient Name", interactive=True, placeholder="Generated Patient Name", lines=1) | |
with gr.Row(): | |
with gr.Column(): | |
soap_report_box_text = gr.Textbox(label="SOAP Report", interactive=True, placeholder="Generated SOAP Report", lines=10) | |
with gr.Column(): | |
sbar_report_box_text = gr.Textbox(label="SBAR Report", interactive=True, placeholder="Generated SBAR Report", lines=10) | |
audio_doctor_recommendations_box = gr.Textbox(label="Doctor Recommendations", interactive=False, placeholder="Recommendations", lines=5) | |
audio_transcription_box = gr.Textbox(label="Transcription Text", interactive=False, placeholder="Transcribed Text", lines=5) | |
# Click event for audio | |
generate_with_audio_button.click( | |
fn=report_audio, | |
inputs=[audio_file, transcription_service], | |
outputs=[ | |
patient_name_box_text, | |
soap_report_box_text, | |
sbar_report_box_text, | |
audio_doctor_recommendations_box, | |
audio_transcription_box | |
] | |
) | |
# Add Save Report Button | |
with gr.Row(): | |
save_button = gr.Button("Save Report", variant="secondary") | |
save_message = gr.Textbox(label="Save Status", interactive=False, placeholder="Status of the save operation", lines=1) | |
# Click event for Save Report Button using `patient_name_box_text` as the file name | |
save_button.click( | |
fn=save_reports_main, | |
inputs=[patient_name_box_text, soap_report_box_text, sbar_report_box_text], | |
outputs=[save_message] | |
) | |
# Tab for generating from text input | |
with gr.Tab("From Transcript"): | |
with gr.Column(): | |
input_text = gr.Textbox(label="Patient Case Study (Text Input)", placeholder="Enter the patient case study here...", lines=7) | |
gr.Markdown("<small>Enter the patient's details, symptoms, and any relevant information.</small>") | |
generate_with_text_button = gr.Button("Generate Report", variant="primary") | |
# Shared output containers for this tab | |
patient_name_box_text = gr.Textbox(label="Patient Name", interactive=True, placeholder="Generated Patient Name", lines=1) | |
with gr.Row(): | |
with gr.Column(): | |
soap_report_box_text = gr.Textbox(label="SOAP Report", interactive=True, placeholder="Generated SOAP Report", lines=10) | |
with gr.Column(): | |
sbar_report_box_text = gr.Textbox(label="SBAR Report", interactive=True, placeholder="Generated SBAR Report", lines=10) | |
doctor_recommendations_box_text = gr.Textbox(label="Doctor Recommendations", interactive=False, placeholder="Recommendations", lines=5) | |
# Click event for text | |
generate_with_text_button.click( | |
fn=report_main, | |
inputs=[input_text], | |
outputs=[ | |
patient_name_box_text, | |
soap_report_box_text, | |
sbar_report_box_text, | |
doctor_recommendations_box_text | |
] | |
) | |
# Add Save Report Button | |
with gr.Row(): | |
save_button = gr.Button("Save Report", variant="secondary") | |
save_message = gr.Textbox(label="Save Status", interactive=False, placeholder="Status of the save operation", lines=1) | |
# Click event for Save Report Button using `patient_name_box_text` as the file name | |
save_button.click( | |
fn=save_reports_main, | |
inputs=[patient_name_box_text, soap_report_box_text, sbar_report_box_text], | |
outputs=[save_message] | |
) | |
# Page 2---------------------------------------------------------------------- | |
####| | |
with gr.Tab("SOAP and SBAR Queries"): | |
with gr.Tab("Query SOAP Reports"): | |
with gr.Row(): | |
with gr.Column(): | |
soap_refresh_button = gr.Button("Refresh") | |
ask_soap_input = gr.Dropdown(label="Choose File") | |
soap_content_display = gr.Textbox( | |
label="SOAP Report Content", interactive=False, placeholder="Report content will appear here...", lines=5 | |
) | |
with gr.Column(): | |
# Chatbot for Q&A | |
soap_chatbot = gr.Chatbot(label="SOAP Chatbot") | |
soap_chat_input = gr.Textbox(placeholder="Enter your question here...", submit_btn=True) | |
audio_file = gr.Audio(label="Audio Input", type="filepath",sources="microphone") | |
submit_audio_btn = gr.Button("Submit Audio") | |
clear = gr.ClearButton([soap_chat_input, soap_chatbot,audio_file]) | |
# Refresh button for SOAP file dropdown | |
soap_refresh_button.click(fn=soap_refresh, inputs=None, outputs=ask_soap_input) | |
# Display selected SOAP report content | |
ask_soap_input.change(fn=get_soap_report_content, inputs=ask_soap_input, outputs=soap_content_display) | |
submit_audio_btn.click(openai_STT, inputs=audio_file, outputs=soap_chat_input) | |
# Handle chatbot input submission with streaming response | |
soap_chat_input.submit( | |
handle_chat_message, | |
inputs=[soap_chatbot, soap_chat_input, soap_content_display], | |
outputs=[soap_chatbot, soap_chat_input] | |
) | |
# Query SBAR Reports Tab | |
with gr.Tab("Query SBAR Reports"): | |
with gr.Row(): | |
with gr.Column(): | |
sbar_refresh_button = gr.Button("Refresh") | |
ask_sbar_input = gr.Dropdown(label="Choose File") | |
sbar_content_display = gr.Textbox( | |
label="SBAR Report Content", interactive=False, placeholder="Report content will appear here...", lines=5 | |
) | |
with gr.Column(): | |
# Chatbot for SBAR Q&A | |
sbar_chatbot = gr.Chatbot(label="SBAR Chatbot") | |
sbar_chat_input = gr.Textbox(placeholder="Enter your question here...",submit_btn=True) | |
audio_file = gr.Audio(label="Audio Input", type="filepath",sources="microphone") | |
submit_audio_btn = gr.Button("Submit Audio") | |
clear_sbar = gr.ClearButton([sbar_chat_input, sbar_chatbot,audio_file]) | |
# Refresh button for SBAR file dropdown | |
sbar_refresh_button.click(fn=sbar_refresh, inputs=None, outputs=ask_sbar_input) | |
# Display selected SBAR report content | |
ask_sbar_input.change(fn=get_sbar_report_content, inputs=ask_sbar_input, outputs=sbar_content_display) | |
submit_audio_btn.click(openai_STT, inputs=audio_file, outputs=sbar_chat_input) | |
# Handle chatbot input submission with streaming response | |
sbar_chat_input.submit( | |
handle_chat_message, | |
inputs=[sbar_chatbot, sbar_chat_input, sbar_content_display], # Pass the SBAR content | |
outputs=[sbar_chatbot, sbar_chat_input] | |
) | |
# Page 3---------------------------------------------------------------------- | |
####|Chatbot to query all SOAP and SBAR reports (RAG). Chatbot can ask OpenAI for answers directly | |
with gr.Tab("All Queries"): | |
with gr.Column(elem_classes="col"): | |
local_search_input = gr.Textbox(label="Enter Question here") | |
local_search_button = gr.Button("Search") | |
local_search_output = gr.Textbox(label="Output") | |
local_gpt_button = gr.Button("Ask ChatGPT") | |
local_gpt_output = gr.Textbox(label="Output") | |
local_search_button.click(local_search, inputs=local_search_input, outputs=local_search_output) | |
local_gpt_button.click(local_gpt, inputs=local_search_input, outputs=local_gpt_output) | |
# Page 4---------------------------------------------------------------------- | |
####| | |
with gr.Tab("Documents Queries"): | |
with gr.Column(elem_classes="col"): | |
with gr.Tab("Upload and Process Documents"): | |
with gr.Column(): | |
docs2_upload_input = gr.Files(label="Upload File(s)") | |
docs2_upload_button = gr.Button("Upload") | |
docs2_upload_output = gr.Textbox(label="Output") | |
docs2_process_button = gr.Button("Process") | |
docs2_process_output = gr.Textbox(label="Output") | |
create2_agent_button = gr.Button("Create Agent") | |
create2_agent_output = gr.Textbox(label="Output") | |
gr.ClearButton([docs2_upload_input, docs2_upload_output, docs2_process_output, create2_agent_output]) | |
docs2_upload_button.click(save2_docs, inputs=docs2_upload_input, outputs=docs2_upload_output) | |
docs2_process_button.click(process2_docs, inputs=None, outputs=docs2_process_output) | |
create2_agent_button.click(create2_agent, inputs=None, outputs=create2_agent_output) | |
with gr.Tab("Query Documents"): | |
with gr.Column(): | |
docs2_prompt_input = gr.Textbox(label="Custom Prompt") | |
docs2_chatbot = gr.Chatbot(label="Chats") | |
docs2_state = gr.State() | |
docs2_search_input = gr.Textbox(label="Enter Question") | |
docs2_search_button = gr.Button("Search") | |
docs2_delete_button = gr.Button("Delete") | |
docs2_delete_output = gr.Textbox(label="Output") | |
gr.ClearButton([docs2_prompt_input, docs2_search_input, docs2_delete_output]) | |
docs2_search_button.click(search2_docs, inputs=[docs2_prompt_input, docs2_search_input, docs2_state], outputs=[docs2_chatbot, docs2_state]) | |
docs2_delete_button.click(delete2_docs, inputs=None, outputs=docs2_delete_output) | |
demo.launch(debug=True) | |