CPU Upgrade
CPU Upgrade
import streamlit as st | |
import anthropic, openai, base64, cv2, glob, json, math, os, pytz, random, re, requests, time, zipfile | |
import plotly.graph_objects as go | |
import streamlit.components.v1 as components | |
from datetime import datetime | |
from audio_recorder_streamlit import audio_recorder | |
from bs4 import BeautifulSoup | |
from collections import defaultdict, deque | |
from dotenv import load_dotenv | |
from gradio_client import Client | |
from huggingface_hub import InferenceClient | |
from io import BytesIO | |
from PIL import Image | |
from PyPDF2 import PdfReader | |
from urllib.parse import quote | |
from xml.etree import ElementTree as ET | |
from openai import OpenAI | |
import extra_streamlit_components as stx | |
import asyncio | |
import edge_tts | |
# 1. App Configuration | |
Site_Name = '🔬 Research Assistant Pro' | |
st.set_page_config( | |
page_title=Site_Name, | |
page_icon="🔬", | |
layout="wide", | |
initial_sidebar_state="auto", | |
menu_items={ | |
'Get Help': '', | |
'Report a bug': '', | |
'About': Site_Name | |
} | |
) | |
load_dotenv() | |
# 2. API and Client Setup | |
openai_api_key = os.getenv('OPENAI_API_KEY', st.secrets.get('OPENAI_API_KEY', '')) | |
anthropic_key = os.getenv('ANTHROPIC_API_KEY', st.secrets.get('ANTHROPIC_API_KEY', '')) | |
hf_key = os.getenv('HF_KEY', st.secrets.get('HF_KEY', '')) | |
openai_client = OpenAI(api_key=openai_api_key) | |
claude_client = anthropic.Anthropic(api_key=anthropic_key) | |
# 3. Session State Management | |
if 'chat_history' not in st.session_state: | |
st.session_state.chat_history = [] | |
if 'current_audio' not in st.session_state: | |
st.session_state.current_audio = None | |
if 'autoplay_audio' not in st.session_state: | |
st.session_state.autoplay_audio = True | |
if 'last_search' not in st.session_state: | |
st.session_state.last_search = None | |
if 'file_content' not in st.session_state: | |
st.session_state.file_content = None | |
if 'current_file' not in st.session_state: | |
st.session_state.current_file = None | |
# 4. Utility Functions | |
def get_download_link(file_path): | |
"""Generate download link for any file type""" | |
with open(file_path, "rb") as file: | |
contents = | |
b64 = base64.b64encode(contents).decode() | |
file_name = os.path.basename(file_path) | |
file_type = file_name.split('.')[-1] | |
mime_types = { | |
'md': 'text/markdown', | |
'mp3': 'audio/mpeg', | |
'mp4': 'video/mp4', | |
'pdf': 'application/pdf', | |
'txt': 'text/plain' | |
} | |
mime_type = mime_types.get(file_type, 'application/octet-stream') | |
return f'<a href="data:{mime_type};base64,{b64}" download="{file_name}">⬇️ Download {file_name}</a>' | |
def generate_filename(content, file_type="md"): | |
"""Generate unique filename with timestamp""" | |
timestamp ="%Y%m%d_%H%M%S") | |
safe_content = re.sub(r'[^\w\s-]', '', content[:50]) | |
return f"{timestamp}_{safe_content}.{file_type}" | |
def get_autoplay_audio_html(audio_path, width="100%"): | |
"""Create HTML for autoplaying audio with controls""" | |
try: | |
with open(audio_path, "rb") as audio_file: | |
audio_bytes = | |
audio_b64 = base64.b64encode(audio_bytes).decode() | |
return f''' | |
<audio controls autoplay style="width: {width};"> | |
<source src="data:audio/mpeg;base64,{audio_b64}" type="audio/mpeg"> | |
Your browser does not support the audio element. | |
</audio> | |
<div style="margin-top: 5px;"> | |
<a href="data:audio/mpeg;base64,{audio_b64}" | |
download="{os.path.basename(audio_path)}" | |
style="text-decoration: none;"> | |
⬇️ Download Audio | |
</a> | |
</div> | |
''' | |
except Exception as e: | |
return f"Error loading audio: {str(e)}" | |
def get_video_html(video_path, width="100%"): | |
"""Create HTML for autoplaying video with controls""" | |
video_url = f"data:video/mp4;base64,{base64.b64encode(open(video_path, 'rb').read()).decode()}" | |
return f''' | |
<video width="{width}" controls autoplay muted loop> | |
<source src="{video_url}" type="video/mp4"> | |
Your browser does not support the video tag. | |
</video> | |
''' | |
# 5. Voice Recognition Component | |
def create_voice_component(): | |
"""Create voice recognition component with visual feedback""" | |
return components.html( | |
""" | |
<div style="padding: 20px; border-radius: 10px; background: #f0f2f6;"> | |
<button id="startBtn" class="streamlit-button">Start Voice Search</button> | |
<p id="status">Click to start speaking</p> | |
<div id="result"></div> | |
<script> | |
if ('webkitSpeechRecognition' in window) { | |
const recognition = new webkitSpeechRecognition(); | |
recognition.continuous = false; | |
recognition.interimResults = true; | |
const startBtn = document.getElementById('startBtn'); | |
const status = document.getElementById('status'); | |
const result = document.getElementById('result'); | |
startBtn.onclick = () => { | |
recognition.start(); | |
status.textContent = 'Listening...'; | |
}; | |
recognition.onresult = (event) => { | |
const transcript = Array.from(event.results) | |
.map(result => result[0].transcript) | |
.join(''); | |
result.textContent = transcript; | |
if (event.results[0].isFinal) { | |
window.parent.postMessage({ | |
type: 'voice_search', | |
query: transcript | |
}, '*'); | |
} | |
}; | |
recognition.onend = () => { | |
status.textContent = 'Click to start speaking'; | |
}; | |
} | |
</script> | |
</div> | |
""", | |
height=200 | |
) | |
# 6. Audio Processing Functions | |
async def generate_audio(text, voice="en-US-AriaNeural", rate="+0%", pitch="+0Hz"): | |
"""Generate audio using Edge TTS with automatic playback""" | |
if not text.strip(): | |
return None | |
timestamp ="%Y%m%d_%H%M%S") | |
output_file = f"response_{timestamp}.mp3" | |
communicate = edge_tts.Communicate(text, voice, rate=rate, pitch=pitch) | |
await | |
return output_file | |
def render_audio_result(audio_file, title="Generated Audio"): | |
"""Render audio result with autoplay in Streamlit""" | |
if audio_file and os.path.exists(audio_file): | |
st.markdown(f"### {title}") | |
st.markdown(get_autoplay_audio_html(audio_file), unsafe_allow_html=True) | |
# 7. Search and Process Functions | |
def perform_arxiv_search(query, response_type="summary"): | |
"""Perform Arxiv search with voice response""" | |
client = Client("awacke1/Arxiv-Paper-Search-And-QA-RAG-Pattern") | |
# Get search results | |
refs = client.predict( | |
query, | |
20, | |
"Semantic Search", | |
"mistralai/Mixtral-8x7B-Instruct-v0.1", | |
api_name="/update_with_rag_md" | |
)[0] | |
# Get AI interpretation | |
summary = client.predict( | |
query, | |
"mistralai/Mixtral-8x7B-Instruct-v0.1", | |
True, | |
api_name="/ask_llm" | |
) | |
response_text = summary if response_type == "summary" else refs | |
return response_text, refs | |
async def process_voice_search_with_autoplay(query): | |
"""Process voice search with automatic audio playback""" | |
summary, full_results = perform_arxiv_search(query) | |
audio_file = await generate_audio(summary) | |
st.session_state.current_audio = audio_file | |
st.session_state.last_search = { | |
'query': query, | |
'summary': summary, | |
'full_results': full_results, | |
'audio': audio_file, | |
'timestamp':"%Y-%m-%d %H:%M:%S") | |
} | |
if audio_file: | |
render_audio_result(audio_file, "Search Results") | |
return audio_file | |
def display_search_results_with_audio(): | |
"""Display search results with autoplaying audio""" | |
if st.session_state.last_search: | |
st.subheader("Latest Results") | |
st.markdown(st.session_state.last_search['summary']) | |
with st.expander("View Full Results"): | |
st.markdown(st.session_state.last_search['full_results']) | |
if st.session_state.current_audio: | |
render_audio_result(st.session_state.current_audio, "Audio Summary") | |
# 8. UI Components | |
def render_search_interface(): | |
"""Render main search interface""" | |
st.header("🔍 Voice Search") | |
create_voice_component() | |
col1, col2 = st.columns([3, 1]) | |
with col1: | |
query = st.text_input("Or type your query:") | |
with col2: | |
if st.button("🔍 Search"): | | | |
display_search_results_with_audio() | |
def display_search_history(): | |
"""Display search history with audio playback""" | |
st.header("Search History") | |
if st.session_state.chat_history: | |
for idx, entry in enumerate(reversed(st.session_state.chat_history)): | |
with st.expander( | |
f"🔍 {entry['timestamp']} - {entry['query'][:50]}...", | |
expanded=False | |
): | |
st.markdown(entry['summary']) | |
if 'audio' in entry and entry['audio']: | |
render_audio_result(entry['audio'], "Recorded Response") | |
def render_settings(): | |
"""Render settings interface""" | |
st.sidebar.title("⚙️ Settings") | |
voice_options = [ | |
"en-US-AriaNeural", | |
"en-US-GuyNeural", | |
"en-GB-SoniaNeural", | |
"en-AU-NatashaNeural" | |
] | |
settings = { | |
'voice': st.sidebar.selectbox("Select Voice", voice_options), | |
'autoplay': st.sidebar.checkbox("Autoplay Responses", value=True), | |
'rate': st.sidebar.slider("Speech Rate", -50, 50, 0, 5), | |
'pitch': st.sidebar.slider("Pitch", -50, 50, 0, 5) | |
} | |
return settings | |
def display_file_manager(): | |
"""Display file manager in sidebar""" | |
st.sidebar.title("📁 File Manager") | |
all_files = [] | |
for ext in ['.md', '.mp3', '.mp4']: | |
all_files.extend(glob.glob(f"*{ext}")) | |
all_files.sort(key=os.path.getmtime, reverse=True) | |
col1, col2 = st.sidebar.columns(2) | |
with col1: | |
if st.button("🗑 Delete All"): | |
for file in all_files: | |
os.remove(file) | |
st.rerun() | |
with col2: | |
if st.button("⬇️ Download All"): | |
zip_name = f"archive_{'%Y%m%d_%H%M%S')}.zip" | |
with zipfile.ZipFile(zip_name, 'w') as zipf: | |
for file in all_files: | |
zipf.write(file) | |
st.sidebar.markdown(get_download_link(zip_name), unsafe_allow_html=True) | |
for file in all_files: | |
with st.sidebar.expander(f"📄 {os.path.basename(file)}", expanded=False): | |
st.write(f"Last modified: {datetime.fromtimestamp(os.path.getmtime(file)).strftime('%Y-%m-%d %H:%M:%S')}") | |
col1, col2 = st.columns(2) | |
with col1: | |
st.markdown(get_download_link(file), unsafe_allow_html=True) | |
with col2: | |
if st.button("🗑 Delete", key=f"del_{file}"): | |
os.remove(file) | |
st.rerun() | |
# 9. Main Application | |
def main(): | |
st.title("🔬 Research Assistant Pro") | |
settings = render_settings() | |
display_file_manager() | |
tabs = st.tabs(["🎤 Voice Search", "📚 History", "🎵 Media", "⚙️ Settings"]) | |
with tabs[0]: | |
render_search_interface() | |
with tabs[1]: | |
display_search_history() | |
with tabs[2]: | |
st.header("Media Gallery") | |
media_tabs = st.tabs(["🎵 Audio", "🎥 Video", "📷 Images"]) | |
with media_tabs[0]: | |
audio_files = glob.glob("*.mp3") | |
if audio_files: | |
for audio_file in audio_files: | |
st.markdown(get_autoplay_audio_html(audio_file), unsafe_allow_html=True) | |
else: | |
st.write("No audio files found") | |
with media_tabs[1]: | |
video_files = glob.glob("*.mp4") | |
if video_files: | |
for video_file in video_files: | |
st.markdown(get_video_html(video_file), unsafe_allow_html=True) | |
else: | |
st.write("No video files found") | |
with media_tabs[2]: | |
image_files = glob.glob("*.png") + glob. |