CEEMEESEEK / app2.py
acecalisto3's picture
Update app2.py
10aaa36 verified
raw
history blame
11.3 kB
import asyncio
import csv
import logging
import os
from typing import List, Tuple
import aiohttp
import datetime
import hashlib
from pathlib import Path
import feedparser
import gradio as gr
from huggingface_hub import InferenceClient
from sqlalchemy import create_engine, Column, Integer, String, Text, DateTime
from sqlalchemy.orm import declarative_base, sessionmaker
from sqlalchemy.exc import SQLAlchemyError
import validators
from bs4 import BeautifulSoup
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# Configuration
HUGGINGFACE_API_KEY = os.getenv("HUGGINGFACE_API_KEY")
DEFAULT_MONITORING_INTERVAL = 300
MAX_MONITORING_INTERVAL = 600
CHANGE_FREQUENCY_THRESHOLD = 3
# Global variables
monitoring_tasks = {}
url_monitoring_intervals = {}
change_counts = {}
history = []
# Database setup
Base = declarative_base()
class Article(Base):
__tablename__ = 'articles'
id = Column(Integer, primary_key=True)
url = Column(String(255), nullable=False)
title = Column(String(255))
content = Column(Text)
hash = Column(String(32))
timestamp = Column(DateTime, default=datetime.datetime.utcnow)
async def create_db_engine(db_url):
try:
engine = create_engine(db_url)
Base.metadata.create_all(engine)
return engine, sessionmaker(bind=engine)
except SQLAlchemyError as e:
logger.error(f"Database error: {e}")
raise
def sanitize_url(url: str) -> str:
return url if validators.url(url) else None
async def fetch_url_content(url: str, session: aiohttp.ClientSession) -> Tuple[str, str]:
async with session.get(url) as response:
content = await response.text()
soup = BeautifulSoup(content, 'html.parser')
title = soup.title.string if soup.title else "No Title"
return title, content
async def save_to_database(session, url: str, title: str, content: str, hash: str):
try:
article = Article(url=url, title=title, content=content, hash=hash)
session.add(article)
await session.commit()
except SQLAlchemyError as e:
logger.error(f"Database error: {e}")
await session.rollback()
async def save_to_csv(storage_location: str, url: str, title: str, content: str, timestamp: datetime.datetime):
try:
os.makedirs(os.path.dirname(storage_location), exist_ok=True)
with open(storage_location, "a", newline='', encoding="utf-8") as csvfile:
csv_writer = csv.writer(csvfile)
csv_writer.writerow([timestamp.strftime("%Y-%m-%d %H:%M:%S"), url, title, content])
except IOError as e:
logger.error(f"IOError saving to CSV: {e}")
except Exception as e:
logger.error(f"Unexpected error saving to CSV: {e}")
async def monitor_url(url: str, interval: int, storage_location: str, feed_rss: bool, db_session):
previous_hash = ""
async with aiohttp.ClientSession() as session:
while True:
try:
title, content = await fetch_url_content(url, session)
current_hash = hashlib.md5(content.encode('utf-8')).hexdigest()
if current_hash != previous_hash:
previous_hash = current_hash
timestamp = datetime.datetime.now()
if feed_rss:
try:
await save_to_database(db_session, url, title, content, current_hash)
except SQLAlchemyError as e:
logger.error(f"Database error while saving {url}: {e}")
if storage_location:
await save_to_csv(storage_location, url, title, content, timestamp)
history.append(f"Change detected at {url} on {timestamp.strftime('%Y-%m-%d %H:%M:%S')}")
logger.info(f"Change detected at {url}")
change_counts[url] = change_counts.get(url, 0) + 1
if change_counts[url] >= CHANGE_FREQUENCY_THRESHOLD:
interval = max(60, interval // 2)
else:
change_counts[url] = 0
interval = min(interval * 2, MAX_MONITORING_INTERVAL)
url_monitoring_intervals[url] = interval
except aiohttp.ClientError as e:
logger.error(f"Network error monitoring {url}: {e}")
history.append(f"Network error monitoring {url}: {e}")
except Exception as e:
logger.error(f"Unexpected error monitoring {url}: {e}")
history.append(f"Unexpected error monitoring {url}: {e}")
await asyncio.sleep(interval)
async def start_monitoring(urls: List[str], storage_location: str, feed_rss: bool):
global db_session
for url in urls:
if url not in monitoring_tasks:
sanitized_url = sanitize_url(url)
if sanitized_url:
task = asyncio.create_task(monitor_url(sanitized_url, DEFAULT_MONITORING_INTERVAL, storage_location, feed_rss, db_session))
monitoring_tasks[sanitized_url] = task
else:
logger.warning(f"Invalid URL: {url}")
history.append(f"Invalid URL: {url}")
return "Monitoring started"
async def cleanup_resources(url: str):
# Add any cleanup logic here, e.g., closing database connections
pass
def stop_monitoring(url: str):
if url in monitoring_tasks:
monitoring_tasks[url].cancel()
asyncio.create_task(cleanup_resources(url))
del monitoring_tasks[url]
return "Monitoring stopped"
def generate_rss_feed():
session = Session()
try:
articles = session.query(Article).order_by(
Article.timestamp.desc()).limit(20).all()
feed = feedparser.FeedParserDict()
feed['title'] = 'Website Changes Feed'
feed['link'] = 'http://yourwebsite.com/feed' # Replace if needed
feed['description'] = 'Feed of changes detected on monitored websites.'
feed['entries'] = [{
'title': article.title,
'link': article.url,
'description': article.content,
'published': article.timestamp
} for article in articles]
return feedparser.FeedGenerator().feed_from_dictionary(
feed).writeString('utf-8')
except SQLAlchemyError as e:
logger.error(f"Database error: {e}")
return None
finally:
session.close()
async def chatbot_response(message: str, history: List[Tuple[str, str]]):
try:
client = InferenceClient("mistralai/Mixtral-8x7B-Instruct-v0.1", token=HUGGINGFACE_API_KEY)
response = await client.text_generation(message, max_new_tokens=100)
history.append((message, response[0]['generated_text']))
return history, history
except Exception as e:
logger.error(f"Chatbot error: {e}")
history.append((message, "Error: Could not get a response from the chatbot."))
return history, history
async def update_db_status(db_status):
while True:
try:
await db_session.execute("SELECT 1")
await db_status.update(value="Connected")
except SQLAlchemyError:
await db_status.update(value="Disconnected")
await asyncio.sleep(60) # Check every minute
async def update_feed_content(db_session):
try:
articles = await db_session. query(Article).order_by(Article.timestamp.desc()).limit(20).all()
feed = {
'title': 'Website Changes Feed',
'link': 'http://yourwebsite.com/feed',
'description': 'Feed of changes detected on monitored websites.',
'items': [{
'title': article.title,
'link': article.url,
'description': article.content,
'pubDate': article.timestamp
} for article in articles]
}
return feed
except SQLAlchemyError as e:
logger.error(f"Database error: {e}")
return None
async def periodic_update_with_error_handling(db_session):
while True:
try:
await asyncio.sleep(300) # Wait for 5 minutes
await update_feed_content(db_session)
except Exception as e:
logger.error(f"Error in periodic update: {e}")
async def main():
global db_session
try:
engine, Session = await create_db_engine("sqlite:///monitoring.db")
db_session = Session()
except SQLAlchemyError as e:
logger.error(f"Failed to connect to database: {e}")
return
demo = gr.Blocks()
with demo:
gr.Markdown("# Website Monitor and Chatbot")
with gr.Row():
with gr.Column():
db_url = gr.Textbox(
label="Database URL", value="sqlite:///monitoring.db")
db_status = gr.Textbox(label="Database Status",
interactive=False,
value="Connected")
with gr.Column():
with gr.Tab("Configuration"):
target_urls = gr.Textbox(
label="Target URLs (comma-separated)",
placeholder=
"https://example.com, https://another-site.com")
storage_location = gr.Textbox(
label="Storage Location (CSV file path)",
placeholder="/path/to/your/file.csv")
feed_rss_checkbox = gr.Checkbox(label="Enable RSS Feed")
start_button = gr.Button("Start Monitoring")
stop_button = gr.Button("Stop Monitoring")
status_text = gr.Textbox(label="Status",
interactive=False)
history_text = gr.Textbox(label="History",
lines=10,
interactive=False)
with gr.Tab("User-End View"):
feed_content = gr.JSON(label="RSS Feed Content")
with gr.Tab("Chatbot"):
chatbot_interface = gr.Chatbot(type='messages')
message_input = gr.Textbox(
placeholder="Type your message here...")
send_button = gr.Button("Send")
start_button.click(
start_monitoring,
inputs=[target_urls, storage_location, feed_rss_checkbox],
outputs=status_text)
stop_button.click(lambda url: stop_monitoring(url),
inputs=target_urls,
outputs=status_text)
send_button.click(
chatbot_response,
inputs=[message_input, chatbot_interface],
outputs=[chatbot_interface, message_input])
asyncio.create_task(periodic_update_with_error_handling(db_session))
asyncio.create_task(update_db_status(db_status))
try:
await demo.launch()
finally:
if db_session:
await db_session.close()
engine.dispose()
if __name__ == "__main__":
asyncio.run(main())