article_writer / google_search.py
eljanmahammadli's picture
enable ai model selection and api key
bf91121
raw
history blame
5.28 kB
import os
import time
from googleapiclient.discovery import build
import asyncio
import httpx
from bs4 import BeautifulSoup
from dotenv import load_dotenv
import html2text
import requests
load_dotenv()
API_KEY = os.environ.get("GOOGLE_SEARCH_API_KEY")
CSE_KEY = os.environ.get("GOOGLE_SEARCH_CSE_ID")
# Number of pages to scrape
NUM_PAGES = 10
# load html2text and set up configs
h2t = html2text.HTML2Text()
h2t.bodywidth = 0 # No wrapping
h2t.ignore_links = True # Ignore hyperlinks
h2t.ignore_images = True # Ignore images
h2t.ignore_emphasis = True # Ignore emphasis
h2t.ignore_tables = False # Include tables
h2t.skip_internal_links = True # Skip internal links
h2t.skip_external_links = True # Skip external links
h2t.single_line_break = True # Use single line breaks
h2t.protect_links = True # Protect links from being split
h2t.default_image_alt = "[image]" # Default alt text for images
def clean_html(text):
return h2t.handle(text)
def build_results_beautifulsoup(url_list):
print("Starting to scrape URLs...")
start_time = time.perf_counter()
# scrape URLs in list
soups = asyncio.run(parallel_scrap(url_list))
scraping_time = time.perf_counter() - start_time
print(f"Scraping processing time: {scraping_time:.2f} seconds")
result_content = {}
count = 0
print("Starting to process each URL...")
for url, soup in zip(url_list, soups):
if count >= NUM_PAGES:
print(f"Reached the limit of {NUM_PAGES} pages. Stopping processing.")
break
if soup:
print(f"Processing URL: {url}")
text = clean_html(soup.text)
if len(text) > 500:
print(f"Adding content from URL: {url}, content length: {len(text)}")
result_content[url] = text
count += 1
else:
print(f"Skipped URL: {url}, content too short (length: {len(text)})")
else:
print(f"Skipped URL: {url}, no soup content available.")
print("Finished processing URLs.")
return result_content
def build_results_extractor(url_list):
try:
endpoint = "https://extractorapi.com/api/v1/extractor"
result_content = {}
count = 0
for url in url_list:
if count >= NUM_PAGES:
break
params = {"apikey": os.environ.get("EXTRACTOR_API_KEY"), "url": url}
r = requests.get(endpoint, params=params)
if r.status_code == 200:
text = r.json()["text"]
if len(text) > 500:
result_content[url] = text
count += 1
if r.status_code == 403:
raise Exception(f"Error with API; using default implementaion instead")
return result_content
except Exception as e:
print(e)
return build_results_beautifulsoup(url_list)
months = {
"January": "01",
"February": "02",
"March": "03",
"April": "04",
"May": "05",
"June": "06",
"July": "07",
"August": "08",
"September": "09",
"October": "10",
"November": "11",
"December": "12",
}
domain_list = ["com", "org", "net", "int", "edu", "gov", "mil"]
def build_date(year=2024, month="March", day=1):
return f"{year}{months[month]}{day}"
async def get_url_data(url, client):
try:
r = await client.get(url)
if r.status_code == 200:
soup = BeautifulSoup(r.content, "html.parser")
return soup
except Exception:
return None
async def parallel_scrap(urls):
async with httpx.AsyncClient(timeout=30) as client:
tasks = []
for url in urls:
tasks.append(get_url_data(url=url, client=client))
results = await asyncio.gather(*tasks, return_exceptions=True)
return results
def scrap(urls):
client = httpx.Client()
soups = []
for url in urls:
soups.append(get_url_data(url=url, client=client))
return soups
def google_search_urls(
text,
sorted_date,
domains_to_include,
api_key,
cse_id,
**kwargs,
):
service = build("customsearch", "v1", developerKey=api_key)
results = service.cse().list(q=text, cx=cse_id, sort=sorted_date, **kwargs).execute()
url_list = []
if "items" in results and len(results["items"]) > 0:
for count, link in enumerate(results["items"]):
# skip user selected domains
if (domains_to_include is None) or not any(
("." + domain) in link["link"] for domain in domains_to_include
):
continue
url = link["link"]
if url not in url_list:
url_list.append(url)
return url_list
def google_search(
topic,
sorted_date,
domains_to_include,
):
api_key = os.environ.get("GOOGLE_SEARCH_API_KEY")
cse_id = os.environ.get("GOOGLE_SEARCH_CSE_ID")
start_time = time.perf_counter()
url_list = google_search_urls(
topic,
sorted_date,
domains_to_include,
api_key,
cse_id,
)
print("Google Search processing time: ", time.perf_counter() - start_time)
result_content = build_results_beautifulsoup(url_list)
return result_content