import asyncio import csv import logging import os from typing import List, Tuple import aiohttp import datetime import hashlib from pathlib import Path import feedparser import gradio as gr from huggingface_hub import InferenceClient from sqlalchemy import create_engine, Column, Integer, String, Text, DateTime from sqlalchemy.orm import declarative_base, sessionmaker from sqlalchemy.exc import SQLAlchemyError import validators from bs4 import BeautifulSoup # Configure logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) # Configuration HUGGINGFACE_API_KEY = os.getenv("HUGGINGFACE_API_KEY") DEFAULT_MONITORING_INTERVAL = 300 MAX_MONITORING_INTERVAL = 600 CHANGE_FREQUENCY_THRESHOLD = 3 # Global variables monitoring_tasks = {} url_monitoring_intervals = {} change_counts = {} history = [] # Database setup Base = declarative_base() class Article(Base): __tablename__ = 'articles' id = Column(Integer, primary_key=True) url = Column(String(255), nullable=False) title = Column(String(255)) content = Column(Text) hash = Column(String(32)) timestamp = Column(DateTime, default=datetime.datetime.utcnow) async def create_db_engine(db_url): try: engine = create_engine(db_url) Base.metadata.create_all(engine) return engine, sessionmaker(bind=engine) except SQLAlchemyError as e: logger.error(f"Database error: {e}") raise def sanitize_url(url: str) -> str: return url if validators.url(url) else None async def fetch_url_content(url: str, session: aiohttp.ClientSession) -> Tuple[str, str]: async with session.get(url) as response: content = await response.text() soup = BeautifulSoup(content, 'html.parser') title = soup.title.string if soup.title else "No Title" return title, content async def save_to_database(session, url: str, title: str, content: str, hash: str): try: article = Article(url=url, title=title, content=content, hash=hash) session.add(article) await session.commit() except SQLAlchemyError as e: logger.error(f"Database error: {e}") await session.rollback() async def save_to_csv(storage_location: str, url: str, title: str, content: str, timestamp: datetime.datetime): try: os.makedirs(os.path.dirname(storage_location), exist_ok=True) with open(storage_location, "a", newline='', encoding="utf-8") as csvfile: csv_writer = csv.writer(csvfile) csv_writer.writerow([timestamp.strftime("%Y-%m-%d %H:%M:%S"), url, title, content]) except IOError as e: logger.error(f"IOError saving to CSV: {e}") except Exception as e: logger.error(f"Unexpected error saving to CSV: {e}") async def monitor_url(url: str, interval: int, storage_location: str, feed_rss: bool, db_session): previous_hash = "" async with aiohttp.ClientSession() as session: while True: try: title, content = await fetch_url_content(url, session) current_hash = hashlib.md5(content.encode('utf-8')).hexdigest() if current_hash != previous_hash: previous_hash = current_hash timestamp = datetime.datetime.now() if feed_rss: try: await save_to_database(db_session, url, title, content, current_hash) except SQLAlchemyError as e: logger.error(f"Database error while saving {url}: {e}") if storage_location: await save_to_csv(storage_location, url, title, content, timestamp) history.append(f"Change detected at {url} on {timestamp.strftime('%Y-%m-%d %H:%M:%S')}") logger.info(f"Change detected at {url}") change_counts[url] = change_counts.get(url, 0) + 1 if change_counts[url] >= CHANGE_FREQUENCY_THRESHOLD: interval = max(60, interval // 2) else: change_counts[url] = 0 interval = min(interval * 2, MAX_MONITORING_INTERVAL) url_monitoring_intervals[url] = interval except aiohttp.ClientError as e: logger.error(f"Network error monitoring {url}: {e}") history.append(f"Network error monitoring {url}: {e}") except Exception as e: logger.error(f"Unexpected error monitoring {url}: {e}") history.append(f"Unexpected error monitoring {url}: {e}") await asyncio.sleep(interval) async def start_monitoring(urls: List[str], storage_location: str, feed_rss: bool): global db_session for url in urls: if url not in monitoring_tasks: sanitized_url = sanitize_url(url) if sanitized_url: task = asyncio.create_task(monitor_url(sanitized_url, DEFAULT_MONITORING_INTERVAL, storage_location, feed_rss, db_session)) monitoring_tasks[sanitized_url] = task else: logger.warning(f"Invalid URL: {url}") history.append(f"Invalid URL: {url}") return "Monitoring started" async def cleanup_resources(url: str): # Add any cleanup logic here, e.g., closing database connections pass def stop_monitoring(url: str): if url in monitoring_tasks: monitoring_tasks[url].cancel() asyncio.create_task(cleanup_resources(url)) del monitoring_tasks[url] return "Monitoring stopped" def generate_rss_feed(): session = Session() try: articles = session.query(Article).order_by( Article.timestamp.desc()).limit(20).all() feed = feedparser.FeedParserDict() feed['title'] = 'Website Changes Feed' feed['link'] = 'http://yourwebsite.com/feed' # Replace if needed feed['description'] = 'Feed of changes detected on monitored websites.' feed['entries'] = [{ 'title': article.title, 'link': article.url, 'description': article.content, 'published': article.timestamp } for article in articles] return feedparser.FeedGenerator().feed_from_dictionary( feed).writeString('utf-8') except SQLAlchemyError as e: logger.error(f"Database error: {e}") return None finally: session.close() async def chatbot_response(message: str, history: List[Tuple[str, str]]): try: client = InferenceClient("mistralai/Mixtral-8x7B-Instruct-v0.1", token=HUGGINGFACE_API_KEY) response = await client.text_generation(message, max_new_tokens=100) history.append((message, response[0]['generated_text'])) return history, history except Exception as e: logger.error(f"Chatbot error: {e}") history.append((message, "Error: Could not get a response from the chatbot.")) return history, history async def update_db_status(db_status): while True: try: await db_session.execute("SELECT 1") await db_status.update(value="Connected") except SQLAlchemyError: await db_status.update(value="Disconnected") await asyncio.sleep(60) # Check every minute async def update_feed_content(db_session): try: articles = await db_session. query(Article).order_by(Article.timestamp.desc()).limit(20).all() feed = { 'title': 'Website Changes Feed', 'link': 'http://yourwebsite.com/feed', 'description': 'Feed of changes detected on monitored websites.', 'items': [{ 'title': article.title, 'link': article.url, 'description': article.content, 'pubDate': article.timestamp } for article in articles] } return feed except SQLAlchemyError as e: logger.error(f"Database error: {e}") return None async def periodic_update_with_error_handling(db_session): while True: try: await asyncio.sleep(300) # Wait for 5 minutes await update_feed_content(db_session) except Exception as e: logger.error(f"Error in periodic update: {e}") async def main(): global db_session try: engine, Session = await create_db_engine("sqlite:///monitoring.db") db_session = Session() except SQLAlchemyError as e: logger.error(f"Failed to connect to database: {e}") return demo = gr.Blocks() with demo: gr.Markdown("# Website Monitor and Chatbot") with gr.Row(): with gr.Column(): db_url = gr.Textbox( label="Database URL", value="sqlite:///monitoring.db") db_status = gr.Textbox(label="Database Status", interactive=False, value="Connected") with gr.Column(): with gr.Tab("Configuration"): target_urls = gr.Textbox( label="Target URLs (comma-separated)", placeholder= "https://example.com, https://another-site.com") storage_location = gr.Textbox( label="Storage Location (CSV file path)", placeholder="/path/to/your/file.csv") feed_rss_checkbox = gr.Checkbox(label="Enable RSS Feed") start_button = gr.Button("Start Monitoring") stop_button = gr.Button("Stop Monitoring") status_text = gr.Textbox(label="Status", interactive=False) history_text = gr.Textbox(label="History", lines=10, interactive=False) with gr.Tab("User-End View"): feed_content = gr.JSON(label="RSS Feed Content") with gr.Tab("Chatbot"): chatbot_interface = gr.Chatbot(type='messages') message_input = gr.Textbox( placeholder="Type your message here...") send_button = gr.Button("Send") start_button.click( start_monitoring, inputs=[target_urls, storage_location, feed_rss_checkbox], outputs=status_text) stop_button.click(lambda url: stop_monitoring(url), inputs=target_urls, outputs=status_text) send_button.click( chatbot_response, inputs=[message_input, chatbot_interface], outputs=[chatbot_interface, message_input]) asyncio.create_task(periodic_update_with_error_handling(db_session)) asyncio.create_task(update_db_status(db_status)) try: await demo.launch() finally: if db_session: await db_session.close() engine.dispose() if __name__ == "__main__": asyncio.run(main())