Spaces:
Sleeping
Sleeping
from typing import List, Dict, Optional, Tuple | |
from datetime import datetime, timedelta | |
import feedparser | |
from bs4 import BeautifulSoup | |
import pytz | |
import os | |
import json | |
from transformers import T5Tokenizer, T5ForConditionalGeneration | |
from fuzzywuzzy import fuzz | |
class EventMatcher: | |
def __init__(self): | |
"""Initialize the event matcher""" | |
print("Initializing Event Matcher...") | |
self.eastern = pytz.timezone('America/New_York') | |
self.events = [] | |
self.last_update = None | |
self.cache_file = "events_cache.json" | |
# Initialize T5 model for response generation | |
self.tokenizer = T5Tokenizer.from_pretrained("t5-small") | |
self.model = T5ForConditionalGeneration.from_pretrained("t5-small") | |
# Load initial events | |
self.load_from_cache() | |
self.update_events() | |
def query(self, user_query: str) -> str: | |
"""Main query method called by the app""" | |
try: | |
# Update events if needed - make this non-blocking | |
if self.last_update is None or (datetime.now() - self.last_update).seconds >= 3600: | |
# Start a background task to update events | |
self.update_events() | |
# Quick response for empty query | |
if not user_query.strip(): | |
return "Please ask me about events at Brock University!" | |
# Find matching events - optimize by limiting initial search | |
matched_events = self.find_matching_events(user_query) | |
if not matched_events: | |
return "I couldn't find any events matching your query. Try asking in a different way!" | |
# Format the response without T5 for faster response | |
events_text = "" | |
for event, score in matched_events: | |
events_text += f""" | |
π {event['title']} | |
π {event['start_time'].strftime('%A, %B %d, %Y')} | |
β° {event['start_time'].strftime('%I:%M %p')} | |
π {event['location']} | |
π·οΈ {event['categories']} | |
π₯ {event['hosts']} | |
{event['description'][:200]}... | |
π {event['link']} | |
""" | |
# Create a simple response prefix based on the query type | |
if "today" in user_query.lower(): | |
prefix = "Here are today's events:" | |
elif "week" in user_query.lower(): | |
prefix = "Here are the events happening this week:" | |
elif any(word in user_query.lower() for word in ["workshop", "training", "seminar"]): | |
prefix = "I found these workshops and seminars:" | |
elif any(word in user_query.lower() for word in ["faculty", "department", "school"]): | |
prefix = "Here are the faculty-related events:" | |
else: | |
prefix = "Here are some events that match your query:" | |
return f"{prefix}\n{events_text}" | |
except Exception as e: | |
print(f"Error in query: {e}") | |
return "I encountered an error processing your query. Please try again!" | |
return "I encountered an error processing your query. Please try again!" | |
def find_matching_events(self, query: str) -> List[Tuple[Dict, float]]: | |
"""Find events matching the query - optimized version""" | |
matched_events = [] | |
query_lower = query.lower() | |
for event in self.events: | |
# Quick initial filter | |
if any(term in event['title'].lower() or | |
term in event['description'].lower()[:200] or | |
term in event['location'].lower() or | |
term in event['categories'].lower() | |
for term in query_lower.split()): | |
# Calculate similarity scores only for potentially matching events | |
title_score = fuzz.token_set_ratio(query_lower, event['title'].lower()) / 100 | |
desc_score = fuzz.token_set_ratio(query_lower, event['description'].lower()) / 100 | |
location_score = fuzz.token_set_ratio(query_lower, event['location'].lower()) / 100 | |
categories_score = fuzz.token_set_ratio(query_lower, event['categories'].lower()) / 100 | |
# Weight the scores | |
total_score = ( | |
title_score * 0.4 + | |
desc_score * 0.3 + | |
location_score * 0.2 + | |
categories_score * 0.1 | |
) | |
if total_score > 0.3: # Threshold for relevance | |
matched_events.append((event, total_score)) | |
# Sort by score and get top matches | |
matched_events.sort(key=lambda x: x[1], reverse=True) | |
return matched_events[:3] | |
def parse_event_datetime(self, entry) -> Tuple[Optional[datetime], Optional[datetime]]: | |
"""Parse event dates from RSS feed""" | |
try: | |
start_time = entry.get('start', None) | |
end_time = entry.get('end', None) | |
# Try RSS feed times first | |
if start_time: | |
start_dt = datetime.strptime(start_time, '%a, %d %b %Y %H:%M:%S %Z') | |
start_dt = pytz.UTC.localize(start_dt).astimezone(self.eastern) | |
else: | |
# Try HTML parsing if RSS times not available | |
soup = BeautifulSoup(entry.description, 'html.parser') | |
start_elem = soup.find('time', class_='dt-start') | |
if start_elem and 'datetime' in start_elem.attrs: | |
dt_str = start_elem['datetime'].split('.')[0] | |
start_dt = datetime.strptime(dt_str, '%Y-%m-%dT%H:%M:%S') | |
start_dt = self.eastern.localize(start_dt) | |
else: | |
return None, None | |
if end_time: | |
end_dt = datetime.strptime(end_time, '%a, %d %b %Y %H:%M:%S %Z') | |
end_dt = pytz.UTC.localize(end_dt).astimezone(self.eastern) | |
else: | |
end_dt = None | |
return start_dt, end_dt | |
except Exception as e: | |
print(f"Error parsing dates: {e}") | |
return None, None | |
def update_events(self) -> None: | |
"""Update events from RSS feed with caching""" | |
try: | |
# Check if we need to update (cache is less than 1 hour old) | |
if self.last_update and (datetime.now() - self.last_update).seconds < 3600: | |
return | |
print("Fetching events from RSS feed...") | |
feed = feedparser.parse("https://experiencebu.brocku.ca/events.rss") | |
new_events = [] | |
for entry in feed.entries: | |
event = self.process_event_entry(entry) | |
if event: | |
new_events.append(event) | |
if new_events: | |
self.events = new_events | |
self.last_update = datetime.now() | |
print(f"Updated {len(self.events)} events") | |
# Save to cache | |
self.save_to_cache() | |
except Exception as e: | |
print(f"Error updating events: {e}") | |
self.load_from_cache() | |
def process_event_entry(self, entry) -> Optional[Dict]: | |
"""Process a single event entry""" | |
try: | |
# Parse dates | |
start_time, end_time = self.parse_event_datetime(entry) | |
if not self.is_event_valid(start_time): | |
return None | |
# Extract event details | |
categories = self.extract_categories(entry) | |
hosts = self.extract_hosts(entry) | |
description = self.clean_description(entry.description) | |
return { | |
'title': entry.title, | |
'description': description, | |
'start_time': start_time, | |
'end_time': end_time, | |
'location': entry.get('location', 'Location not specified'), | |
'categories': ';'.join(categories), | |
'hosts': ';'.join(hosts), | |
'link': entry.link, | |
'guid': entry.guid | |
} | |
except Exception as e: | |
print(f"Error processing event entry: {e}") | |
return None | |
# [Helper methods from original code] | |
def extract_categories(self, entry) -> List[str]: | |
try: | |
return [tag.term for tag in entry.get('tags', [])] | |
except Exception: | |
return [] | |
def extract_hosts(self, entry) -> List[str]: | |
try: | |
hosts = entry.get('host', []) | |
if not isinstance(hosts, list): | |
hosts = [hosts] | |
return [h for h in hosts if h] | |
except Exception: | |
return [] | |
def clean_description(self, description: str) -> str: | |
try: | |
soup = BeautifulSoup(description, 'html.parser') | |
return ' '.join(soup.get_text().split()) | |
except Exception: | |
return description | |
def is_event_valid(self, start_time: Optional[datetime]) -> bool: | |
if not start_time: | |
return False | |
now = datetime.now(self.eastern) | |
two_weeks = now + timedelta(days=14) | |
return now <= start_time <= two_weeks | |
# [Cache handling methods] | |
def save_to_cache(self) -> None: | |
try: | |
cache_data = { | |
'last_update': self.last_update.isoformat(), | |
'events': [] | |
} | |
for event in self.events: | |
event_copy = event.copy() | |
event_copy['start_time'] = event_copy['start_time'].isoformat() | |
if event_copy.get('end_time'): | |
event_copy['end_time'] = event_copy['end_time'].isoformat() | |
cache_data['events'].append(event_copy) | |
with open(self.cache_file, 'w') as f: | |
json.dump(cache_data, f) | |
except Exception as e: | |
print(f"Error saving to cache: {e}") | |
def load_from_cache(self) -> None: | |
try: | |
if not os.path.exists(self.cache_file): | |
return | |
with open(self.cache_file, 'r') as f: | |
cache_data = json.load(f) | |
self.last_update = datetime.fromisoformat(cache_data['last_update']) | |
self.events = [] | |
for event in cache_data['events']: | |
event['start_time'] = datetime.fromisoformat(event['start_time']) | |
if event.get('end_time'): | |
event['end_time'] = datetime.fromisoformat(event['end_time']) | |
self.events.append(event) | |
print(f"Loaded {len(self.events)} events from cache") | |
except Exception as e: | |
print(f"Error loading from cache: {e}") |