|
import streamlit as st |
|
import pandas as pd |
|
import numpy as np |
|
import re |
|
import random |
|
import time |
|
|
|
import streamlit as st |
|
from dotenv import load_dotenv |
|
from langchain_experimental.text_splitter import SemanticChunker |
|
from langchain_community.embeddings import OpenAIEmbeddings |
|
from langchain_community.vectorstores import FAISS |
|
from langchain_community.chat_models import ChatOpenAI |
|
from langchain import hub |
|
from langchain_core.runnables import RunnablePassthrough |
|
from langchain_community.document_loaders import WebBaseLoader,FireCrawlLoader,PyPDFLoader |
|
from langchain_core.prompts.prompt import PromptTemplate |
|
import os |
|
from high_chart import test_chart |
|
from chat_with_pps import get_response |
|
from ecologits.tracers.utils import compute_llm_impacts |
|
from codecarbon import EmissionsTracker |
|
|
|
load_dotenv() |
|
|
|
def get_docs_from_website(urls): |
|
loader = WebBaseLoader(urls, header_template={ |
|
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/102.0.0.0 Safari/537.36', |
|
}) |
|
try: |
|
docs = loader.load() |
|
return docs |
|
except Exception as e: |
|
return None |
|
|
|
|
|
def get_docs_from_website_fc(urls,firecrawl_api_key): |
|
docs = [] |
|
try: |
|
for url in urls: |
|
loader = FireCrawlLoader(api_key=firecrawl_api_key, url = url,mode="scrape") |
|
docs+=loader.load() |
|
return docs |
|
except Exception as e: |
|
return None |
|
|
|
|
|
def get_doc_chunks(docs): |
|
|
|
|
|
|
|
|
|
|
|
text_splitter = SemanticChunker(OpenAIEmbeddings(model="text-embedding-3-small")) |
|
|
|
docs = text_splitter.split_documents(docs) |
|
return docs |
|
|
|
def get_doc_chunks_fc(docs): |
|
|
|
|
|
|
|
|
|
|
|
text_splitter = SemanticChunker(OpenAIEmbeddings(model="text-embedding-3-small")) |
|
docs_splitted = [] |
|
for text in docs: |
|
text_splitted = text_splitter.split_text(text) |
|
docs_splitted+=text_splitted |
|
return docs_splitted |
|
|
|
|
|
def get_vectorstore_from_docs(doc_chunks): |
|
embedding = OpenAIEmbeddings(model="text-embedding-3-small") |
|
vectorstore = FAISS.from_documents(documents=doc_chunks, embedding=embedding) |
|
return vectorstore |
|
|
|
def get_vectorstore_from_text(texts): |
|
embedding = OpenAIEmbeddings(model="text-embedding-3-small") |
|
vectorstore = FAISS.from_texts(texts=texts, embedding=embedding) |
|
return vectorstore |
|
|
|
def get_conversation_chain(vectorstore): |
|
llm = ChatOpenAI(model="gpt-4o",temperature=0.5, max_tokens=2048) |
|
|
|
retriever=vectorstore.as_retriever() |
|
|
|
prompt = hub.pull("rlm/rag-prompt") |
|
|
|
rag_chain = ( |
|
{"context": retriever , "question": RunnablePassthrough()} |
|
| prompt |
|
| llm |
|
) |
|
return rag_chain |
|
|
|
|
|
def fill_promptQ_template(input_variables, template): |
|
prompt = PromptTemplate(input_variables=["BRAND_NAME","BRAND_DESCRIPTION"], template=template) |
|
return prompt.format(BRAND_NAME=input_variables["BRAND_NAME"], BRAND_DESCRIPTION=input_variables["BRAND_DESCRIPTION"]) |
|
|
|
def text_to_list(text): |
|
lines = text.replace("- ","").split('\n') |
|
|
|
lines = [line.split() for line in lines] |
|
items = [[' '.join(line[:-1]),line[-1]] for line in lines] |
|
|
|
|
|
for item in items: |
|
item[1] = re.sub(r'\D', '', item[1]) |
|
return items |
|
|
|
def delete_pp(pps): |
|
for pp in pps: |
|
for i in range(len(st.session_state['pp_grouped'])): |
|
if st.session_state['pp_grouped'][i]['name'] == pp: |
|
del st.session_state['pp_grouped'][i] |
|
break |
|
|
|
def display_list_urls(): |
|
for index, item in enumerate(st.session_state["urls"]): |
|
emp = st.empty() |
|
col1, col2 = emp.columns([7, 3]) |
|
|
|
|
|
if col2.button("❌", key=f"but{index}"): |
|
temp = st.session_state['parties_prenantes'][index] |
|
delete_pp(temp) |
|
del st.session_state.urls[index] |
|
del st.session_state["parties_prenantes"][index] |
|
st.rerun() |
|
|
|
if len(st.session_state.urls) > index: |
|
|
|
with col1.expander(f"Source {index+1}: {item}"): |
|
pp = st.session_state["parties_prenantes"][index] |
|
st.write(pd.DataFrame(pp, columns=["Partie prenante"])) |
|
else: |
|
emp.empty() |
|
|
|
def colored_circle(color): |
|
return f'<span style="display: inline-block; width: 15px; height: 15px; border-radius: 50%; background-color: {color};"></span>' |
|
|
|
def display_list_pps(): |
|
for index, item in enumerate(st.session_state["pp_grouped"]): |
|
emp = st.empty() |
|
col1, col2 = emp.columns([7, 3]) |
|
|
|
if col2.button("❌", key=f"butp{index}"): |
|
|
|
del st.session_state["pp_grouped"][index] |
|
st.rerun() |
|
|
|
if len(st.session_state["pp_grouped"]) > index: |
|
name = st.session_state["pp_grouped"][index]["name"] |
|
col1.markdown(f'<p>{colored_circle(st.session_state["pp_grouped"][index]["color"])} {st.session_state["pp_grouped"][index]["name"]}</p>', |
|
unsafe_allow_html=True |
|
) |
|
else: |
|
emp.empty() |
|
|
|
|
|
|
|
def extract_pp(docs,input_variables): |
|
template_extraction_PP = """ |
|
Objectif : Identifiez toutes les parties prenantes de la marque suivante : |
|
|
|
Le nom de la marque de référence est le suivant : {BRAND_NAME} |
|
|
|
TA RÉPONSE DOIT ÊTRE SOUS FORME DE LISTE DE NOMS DE MARQUES, CHAQUE NOM SUR UNE LIGNE SÉPARÉE. |
|
|
|
""" |
|
|
|
|
|
if docs == None: |
|
return "445" |
|
|
|
|
|
text_chunks = get_doc_chunks(docs) |
|
|
|
|
|
vectorstore = get_vectorstore_from_docs(text_chunks) |
|
|
|
chain = get_conversation_chain(vectorstore) |
|
|
|
question = fill_promptQ_template(input_variables, template_extraction_PP) |
|
|
|
start = time.perf_counter() |
|
response = chain.invoke(question) |
|
|
|
response_latency = time.perf_counter() - start |
|
|
|
|
|
|
|
if "ne sais pas" in response.content: |
|
return "444" |
|
|
|
|
|
nbre_out_tokens = response.response_metadata["token_usage"]["completion_tokens"] |
|
provider = "openai" |
|
model = "gpt-4o" |
|
impact = compute_llm_impacts( |
|
provider=provider, |
|
model_name=model, |
|
output_token_count=nbre_out_tokens, |
|
request_latency=response_latency, |
|
) |
|
|
|
st.session_state["partial_emissions"]["extraction_pp"]["el"] += impact.gwp.value |
|
|
|
partie_prenante = response.content.replace("- ","").split('\n') |
|
partie_prenante = [item.strip() for item in partie_prenante] |
|
|
|
return partie_prenante |
|
|
|
def generate_random_color(): |
|
|
|
r = random.randint(0, 255) |
|
g = random.randint(0, 255) |
|
b = random.randint(0, 255) |
|
|
|
|
|
color_hex = '#{:02x}{:02x}{:02x}'.format(r, g, b) |
|
|
|
return color_hex |
|
|
|
|
|
def format_pp_add_viz(pp): |
|
y = 50 |
|
x = 50 |
|
for i in range(len(st.session_state['pp_grouped'])): |
|
if st.session_state['pp_grouped'][i]['y'] == y and st.session_state['pp_grouped'][i]['x'] == x: |
|
y += 5 |
|
if y > 95: |
|
y = 50 |
|
x += 5 |
|
if st.session_state['pp_grouped'][i]['name'] == pp: |
|
return None |
|
else: |
|
st.session_state['pp_grouped'].append({'name':pp, 'x':x,'y':y, 'color':generate_random_color()}) |
|
|
|
def add_pp(new_pp, default_value=50): |
|
new_pp = sorted(new_pp) |
|
new_pp = [item.lower().capitalize().strip() for item in new_pp] |
|
st.session_state['parties_prenantes'].append(new_pp) |
|
for pp in new_pp: |
|
format_pp_add_viz(pp) |
|
|
|
def add_existing_pps(pp,pouvoir,influence): |
|
for i in range(len(st.session_state['pp_grouped'])): |
|
if st.session_state['pp_grouped'][i]['name'] == pp: |
|
st.session_state['pp_grouped'][i]['x'] = influence |
|
st.session_state['pp_grouped'][i]['y'] = pouvoir |
|
return None |
|
st.session_state['pp_grouped'].append({'name':pp, 'x':influence,'y':pouvoir, 'color':generate_random_color()}) |
|
|
|
def load_csv(file): |
|
df = pd.read_csv(file) |
|
for index, row in df.iterrows(): |
|
add_existing_pps(row['parties prenantes'],row['pouvoir'],row['influence']) |
|
|
|
|
|
def add_pp_input_text(): |
|
new_pp = st.text_input("Ajouter une partie prenante") |
|
if st.button("Ajouter",key="add_single_pp"): |
|
format_pp_add_viz(new_pp) |
|
|
|
|
|
def complete_and_verify_url(partial_url): |
|
|
|
regex = re.compile( |
|
r'^(?:http|ftp)s?://' |
|
r'(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+[A-Z]{2,8}\.?|' |
|
r'localhost|' |
|
r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})' |
|
r'(?::\d+)?' |
|
r'(?:/?|[/?]\S+)$', re.IGNORECASE) |
|
|
|
regex = re.compile( |
|
r'^(?:http|ftp)s?://' |
|
r'(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+[A-Z]{2,8}\.?|' |
|
r'localhost|' |
|
r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})' |
|
r'(?::\d+)?' |
|
r'(?:[/?#][^\s]*)?$', |
|
re.IGNORECASE) |
|
|
|
|
|
if not partial_url.startswith(('http://', 'https://', 'www.')): |
|
if not partial_url.startswith('www.'): |
|
complete_url = 'https://www.' + partial_url |
|
else: |
|
complete_url = 'https://' + partial_url |
|
|
|
elif partial_url.startswith('www.'): |
|
complete_url = 'https://' + partial_url |
|
|
|
else: |
|
complete_url = partial_url |
|
|
|
|
|
if re.match(regex, complete_url): |
|
return (True, complete_url) |
|
else: |
|
return (False, complete_url) |
|
|
|
@st.dialog("Conseil IA",width="large") |
|
def show_conseil_ia(): |
|
prompt = "Prenant compte les données de l'entreprise (activité, produits, services ...), quelles sont les principales parties prenantes à animer pour une démarche RSE réussie ?" |
|
st.markdown(f"**{prompt}**") |
|
response = st.write_stream(get_response(prompt, "",st.session_state["latest_doc"][0].page_content)) |
|
st.warning("Quittez et saisissez une autre URL") |
|
|
|
def display_pp(): |
|
if "emission" not in st.session_state: |
|
tracker = EmissionsTracker() |
|
tracker.start() |
|
st.session_state["emission"] = tracker |
|
|
|
load_dotenv() |
|
fire_crawl_api_key = os.getenv("FIRECRAWL_API_KEY") |
|
|
|
|
|
if "Nom de la marque" not in st.session_state: |
|
st.session_state["Nom de la marque"] = "" |
|
|
|
|
|
if "urls" not in st.session_state: |
|
st.session_state["urls"] = [] |
|
if "parties_prenantes" not in st.session_state: |
|
st.session_state['parties_prenantes'] = [] |
|
if "pp_grouped" not in st.session_state: |
|
st.session_state['pp_grouped'] = [] |
|
if "latest_doc" not in st.session_state: |
|
st.session_state['latest_doc'] = "" |
|
if "not_pp" not in st.session_state: |
|
st.session_state["not_pp"] = "" |
|
|
|
|
|
st.title("IDENTIFIER ET ANIMER VOS PARTIES PRENANTES") |
|
|
|
brand_name = st.text_input("Nom de la marque", st.session_state["Nom de la marque"]) |
|
st.session_state["Nom de la marque"] = brand_name |
|
|
|
option = st.radio("Source", ("A partir de votre site web", "A partir de vos documents entreprise","A partir de cartographie existante")) |
|
|
|
|
|
if option == "A partir de votre site web": |
|
|
|
url = st.text_input("Ajouter une URL") |
|
|
|
captions = ["L’IA prend en compte uniquement les textes contenus dans les pages web analysées","L’IA prend en compte les textes, les images et les liens URL contenus dans les pages web analysées"] |
|
scraping_option = st.radio("Mode", ("Analyse rapide", "Analyse profonde"),horizontal=True,captions = captions) |
|
|
|
if st.button("ajouter",key="add_pp"): |
|
st.session_state["not_pp"] = "" |
|
|
|
is_valid,url = complete_and_verify_url(url) |
|
if not is_valid: |
|
st.error("URL invalide") |
|
elif url in st.session_state["urls"] : |
|
st.error("URL déjà ajoutée") |
|
|
|
else: |
|
if scraping_option == "Analyse profonde": |
|
with st.spinner("Collecte des données..."): |
|
docs = get_docs_from_website_fc([url],fire_crawl_api_key) |
|
if docs is None: |
|
st.warning("Erreur lors de la collecte des données, 2eme essai avec collecte rapide...") |
|
with st.spinner("2eme essai, collecte rapide..."): |
|
docs = get_docs_from_website([url]) |
|
|
|
if scraping_option == "Analyse rapide": |
|
with st.spinner("Collecte des données..."): |
|
docs = get_docs_from_website([url]) |
|
|
|
if docs is None: |
|
st.error("Erreur lors de la collecte des données, URL unvalide") |
|
st.session_state["latest_doc"] = "" |
|
else: |
|
|
|
st.session_state["partial_emissions"]["Scrapping"]["cc"] = st.session_state["emission"].stop() |
|
st.session_state["latest_doc"] = docs |
|
|
|
with st.spinner("Processing..."): |
|
|
|
|
|
input_variables = {"BRAND_NAME": brand_name, "BRAND_DESCRIPTION": ""} |
|
partie_prenante = extract_pp(docs, input_variables) |
|
|
|
if "444" in partie_prenante: |
|
st.session_state["not_pp"] = "444" |
|
|
|
elif "445" in partie_prenante: |
|
st.error("Aucun site web trouvé avec l'url donnée") |
|
st.session_state["not_pp"] = "" |
|
else: |
|
st.session_state["not_pp"] = "" |
|
partie_prenante = sorted(partie_prenante) |
|
st.session_state["urls"].append(url) |
|
add_pp(partie_prenante) |
|
st.session_state["partial_emissions"]["extraction_pp"]["cc"] = st.session_state["emission"].stop() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if option =="A partir de vos documents entreprise": |
|
|
|
uploaded_file = st.file_uploader("Télécharger le fichier PDF", type="pdf") |
|
if uploaded_file is not None: |
|
|
|
if st.button("ajouter",key="add_pp_pdf"): |
|
st.session_state["not_pp"] = "" |
|
|
|
with st.spinner("Processing..."): |
|
file_name = uploaded_file.name |
|
with open(file_name, mode='wb') as w: |
|
w.write(uploaded_file.getvalue()) |
|
pdf = PyPDFLoader(file_name) |
|
text = pdf.load() |
|
st.session_state["latest_doc"] = text |
|
input_variables = {"BRAND_NAME": brand_name, "BRAND_DESCRIPTION": ""} |
|
partie_prenante = extract_pp(text, input_variables) |
|
|
|
if "444" in partie_prenante: |
|
st.session_state["not_pp"] = "444" |
|
|
|
elif "445" in partie_prenante: |
|
st.error("Aucun site web trouvé avec l'url donnée") |
|
st.session_state["not_pp"] = "" |
|
|
|
else: |
|
st.session_state["not_pp"] = "" |
|
partie_prenante = sorted(partie_prenante) |
|
st.session_state["urls"].append(file_name) |
|
add_pp(partie_prenante) |
|
|
|
if option == "A partir de cartographie existante": |
|
uploaded_file = st.file_uploader("Télécharger le fichier CSV", type="csv") |
|
if uploaded_file is not None: |
|
if st.button("ajouter",key="add_pp_csv"): |
|
file_name = uploaded_file.name |
|
with open(file_name, mode='wb') as w: |
|
w.write(uploaded_file.getvalue()) |
|
|
|
try: |
|
load_csv(file_name) |
|
brand_name_from_csv = file_name.split("-")[1] |
|
st.session_state["Nom de la marque"] = brand_name_from_csv |
|
except Exception as e: |
|
st.error("Erreur lors de la lecture du fichier") |
|
|
|
|
|
if st.session_state["not_pp"] == "444": |
|
st.warning("Aucune parties prenantes n'est identifiable sur l'URL fournie. Fournissez une autre URL ou bien cliquez sur le boutton ci-dessous pour un Conseils IA") |
|
|
|
if st.button("Conseil IA"): |
|
show_conseil_ia() |
|
|
|
if st.session_state["latest_doc"] != "": |
|
with st.expander("Cliquez ici pour éditer et voir le document"): |
|
docs = st.session_state["latest_doc"] |
|
cleaned_text = re.sub(r'\n\n+', '\n\n', docs[0].page_content.strip()) |
|
text_value = st.text_area("Modifier le texte ci-dessous:", value=cleaned_text, height=300) |
|
if st.button('Sauvegarder',key="save_doc_fake"): |
|
st.success("Texte sauvegardé avec succès!") |
|
|
|
display_list_urls() |
|
with st.expander("Liste des parties prenantes"): |
|
add_pp_input_text() |
|
display_list_pps() |
|
test_chart() |
|
|
|
|
|
|
|
|
|
|