Spaces:
Sleeping
Sleeping
import streamlit as st | |
import pickle | |
import random | |
import math | |
from streamlit_agraph import agraph, Node, Edge, Config | |
from utils.kg.construct_kg import get_graph # if still needed for something else | |
from utils.audit.rag import get_text_from_content_for_doc, get_text_from_content_for_audio | |
from utils.audit.response_llm import generate_response_via_langchain,generate_llm_with_tools | |
from utils.audit.rag import get_vectorstore | |
from langchain_core.messages import AIMessage, HumanMessage | |
from langchain_core.prompts import PromptTemplate | |
from itext2kg.models import KnowledgeGraph | |
from typing_extensions import Annotated, TypedDict | |
import json | |
class AddRelationship(TypedDict): | |
'''Ajouter une relation au graphe''' | |
source_id : Annotated[str, 'The source node ID'] | |
target_id : Annotated[str, 'The target node ID'] | |
relationship_type : Annotated[str, 'The type of relationship'] | |
class DeleteRelationship(TypedDict): | |
'''Supprimer une relation du graphe, une information est donnée sur la relation à supprimer''' | |
source_id : Annotated[str, 'The source node ID'] | |
target_id : Annotated[str, 'The target node ID'] | |
relationship_type : Annotated[str, 'The type of relationship'] | |
tools = [AddRelationship, DeleteRelationship] | |
################################################################################ | |
# Utility Functions | |
################################################################################ | |
def if_node_exists(nodes, node_id): | |
"""Check if a node with the given id already exists in a list of Node objects.""" | |
for node in nodes: | |
if node.id == node_id: | |
return node | |
return False | |
def generate_random_color(): | |
"""Generate a random pastel-ish RGB color.""" | |
r = random.randint(180, 255) | |
g = random.randint(180, 255) | |
b = random.randint(180, 255) | |
return (r, g, b) | |
def rgb_to_hex(rgb): | |
"""Convert an (R, G, B) tuple to a hex string like '#aabbcc'.""" | |
return '#{:02x}{:02x}{:02x}'.format(rgb[0], rgb[1], rgb[2]) | |
def color_distance(color1, color2): | |
"""Calculate Euclidean distance between two RGB colors.""" | |
return math.sqrt( | |
(color1[0] - color2[0])**2 + | |
(color1[1] - color2[1])**2 + | |
(color1[2] - color2[2])**2 | |
) | |
def generate_distinct_colors(num_colors, min_distance=30): | |
""" | |
Generate a list of distinct pastel-ish colors (in hex), ensuring each is | |
at least `min_distance` away from the others in RGB space. | |
""" | |
colors = [] | |
while len(colors) < num_colors: | |
new_color = generate_random_color() | |
if all(color_distance(new_color, existing_color) >= min_distance | |
for existing_color in colors): | |
colors.append(new_color) | |
return [rgb_to_hex(color) for color in colors] | |
def list_to_dict_colors(node_types): | |
""" | |
Create a dict mapping each node type to a random (distinct) hex color. | |
""" | |
number_of_colors = len(node_types) | |
color_hexes = generate_distinct_colors(number_of_colors) | |
return {typ: color_hexes[i] for i, typ in enumerate(node_types)} | |
def get_node_types_advanced(graph: KnowledgeGraph): | |
""" | |
Extract the set of node labels from an itext2kg KnowledgeGraph. | |
(graph.entities have .label, relationships have .startEntity, .endEntity) | |
""" | |
node_types = set() | |
dict_node_colors = {} | |
for node in graph.entities: | |
node_types.add(node.label) | |
for relationship in graph.relationships: | |
node_types.add(relationship.startEntity.label) | |
node_types.add(relationship.endEntity.label) | |
dict_node_colors = {node:rgb_to_hex(generate_random_color()) for node in node_types} | |
return node_types, dict_node_colors | |
################################################################################ | |
# Graph Conversion | |
################################################################################ | |
def get_node_types(graph): | |
""" | |
Extract the set of node types from a graph that has: | |
graph.nodes -> [ Node(id, type) ... ] | |
graph.relationships -> [ Relationship(source, target, type) ... ] | |
""" | |
node_types = set() | |
for node in graph.nodes: | |
node_types.add(node.type) | |
for rel in graph.relationships: | |
node_types.add(rel.source.type) | |
node_types.add(rel.target.type) | |
return node_types | |
def convert_neo4j_to_agraph(neo4j_graph, node_colors): | |
""" | |
Convert a “Neo4j-like” object into Agraph Nodes & Edges. | |
""" | |
nodes = [] | |
edges = [] | |
# Create nodes | |
for node in neo4j_graph.nodes: | |
node_id = node.id.replace(" ", "_") | |
label = node.id | |
type_ = node.type | |
new_node = Node( | |
id=node_id, | |
title=type_, # 'title' effectively becomes "type" | |
label=label, | |
size=25, | |
shape="circle", | |
color=node_colors.get(type_, "#cccccc") | |
) | |
if not if_node_exists(nodes, node_id): | |
nodes.append(new_node) | |
# Create edges | |
for rel in neo4j_graph.relationships: | |
source_id = rel.source.id.replace(" ", "_") | |
target_id = rel.target.id.replace(" ", "_") | |
# Ensure nodes exist (if not from the loop above): | |
if not if_node_exists(nodes, source_id): | |
nodes.append(Node( | |
id=source_id, | |
title=rel.source.type, | |
label=rel.source.id, | |
size=25, | |
shape="circle", | |
color=node_colors.get(rel.source.type, "#cccccc") | |
)) | |
if not if_node_exists(nodes, target_id): | |
nodes.append(Node( | |
id=target_id, | |
title=rel.target.type, | |
label=rel.target.id, | |
size=25, | |
shape="circle", | |
color=node_colors.get(rel.target.type, "#cccccc") | |
)) | |
edges.append(Edge( | |
source=source_id, | |
label=rel.type, | |
target=target_id | |
)) | |
config = Config( | |
width=1200, | |
height=800, | |
directed=True, | |
physics=True, | |
hierarchical=True, | |
from_json="config.json" | |
) | |
return edges, nodes, config | |
def convert_advanced_neo4j_to_agraph(neo4j_graph: KnowledgeGraph, node_colors): | |
""" | |
Same logic as above, but adapted to an itext2kg.models.KnowledgeGraph object | |
(graph.entities, graph.relationships). | |
""" | |
nodes = [] | |
edges = [] | |
# Create nodes | |
for node in neo4j_graph.entities: | |
node_id = node.name.replace(" ", "_") | |
label = node.name | |
type_ = node.label | |
new_node = Node( | |
id=node_id, | |
title=type_, | |
label=label, | |
size=25, | |
shape="circle", | |
color=node_colors[type_] | |
) | |
if not if_node_exists(nodes, new_node.id): | |
nodes.append(new_node) | |
# Create edges | |
for relationship in neo4j_graph.relationships: | |
source = relationship.startEntity | |
target = relationship.endEntity | |
source_id = source.name.replace(" ", "_") | |
target_id = target.name.replace(" ", "_") | |
# Ensure existence of the source node | |
if not if_node_exists(nodes, source_id): | |
nodes.append(Node( | |
id=source_id, | |
title=source.label, | |
label=source.name, | |
size=25, | |
shape="circle", | |
color=node_colors.get(source.label, "#CCCCCC") | |
)) | |
# Ensure existence of the target node | |
if not if_node_exists(nodes, target_id): | |
nodes.append(Node( | |
id=target_id, | |
title=target.label, | |
label=target.name, | |
size=25, | |
shape="circle", | |
color=node_colors.get(target.label, "#CCCCCC") | |
)) | |
edges.append(Edge( | |
source=source_id, | |
label=relationship.name, | |
target=target_id | |
)) | |
config = Config( | |
width=1200, | |
height=800, | |
directed=True, | |
physics=True, | |
hierarchical=True, | |
from_json="config.json" | |
) | |
return edges, nodes, config | |
def display_graph(edges, nodes, config): | |
"""Render Agraph.""" | |
return agraph(edges=edges, nodes=nodes, config=config) | |
def filter_nodes_by_types(nodes, node_types_filter): | |
""" | |
Filter out Agraph nodes by the node’s 'title' field (which is used as 'type' here). | |
""" | |
if not node_types_filter: | |
return nodes | |
return [node for node in nodes if node.title in node_types_filter] | |
def format_relationships(relationships : list[Edge]): | |
"""Format relationships for display in the chat.""" | |
return "\n".join( | |
f"- **{rel.source}** -- {rel.label} --> **{rel.to}**" | |
for rel in relationships | |
) | |
def fortmat_nodes(nodes : list[Node]): | |
"""Format nodes for display in the chat.""" | |
return "\n".join( | |
f"- **{node.label}** ({node.title})" | |
for node in nodes | |
) | |
def add_relationship_to_graph(source_id, target_id, relationship_type): | |
st.session_state.edges.append(Edge(source=source_id, label=relationship_type, target=target_id)) | |
print(f"Relation ajoutée: {source_id} -- {relationship_type} --> {target_id}") | |
if not if_node_exists(st.session_state.nodes, source_id): | |
st.session_state.nodes.append(Node( | |
id=source_id, | |
title="Autre", | |
label=source_id, | |
size=25, | |
shape="circle", | |
color=st.session_state.node_types.get(target.label, "#CCCCCC") | |
)) | |
print(f"Node ajouté: {source_id}") | |
print(f"Nodes: {fortmat_nodes(st.session_state.nodes)}") | |
def delete_relationship_from_graph(source_id, target_id, relationship_type): | |
st.session_state.edges = [edge for edge in st.session_state.edges if not ( | |
edge.source == source_id and edge.to == target_id and edge.label == relationship_type | |
)] | |
################################################################################ | |
# Dialog Components (same as your original code) | |
################################################################################ | |
def change_view_dialog(): | |
""" | |
Dialog to rename or delete existing views from st.session_state.filter_views | |
and choose the active one (st.session_state.current_view). | |
""" | |
st.write("Changer la vue") | |
for index, item in enumerate(st.session_state.filter_views.keys()): | |
emp = st.empty() | |
col1, col2, col3 = emp.columns([8, 1, 1]) | |
# Delete the view (except for the default if you want) | |
if index > 0 and col2.button("🗑️", key=f"del{index}"): | |
del st.session_state.filter_views[item] | |
st.session_state.current_view = "Vue par défaut" | |
st.rerun() | |
# Choose the view | |
but_content = "🔍" if st.session_state.current_view != item else "✅" | |
if col3.button(but_content, key=f"valid{index}"): | |
st.session_state.current_view = item | |
st.rerun() | |
# Show details / rename | |
if len(st.session_state.filter_views.keys()) > index: | |
with col1.expander(item): | |
# Don’t allow renaming the default view (index=0) if you want | |
if index > 0: | |
change_name = st.text_input( | |
"Nom de la vue", | |
label_visibility="collapsed", | |
placeholder="Changez le nom de la vue", | |
key=f"change_name{index}" | |
) | |
if st.button("Renommer", key=f"rename{index}"): | |
if change_name.strip(): | |
st.session_state.filter_views[change_name] = st.session_state.filter_views.pop(item) | |
st.session_state.current_view = change_name | |
st.rerun() | |
st.markdown( | |
"\n".join(f"- {label.strip()}" | |
for label in st.session_state.filter_views[item]) | |
) | |
else: | |
emp.empty() | |
def add_view_dialog(filters): | |
""" | |
Dialog to add a new “view” to st.session_state.filter_views, specifying which types to filter by. | |
""" | |
st.write("Ajouter une vue") | |
view_name = st.text_input("Nom de la vue") | |
st.markdown("Les filtres actuels :") | |
st.write(filters) | |
if st.button("Ajouter la vue"): | |
if view_name.strip(): | |
st.session_state.filter_views[view_name] = filters | |
st.session_state.current_view = view_name | |
st.rerun() | |
def change_color_dialog(): | |
"""Dialog to interactively change colors of each node type via color pickers.""" | |
st.write("Changer la couleur") | |
for node_type, color in st.session_state.node_types.items(): | |
new_color = st.color_picker( | |
f"La couleur de l'entité **{node_type.strip()}**", | |
color | |
) | |
print("New color:", new_color) | |
print("Old color:", color) | |
st.session_state.node_types[node_type] = new_color | |
if st.button("Valider"): | |
st.rerun() | |
def change_node_label_dialog(selected_node_id): | |
"""Dialog to change the label of a node.""" | |
node : Node = if_node_exists(st.session_state.nodes, selected_node_id) | |
st.write("- **Nom:** ", node.label) | |
st.write("- **Etiquette:** ", node.title) | |
if node: | |
new_label = st.selectbox("Etiquette du noeud",list(st.session_state.node_types.keys())+["Autre"],index=list(st.session_state.node_types.keys()).index(node.title)) | |
if new_label == "Autre": | |
new_label_text = st.text_input("Nouvelle étiquette") | |
if st.button("Valider") and new_label: | |
if new_label == "Autre" and new_label_text: | |
st.session_state.node_types[new_label_text] = rgb_to_hex(generate_random_color()) | |
node.title = new_label_text | |
st.success(f"Etiquette du noeud {selected_node_id} modifiée en {new_label_text}") | |
st.rerun() | |
node.title = new_label | |
st.success(f"Etiquette du noeud {selected_node_id} modifiée en {new_label}") | |
st.rerun() | |
################################################################################ | |
# Main KG Function | |
################################################################################ | |
def kg_main(): | |
# 1. Load your pickles (if not already loaded in session state) | |
if "scenes" not in st.session_state: | |
with open("./utils/assets/scenes.pkl", "rb") as f: | |
st.session_state.scenes = pickle.load(f) | |
st.session_state.vectorstore = get_vectorstore(st.session_state.scenes) | |
if "graph" not in st.session_state: | |
with open("./utils/assets/kg_ia_signature.pkl", "rb") as f: | |
# Depending on how you stored it, it might be a tuple (graph, extra_info) | |
# or directly a single object. Adjust as needed. | |
st.session_state.graph = pickle.load(f) | |
print("Graph loaded.") | |
# 2. Initialize other session keys if they don’t exist | |
if "filter_views" not in st.session_state: | |
st.session_state.filter_views = {} | |
if "current_view" not in st.session_state: | |
st.session_state.current_view = None | |
if "node_types" not in st.session_state: | |
st.session_state.node_types = None | |
if "chat_graph_history" not in st.session_state: | |
st.session_state.chat_graph_history = [] | |
st.title("Graphe de connaissance") | |
# If we haven’t set up node types yet, do it now | |
if st.session_state.node_types is None: | |
# st.session_state.graph is presumably a list/tuple => st.session_state.graph[0] | |
# Or just st.session_state.graph if you stored it directly as a single obj | |
node_types, st.session_state.node_types = get_node_types_advanced(st.session_state.graph) | |
# st.write(f"Types d'entités trouvés : {node_types}") | |
print("Couleurs attribuées") | |
# Initialize a default filter view | |
st.session_state.filter_views["Vue par défaut"] = list(node_types) | |
st.session_state.filter_views["Personnages"] = "Person" | |
st.session_state.filter_views["Lieux"] = ["Location"] | |
st.session_state.filter_views["Concepts"] = ["Concept"] | |
st.session_state.current_view = "Personnages" | |
if "edges" not in st.session_state or "nodes" not in st.session_state: | |
# Convert the graph to Agraph format | |
st.session_state.edges, st.session_state.nodes, st.session_state.config = convert_advanced_neo4j_to_agraph( | |
st.session_state.graph, st.session_state.node_types | |
) | |
# 3. Convert the graph to agraph format | |
edges = st.session_state.edges | |
nodes = st.session_state.nodes | |
config = st.session_state.config | |
print("Graph converti en Agraph") | |
#ask chatgpt to analyse the graph | |
prompt = ("Tu es un expert en graphes de connaissances, analyse le graphe et donne une synthèse et differentes conclusions sur les elements du recit, tout en etant pertinent et precis", | |
"**Graphe**:," | |
f"**Noeuds**: {fortmat_nodes(st.session_state.nodes)}\n" | |
f"Relations: {format_relationships(st.session_state.edges)}", | |
"Output: tu dois donner une synthèse et des conclusions sur les elements du recit , ca sera le premier message de la conversation"), | |
response = generate_response_via_langchain(prompt) | |
st.session_state.chat_graph_history.append(AIMessage(content=response)) | |
# 4. UI layout: (left) the graph itself, (right) the chat | |
col1, col2 = st.columns([3, 1]) | |
with col1.container(border=True,height=800): | |
st.write(f"#### Visualisation du graphe (**{st.session_state.current_view}**)") | |
filter_col, add_view_col, change_view_col, color_col = st.columns([9, 1, 1, 1]) | |
if color_col.button("🎨", help="Changer la couleur"): | |
change_color_dialog() | |
if change_view_col.button("🔍", help="Changer de vue"): | |
change_view_dialog() | |
# Currently selected filter for the chosen view | |
current_filters = st.session_state.filter_views.get(st.session_state.current_view, []) | |
filter_selection = filter_col.multiselect( | |
"Filtrer selon l'étiquette", | |
st.session_state.node_types.keys(), | |
default=current_filters, | |
label_visibility="collapsed" | |
) | |
if add_view_col.button("➕", help="Ajouter une vue"): | |
add_view_dialog(filter_selection) | |
# Filter out nodes that don’t match the chosen types | |
filtered_nodes = filter_nodes_by_types(nodes, filter_selection) | |
col_graph , col_buttons = st.columns([12, 1]) | |
# Render the graph | |
print("Affichage du graphe") | |
with col_graph.container(): | |
selected_node_id = display_graph(edges, filtered_nodes, config) | |
print("Graphe affiché") | |
with col_buttons.container(): | |
# modify node button with emoji | |
if selected_node_id: | |
if st.button("📝",key="change label"): | |
st.write(f"**Node sélectionné**: `{selected_node_id}`") | |
change_node_label_dialog(selected_node_id) | |
if selected_node_id: | |
st.write(f"**Noeud sélectionné**: `{selected_node_id}`") | |
# 5. Chat UI | |
with col2.container(border=True,height=800): | |
st.markdown("#### Dialoguer avec le graphe") | |
user_query = st.chat_input("Votre question ...") | |
if user_query: | |
st.session_state.chat_graph_history.append(HumanMessage(content=user_query)) | |
with st.container(): | |
# Display the existing chat | |
for message in st.session_state.chat_graph_history: | |
if isinstance(message, AIMessage): | |
with st.chat_message("AI"): | |
st.markdown(message.content) | |
elif isinstance(message, HumanMessage): | |
with st.chat_message("Human"): | |
st.write(message.content) | |
# If the last message is from the user, we try to generate a response | |
if (len(st.session_state.chat_graph_history) > 0 and | |
isinstance(st.session_state.chat_graph_history[-1], HumanMessage)): | |
last_message = st.session_state.chat_graph_history[-1] | |
with st.chat_message("AI"): | |
# Example retrieval (if you have a vectorstore in session state) | |
# and want to incorporate scenes or graph data: | |
prompt_tool_calling = ("Ta mission est de decider selon la query de l'utilisateur s'il y'a un outil qui correspont et il faut l'appeler, tu dois aussi savoir si on va supprimer une relation ou plutot ajouter une relation\n" | |
"Tu as 2 outils , un pour supprimer une relation et l'autre ajouter une relation dans un graphe\n" | |
"si un outil est appelé, tu dois le dire à l'utilisateur et tu dois bien extraire les id des noeuds et le type de relation\n" | |
"si l'id du noeud existe dans le graphe, extrait le exactement et si le type de relation existe dans le graphe, extrait le exactement\n" | |
f"**query de l'utilisateur** : {last_message.content}\n" | |
f"**Graph**: {format_relationships(st.session_state.edges)}\n" | |
f"sinon tu dois renvoyé: 'Pas d'outils appelé'\n" | |
f"les outils sont: {tools}\n" | |
f"Output: tu dois ecrire soit 'outil appelé' apres avoir identifier les differents elements soit 'Pas d'outils appelé'\n") | |
tools_called = generate_llm_with_tools(tools=tools,query=prompt_tool_calling) | |
print(tools_called) | |
if 'tool_calls' in tools_called.additional_kwargs: | |
for tool_call in tools_called.additional_kwargs['tool_calls']: | |
func_name = tool_call["function"]["name"] | |
raw_args = tool_call["function"]["arguments"] | |
parsed_args = json.loads(raw_args) # Convert JSON string to dict | |
source_id = parsed_args["source_id"] | |
target_id = parsed_args["target_id"] | |
relationship_type = parsed_args["relationship_type"] | |
if func_name == "AddRelationship": | |
add_relationship_to_graph(source_id, target_id, relationship_type) | |
st.write(f"Relation ajoutée: {source_id} -- {relationship_type} --> {target_id}") | |
elif func_name == "DeleteRelationship": | |
delete_relationship_from_graph(source_id, target_id, relationship_type) | |
st.write(f"Relation supprimée: {source_id} -- {relationship_type} --> {target_id}") | |
if "vectorstore" in st.session_state: | |
retriever = st.session_state.vectorstore.as_retriever() | |
context = retriever.invoke(last_message.content) | |
prompt = ( | |
f"Contexte depuis les 'scenes': {st.session_state.scenes}\n" | |
f"Contexte vectorstore: {context}\n" | |
f"Question: {last_message.content}\n" | |
f"Graph: {st.session_state.graph}\n" # If you want to embed your entire graph | |
) | |
response = st.write_stream( | |
generate_response_via_langchain(prompt, stream=True) | |
) | |
st.session_state.chat_graph_history.append(AIMessage(content=response)) | |
else: | |
# Fallback if no vectorstore | |
st.write("Aucune base de vecteurs disponible.") | |
st.session_state.chat_graph_history.append(AIMessage(content="(Pas de vectorstore)")) | |
# If the user clicked on a node in the graph, we can propose quick prompts | |
if selected_node_id: | |
with st.chat_message("AI"): | |
st.markdown(f"**Vous avez sélectionné**: `{selected_node_id}`") | |
quick_prompts = [ | |
f"Donne-moi plus d'informations sur le noeud '{selected_node_id}'", | |
f"Montre-moi les relations de '{selected_node_id}' dans ce graphe" | |
] | |
for i, qprompt in enumerate(quick_prompts): | |
if st.button(qprompt, key=f"qp_{i}"): | |
st.session_state.chat_graph_history.append(HumanMessage(content=qprompt)) | |
kg_main() |