import os import logging import gradio as gr from operator import itemgetter from pinecone import Pinecone from huggingface_hub import whoami from langchain.prompts import ChatPromptTemplate from langchain.schema.output_parser import StrOutputParser from langchain.schema.runnable import RunnablePassthrough, RunnableLambda from langchain_community.embeddings import HuggingFaceBgeEmbeddings from langchain_openai import AzureChatOpenAI from langchain.prompts.prompt import PromptTemplate from langchain.memory import ConversationBufferMemory from langchain_community.vectorstores import Pinecone as PineconeVectorstore from eki_esrsqa.utils import ( make_html_source, make_pairs, _format_chat_history, _combine_documents, get_llm, init_env, ) logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) init_env() chat_model_init = get_llm() demo_name = "ESRS_QA" hf_model = "BAAI/bge-base-en-v1.5" embeddings = HuggingFaceBgeEmbeddings( model_name=hf_model, encode_kwargs={"normalize_embeddings": True}, ) pc = Pinecone(api_key=os.getenv("PINECONE_API_KEY")) index = pc.Index(os.getenv("PINECONE_API_INDEX")) vectorstore = PineconeVectorstore(index, embeddings, "page_content") retriever = vectorstore.as_retriever(search_kwargs={"k": 5}) chat_model = AzureChatOpenAI() esrs_wiki = """ The Corporate Sustainability Reporting Directive (CSRD) is a mandate that requires all companies to report on their sustainability initiatives. In response to this directive, the European Sustainability Reporting Standards (ESRS) were developed. These standards are a key tool in promoting the transition to a sustainable economy within the EU, providing a structured framework for companies to disclose their sustainability initiatives. The ESRS cover a wide range of environmental, social, and governance (ESG) issues, including climate change, biodiversity, and human rights. Companies that adhere to the ESRS can provide investors with valuable insights into their sustainability impact, thereby informing investment decisions. The ESRS are designed to be highly interoperable with global reporting standards, which helps to avoid unnecessary duplication in reporting by companies. The reporting requirements based on the ESRS will be gradually implemented for different companies over time. In summary, the ESRS play a critical role in fostering sustainable finance and enabling companies to demonstrate their commitment to the green deal agenda while accessing sustainable finance. --- """ reformulation_template = """Given the following conversation and a follow up question, rephrase the follow up question to be a standalone question, in its original language. Chat History: {chat_history} Follow Up Input: {question} Standalone question:""" CONDENSE_QUESTION_PROMPT = PromptTemplate.from_template(reformulation_template) answering_template = """ You are an ESG expert, with 20 years experience analyzing corporate sustainability reports. You are specialist in the upcoming CSRD regulation and in general with corporate sustainability disclosure requirements. {esrs_wiki} You will answer the question based on the following passages extracted from CSRD specific sustainability guidelines and reports: ``` {context} ``` Guidelines: 1. Context: You'll receive relevant excerpts from a CSRD-specific sustainability guideline or report to address a given question. 2. Relevance: Only include passages directly pertaining to the question; omit irrelevant content. 3. Facts and Figures: Prioritize factual information in your response. 4. Conciseness: Keep answers sharp and succinct, avoiding unnecessary context. 5. Focus: Address the specific question without veering into related topics. 6. Honesty: If unsure, state that you don't know rather than inventing an answer. 7. Source Attribution: When using information from a passage, mention it as [Doc i] at the end of the sentence (where 'i' represents the document number). 8. Multiple Sources: If the same content appears in multiple documents, cite them collectively (e.g., [Doc i, Doc j, Doc k]). 9. Structured Paragraphs: Instead of bullet-point summaries, compile your responses into well-structured paragraphs. 10. Method Focus: When addressing "how" questions, emphasize methods and procedures over outcomes. 11. Selective Usage: You're not obligated to use every passage; include only those relevant to the question. 12. Insufficient Information: If documents lack necessary details, indicate that you don't have enough information. Question: {question} Answer: """ ANSWER_PROMPT = ChatPromptTemplate.from_template(answering_template) DEFAULT_DOCUMENT_PROMPT = PromptTemplate.from_template(template="{page_content}") memory = ConversationBufferMemory( return_messages=True, output_key="answer", input_key="question" ) # First we add a step to load memory # This adds a "memory" key to the input object loaded_memory = RunnablePassthrough.assign( chat_history=RunnableLambda(memory.load_memory_variables) | itemgetter("history"), ) # Now we calculate the standalone question standalone_question = { "standalone_question": { "question": lambda x: x["question"], "chat_history": lambda x: _format_chat_history(x["chat_history"]), } | CONDENSE_QUESTION_PROMPT | chat_model | StrOutputParser(), } # Now we retrieve the documents retrieved_documents = { "docs": itemgetter("standalone_question") | retriever, "question": lambda x: x["standalone_question"], } # Now we construct the inputs for the final prompt final_inputs = { "context": lambda x: _combine_documents(x["docs"], DEFAULT_DOCUMENT_PROMPT), "question": itemgetter("question"), "esrs_wiki": lambda x: esrs_wiki, } # And finally, we do the part that returns the answers answer = { "answer": final_inputs | ANSWER_PROMPT | chat_model, "docs": itemgetter("docs"), } # And now we put it all together! final_chain = loaded_memory | standalone_question | retrieved_documents | answer async def chat( query: str, history: list = [], ): """taking a query and a message history, use a pipeline (reformulation, retriever, answering) to yield a tuple of: (messages in gradio format, messages in langchain format, source documents)""" source_string = "" gradio_format = make_pairs([a.content for a in history]) + [(query, "")] # reset memory memory.clear() for message in history: memory.chat_memory.add_message(message) inputs = {"question": query} result = final_chain.astream_log({"question": query}) reformulated_question_path_id = "/logs/AzureChatOpenAI/streamed_output_str/-" # "/logs/ChatGroq/streamed_output_str/-" retriever_path_id = "/logs/Retriever/final_output" final_answer_path_id = "/logs/AzureChatOpenAI:2/streamed_output_str/-" # "/logs/ChatGroq:2/streamed_output_str/-" async for op in result: op = op.ops[0] if op["path"] == reformulated_question_path_id: # reforulated question new_token = op["value"] # str elif op["path"] == retriever_path_id: # documents sources = op["value"]["documents"] # List[Document] source_string = "\n\n".join( [(make_html_source(i, doc)) for i, doc in enumerate(sources, 1)] ) # if doc.metadata["source"] == "ESRS" # else make_html_source(i, doc) elif op["path"] == final_answer_path_id: # final answer new_token = op["value"] # str answer_yet = gradio_format[-1][1] gradio_format[-1] = (query, answer_yet + new_token) yield "", gradio_format, history, source_string memory.save_context(inputs, {"answer": gradio_format[-1][1]}) yield "", gradio_format, memory.load_memory_variables({})["history"], source_string with open("./assets/style.css", "r") as f: css = f.read() def update_visible(oauth_token: gr.OAuthToken | None): if oauth_token is None: return { bloc_1: gr.update(visible=True), bloc_2: gr.update(visible=False), bloc_3: gr.update(visible=False), } org_names = [org["name"] for org in whoami(oauth_token.token)["orgs"]] logger.info(org_names) if "ekimetrics-esrsqa" in org_names: # logged in group return { bloc_1: gr.update(visible=False), bloc_2: gr.update(visible=True), bloc_3: gr.update(visible=False), } else: # logged but not in group return { bloc_1: gr.update(visible=False), bloc_2: gr.update(visible=False), bloc_3: gr.update(visible=True), } with gr.Blocks(title=f"{demo_name}", css=css) as demo: gr.LoginButton() gr.Markdown(f"

{demo_name}

") with gr.Column() as bloc_1: textbox_1 = gr.Textbox("You are not logged to Hugging Face !", show_label=False) with gr.Column(visible=False) as bloc_3: textbox_3 = gr.Textbox( "You are not part of the ESRSQA Project, ask access here : https://huggingface.co/ekimetrics-esrsqa" ) with gr.Column(visible=False) as bloc_2: with gr.Row(): with gr.Column(scale=2): chatbot = gr.Chatbot( elem_id="chatbot", label=f"{demo_name} chatbot", show_label=False ) state = gr.State([]) with gr.Row(): ask = gr.Textbox( show_label=False, placeholder="Input your question then press enter", ) with gr.Column(scale=1, variant="panel"): gr.Markdown("### Sources") sources_textbox = gr.Markdown(show_label=False) ask.submit( fn=chat, inputs=[ ask, state, ], outputs=[ask, chatbot, state, sources_textbox], ) demo.load(update_visible, inputs=None, outputs=[bloc_1, bloc_2, bloc_3]) demo.launch( share=True, # auth=("", ""), debug=True, )