import csv import logging import os from typing import List, Tuple import asyncio import datetime import hashlib import aiohttp import feedparser import gradio as gr from huggingface_hub import InferenceClient from sqlalchemy import create_engine, Column, Integer, String, Text, DateTime from sqlalchemy.orm import declarative_base, sessionmaker from sqlalchemy.exc import SQLAlchemyError import validators from bs4 import BeautifulSoup # Configure logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) # Configuration HUGGINGFACE_API_KEY = os.getenv("HUGGINGFACE_API_KEY") DEFAULT_MONITORING_INTERVAL = 300 MAX_MONITORING_INTERVAL = 600 CHANGE_FREQUENCY_THRESHOLD = 3 # Global variables monitoring_tasks = {} url_monitoring_intervals = {} change_counts = {} history = [] # Database setup Base = declarative_base() class Article(Base): __tablename__ = 'articles' id = Column(Integer, primary_key=True) url = Column(String(255), nullable=False) title = Column(String(255)) content = Column(Text) hash = Column(String(32)) timestamp = Column(DateTime, default=datetime.datetime.utcnow) async def create_db_engine(db_url): try: engine = create_engine(db_url) Base.metadata.create_all(engine) return engine, sessionmaker(bind=engine) except SQLAlchemyError as e: logger.error(f"Database error: {e}") raise def sanitize_url(url: str) -> str: return url if validators.url(url) else None async def fetch_url_content(url: str, session: aiohttp.ClientSession) -> Tuple[str, str]: async with session.get(url) as response: content = await response.text() soup = BeautifulSoup(content, 'html.parser') title = soup.title.string if soup.title else "No Title" return title, content async def save_to_database(session, url: str, title: str, content: str, hash: str): try: article = Article(url=url, title=title, content=content, hash=hash) session.add(article) await session.commit() except SQLAlchemyError as e: logger.error(f"Database error: {e}") await session.rollback() async def save_to_csv(storage_location: str, url: str, title: str, content: str, timestamp: datetime.datetime): try: os.makedirs(os.path.dirname(storage_location), exist_ok=True) with open(storage_location, "a", newline='', encoding="utf-8") as csvfile: csv_writer = csv.writer(csvfile) csv_writer.writerow([timestamp.strftime("%Y-%m-%d %H:%M:%S"), url, title, content]) except IOError as e: logger.error(f"IOError saving to CSV: {e}") except Exception as e: logger.error(f"Unexpected error saving to CSV: {e}") async def monitor_url(url: str, interval: int, storage_location: str, feed_rss: bool, db_session): previous_hash = "" async with aiohttp.ClientSession() as session: while True: try: title, content = await fetch_url_content(url, session) current_hash = hashlib.md5(content.encode('utf-8')).hexdigest() if current_hash != previous_hash: previous_hash = current_hash timestamp = datetime.datetime.now() if feed_rss: try: await save_to_database(db_session, url, title, content, current_hash) except SQLAlchemyError as e: logger.error(f"Database error while saving {url}: {e}") if storage_location: await save_to_csv(storage_location, url, title, content, timestamp) history.append(f"Change detected at {url} on {timestamp.strftime('%Y-%m-%d %H:%M:%S')}") logger.info(f"Change detected at {url}") change_counts[url] = change_counts.get(url, 0) + 1 if change_counts[url] >= CHANGE_FREQUENCY_THRESHOLD: interval = max(60, interval // 2) else: change_counts[url] = 0 interval = min(interval * 2, MAX_MONITORING_INTERVAL) url_monitoring_intervals[url] = interval except aiohttp.ClientError as e: logger.error(f"Network error monitoring {url}: {e}") history.append(f"Network error monitoring {url}: {e}") except Exception as e: logger.error(f"Unexpected error monitoring {url}: {e}") history.append(f"Unexpected error monitoring {url}: {e}") await asyncio.sleep(interval) async def start_monitoring(urls: List[str], storage_location: str, feed_rss: bool): global db_session for url in urls: if url not in monitoring_tasks: sanitized_url = sanitize_url(url) if sanitized_url: task = asyncio.create_task(monitor_url(sanitized_url, DEFAULT_MONITORING_INTERVAL, storage_location, feed_rss, db_session)) monitoring_tasks[sanitized_url] = task else: logger.warning(f"Invalid URL: {url}") history.append(f"Invalid URL: {url}") return "Monitoring started" async def cleanup_resources(url: str): # Add any cleanup logic here, e.g., closing database connections pass def stop_monitoring(url: str): if url in monitoring_tasks: monitoring_tasks[url].cancel() asyncio.create_task(cleanup_resources(url)) del monitoring_tasks[url] return "Monitoring stopped" async def chatbot_response(message: str, history: List[Tuple[str, str]]): try: client = InferenceClient("mistralai/Mixtral-8x7B-Instruct-v0.1", token=HUGGINGFACE_API_KEY) response = await client.text_generation(message, max_new_tokens=100) history.append({"role": "user", "content": message}) history.append({"role": "assistant", "content": response[0]['generated_text']}) return history, history except Exception as e: logger.error(f"Chatbot error: {e}") history.append({"role": "user", "content": message}) history.append({"role": "assistant", "content": "Error: Could not get a response from the chatbot."}) return history, history async def update_db_status(db_status): while True: try: await db_session.execute("SELECT 1") await db_status.update(value="Connected") except SQLAlchemyError: await db_status.update(value="Disconnected") await asyncio.sleep(60) # Check every minute async def update_feed_content(db_session): try: articles = db_session.query(Article).order_by(Article.timestamp.desc()).limit(20).all() feed = { 'title': 'Website Changes Feed', 'link': 'http://yourwebsite.com/feed', 'description': 'Feed of changes detected on monitored websites.', 'items': [{ 'title': article.title, 'link': article.url, 'description': article.content, 'pubDate': article.timestamp } for article in articles] } return feed except SQLAlchemyError as e: logger.error(f"Database error: {e}") return None async def periodic_update_with_error_handling(db_session): while True: try: await asyncio.sleep(300) # Wait for 5 minutes await update_feed_content(db_session) except Exception as e: logger.error(f"Error in periodic update: {e}") async def main(): global db_session try: engine, Session = await create_db_engine("sqlite:///monitoring.db") db_session = Session() except SQLAlchemyError as e: logger.error(f"Failed to connect to database: {e}") return demo = gr.Blocks() with demo: gr.Markdown("# Website Monitor and Chatbot") with gr.Row(): with gr.Column(): db_url = gr.Textbox(label="Database URL", value="sqlite:///monitoring.db") db_status = gr.Textbox(label="Database Status", interactive=False, value="Connected") with gr.Column(): with gr.Tab("Configuration"): target_urls = gr.Textbox(label="Target URLs (comma-separated)", placeholder="https://example.com, https://another-site.com") storage_location = gr.Textbox(label="Storage Location (CSV file path)", placeholder="/path/to/your/file.csv") feed_rss_checkbox = gr.Checkbox(label="Enable RSS Feed") start_button = gr.Button("Start Monitoring") stop_button = gr.Button("Stop Monitoring") status_text = gr.Textbox(label="Status", interactive=False) history_text = gr.Textbox(label="History", lines=10, interactive=False) with gr.Tab("User-End View"): feed_content = gr.JSON(label="RSS Feed Content") with gr.Tab("Chatbot"): chatbot_interface = gr.Chatbot() message_input = gr.Textbox(placeholder="Type your message here...") send_button = gr.Button("Send") start_button.click( start_monitoring, inputs=[target_urls, storage_location, feed_rss_checkbox], outputs=status_text ) stop_button.click( lambda url: stop_monitoring(url), inputs=target_urls, outputs=status_text ) send_button.click( chatbot_response, inputs=[message_input, chatbot_interface], outputs=[chatbot_interface, message_input] ) asyncio.create_task(periodic_update_with_error_handling(db_session)) asyncio.create_task(update_db_status(db_status)) try: await demo.launch() finally: if db_session: await db_session.close() engine.dispose() if __name__ == "__main__": asyncio.run(main())