Test / app.py
Shakir60's picture
Update app.py
9b091b5 verified
from streamlit_extras.colored_header import colored_header
from streamlit_extras.add_vertical_space import add_vertical_space
from PIL import Image
import numpy as np
from transformers import ViTFeatureExtractor, ViTForImageClassification
from sentence_transformers import SentenceTransformer
import streamlit as st
import torch
import matplotlib.pyplot as plt
import logging
import faiss
from typing import List, Dict
from datetime import datetime
from groq import Groq
import os
from functools import lru_cache
import time
from streamlit_card import card
import plotly.graph_objects as go
# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class RAGSystem:
def __init__(self):
# Load models only when needed
self._embedding_model = None
self._vector_store = None
self._knowledge_base = None
@property
def embedding_model(self):
if self._embedding_model is None:
self._embedding_model = SentenceTransformer('all-MiniLM-L6-v2')
return self._embedding_model
@property
def knowledge_base(self):
if self._knowledge_base is None:
self._knowledge_base = self.load_knowledge_base()
return self._knowledge_base
@property
def vector_store(self):
if self._vector_store is None:
self._vector_store = self.create_vector_store()
return self._vector_store
@staticmethod
@lru_cache(maxsize=1) # Cache the knowledge base
def load_knowledge_base() -> List[Dict]:
"""Load and preprocess knowledge base"""
kb = {
"spalling": [
{
"severity": "Critical",
"description": "Severe concrete spalling with exposed reinforcement",
"repair_method": "Remove deteriorated concrete, clean reinforcement",
"immediate_action": "Evacuate area, install support",
"prevention": "Regular inspections, waterproofing"
}
],
"structural_cracks": [
{
"severity": "High",
"description": "Active structural cracks >5mm width",
"repair_method": "Structural analysis, epoxy injection",
"immediate_action": "Install crack monitors",
"prevention": "Regular monitoring, load management"
}
],
"surface_deterioration": [
{
"severity": "Medium",
"description": "Surface scaling and deterioration",
"repair_method": "Surface preparation, patch repair",
"immediate_action": "Document extent, plan repairs",
"prevention": "Surface sealers, proper drainage"
}
],
"corrosion": [
{
"severity": "High",
"description": "Corrosion of reinforcement leading to cracks",
"repair_method": "Remove rust, apply inhibitors",
"immediate_action": "Isolate affected area",
"prevention": "Anti-corrosion coatings, proper drainage"
}
],
"efflorescence": [
{
"severity": "Low",
"description": "White powder deposits on concrete surfaces",
"repair_method": "Surface cleaning, sealant application",
"immediate_action": "Identify moisture source",
"prevention": "Improve waterproofing, reduce moisture ingress"
}
],
"delamination": [
{
"severity": "Medium",
"description": "Separation of layers in concrete",
"repair_method": "Resurface or replace delaminated sections",
"immediate_action": "Inspect bonding layers",
"prevention": "Proper curing and bonding agents"
}
],
"honeycombing": [
{
"severity": "Medium",
"description": "Voids in concrete caused by improper compaction",
"repair_method": "Grout injection, patch repair",
"immediate_action": "Assess structural impact",
"prevention": "Proper vibration during pouring"
}
],
"water_leakage": [
{
"severity": "High",
"description": "Water ingress through cracks or joints",
"repair_method": "Injection grouting, waterproofing membranes",
"immediate_action": "Stop water flow, apply sealants",
"prevention": "Drainage systems, joint sealing"
}
],
"settlement_cracks": [
{
"severity": "High",
"description": "Cracks due to uneven foundation settlement",
"repair_method": "Foundation underpinning, grouting",
"immediate_action": "Monitor movement, stabilize foundation",
"prevention": "Soil compaction, proper foundation design"
}
],
"shrinkage_cracks": [
{
"severity": "Low",
"description": "Minor cracks caused by shrinkage during curing",
"repair_method": "Sealant application, surface repairs",
"immediate_action": "Monitor cracks",
"prevention": "Proper curing and moisture control"
}
]
}
documents = []
for category, items in kb.items():
for item in items:
doc_text = f"Category: {category}\n"
for key, value in item.items():
doc_text += f"{key}: {value}\n"
documents.append({"text": doc_text, "metadata": {"category": category}})
return documents
def create_vector_store(self):
"""Create FAISS vector store"""
texts = [doc["text"] for doc in self.knowledge_base]
embeddings = self.embedding_model.encode(texts)
dimension = embeddings.shape[1]
index = faiss.IndexFlatL2(dimension)
index.add(np.array(embeddings).astype('float32'))
return index
@lru_cache(maxsize=32) # Cache recent query results
def get_relevant_context(self, query: str, k: int = 2) -> str:
"""Retrieve relevant context based on query"""
try:
query_embedding = self.embedding_model.encode([query])
D, I = self.vector_store.search(np.array(query_embedding).astype('float32'), k)
context = "\n\n".join([self.knowledge_base[i]["text"] for i in I[0]])
return context
except Exception as e:
logger.error(f"Error retrieving context: {e}")
return ""
class ImageAnalyzer:
def __init__(self, model_name="microsoft/swin-base-patch4-window7-224-in22k"):
self.device = "cpu"
self.defect_classes = ["spalling", "structural_cracks", "surface_deterioration"]
self.model_name = model_name
self._model = None
self._feature_extractor = None
@property
def model(self):
if self._model is None:
self._model = self._load_model()
return self._model
@property
def feature_extractor(self):
if self._feature_extractor is None:
self._feature_extractor = self._load_feature_extractor()
return self._feature_extractor
def _load_feature_extractor(self):
"""Load the appropriate feature extractor based on model type"""
try:
if "swin" in self.model_name:
from transformers import AutoFeatureExtractor
return AutoFeatureExtractor.from_pretrained(self.model_name)
elif "convnext" in self.model_name:
from transformers import ConvNextFeatureExtractor
return ConvNextFeatureExtractor.from_pretrained(self.model_name)
else:
from transformers import ViTFeatureExtractor
return ViTFeatureExtractor.from_pretrained(self.model_name)
except Exception as e:
logger.error(f"Feature extractor initialization error: {e}")
return None
def _load_model(self):
try:
if "swin" in self.model_name:
from transformers import SwinForImageClassification
model = SwinForImageClassification.from_pretrained(
self.model_name,
num_labels=len(self.defect_classes),
ignore_mismatched_sizes=True
)
elif "convnext" in self.model_name:
from transformers import ConvNextForImageClassification
model = ConvNextForImageClassification.from_pretrained(
self.model_name,
num_labels=len(self.defect_classes),
ignore_mismatched_sizes=True
)
else:
from transformers import ViTForImageClassification
model = ViTForImageClassification.from_pretrained(
self.model_name,
num_labels=len(self.defect_classes),
ignore_mismatched_sizes=True
)
model = model.to(self.device)
# Reinitialize the classifier layer
with torch.no_grad():
if hasattr(model, 'classifier'):
in_features = model.classifier.in_features
model.classifier = torch.nn.Linear(in_features, len(self.defect_classes))
elif hasattr(model, 'head'):
in_features = model.head.in_features
model.head = torch.nn.Linear(in_features, len(self.defect_classes))
return model
except Exception as e:
logger.error(f"Model initialization error: {e}")
return None
def preprocess_image(self, image_bytes):
"""Preprocess image for model input"""
return _cached_preprocess_image(image_bytes, self.model_name)
def analyze_image(self, image):
"""Analyze image for defects"""
try:
if self.model is None:
raise ValueError("Model not properly initialized")
inputs = self.feature_extractor(
images=image,
return_tensors="pt"
)
inputs = {k: v.to(self.device) for k, v in inputs.items()}
with torch.no_grad():
outputs = self.model(**inputs)
probs = torch.nn.functional.softmax(outputs.logits, dim=1)[0]
confidence_threshold = 0.3
results = {
self.defect_classes[i]: float(probs[i])
for i in range(len(self.defect_classes))
if float(probs[i]) > confidence_threshold
}
if not results:
max_idx = torch.argmax(probs)
results = {self.defect_classes[int(max_idx)]: float(probs[max_idx])}
return results
except Exception as e:
logger.error(f"Analysis error: {str(e)}")
return None
@st.cache_data
def _cached_preprocess_image(image_bytes, model_name):
"""Cached version of image preprocessing"""
try:
image = Image.open(image_bytes)
if image.mode != 'RGB':
image = image.convert('RGB')
# Adjust size based on model requirements
if "convnext" in model_name:
width, height = 384, 384
else:
width, height = 224, 224
image = image.resize((width, height), Image.Resampling.LANCZOS)
return image
except Exception as e:
logger.error(f"Image preprocessing error: {e}")
return None
@st.cache_data
def get_groq_response(query: str, context: str) -> str:
"""Get response from Groq LLM with caching"""
try:
if not os.getenv("GROQ_API_KEY"):
return "Error: Groq API key not configured"
client = Groq(api_key=os.getenv("GROQ_API_KEY"))
prompt = f"""Based on the following context about construction defects, answer the question.
Context: {context}
Question: {query}
Provide a detailed answer based on the given context."""
response = client.chat.completions.create(
messages=[
{
"role": "system",
"content": "You are a construction defect analysis expert."
},
{
"role": "user",
"content": prompt
}
],
model="llama-3.3-70b-versatile",
temperature=0.7,
)
return response.choices[0].message.content
except Exception as e:
logger.error(f"Groq API error: {e}", exc_info=True)
return f"Error: Unable to get response from AI model. Exception: {str(e)}"
def get_theme():
"""Get current theme from query parameters"""
theme = st.query_params.get("theme", "light")
return "dark" if theme == "dark" else "light"
def create_plotly_confidence_chart(results, unique_key):
"""Create an interactive confidence chart using Plotly"""
theme = get_theme()
colors = {
'light': {'bg': 'white', 'text': 'black', 'grid': '#eee'},
'dark': {'bg': '#2d2d2d', 'text': 'white', 'grid': '#444'}
}
fig = go.Figure(data=[
go.Bar(
x=list(results.values()),
y=list(results.keys()),
orientation='h',
marker_color='rgb(26, 118, 255)',
text=[f'{v:.1%}' for v in results.values()],
textposition='auto',
)
])
fig.update_layout(
title='Defect Detection Confidence',
xaxis_title='Confidence Level',
yaxis_title='Defect Type',
template='plotly_dark' if theme == 'dark' else 'plotly_white',
height=400,
margin=dict(l=20, r=20, t=40, b=20),
xaxis=dict(range=[0, 1]),
plot_bgcolor=colors[theme]['bg'],
paper_bgcolor=colors[theme]['bg'],
font=dict(color=colors[theme]['text'])
)
return fig
def create_defect_card(title, description, severity, repair_method):
"""Create a styled card for defect information"""
theme = get_theme()
severity_colors = {
"Critical": "#ff4444",
"High": "#ffa000",
"Medium": "#ffeb3b",
"Low": "#4caf50"
}
bg_color = '#1e1e1e' if theme == 'dark' else '#ffffff'
text_color = '#ffffff' if theme == 'dark' else '#000000'
border_color = '#333333' if theme == 'dark' else '#dddddd'
return f"""
<div style="border: 1px solid {border_color};
border-radius: 10px;
padding: 15px;
margin: 10px 0;
background-color: {bg_color};
color: {text_color};">
<h3 style="color: {'#00a0dc' if theme == 'dark' else '#1f77b4'};
margin: 0 0 10px 0;">{title}</h3>
<p><strong>Description:</strong> {description}</p>
<p><strong>Severity:</strong>
<span style="color: {severity_colors.get(severity, '#808080')}">
{severity}
</span>
</p>
<p><strong>Repair Method:</strong> {repair_method}</p>
</div>
"""
def apply_theme_styles():
"""Apply theme-specific CSS styles"""
theme = get_theme()
is_dark = theme == "dark"
styles = """
<style>
.stApp {
background-color: """ + ('#0e1117' if is_dark else '#f8f9fa') + """;
}
.upload-area {
text-align: center;
padding: 2rem;
border-radius: 10px;
border: 2px dashed """ + ('#444' if is_dark else '#ccc') + """;
background-color: """ + ('#1e1e1e' if is_dark else '#ffffff') + """;
margin-bottom: 1rem;
}
.info-box {
padding: 1rem;
border-radius: 10px;
margin: 1rem 0;
background-color: """ + ('#262730' if is_dark else '#e9ecef') + """;
border: 1px solid """ + ('#333' if is_dark else '#dee2e6') + """;
}
.stButton>button {
width: 100%;
}
</style>
"""
st.markdown(styles, unsafe_allow_html=True)
def main():
st.set_page_config(
page_title="Construction Defect Analyzer",
page_icon="πŸ—οΈ",
layout="wide",
initial_sidebar_state="expanded"
)
# Apply theme styles
apply_theme_styles()
# Initialize session state
if 'analyzer' not in st.session_state:
st.session_state.analyzer = ImageAnalyzer()
if 'rag_system' not in st.session_state:
st.session_state.rag_system = RAGSystem()
if 'analysis_history' not in st.session_state:
st.session_state.analysis_history = []
# Sidebar
with st.sidebar:
st.title("πŸ”§ Controls")
# Theme selector
theme = st.selectbox(
"Theme",
options=["light", "dark"],
index=0 if get_theme() == "light" else 1,
key="theme_selector"
)
if theme != get_theme():
st.query_params["theme"] = theme
st.rerun()
st.divider()
# API Status
if os.getenv("GROQ_API_KEY"):
st.success("🟒 AI System Connected")
else:
st.error("πŸ”΄ AI System Not Connected")
with st.expander("ℹ️ About", expanded=True):
st.write("""
### Construction Defect Analyzer
Advanced AI-powered tool for:
- Visual defect detection
- Repair recommendations
- Expert consultations
- Analysis tracking
""")
# Settings
with st.expander("βš™οΈ Settings"):
if st.button("Clear History"):
st.session_state.analysis_history = []
st.cache_data.clear()
st.success("History cleared!")
# Main content
st.title("Construction Defect Analyzer")
tabs = st.tabs(["πŸ“Έ Analysis", "❓ Expert Help", "πŸ“Š History"])
with tabs[0]: # Analysis Tab
col1, col2 = st.columns([1, 1])
with col1:
st.markdown('<div class="upload-area">', unsafe_allow_html=True)
uploaded_file = st.file_uploader(
"Upload construction image",
type=["jpg", "jpeg", "png"]
)
st.markdown('</div>', unsafe_allow_html=True)
if uploaded_file:
try:
with st.spinner('Processing image...'):
image = st.session_state.analyzer.preprocess_image(uploaded_file)
if image:
st.image(image, caption='Analyzed Image', use_column_width=True)
results = st.session_state.analyzer.analyze_image(image)
if results:
st.session_state.analysis_history.append({
'timestamp': datetime.now(),
'results': results,
'image': image
})
except Exception as e:
st.error(f"Error: {str(e)}")
with col2:
if uploaded_file and 'results' in locals():
st.markdown("### Analysis Results")
fig = create_plotly_confidence_chart(results, "main_analysis")
st.plotly_chart(fig, use_container_width=True, key="main_chart")
primary_defect = max(results.items(), key=lambda x: x[1])[0]
st.info(f"πŸ” Primary Defect: {primary_defect}")
context = st.session_state.rag_system.get_relevant_context(primary_defect)
if context:
lines = context.split('\n')
st.markdown(create_defect_card(
primary_defect,
next((line.split(': ')[1] for line in lines if 'description' in line.lower()), ''),
next((line.split(': ')[1] for line in lines if 'severity' in line.lower()), ''),
next((line.split(': ')[1] for line in lines if 'repair_method' in line.lower()), '')
), unsafe_allow_html=True)
with tabs[1]: # Expert Help Tab
st.markdown("### Ask Our Expert")
query = st.text_input(
"Your Question:",
placeholder="Example: What are the best repair methods for spalling?"
)
if query:
with st.spinner('Consulting AI expert...'):
context = st.session_state.rag_system.get_relevant_context(query)
if context:
response = get_groq_response(query, context)
if not response.startswith("Error"):
st.markdown("### Expert Response")
st.markdown(response)
with st.expander("View Source"):
st.markdown(context)
else:
st.error(response)
with tabs[2]: # History Tab
if st.session_state.analysis_history:
for i, analysis in enumerate(reversed(st.session_state.analysis_history)):
with st.expander(
f"Analysis {i+1} - {analysis['timestamp'].strftime('%Y-%m-%d %H:%M')}"
):
cols = st.columns([1, 1])
with cols[0]:
st.image(analysis['image'], caption='Image', use_column_width=True)
with cols[1]:
fig = create_plotly_confidence_chart(
analysis['results'],
f"history_{i}"
)
st.plotly_chart(fig, use_container_width=True, key=f"history_{i}")
else:
st.info("No analysis history available")
if __name__ == "__main__":
main()