Spaces:
Runtime error
Runtime error
import os | |
import time | |
from googleapiclient.discovery import build | |
import asyncio | |
import httpx | |
from bs4 import BeautifulSoup | |
from dotenv import load_dotenv | |
import html2text | |
import requests | |
load_dotenv() | |
API_KEY = os.environ.get("GOOGLE_SEARCH_API_KEY") | |
CSE_KEY = os.environ.get("GOOGLE_SEARCH_CSE_ID") | |
# Number of pages to scrape | |
NUM_PAGES = 10 | |
# load html2text and set up configs | |
h2t = html2text.HTML2Text() | |
h2t.bodywidth = 0 # No wrapping | |
h2t.ignore_links = True # Ignore hyperlinks | |
h2t.ignore_images = True # Ignore images | |
h2t.ignore_emphasis = True # Ignore emphasis | |
h2t.ignore_tables = False # Include tables | |
h2t.skip_internal_links = True # Skip internal links | |
h2t.skip_external_links = True # Skip external links | |
h2t.single_line_break = True # Use single line breaks | |
h2t.protect_links = True # Protect links from being split | |
h2t.default_image_alt = "[image]" # Default alt text for images | |
def clean_html(text): | |
return h2t.handle(text) | |
def build_results_beautifulsoup(url_list): | |
print("Starting to scrape URLs...") | |
start_time = time.perf_counter() | |
# scrape URLs in list | |
soups = asyncio.run(parallel_scrap(url_list)) | |
scraping_time = time.perf_counter() - start_time | |
print(f"Scraping processing time: {scraping_time:.2f} seconds") | |
result_content = {} | |
count = 0 | |
print("Starting to process each URL...") | |
for url, soup in zip(url_list, soups): | |
if count >= NUM_PAGES: | |
print(f"Reached the limit of {NUM_PAGES} pages. Stopping processing.") | |
break | |
if soup: | |
print(f"Processing URL: {url}") | |
text = clean_html(soup.text) | |
if len(text) > 500: | |
print(f"Adding content from URL: {url}, content length: {len(text)}") | |
result_content[url] = text | |
count += 1 | |
else: | |
print(f"Skipped URL: {url}, content too short (length: {len(text)})") | |
else: | |
print(f"Skipped URL: {url}, no soup content available.") | |
print("Finished processing URLs.") | |
return result_content | |
def build_results_extractor(url_list): | |
try: | |
endpoint = "https://extractorapi.com/api/v1/extractor" | |
result_content = {} | |
count = 0 | |
for url in url_list: | |
if count >= NUM_PAGES: | |
break | |
params = {"apikey": os.environ.get("EXTRACTOR_API_KEY"), "url": url} | |
r = requests.get(endpoint, params=params) | |
if r.status_code == 200: | |
text = r.json()["text"] | |
if len(text) > 500: | |
result_content[url] = text | |
count += 1 | |
if r.status_code == 403: | |
raise Exception(f"Error with API; using default implementaion instead") | |
return result_content | |
except Exception as e: | |
print(e) | |
return build_results_beautifulsoup(url_list) | |
months = { | |
"January": "01", | |
"February": "02", | |
"March": "03", | |
"April": "04", | |
"May": "05", | |
"June": "06", | |
"July": "07", | |
"August": "08", | |
"September": "09", | |
"October": "10", | |
"November": "11", | |
"December": "12", | |
} | |
domain_list = ["com", "org", "net", "int", "edu", "gov", "mil"] | |
def build_date(year=2024, month="March", day=1): | |
return f"{year}{months[month]}{day}" | |
async def get_url_data(url, client): | |
try: | |
r = await client.get(url) | |
if r.status_code == 200: | |
soup = BeautifulSoup(r.content, "html.parser") | |
return soup | |
except Exception: | |
return None | |
async def parallel_scrap(urls): | |
async with httpx.AsyncClient(timeout=30) as client: | |
tasks = [] | |
for url in urls: | |
tasks.append(get_url_data(url=url, client=client)) | |
results = await asyncio.gather(*tasks, return_exceptions=True) | |
return results | |
def scrap(urls): | |
client = httpx.Client() | |
soups = [] | |
for url in urls: | |
soups.append(get_url_data(url=url, client=client)) | |
return soups | |
def google_search_urls( | |
text, | |
sorted_date, | |
domains_to_include, | |
api_key, | |
cse_id, | |
**kwargs, | |
): | |
service = build("customsearch", "v1", developerKey=api_key) | |
results = service.cse().list(q=text, cx=cse_id, sort=sorted_date, **kwargs).execute() | |
url_list = [] | |
if "items" in results and len(results["items"]) > 0: | |
for count, link in enumerate(results["items"]): | |
# skip user selected domains | |
if (domains_to_include is None) or not any( | |
("." + domain) in link["link"] for domain in domains_to_include | |
): | |
continue | |
url = link["link"] | |
if url not in url_list: | |
url_list.append(url) | |
return url_list | |
def google_search( | |
topic, | |
sorted_date, | |
domains_to_include, | |
): | |
api_key = os.environ.get("GOOGLE_SEARCH_API_KEY") | |
cse_id = os.environ.get("GOOGLE_SEARCH_CSE_ID") | |
start_time = time.perf_counter() | |
url_list = google_search_urls( | |
topic, | |
sorted_date, | |
domains_to_include, | |
api_key, | |
cse_id, | |
) | |
print("Google Search processing time: ", time.perf_counter() - start_time) | |
result_content = build_results_beautifulsoup(url_list) | |
return result_content | |