article_writer / plagiarism.py
eljanmahammadli's picture
added RAG
03fd59b
raw
history blame
3.76 kB
import time
from googleapiclient.discovery import build
import asyncio
import httpx
from bs4 import BeautifulSoup
import justext
import newspaper
def clean_html(text):
result = ""
article = newspaper.Article(url=" ")
article.set_html(text)
article.parse()
result += article.title + "\n"
paragraphs = justext.justext(text, justext.get_stoplist("English"))
for paragraph in paragraphs:
if not paragraph.is_boilerplate:
result += paragraph.text
return result
months = {
"January": "01",
"February": "02",
"March": "03",
"April": "04",
"May": "05",
"June": "06",
"July": "07",
"August": "08",
"September": "09",
"October": "10",
"November": "11",
"December": "12",
}
domain_list = ["com", "org", "net", "int", "edu", "gov", "mil"]
def build_date(year=2024, month="March", day=1):
return f"{year}{months[month]}{day}"
async def get_url_data(url, client):
try:
r = await client.get(url)
if r.status_code == 200:
soup = BeautifulSoup(r.content, "html.parser")
return soup
except Exception:
return None
async def parallel_scrap(urls):
async with httpx.AsyncClient(timeout=30) as client:
tasks = []
for url in urls:
tasks.append(get_url_data(url=url, client=client))
results = await asyncio.gather(*tasks, return_exceptions=True)
return results
def scrap(urls):
client = httpx.Client()
soups = []
for url in urls:
soups.append(get_url_data(url=url, client=client))
return soups
def google_search_urls(
text,
sorted_date,
domains_to_include,
api_key,
cse_id,
**kwargs,
):
service = build("customsearch", "v1", developerKey=api_key)
results = service.cse().list(q=text, cx=cse_id, sort=sorted_date, **kwargs).execute()
url_list = []
if "items" in results and len(results["items"]) > 0:
for count, link in enumerate(results["items"]):
# skip user selected domains
if (domains_to_include is None) or not any(
("." + domain) in link["link"] for domain in domains_to_include
):
continue
url = link["link"]
if url not in url_list:
url_list.append(url)
return url_list
def google_search(
topic,
sorted_date,
domains_to_include,
):
# api_key = "AIzaSyCLyCCpOPLZWuptuPAPSg8cUIZhdEMVf6g"
# api_key = "AIzaSyA5VVwY1eEoIoflejObrxFDI0DJvtbmgW8"
api_key = "AIzaSyCLyCCpOPLZWuptuPAPSg8cUIZhdEMVf6g"
# api_key = "AIzaSyCS1WQDMl1IMjaXtwSd_2rA195-Yc4psQE"
# api_key = "AIzaSyCB61O70B8AC3l5Kk3KMoLb6DN37B7nqIk"
# api_key = "AIzaSyCg1IbevcTAXAPYeYreps6wYWDbU0Kz8tg"
# api_key = "AIzaSyA5VVwY1eEoIoflejObrxFDI0DJvtbmgW8"
cse_id = "851813e81162b4ed4"
# get list of URLS to check
start_time = time.perf_counter()
url_list = google_search_urls(
topic,
sorted_date,
domains_to_include,
api_key,
cse_id,
)
print("GOOGLE SEARCH PROCESSING TIME: ", time.perf_counter() - start_time)
# Scrape URLs in list
start_time = time.perf_counter()
soups = asyncio.run(parallel_scrap(url_list))
print("SCRAPING PROCESSING TIME: ", time.perf_counter() - start_time)
result_content = {}
num_pages = 3
count = 0
for url, soup in zip(url_list, soups):
if count >= num_pages:
break
if soup:
text = clean_html(soup.text)
result_content[url] = text
count += 1
# for key, value in result_content.items():
# print("-------------------URL: ", key)
# print(value[:30])
return result_content