ANALYSE_AGENT / audit_page /knowledge_graph.py
Ilyas KHIAT
analyse
07a1739
import streamlit as st
import pickle
import random
import math
from streamlit_agraph import agraph, Node, Edge, Config
from utils.kg.construct_kg import get_graph # if still needed for something else
from utils.audit.rag import get_text_from_content_for_doc, get_text_from_content_for_audio
from utils.audit.response_llm import generate_response_via_langchain,generate_llm_with_tools
from utils.audit.rag import get_vectorstore
from langchain_core.messages import AIMessage, HumanMessage
from langchain_core.prompts import PromptTemplate
from itext2kg.models import KnowledgeGraph
from typing_extensions import Annotated, TypedDict
import json
class AddRelationship(TypedDict):
'''Ajouter une relation au graphe'''
source_id : Annotated[str, 'The source node ID']
target_id : Annotated[str, 'The target node ID']
relationship_type : Annotated[str, 'The type of relationship']
class DeleteRelationship(TypedDict):
'''Supprimer une relation du graphe, une information est donnée sur la relation à supprimer'''
source_id : Annotated[str, 'The source node ID']
target_id : Annotated[str, 'The target node ID']
relationship_type : Annotated[str, 'The type of relationship']
tools = [AddRelationship, DeleteRelationship]
################################################################################
# Utility Functions
################################################################################
def if_node_exists(nodes, node_id):
"""Check if a node with the given id already exists in a list of Node objects."""
for node in nodes:
if node.id == node_id:
return node
return False
def generate_random_color():
"""Generate a random pastel-ish RGB color."""
r = random.randint(180, 255)
g = random.randint(180, 255)
b = random.randint(180, 255)
return (r, g, b)
def rgb_to_hex(rgb):
"""Convert an (R, G, B) tuple to a hex string like '#aabbcc'."""
return '#{:02x}{:02x}{:02x}'.format(rgb[0], rgb[1], rgb[2])
def color_distance(color1, color2):
"""Calculate Euclidean distance between two RGB colors."""
return math.sqrt(
(color1[0] - color2[0])**2 +
(color1[1] - color2[1])**2 +
(color1[2] - color2[2])**2
)
def generate_distinct_colors(num_colors, min_distance=30):
"""
Generate a list of distinct pastel-ish colors (in hex), ensuring each is
at least `min_distance` away from the others in RGB space.
"""
colors = []
while len(colors) < num_colors:
new_color = generate_random_color()
if all(color_distance(new_color, existing_color) >= min_distance
for existing_color in colors):
colors.append(new_color)
return [rgb_to_hex(color) for color in colors]
def list_to_dict_colors(node_types):
"""
Create a dict mapping each node type to a random (distinct) hex color.
"""
number_of_colors = len(node_types)
color_hexes = generate_distinct_colors(number_of_colors)
return {typ: color_hexes[i] for i, typ in enumerate(node_types)}
def get_node_types_advanced(graph: KnowledgeGraph):
"""
Extract the set of node labels from an itext2kg KnowledgeGraph.
(graph.entities have .label, relationships have .startEntity, .endEntity)
"""
node_types = set()
dict_node_colors = {}
for node in graph.entities:
node_types.add(node.label)
for relationship in graph.relationships:
node_types.add(relationship.startEntity.label)
node_types.add(relationship.endEntity.label)
dict_node_colors = {node:rgb_to_hex(generate_random_color()) for node in node_types}
return node_types, dict_node_colors
################################################################################
# Graph Conversion
################################################################################
def get_node_types(graph):
"""
Extract the set of node types from a graph that has:
graph.nodes -> [ Node(id, type) ... ]
graph.relationships -> [ Relationship(source, target, type) ... ]
"""
node_types = set()
for node in graph.nodes:
node_types.add(node.type)
for rel in graph.relationships:
node_types.add(rel.source.type)
node_types.add(rel.target.type)
return node_types
def convert_neo4j_to_agraph(neo4j_graph, node_colors):
"""
Convert a “Neo4j-like” object into Agraph Nodes & Edges.
"""
nodes = []
edges = []
# Create nodes
for node in neo4j_graph.nodes:
node_id = node.id.replace(" ", "_")
label = node.id
type_ = node.type
new_node = Node(
id=node_id,
title=type_, # 'title' effectively becomes "type"
label=label,
size=25,
shape="circle",
color=node_colors.get(type_, "#cccccc")
)
if not if_node_exists(nodes, node_id):
nodes.append(new_node)
# Create edges
for rel in neo4j_graph.relationships:
source_id = rel.source.id.replace(" ", "_")
target_id = rel.target.id.replace(" ", "_")
# Ensure nodes exist (if not from the loop above):
if not if_node_exists(nodes, source_id):
nodes.append(Node(
id=source_id,
title=rel.source.type,
label=rel.source.id,
size=25,
shape="circle",
color=node_colors.get(rel.source.type, "#cccccc")
))
if not if_node_exists(nodes, target_id):
nodes.append(Node(
id=target_id,
title=rel.target.type,
label=rel.target.id,
size=25,
shape="circle",
color=node_colors.get(rel.target.type, "#cccccc")
))
edges.append(Edge(
source=source_id,
label=rel.type,
target=target_id
))
config = Config(
width=1200,
height=800,
directed=True,
physics=True,
hierarchical=True,
from_json="config.json"
)
return edges, nodes, config
def convert_advanced_neo4j_to_agraph(neo4j_graph: KnowledgeGraph, node_colors):
"""
Same logic as above, but adapted to an itext2kg.models.KnowledgeGraph object
(graph.entities, graph.relationships).
"""
nodes = []
edges = []
# Create nodes
for node in neo4j_graph.entities:
node_id = node.name.replace(" ", "_")
label = node.name
type_ = node.label
new_node = Node(
id=node_id,
title=type_,
label=label,
size=25,
shape="circle",
color=node_colors[type_]
)
if not if_node_exists(nodes, new_node.id):
nodes.append(new_node)
# Create edges
for relationship in neo4j_graph.relationships:
source = relationship.startEntity
target = relationship.endEntity
source_id = source.name.replace(" ", "_")
target_id = target.name.replace(" ", "_")
# Ensure existence of the source node
if not if_node_exists(nodes, source_id):
nodes.append(Node(
id=source_id,
title=source.label,
label=source.name,
size=25,
shape="circle",
color=node_colors.get(source.label, "#CCCCCC")
))
# Ensure existence of the target node
if not if_node_exists(nodes, target_id):
nodes.append(Node(
id=target_id,
title=target.label,
label=target.name,
size=25,
shape="circle",
color=node_colors.get(target.label, "#CCCCCC")
))
edges.append(Edge(
source=source_id,
label=relationship.name,
target=target_id
))
config = Config(
width=1200,
height=800,
directed=True,
physics=True,
hierarchical=True,
from_json="config.json"
)
return edges, nodes, config
def display_graph(edges, nodes, config):
"""Render Agraph."""
return agraph(edges=edges, nodes=nodes, config=config)
def filter_nodes_by_types(nodes, node_types_filter):
"""
Filter out Agraph nodes by the node’s 'title' field (which is used as 'type' here).
"""
if not node_types_filter:
return nodes
return [node for node in nodes if node.title in node_types_filter]
def format_relationships(relationships : list[Edge]):
"""Format relationships for display in the chat."""
return "\n".join(
f"- **{rel.source}** -- {rel.label} --> **{rel.to}**"
for rel in relationships
)
def fortmat_nodes(nodes : list[Node]):
"""Format nodes for display in the chat."""
return "\n".join(
f"- **{node.label}** ({node.title})"
for node in nodes
)
def add_relationship_to_graph(source_id, target_id, relationship_type):
st.session_state.edges.append(Edge(source=source_id, label=relationship_type, target=target_id))
print(f"Relation ajoutée: {source_id} -- {relationship_type} --> {target_id}")
if not if_node_exists(st.session_state.nodes, source_id):
st.session_state.nodes.append(Node(
id=source_id,
title="Autre",
label=source_id,
size=25,
shape="circle",
color=st.session_state.node_types.get(target.label, "#CCCCCC")
))
print(f"Node ajouté: {source_id}")
print(f"Nodes: {fortmat_nodes(st.session_state.nodes)}")
def delete_relationship_from_graph(source_id, target_id, relationship_type):
st.session_state.edges = [edge for edge in st.session_state.edges if not (
edge.source == source_id and edge.to == target_id and edge.label == relationship_type
)]
################################################################################
# Dialog Components (same as your original code)
################################################################################
@st.dialog(title="Changer la vue")
def change_view_dialog():
"""
Dialog to rename or delete existing views from st.session_state.filter_views
and choose the active one (st.session_state.current_view).
"""
st.write("Changer la vue")
for index, item in enumerate(st.session_state.filter_views.keys()):
emp = st.empty()
col1, col2, col3 = emp.columns([8, 1, 1])
# Delete the view (except for the default if you want)
if index > 0 and col2.button("🗑️", key=f"del{index}"):
del st.session_state.filter_views[item]
st.session_state.current_view = "Vue par défaut"
st.rerun()
# Choose the view
but_content = "🔍" if st.session_state.current_view != item else "✅"
if col3.button(but_content, key=f"valid{index}"):
st.session_state.current_view = item
st.rerun()
# Show details / rename
if len(st.session_state.filter_views.keys()) > index:
with col1.expander(item):
# Don’t allow renaming the default view (index=0) if you want
if index > 0:
change_name = st.text_input(
"Nom de la vue",
label_visibility="collapsed",
placeholder="Changez le nom de la vue",
key=f"change_name{index}"
)
if st.button("Renommer", key=f"rename{index}"):
if change_name.strip():
st.session_state.filter_views[change_name] = st.session_state.filter_views.pop(item)
st.session_state.current_view = change_name
st.rerun()
st.markdown(
"\n".join(f"- {label.strip()}"
for label in st.session_state.filter_views[item])
)
else:
emp.empty()
@st.dialog(title="Ajouter une vue")
def add_view_dialog(filters):
"""
Dialog to add a new “view” to st.session_state.filter_views, specifying which types to filter by.
"""
st.write("Ajouter une vue")
view_name = st.text_input("Nom de la vue")
st.markdown("Les filtres actuels :")
st.write(filters)
if st.button("Ajouter la vue"):
if view_name.strip():
st.session_state.filter_views[view_name] = filters
st.session_state.current_view = view_name
st.rerun()
@st.dialog(title="Changer la couleur")
def change_color_dialog():
"""Dialog to interactively change colors of each node type via color pickers."""
st.write("Changer la couleur")
for node_type, color in st.session_state.node_types.items():
new_color = st.color_picker(
f"La couleur de l'entité **{node_type.strip()}**",
color
)
print("New color:", new_color)
print("Old color:", color)
st.session_state.node_types[node_type] = new_color
if st.button("Valider"):
st.rerun()
@st.dialog(title="Modifier l'etiquette du noeud")
def change_node_label_dialog(selected_node_id):
"""Dialog to change the label of a node."""
node : Node = if_node_exists(st.session_state.nodes, selected_node_id)
st.write("- **Nom:** ", node.label)
st.write("- **Etiquette:** ", node.title)
if node:
new_label = st.selectbox("Etiquette du noeud",list(st.session_state.node_types.keys())+["Autre"],index=list(st.session_state.node_types.keys()).index(node.title))
if new_label == "Autre":
new_label_text = st.text_input("Nouvelle étiquette")
if st.button("Valider") and new_label:
if new_label == "Autre" and new_label_text:
st.session_state.node_types[new_label_text] = rgb_to_hex(generate_random_color())
node.title = new_label_text
st.success(f"Etiquette du noeud {selected_node_id} modifiée en {new_label_text}")
st.rerun()
node.title = new_label
st.success(f"Etiquette du noeud {selected_node_id} modifiée en {new_label}")
st.rerun()
################################################################################
# Main KG Function
################################################################################
def kg_main():
# 1. Load your pickles (if not already loaded in session state)
if "scenes" not in st.session_state:
with open("./utils/assets/scenes.pkl", "rb") as f:
st.session_state.scenes = pickle.load(f)
st.session_state.vectorstore = get_vectorstore(st.session_state.scenes)
if "graph" not in st.session_state:
with open("./utils/assets/kg_ia_signature.pkl", "rb") as f:
# Depending on how you stored it, it might be a tuple (graph, extra_info)
# or directly a single object. Adjust as needed.
st.session_state.graph = pickle.load(f)
print("Graph loaded.")
# 2. Initialize other session keys if they don’t exist
if "filter_views" not in st.session_state:
st.session_state.filter_views = {}
if "current_view" not in st.session_state:
st.session_state.current_view = None
if "node_types" not in st.session_state:
st.session_state.node_types = None
if "chat_graph_history" not in st.session_state:
st.session_state.chat_graph_history = []
st.title("Graphe de connaissance")
# If we haven’t set up node types yet, do it now
if st.session_state.node_types is None:
# st.session_state.graph is presumably a list/tuple => st.session_state.graph[0]
# Or just st.session_state.graph if you stored it directly as a single obj
node_types, st.session_state.node_types = get_node_types_advanced(st.session_state.graph)
# st.write(f"Types d'entités trouvés : {node_types}")
print("Couleurs attribuées")
# Initialize a default filter view
st.session_state.filter_views["Vue par défaut"] = list(node_types)
st.session_state.filter_views["Personnages"] = "Person"
st.session_state.filter_views["Lieux"] = ["Location"]
st.session_state.filter_views["Concepts"] = ["Concept"]
st.session_state.current_view = "Personnages"
if "edges" not in st.session_state or "nodes" not in st.session_state:
# Convert the graph to Agraph format
st.session_state.edges, st.session_state.nodes, st.session_state.config = convert_advanced_neo4j_to_agraph(
st.session_state.graph, st.session_state.node_types
)
# 3. Convert the graph to agraph format
edges = st.session_state.edges
nodes = st.session_state.nodes
config = st.session_state.config
print("Graph converti en Agraph")
#ask chatgpt to analyse the graph
prompt = ("Tu es un expert en graphes de connaissances, analyse le graphe et donne une synthèse et differentes conclusions sur les elements du recit, tout en etant pertinent et precis",
"**Graphe**:,"
f"**Noeuds**: {fortmat_nodes(st.session_state.nodes)}\n"
f"Relations: {format_relationships(st.session_state.edges)}",
"Output: tu dois donner une synthèse et des conclusions sur les elements du recit , ca sera le premier message de la conversation"),
response = generate_response_via_langchain(prompt)
st.session_state.chat_graph_history.append(AIMessage(content=response))
# 4. UI layout: (left) the graph itself, (right) the chat
col1, col2 = st.columns([3, 1])
with col1.container(border=True,height=800):
st.write(f"#### Visualisation du graphe (**{st.session_state.current_view}**)")
filter_col, add_view_col, change_view_col, color_col = st.columns([9, 1, 1, 1])
if color_col.button("🎨", help="Changer la couleur"):
change_color_dialog()
if change_view_col.button("🔍", help="Changer de vue"):
change_view_dialog()
# Currently selected filter for the chosen view
current_filters = st.session_state.filter_views.get(st.session_state.current_view, [])
filter_selection = filter_col.multiselect(
"Filtrer selon l'étiquette",
st.session_state.node_types.keys(),
default=current_filters,
label_visibility="collapsed"
)
if add_view_col.button("➕", help="Ajouter une vue"):
add_view_dialog(filter_selection)
# Filter out nodes that don’t match the chosen types
filtered_nodes = filter_nodes_by_types(nodes, filter_selection)
col_graph , col_buttons = st.columns([12, 1])
# Render the graph
print("Affichage du graphe")
with col_graph.container():
selected_node_id = display_graph(edges, filtered_nodes, config)
print("Graphe affiché")
with col_buttons.container():
# modify node button with emoji
if selected_node_id:
if st.button("📝",key="change label"):
st.write(f"**Node sélectionné**: `{selected_node_id}`")
change_node_label_dialog(selected_node_id)
if selected_node_id:
st.write(f"**Noeud sélectionné**: `{selected_node_id}`")
# 5. Chat UI
with col2.container(border=True,height=800):
st.markdown("#### Dialoguer avec le graphe")
user_query = st.chat_input("Votre question ...")
if user_query:
st.session_state.chat_graph_history.append(HumanMessage(content=user_query))
with st.container():
# Display the existing chat
for message in st.session_state.chat_graph_history:
if isinstance(message, AIMessage):
with st.chat_message("AI"):
st.markdown(message.content)
elif isinstance(message, HumanMessage):
with st.chat_message("Human"):
st.write(message.content)
# If the last message is from the user, we try to generate a response
if (len(st.session_state.chat_graph_history) > 0 and
isinstance(st.session_state.chat_graph_history[-1], HumanMessage)):
last_message = st.session_state.chat_graph_history[-1]
with st.chat_message("AI"):
# Example retrieval (if you have a vectorstore in session state)
# and want to incorporate scenes or graph data:
prompt_tool_calling = ("Ta mission est de decider selon la query de l'utilisateur s'il y'a un outil qui correspont et il faut l'appeler, tu dois aussi savoir si on va supprimer une relation ou plutot ajouter une relation\n"
"Tu as 2 outils , un pour supprimer une relation et l'autre ajouter une relation dans un graphe\n"
"si un outil est appelé, tu dois le dire à l'utilisateur et tu dois bien extraire les id des noeuds et le type de relation\n"
"si l'id du noeud existe dans le graphe, extrait le exactement et si le type de relation existe dans le graphe, extrait le exactement\n"
f"**query de l'utilisateur** : {last_message.content}\n"
f"**Graph**: {format_relationships(st.session_state.edges)}\n"
f"sinon tu dois renvoyé: 'Pas d'outils appelé'\n"
f"les outils sont: {tools}\n"
f"Output: tu dois ecrire soit 'outil appelé' apres avoir identifier les differents elements soit 'Pas d'outils appelé'\n")
tools_called = generate_llm_with_tools(tools=tools,query=prompt_tool_calling)
print(tools_called)
if 'tool_calls' in tools_called.additional_kwargs:
for tool_call in tools_called.additional_kwargs['tool_calls']:
func_name = tool_call["function"]["name"]
raw_args = tool_call["function"]["arguments"]
parsed_args = json.loads(raw_args) # Convert JSON string to dict
source_id = parsed_args["source_id"]
target_id = parsed_args["target_id"]
relationship_type = parsed_args["relationship_type"]
if func_name == "AddRelationship":
add_relationship_to_graph(source_id, target_id, relationship_type)
st.write(f"Relation ajoutée: {source_id} -- {relationship_type} --> {target_id}")
elif func_name == "DeleteRelationship":
delete_relationship_from_graph(source_id, target_id, relationship_type)
st.write(f"Relation supprimée: {source_id} -- {relationship_type} --> {target_id}")
if "vectorstore" in st.session_state:
retriever = st.session_state.vectorstore.as_retriever()
context = retriever.invoke(last_message.content)
prompt = (
f"Contexte depuis les 'scenes': {st.session_state.scenes}\n"
f"Contexte vectorstore: {context}\n"
f"Question: {last_message.content}\n"
f"Graph: {st.session_state.graph}\n" # If you want to embed your entire graph
)
response = st.write_stream(
generate_response_via_langchain(prompt, stream=True)
)
st.session_state.chat_graph_history.append(AIMessage(content=response))
else:
# Fallback if no vectorstore
st.write("Aucune base de vecteurs disponible.")
st.session_state.chat_graph_history.append(AIMessage(content="(Pas de vectorstore)"))
# If the user clicked on a node in the graph, we can propose quick prompts
if selected_node_id:
with st.chat_message("AI"):
st.markdown(f"**Vous avez sélectionné**: `{selected_node_id}`")
quick_prompts = [
f"Donne-moi plus d'informations sur le noeud '{selected_node_id}'",
f"Montre-moi les relations de '{selected_node_id}' dans ce graphe"
]
for i, qprompt in enumerate(quick_prompts):
if st.button(qprompt, key=f"qp_{i}"):
st.session_state.chat_graph_history.append(HumanMessage(content=qprompt))
kg_main()