Spaces:
Runtime error
Runtime error
import os | |
import time | |
from googleapiclient.discovery import build | |
import asyncio | |
import httpx | |
from dotenv import load_dotenv | |
import requests | |
import fitz | |
from trafilatura import extract | |
from bs4 import BeautifulSoup | |
load_dotenv() | |
API_KEY = os.environ.get("GOOGLE_SEARCH_API_KEY") | |
CSE_KEY = os.environ.get("GOOGLE_SEARCH_CSE_ID") | |
# Number of pages to scrape | |
NUM_PAGES = 20 | |
def build_results_beautifulsoup(url_list, scholar_abstracts: list[str] = None): | |
print("Starting to scrape URLs...") | |
start_time = time.perf_counter() | |
# scrape URLs in list | |
soups = asyncio.run(parallel_scrap(url_list)) | |
scraping_time = time.perf_counter() - start_time | |
print(f"Scraping processing time: {scraping_time:.2f} seconds") | |
result_content = {} | |
count = 0 | |
print("Starting to process each URL...") | |
for url, soup in zip(url_list, soups): | |
if count >= NUM_PAGES: | |
print(f"Reached the limit of {NUM_PAGES} pages. Stopping processing.") | |
break | |
if soup: | |
print(f"Processing URL: {url}") | |
text = extract( | |
soup, | |
include_tables=False, | |
include_comments=False, | |
output_format="txt", | |
) | |
# If text is None or empty, log a warning and skip | |
if text is None: | |
print(f"Warning: Extraction returned None for URL: {url}") | |
elif len(text) > 500: | |
print(f"Adding content from URL: {url}, content length: {len(text)}") | |
result_content[url] = text | |
count += 1 | |
else: | |
print(f"Skipped URL: {url}, content too short (length: {len(text)})") | |
elif scholar_abstracts and scholar_abstracts.get(url): | |
print(f"Skipped URL: {url}, no soup content available. Returning scholar abstract instead.") | |
result_content[url] = scholar_abstracts.get(url) | |
else: | |
print(f"Skipped URL: {url}, no soup content available.") | |
print("Finished processing URLs.") | |
return result_content | |
def build_results_extractor(url_list): | |
try: | |
endpoint = "https://extractorapi.com/api/v1/extractor" | |
result_content = {} | |
count = 0 | |
for url in url_list: | |
if count >= NUM_PAGES: | |
break | |
params = {"apikey": os.environ.get("EXTRACTOR_API_KEY"), "url": url} | |
r = requests.get(endpoint, params=params) | |
if r.status_code == 200: | |
text = r.json()["text"] | |
if len(text) > 500: | |
result_content[url] = text | |
count += 1 | |
if r.status_code == 403: | |
raise Exception(f"Error with API; using default implementaion instead") | |
return result_content | |
except Exception as e: | |
print(e) | |
return build_results_beautifulsoup(url_list) | |
months = { | |
"January": "01", | |
"February": "02", | |
"March": "03", | |
"April": "04", | |
"May": "05", | |
"June": "06", | |
"July": "07", | |
"August": "08", | |
"September": "09", | |
"October": "10", | |
"November": "11", | |
"December": "12", | |
} | |
domain_list = ["com", "org", "net", "int", "edu", "gov", "mil"] | |
skip_urls = ["youtube.com", "twitter.com", "facebook.com", "instagram.com", "x.com"] | |
def build_date(year=2024, month="March", day=1): | |
return f"{year}{months[month]}{day}" | |
async def get_url_data(url, client): | |
try: | |
r = await client.get(url, follow_redirects=True) | |
print(f"URL: {url}, Response Code: {r.status_code}") | |
if r.status_code == 200: | |
content_type = r.headers.get("Content-Type", "").lower() | |
# Improved PDF detection using Content-Type and file extension | |
if "application/pdf" in content_type or url.lower().endswith(".pdf"): | |
print(f"Detected PDF content via Content-Type or file extension at URL: {url}") | |
pdf_content = await extract_pdf_text(r.content) | |
return pdf_content | |
else: | |
return r.content | |
else: | |
print(f"Non-200 response for URL: {url}, status code: {r.status_code}") | |
return None | |
except Exception as e: | |
print(f"Error fetching URL: {url}, Error: {str(e)}") | |
return None | |
async def extract_pdf_text(content): | |
try: | |
with fitz.open(stream=content, filetype="pdf") as doc: | |
text = "" | |
for page in doc: | |
text += page.get_text() | |
html_content = f""" | |
<!DOCTYPE html> | |
<html> | |
<body> | |
<p>{text}</p> | |
</body> | |
</html> | |
""" | |
html_bytes = html_content.encode("utf-8") | |
return html_bytes # Return in such a format that is parsable by trafilatura | |
except Exception as e: | |
print(f"Error extracting PDF text: {str(e)}") | |
return None | |
async def parallel_scrap(urls): | |
headers = { | |
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3" | |
} | |
async with httpx.AsyncClient(timeout=30, headers=headers) as client: | |
tasks = [] | |
for url in urls: | |
tasks.append(get_url_data(url=url, client=client)) | |
results = await asyncio.gather(*tasks, return_exceptions=True) | |
return results | |
def scrap(urls): | |
client = httpx.Client() | |
soups = [] | |
for url in urls: | |
soups.append(get_url_data(url=url, client=client)) | |
return soups | |
def google_search_urls( | |
text, | |
sorted_date, | |
domains_to_include, | |
api_key, | |
cse_id, | |
num_results=10, # Number of results to fetch per page | |
total_results=30, # Total number of results to fetch | |
skip_urls=None, # List of URLs to skip | |
**kwargs, | |
): | |
if skip_urls is None: | |
skip_urls = [] # Initialize as empty list if not provided | |
service = build("customsearch", "v1", developerKey=api_key) | |
url_list = [] | |
start_index = 1 # Initial index for the search results | |
while len(url_list) < total_results: | |
# Fetch a page of results | |
results = ( | |
service.cse() | |
.list( | |
q=text, | |
cx=cse_id, | |
sort=sorted_date, | |
start=start_index, | |
num=min(num_results, total_results - len(url_list)), | |
**kwargs, | |
) | |
.execute() | |
) | |
if "items" in results and len(results["items"]) > 0: | |
for count, link in enumerate(results["items"]): | |
url = link["link"] | |
# Skip if the URL is in the skip_urls list or doesn't match the domain filter | |
if url in skip_urls: | |
continue | |
if (domains_to_include is None) or any(("." + domain) in url for domain in domains_to_include): | |
if url not in url_list: | |
url_list.append(url) | |
else: | |
# No more results | |
break | |
# Move to the next page of results | |
start_index += num_results | |
return url_list[:total_results] | |
def scrape_abstract(url, title): | |
response = requests.get(url) | |
soup = BeautifulSoup(response.content, "html.parser") | |
abstract_section = soup.find("div", class_="tldr-abstract-replacement paper-detail-page__tldr-abstract") | |
abstract = abstract_section.get_text().strip() if abstract_section else "" | |
return title + "\n" + abstract if abstract != "" else None | |
def semantic_scholar_urls( | |
text, | |
sorted_date, | |
total_results=30, # Total number of results to fetch | |
skip_urls=None, # List of URLs to skip | |
**kwargs, | |
): | |
ss_api_key = os.environ.get("SEMANTIC_SCHOLAR_API_KEY") | |
semantic_scholar_endpoint = "http://api.semanticscholar.org/graph/v1/paper/search/" | |
date_from, date_to = sorted_date.split(":r:")[1].split(":") | |
year_from = date_from[:4] | |
year_to = date_to[:4] | |
success_count = 0 | |
print(f"Dates: {year_from}-{year_to}") | |
query_params = { | |
"query": text, | |
"fields": "title,abstract,url,publicationTypes,publicationDate,openAccessPdf,fieldsOfStudy", | |
"year": f"{year_from}-{year_to}", | |
"limit": 3 * total_results, | |
} | |
headers = {"x-api-key": ss_api_key} | |
response = requests.get(semantic_scholar_endpoint, params=query_params, headers=headers).json() | |
url_list = [] | |
scholar_abstracts = {} | |
for row in response.get("data", []): | |
if success_count >= total_results: | |
break | |
url = row.get("url") | |
if isinstance(url, dict) and url.get("url"): | |
url = url.get("url") | |
url_list.append(url) | |
abstract = row.get("abstract") | |
if abstract: | |
scholar_abstracts[url] = abstract | |
success_count += 1 | |
if row.get("openAccessPdf") and row.get("url"): | |
url_list.append(row.get("openAccessPdf").get("url")) | |
success_count += 1 | |
return url_list, scholar_abstracts | |
def google_search(topic, sorted_date, domains_to_include, scholar_mode_check): | |
api_key = os.environ.get("GOOGLE_SEARCH_API_KEY") | |
cse_id = os.environ.get("GOOGLE_SEARCH_CSE_ID") | |
start_time = time.perf_counter() | |
# if scholar_mode_check: | |
# topic += " -filetype:pdf" | |
scholar_abstracts = None | |
if not scholar_mode_check: | |
url_list = google_search_urls( | |
topic, | |
sorted_date, | |
domains_to_include, | |
api_key, | |
cse_id, | |
) | |
else: | |
url_list, scholar_abstracts = semantic_scholar_urls(topic, sorted_date) | |
print("---") | |
print(len(url_list)) | |
print(url_list) | |
print("---") | |
if scholar_mode_check: | |
print("Semantic Scholar processing time: ", time.perf_counter() - start_time) | |
else: | |
print("Google Search processing time: ", time.perf_counter() - start_time) | |
result_content = build_results_beautifulsoup(url_list, scholar_abstracts) | |
return result_content | |
if __name__ == "__main__": | |
res = google_search("Low Resource ", "date:r:20240101:20241231", domain_list, True) | |
print(res.keys()) | |
print(len(res)) | |
print(res) | |