Shakir60 commited on
Commit
1413086
·
verified ·
1 Parent(s): 1f13845

Update rag_utils.py

Browse files
Files changed (1) hide show
  1. rag_utils.py +170 -123
rag_utils.py CHANGED
@@ -1,154 +1,201 @@
 
1
  from langchain.embeddings import HuggingFaceEmbeddings
2
  from langchain.vectorstores import FAISS
3
  from langchain.text_splitter import RecursiveCharacterTextSplitter
4
- from langchain.chains import RetrievalQA
5
- from langchain.prompts import PromptTemplate
6
- from langchain.llms import HuggingFaceHub
7
- import os
8
  import logging
 
 
 
 
9
 
10
- # Configure logging
11
- logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
 
12
 
13
  class RAGSystem:
14
  def __init__(self):
 
15
  try:
16
- # Initialize embeddings
17
  self.embeddings = HuggingFaceEmbeddings(
18
- model_name="sentence-transformers/all-mpnet-base-v2"
 
19
  )
20
  self.vector_store = None
21
  self.text_splitter = RecursiveCharacterTextSplitter(
22
  chunk_size=500,
23
- chunk_overlap=50
 
24
  )
25
- # Initialize HuggingFace model for text generation
26
- self.llm = HuggingFaceHub(
27
- repo_id="google/flan-t5-large",
28
- task="text-generation",
29
- model_kwargs={"temperature": 0.7, "max_length": 512}
30
- )
31
- logging.info("RAG system initialized successfully.")
32
  except Exception as e:
33
- logging.error(f"Failed to initialize RAG system: {str(e)}")
34
- raise e
35
 
36
- def initialize_knowledge_base(self, knowledge_base):
37
- """Initialize vector store with enhanced construction knowledge"""
 
38
  try:
39
- documents = []
40
- # Validate knowledge base
41
- self._validate_knowledge_base(knowledge_base)
42
-
43
- # Generate insights and case studies
44
- expert_insights = self._generate_expert_insights(knowledge_base)
45
- case_studies = self._generate_case_studies()
46
-
47
  for damage_type, cases in knowledge_base.items():
48
- for idx, case in enumerate(cases):
49
- try:
50
- # Combine insights into document text
51
- relevant_insight = expert_insights.get(damage_type, "")
52
- relevant_cases = case_studies.get(damage_type, "")
53
-
54
- doc_text = f"""
55
- Damage Type: {damage_type}
56
- Severity: {case['severity']}
57
- Description: {case['description']}
58
- Technical Details: {case['description']}
59
- Expert Insight: {relevant_insight}
60
- Case Studies: {relevant_cases}
61
- Repair Methods: {', '.join(case['repair_method'])}
62
- Cost Considerations: {case['estimated_cost']}
63
- Implementation Timeline: {case['timeframe']}
64
- Location Specifics: {case['location']}
65
- Required Expertise Level: {case['required_expertise']}
66
- Emergency Protocol: {case['immediate_action']}
67
- Preventive Measures: {case['prevention']}
68
- """
69
- documents.append(doc_text)
70
- except KeyError as e:
71
- logging.warning(f"Missing key {str(e)} in {damage_type}, case {idx + 1}. Skipping.")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
72
 
73
- if not documents:
74
- raise ValueError("No valid documents to process.")
 
 
 
 
 
 
 
 
 
75
 
76
- splits = self.text_splitter.create_documents(documents)
77
- self.vector_store = FAISS.from_documents(splits, self.embeddings)
78
 
79
- # Initialize QA chain
80
- self.qa_chain = RetrievalQA.from_chain_type(
81
- llm=self.llm,
82
- chain_type="stuff",
83
- retriever=self.vector_store.as_retriever(),
84
- chain_type_kwargs={
85
- "prompt": self._get_qa_prompt()
86
- }
87
  )
88
- logging.info("Knowledge base initialized successfully.")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
89
  except Exception as e:
90
- logging.error(f"Failed to initialize knowledge base: {str(e)}")
91
- raise e
92
-
93
- def _validate_knowledge_base(self, knowledge_base):
94
- """Validate the structure of the knowledge base."""
95
- required_keys = ['severity', 'description', 'repair_method', 'estimated_cost', 'timeframe', 'location', 'required_expertise', 'immediate_action', 'prevention']
96
- for damage_type, cases in knowledge_base.items():
97
- for idx, case in enumerate(cases):
98
- for key in required_keys:
99
- if key not in case:
100
- logging.error(f"Missing required field '{key}' in {damage_type}, case {idx + 1}")
101
- raise ValueError(f"Missing required field '{key}' in {damage_type}, case {idx + 1}")
102
- logging.info("Knowledge base validation passed.")
103
-
104
- def _get_qa_prompt(self):
105
- """Create a custom prompt template for the QA chain"""
106
- template = """
107
- Context: {context}
108
-
109
- Question: {question}
110
-
111
- Provide a detailed analysis considering:
112
- 1. Technical aspects
113
- 2. Safety implications
114
- 3. Cost-benefit analysis
115
- 4. Long-term considerations
116
- 5. Best practices and recommendations
117
-
118
- Answer:
119
- """
120
- return PromptTemplate(
121
- template=template,
122
- input_variables=["context", "question"]
123
- )
124
-
125
- def get_enhanced_analysis(self, damage_type, confidence, custom_query=None):
126
- """Get enhanced analysis with dynamic content generation"""
127
  try:
128
  if not self.vector_store:
129
- raise ValueError("Vector store is not initialized.")
130
 
131
- if not custom_query:
132
- base_query = f"""
133
- Provide a comprehensive analysis for {damage_type} damage with {confidence}% confidence level.
134
- Include technical assessment, safety implications, and expert recommendations.
135
- """
136
  else:
137
- base_query = custom_query
138
-
 
 
 
139
  # Get relevant documents
140
- results = self.qa_chain.run(base_query)
141
- if not results:
142
- logging.warning("No results returned for query.")
143
- return {"technical_details": [], "safety_considerations": [], "expert_recommendations": []}
144
-
145
- # Process and categorize the response
146
- enhanced_info = {
147
- "technical_details": self._extract_technical_details(results, damage_type),
148
- "safety_considerations": self._extract_safety_considerations(results),
149
- "expert_recommendations": self._extract_recommendations(results, confidence)
 
 
 
 
 
150
  }
151
- return enhanced_info
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
152
  except Exception as e:
153
- logging.error(f"Failed to generate enhanced analysis: {str(e)}")
154
- return {"technical_details": [], "safety_considerations": [], "expert_recommendations": []}
 
1
+ # rag_utils.py
2
  from langchain.embeddings import HuggingFaceEmbeddings
3
  from langchain.vectorstores import FAISS
4
  from langchain.text_splitter import RecursiveCharacterTextSplitter
5
+ from langchain.docstore.document import Document
 
 
 
6
  import logging
7
+ from typing import List, Dict, Any
8
+ import numpy as np
9
+ from tqdm import tqdm
10
+ import streamlit as st
11
 
12
+ # Set up logging
13
+ logging.basicConfig(level=logging.INFO)
14
+ logger = logging.getLogger(__name__)
15
 
16
  class RAGSystem:
17
  def __init__(self):
18
+ """Initialize RAG system with custom embeddings and configurations"""
19
  try:
 
20
  self.embeddings = HuggingFaceEmbeddings(
21
+ model_name="sentence-transformers/all-mpnet-base-v2",
22
+ model_kwargs={'device': 'cuda' if st.cuda.is_available() else 'cpu'}
23
  )
24
  self.vector_store = None
25
  self.text_splitter = RecursiveCharacterTextSplitter(
26
  chunk_size=500,
27
+ chunk_overlap=50,
28
+ separators=["\n\n", "\n", ". ", ", ", " ", ""]
29
  )
30
+ logger.info("RAG system initialized successfully")
 
 
 
 
 
 
31
  except Exception as e:
32
+ logger.error(f"Error initializing RAG system: {str(e)}")
33
+ raise
34
 
35
+ def _create_documents(self, knowledge_base: Dict) -> List[Document]:
36
+ """Create documents from knowledge base with structured format"""
37
+ documents = []
38
  try:
 
 
 
 
 
 
 
 
39
  for damage_type, cases in knowledge_base.items():
40
+ for case in cases:
41
+ # Create a detailed document for each case
42
+ technical_info = f"""
43
+ Technical Analysis for {damage_type}:
44
+ Severity Level: {case['severity']}
45
+ Detailed Description: {case['description']}
46
+ Primary Location: {case['location']}
47
+ Required Expertise: {case['required_expertise']}
48
+ """
49
+
50
+ repair_info = f"""
51
+ Repair and Maintenance Information:
52
+ Repair Methods: {' -> '.join(case['repair_method'])}
53
+ Estimated Cost Range: {case['estimated_cost']}
54
+ Expected Timeframe: {case['timeframe']}
55
+ """
56
+
57
+ safety_info = f"""
58
+ Safety and Prevention Guidelines:
59
+ Immediate Actions Required: {case['immediate_action']}
60
+ Preventive Measures: {case['prevention']}
61
+ Critical Considerations: Special attention needed for {damage_type} in {case['location']}
62
+ """
63
+
64
+ # Combine all information
65
+ doc_text = f"{technical_info}\n{repair_info}\n{safety_info}"
66
+
67
+ # Create metadata for better retrieval
68
+ metadata = {
69
+ 'damage_type': damage_type,
70
+ 'severity': case['severity'],
71
+ 'location': case['location'],
72
+ 'document_type': 'construction_damage_analysis'
73
+ }
74
+
75
+ documents.append(Document(
76
+ page_content=doc_text,
77
+ metadata=metadata
78
+ ))
79
 
80
+ logger.info(f"Created {len(documents)} documents from knowledge base")
81
+ return documents
82
+ except Exception as e:
83
+ logger.error(f"Error creating documents: {str(e)}")
84
+ raise
85
+
86
+ def initialize_knowledge_base(self, knowledge_base: Dict):
87
+ """Initialize vector store with construction knowledge"""
88
+ try:
89
+ # Create documents
90
+ documents = self._create_documents(knowledge_base)
91
 
92
+ # Split documents into chunks
93
+ splits = self.text_splitter.split_documents(documents)
94
 
95
+ # Create vector store
96
+ self.vector_store = FAISS.from_documents(
97
+ documents=splits,
98
+ embedding=self.embeddings
 
 
 
 
99
  )
100
+
101
+ logger.info("Knowledge base initialized successfully")
102
+ except Exception as e:
103
+ logger.error(f"Error initializing knowledge base: {str(e)}")
104
+ raise
105
+
106
+ def _format_response(self, docs: List[Document], damage_type: str, confidence: float) -> Dict[str, List[str]]:
107
+ """Format retrieved documents into structured response"""
108
+ response = {
109
+ "technical_details": [],
110
+ "safety_considerations": [],
111
+ "expert_recommendations": []
112
+ }
113
+
114
+ try:
115
+ for doc in docs:
116
+ content = doc.page_content
117
+ # Parse technical details
118
+ if "Technical Analysis" in content:
119
+ response["technical_details"].append(
120
+ f"For {damage_type} (Confidence: {confidence:.1f}%):\n" +
121
+ content.split("Technical Analysis")[1].split("Repair")[0].strip()
122
+ )
123
+
124
+ # Parse safety considerations
125
+ if "Safety and Prevention" in content:
126
+ response["safety_considerations"].append(
127
+ content.split("Safety and Prevention")[1].strip()
128
+ )
129
+
130
+ # Parse repair recommendations
131
+ if "Repair and Maintenance" in content:
132
+ response["expert_recommendations"].append(
133
+ content.split("Repair and Maintenance")[1].split("Safety")[0].strip()
134
+ )
135
+
136
+ return response
137
  except Exception as e:
138
+ logger.error(f"Error formatting response: {str(e)}")
139
+ raise
140
+
141
+ def get_enhanced_analysis(
142
+ self,
143
+ damage_type: str,
144
+ confidence: float,
145
+ custom_query: str = None
146
+ ) -> Dict[str, List[str]]:
147
+ """Get enhanced analysis with optional custom query support"""
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
148
  try:
149
  if not self.vector_store:
150
+ raise ValueError("Vector store not initialized")
151
 
152
+ # Prepare query
153
+ if custom_query:
154
+ query = f"{custom_query} for {damage_type} damage"
 
 
155
  else:
156
+ query = f"""
157
+ Provide detailed analysis for {damage_type} damage with {confidence}% confidence level.
158
+ Include technical assessment, safety considerations, and repair recommendations.
159
+ """
160
+
161
  # Get relevant documents
162
+ docs = self.vector_store.similarity_search(
163
+ query=query,
164
+ k=3, # Get top 3 most relevant documents
165
+ fetch_k=5 # Fetch top 5 for better diversity
166
+ )
167
+
168
+ # Format and return response
169
+ return self._format_response(docs, damage_type, confidence)
170
+
171
+ except Exception as e:
172
+ logger.error(f"Error getting enhanced analysis: {str(e)}")
173
+ return {
174
+ "technical_details": [f"Error retrieving analysis: {str(e)}"],
175
+ "safety_considerations": ["Please try again or contact support."],
176
+ "expert_recommendations": ["System currently unavailable."]
177
  }
178
+
179
+ def get_similar_cases(self, damage_type: str, confidence: float) -> List[Dict[str, Any]]:
180
+ """Get similar damage cases for comparison"""
181
+ try:
182
+ if not self.vector_store:
183
+ raise ValueError("Vector store not initialized")
184
+
185
+ query = f"Find similar cases of {damage_type} damage"
186
+ docs = self.vector_store.similarity_search(query, k=3)
187
+
188
+ similar_cases = []
189
+ for doc in docs:
190
+ if doc.metadata['damage_type'] != damage_type: # Avoid same damage type
191
+ similar_cases.append({
192
+ 'damage_type': doc.metadata['damage_type'],
193
+ 'severity': doc.metadata['severity'],
194
+ 'location': doc.metadata['location'],
195
+ 'details': doc.page_content[:200] + '...' # First 200 chars
196
+ })
197
+
198
+ return similar_cases
199
  except Exception as e:
200
+ logger.error(f"Error getting similar cases: {str(e)}")
201
+ return []