article_writer / google_search.py
minko186's picture
add semantic scholar
b51be98
import os
import time
from googleapiclient.discovery import build
import asyncio
import httpx
from dotenv import load_dotenv
import requests
import fitz
from trafilatura import extract
from bs4 import BeautifulSoup
load_dotenv()
API_KEY = os.environ.get("GOOGLE_SEARCH_API_KEY")
CSE_KEY = os.environ.get("GOOGLE_SEARCH_CSE_ID")
# Number of pages to scrape
NUM_PAGES = 20
def build_results_beautifulsoup(url_list, scholar_abstracts: list[str] = None):
print("Starting to scrape URLs...")
start_time = time.perf_counter()
# scrape URLs in list
soups = asyncio.run(parallel_scrap(url_list))
scraping_time = time.perf_counter() - start_time
print(f"Scraping processing time: {scraping_time:.2f} seconds")
result_content = {}
count = 0
print("Starting to process each URL...")
for url, soup in zip(url_list, soups):
if count >= NUM_PAGES:
print(f"Reached the limit of {NUM_PAGES} pages. Stopping processing.")
break
if soup:
print(f"Processing URL: {url}")
text = extract(
soup,
include_tables=False,
include_comments=False,
output_format="txt",
)
# If text is None or empty, log a warning and skip
if text is None:
print(f"Warning: Extraction returned None for URL: {url}")
elif len(text) > 500:
print(f"Adding content from URL: {url}, content length: {len(text)}")
result_content[url] = text
count += 1
else:
print(f"Skipped URL: {url}, content too short (length: {len(text)})")
elif scholar_abstracts and scholar_abstracts.get(url):
print(f"Skipped URL: {url}, no soup content available. Returning scholar abstract instead.")
result_content[url] = scholar_abstracts.get(url)
else:
print(f"Skipped URL: {url}, no soup content available.")
print("Finished processing URLs.")
return result_content
def build_results_extractor(url_list):
try:
endpoint = "https://extractorapi.com/api/v1/extractor"
result_content = {}
count = 0
for url in url_list:
if count >= NUM_PAGES:
break
params = {"apikey": os.environ.get("EXTRACTOR_API_KEY"), "url": url}
r = requests.get(endpoint, params=params)
if r.status_code == 200:
text = r.json()["text"]
if len(text) > 500:
result_content[url] = text
count += 1
if r.status_code == 403:
raise Exception(f"Error with API; using default implementaion instead")
return result_content
except Exception as e:
print(e)
return build_results_beautifulsoup(url_list)
months = {
"January": "01",
"February": "02",
"March": "03",
"April": "04",
"May": "05",
"June": "06",
"July": "07",
"August": "08",
"September": "09",
"October": "10",
"November": "11",
"December": "12",
}
domain_list = ["com", "org", "net", "int", "edu", "gov", "mil"]
skip_urls = ["youtube.com", "twitter.com", "facebook.com", "instagram.com", "x.com"]
def build_date(year=2024, month="March", day=1):
return f"{year}{months[month]}{day}"
async def get_url_data(url, client):
try:
r = await client.get(url, follow_redirects=True)
print(f"URL: {url}, Response Code: {r.status_code}")
if r.status_code == 200:
content_type = r.headers.get("Content-Type", "").lower()
# Improved PDF detection using Content-Type and file extension
if "application/pdf" in content_type or url.lower().endswith(".pdf"):
print(f"Detected PDF content via Content-Type or file extension at URL: {url}")
pdf_content = await extract_pdf_text(r.content)
return pdf_content
else:
return r.content
else:
print(f"Non-200 response for URL: {url}, status code: {r.status_code}")
return None
except Exception as e:
print(f"Error fetching URL: {url}, Error: {str(e)}")
return None
async def extract_pdf_text(content):
try:
with fitz.open(stream=content, filetype="pdf") as doc:
text = ""
for page in doc:
text += page.get_text()
html_content = f"""
<!DOCTYPE html>
<html>
<body>
<p>{text}</p>
</body>
</html>
"""
html_bytes = html_content.encode("utf-8")
return html_bytes # Return in such a format that is parsable by trafilatura
except Exception as e:
print(f"Error extracting PDF text: {str(e)}")
return None
async def parallel_scrap(urls):
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3"
}
async with httpx.AsyncClient(timeout=30, headers=headers) as client:
tasks = []
for url in urls:
tasks.append(get_url_data(url=url, client=client))
results = await asyncio.gather(*tasks, return_exceptions=True)
return results
def scrap(urls):
client = httpx.Client()
soups = []
for url in urls:
soups.append(get_url_data(url=url, client=client))
return soups
def google_search_urls(
text,
sorted_date,
domains_to_include,
api_key,
cse_id,
num_results=10, # Number of results to fetch per page
total_results=30, # Total number of results to fetch
skip_urls=None, # List of URLs to skip
**kwargs,
):
if skip_urls is None:
skip_urls = [] # Initialize as empty list if not provided
service = build("customsearch", "v1", developerKey=api_key)
url_list = []
start_index = 1 # Initial index for the search results
while len(url_list) < total_results:
# Fetch a page of results
results = (
service.cse()
.list(
q=text,
cx=cse_id,
sort=sorted_date,
start=start_index,
num=min(num_results, total_results - len(url_list)),
**kwargs,
)
.execute()
)
if "items" in results and len(results["items"]) > 0:
for count, link in enumerate(results["items"]):
url = link["link"]
# Skip if the URL is in the skip_urls list or doesn't match the domain filter
if url in skip_urls:
continue
if (domains_to_include is None) or any(("." + domain) in url for domain in domains_to_include):
if url not in url_list:
url_list.append(url)
else:
# No more results
break
# Move to the next page of results
start_index += num_results
return url_list[:total_results]
def scrape_abstract(url, title):
response = requests.get(url)
soup = BeautifulSoup(response.content, "html.parser")
abstract_section = soup.find("div", class_="tldr-abstract-replacement paper-detail-page__tldr-abstract")
abstract = abstract_section.get_text().strip() if abstract_section else ""
return title + "\n" + abstract if abstract != "" else None
def semantic_scholar_urls(
text,
sorted_date,
total_results=30, # Total number of results to fetch
skip_urls=None, # List of URLs to skip
**kwargs,
):
ss_api_key = os.environ.get("SEMANTIC_SCHOLAR_API_KEY")
semantic_scholar_endpoint = "http://api.semanticscholar.org/graph/v1/paper/search/"
date_from, date_to = sorted_date.split(":r:")[1].split(":")
year_from = date_from[:4]
year_to = date_to[:4]
success_count = 0
print(f"Dates: {year_from}-{year_to}")
query_params = {
"query": text,
"fields": "title,abstract,url,publicationTypes,publicationDate,openAccessPdf,fieldsOfStudy",
"year": f"{year_from}-{year_to}",
"limit": 3 * total_results,
}
headers = {"x-api-key": ss_api_key}
response = requests.get(semantic_scholar_endpoint, params=query_params, headers=headers).json()
url_list = []
scholar_abstracts = {}
for row in response.get("data", []):
if success_count >= total_results:
break
url = row.get("url")
if isinstance(url, dict) and url.get("url"):
url = url.get("url")
url_list.append(url)
abstract = row.get("abstract")
if abstract:
scholar_abstracts[url] = abstract
success_count += 1
if row.get("openAccessPdf") and row.get("url"):
url_list.append(row.get("openAccessPdf").get("url"))
success_count += 1
return url_list, scholar_abstracts
def google_search(topic, sorted_date, domains_to_include, scholar_mode_check):
api_key = os.environ.get("GOOGLE_SEARCH_API_KEY")
cse_id = os.environ.get("GOOGLE_SEARCH_CSE_ID")
start_time = time.perf_counter()
# if scholar_mode_check:
# topic += " -filetype:pdf"
scholar_abstracts = None
if not scholar_mode_check:
url_list = google_search_urls(
topic,
sorted_date,
domains_to_include,
api_key,
cse_id,
)
else:
url_list, scholar_abstracts = semantic_scholar_urls(topic, sorted_date)
print("---")
print(len(url_list))
print(url_list)
print("---")
if scholar_mode_check:
print("Semantic Scholar processing time: ", time.perf_counter() - start_time)
else:
print("Google Search processing time: ", time.perf_counter() - start_time)
result_content = build_results_beautifulsoup(url_list, scholar_abstracts)
return result_content
if __name__ == "__main__":
res = google_search("Low Resource ", "date:r:20240101:20241231", domain_list, True)
print(res.keys())
print(len(res))
print(res)