Spaces:
Sleeping
Sleeping
from bs4 import BeautifulSoup | |
import re | |
import requests as r | |
from html2text import html2text | |
import tqdm | |
import time | |
from selenium import webdriver | |
from selenium.webdriver.common.by import By | |
from selenium.webdriver.chrome.service import Service | |
from selenium.webdriver.support.wait import WebDriverWait | |
from selenium.webdriver.support import expected_conditions as EC | |
from webdriver_manager.chrome import ChromeDriverManager | |
import multiprocessing | |
# def from_desktop_to_mobile_version(url): | |
# """Convert a desktop URL to its mobile version.""" | |
# return url.replace("https://kin.naver.com", "https://m.kin.naver.com") | |
def initialize_webdriver(): | |
"""Initialize and return a WebDriver instance with headless options.""" | |
options = webdriver.ChromeOptions() | |
options.add_argument("--no-sandbox") | |
options.add_argument("--disable-dev-shm-usage") | |
options.add_argument("--headless=new") | |
options.add_argument("--disable-gpu") | |
service = Service(ChromeDriverManager().install()) | |
return webdriver.Chrome(options=options, service=service) | |
def process_url(url): | |
driver = initialize_webdriver() | |
try: | |
print("Processing URL:", url) | |
driver.get(url) | |
closeBtn = WebDriverWait(driver, 5).until( | |
EC.element_to_be_clickable((By.CSS_SELECTOR, ".layer_promotion_choice_inner > .ico_close_layer")), | |
message="Close button not found." | |
) | |
if closeBtn: | |
print("Closing the popup") | |
closeBtn.click() | |
time.sleep(0.2) | |
print("CLOSED") | |
expandBtn = driver.find_element(By.ID, 'nextPageButton') | |
print("Expand button: ", expandBtn) | |
if expandBtn.is_displayed(): | |
WebDriverWait(driver, 10).until( | |
EC.element_to_be_clickable(expandBtn), | |
message="Expand button wasn't loaded in time." | |
) | |
expandBtn.click() | |
print("Clicked the ex`pand button") | |
time.sleep(0.5) | |
html_content = driver.page_source | |
soup = BeautifulSoup(html_content, "html.parser") | |
answers = soup.find_all('div', {'class': 'answerDetail'}) | |
answers = [html2text(str(answer.prettify())) for answer in answers] | |
title = soup.find('div', {'class': 'endTitleSection'}).text.strip() | |
questionDetails = soup.find('div', {'class': 'questionDetail'}).text.strip() | |
title = title.replace("질문", '').strip() | |
print("Answers extracted from: \n", url) | |
print(len(answers)) | |
print('-'*60) | |
return { | |
"title": title, | |
"questionDetails": questionDetails, | |
"url": url, | |
"answers": answers | |
} | |
except Exception as e: | |
print(f"Error processing URL {url} \n\n\n{e}") | |
with open('error_urls.txt', 'w') as f: | |
f.write(url + '\n') | |
return {"title": '', "questionDetails": '', "url": url, "answers": ''} | |
finally: | |
driver.quit() | |
def get_answers(results_a_elements, query): | |
"""Fetch answers for all the extracted result links.""" | |
if not results_a_elements: | |
print("No results found.") | |
return [] | |
print("Result links extracted: ", len(results_a_elements)) | |
# Limit the number of parallel processes for better resource management | |
# max_processes = max(1, int(multiprocessing.cpu_count() * 0.5)) | |
# with multiprocessing.Pool(processes=max_processes) as pool: | |
# results = pool.map(process_url, results_a_elements) | |
results = [] | |
# answer_count = 0 | |
for url in tqdm.tqdm(results_a_elements): | |
res = process_url(url) | |
results.append(res) | |
answer_count += len(res['answers']) | |
return results | |
def get_search_results(query, num_pages): | |
"""Fetch search results for the given query from Naver 지식in.""" | |
results = [] | |
for page in range(1, num_pages + 1): | |
url = f"https://kin.naver.com/search/list.naver?query={query}&page={page}" | |
print("Starting the scraping process for:\n", url) | |
try: | |
response = r.get(url) | |
soup = BeautifulSoup(response.text, "html.parser") | |
results_a_elements = soup.find("ul", {"class": "basic1"}).find_all("a", {"class": "_searchListTitleAnchor"}) | |
results_a_elements = [a.get('href') for a in results_a_elements if a.get("href")] | |
results += results_a_elements | |
except Exception as e: | |
print(f"Error while fetching search results: {e}") | |
return results | |
def extract_data(query, num_pages=150) -> list[dict[str, object]]: | |
results_a_elements = get_search_results(query, num_pages) | |
print(results_a_elements) | |
answers = get_answers(results_a_elements, query) | |
print("Total answers collected:", len(answers)) | |
return answers | |
# if __name__ == "__main__": | |
# process_url("https://kin.naver.com/qna/detail.naver?d1id=4&dirId=401030203&docId=478845808&qb=67O07ZeYIOyImOyIoOu5hA==&enc=utf8§ion=kin.qna_ency&rank=1&search_sort=0&spq=0") |