Spaces:
Running
Running
import time | |
from urllib.parse import urlparse | |
from concurrent.futures import ThreadPoolExecutor, as_completed | |
from bs4 import BeautifulSoup, Comment, element, Tag, NavigableString | |
import json | |
import html | |
import re | |
import os | |
import platform | |
from .prompts import PROMPT_EXTRACT_BLOCKS | |
from .config import * | |
from pathlib import Path | |
from typing import Dict, Any | |
from urllib.parse import urljoin | |
import requests | |
from requests.exceptions import InvalidSchema | |
from typing import Optional, Tuple, Dict, Any | |
import xxhash | |
from colorama import Fore, Style, init | |
import textwrap | |
import cProfile | |
import pstats | |
from functools import wraps | |
import asyncio | |
class InvalidCSSSelectorError(Exception): | |
pass | |
def create_box_message(message: str, type: str = "info", width: int = 120, add_newlines: bool = True, double_line: bool = False) -> str: | |
""" | |
Create a styled message box with colored borders and formatted text. | |
How it works: | |
1. Determines box style and colors based on the message type (e.g., info, warning). | |
2. Wraps text to fit within the specified width. | |
3. Constructs a box using characters (single or double lines) with appropriate formatting. | |
4. Adds optional newlines before and after the box. | |
Args: | |
message (str): The message to display inside the box. | |
type (str): Type of the message (e.g., "info", "warning", "error", "success"). Defaults to "info". | |
width (int): Width of the box. Defaults to 120. | |
add_newlines (bool): Whether to add newlines before and after the box. Defaults to True. | |
double_line (bool): Whether to use double lines for the box border. Defaults to False. | |
Returns: | |
str: A formatted string containing the styled message box. | |
""" | |
init() | |
# Define border and text colors for different types | |
styles = { | |
"warning": (Fore.YELLOW, Fore.LIGHTYELLOW_EX, "β "), | |
"info": (Fore.BLUE, Fore.LIGHTBLUE_EX, "βΉ"), | |
"success": (Fore.GREEN, Fore.LIGHTGREEN_EX, "β"), | |
"error": (Fore.RED, Fore.LIGHTRED_EX, "Γ"), | |
} | |
border_color, text_color, prefix = styles.get(type.lower(), styles["info"]) | |
# Define box characters based on line style | |
box_chars = { | |
"single": ("β", "β", "β", "β", "β", "β"), | |
"double": ("β", "β", "β", "β", "β", "β") | |
} | |
line_style = "double" if double_line else "single" | |
h_line, v_line, tl, tr, bl, br = box_chars[line_style] | |
# Process lines with lighter text color | |
formatted_lines = [] | |
raw_lines = message.split('\n') | |
if raw_lines: | |
first_line = f"{prefix} {raw_lines[0].strip()}" | |
wrapped_first = textwrap.fill(first_line, width=width-4) | |
formatted_lines.extend(wrapped_first.split('\n')) | |
for line in raw_lines[1:]: | |
if line.strip(): | |
wrapped = textwrap.fill(f" {line.strip()}", width=width-4) | |
formatted_lines.extend(wrapped.split('\n')) | |
else: | |
formatted_lines.append("") | |
# Create the box with colored borders and lighter text | |
horizontal_line = h_line * (width - 1) | |
box = [ | |
f"{border_color}{tl}{horizontal_line}{tr}", | |
*[f"{border_color}{v_line}{text_color} {line:<{width-2}}{border_color}{v_line}" for line in formatted_lines], | |
f"{border_color}{bl}{horizontal_line}{br}{Style.RESET_ALL}" | |
] | |
result = "\n".join(box) | |
if add_newlines: | |
result = f"\n{result}\n" | |
return result | |
def calculate_semaphore_count(): | |
""" | |
Calculate the optimal semaphore count based on system resources. | |
How it works: | |
1. Determines the number of CPU cores and total system memory. | |
2. Sets a base count as half of the available CPU cores. | |
3. Limits the count based on memory, assuming 2GB per semaphore instance. | |
4. Returns the minimum value between CPU and memory-based limits. | |
Returns: | |
int: The calculated semaphore count. | |
""" | |
cpu_count = os.cpu_count() | |
memory_gb = get_system_memory() / (1024 ** 3) # Convert to GB | |
base_count = max(1, cpu_count // 2) | |
memory_based_cap = int(memory_gb / 2) # Assume 2GB per instance | |
return min(base_count, memory_based_cap) | |
def get_system_memory(): | |
""" | |
Get the total system memory in bytes. | |
How it works: | |
1. Detects the operating system. | |
2. Reads memory information from system-specific commands or files. | |
3. Converts the memory to bytes for uniformity. | |
Returns: | |
int: The total system memory in bytes. | |
Raises: | |
OSError: If the operating system is unsupported. | |
""" | |
system = platform.system() | |
if system == "Linux": | |
with open('/proc/meminfo', 'r') as mem: | |
for line in mem: | |
if line.startswith('MemTotal:'): | |
return int(line.split()[1]) * 1024 # Convert KB to bytes | |
elif system == "Darwin": # macOS | |
import subprocess | |
output = subprocess.check_output(['sysctl', '-n', 'hw.memsize']).decode('utf-8') | |
return int(output.strip()) | |
elif system == "Windows": | |
import ctypes | |
kernel32 = ctypes.windll.kernel32 | |
c_ulonglong = ctypes.c_ulonglong | |
class MEMORYSTATUSEX(ctypes.Structure): | |
_fields_ = [ | |
('dwLength', ctypes.c_ulong), | |
('dwMemoryLoad', ctypes.c_ulong), | |
('ullTotalPhys', c_ulonglong), | |
('ullAvailPhys', c_ulonglong), | |
('ullTotalPageFile', c_ulonglong), | |
('ullAvailPageFile', c_ulonglong), | |
('ullTotalVirtual', c_ulonglong), | |
('ullAvailVirtual', c_ulonglong), | |
('ullAvailExtendedVirtual', c_ulonglong), | |
] | |
memoryStatus = MEMORYSTATUSEX() | |
memoryStatus.dwLength = ctypes.sizeof(MEMORYSTATUSEX) | |
kernel32.GlobalMemoryStatusEx(ctypes.byref(memoryStatus)) | |
return memoryStatus.ullTotalPhys | |
else: | |
raise OSError("Unsupported operating system") | |
def get_home_folder(): | |
""" | |
Get or create the home folder for Crawl4AI configuration and cache. | |
How it works: | |
1. Uses environment variables or defaults to the user's home directory. | |
2. Creates `.crawl4ai` and its subdirectories (`cache`, `models`) if they don't exist. | |
3. Returns the path to the home folder. | |
Returns: | |
str: The path to the Crawl4AI home folder. | |
""" | |
home_folder = os.path.join(os.getenv("CRAWL4_AI_BASE_DIRECTORY", os.getenv("CRAWL4_AI_BASE_DIRECTORY", Path.home())), ".crawl4ai") | |
os.makedirs(home_folder, exist_ok=True) | |
os.makedirs(f"{home_folder}/cache", exist_ok=True) | |
os.makedirs(f"{home_folder}/models", exist_ok=True) | |
return home_folder | |
def beautify_html(escaped_html): | |
""" | |
Beautifies an escaped HTML string. | |
Parameters: | |
escaped_html (str): A string containing escaped HTML. | |
Returns: | |
str: A beautifully formatted HTML string. | |
""" | |
# Unescape the HTML string | |
unescaped_html = html.unescape(escaped_html) | |
# Use BeautifulSoup to parse and prettify the HTML | |
soup = BeautifulSoup(unescaped_html, 'html.parser') | |
pretty_html = soup.prettify() | |
return pretty_html | |
def split_and_parse_json_objects(json_string): | |
""" | |
Splits a JSON string which is a list of objects and tries to parse each object. | |
Parameters: | |
json_string (str): A string representation of a list of JSON objects, e.g., '[{...}, {...}, ...]'. | |
Returns: | |
tuple: A tuple containing two lists: | |
- First list contains all successfully parsed JSON objects. | |
- Second list contains the string representations of all segments that couldn't be parsed. | |
""" | |
# Trim the leading '[' and trailing ']' | |
if json_string.startswith('[') and json_string.endswith(']'): | |
json_string = json_string[1:-1].strip() | |
# Split the string into segments that look like individual JSON objects | |
segments = [] | |
depth = 0 | |
start_index = 0 | |
for i, char in enumerate(json_string): | |
if char == '{': | |
if depth == 0: | |
start_index = i | |
depth += 1 | |
elif char == '}': | |
depth -= 1 | |
if depth == 0: | |
segments.append(json_string[start_index:i+1]) | |
# Try parsing each segment | |
parsed_objects = [] | |
unparsed_segments = [] | |
for segment in segments: | |
try: | |
obj = json.loads(segment) | |
parsed_objects.append(obj) | |
except json.JSONDecodeError: | |
unparsed_segments.append(segment) | |
return parsed_objects, unparsed_segments | |
def sanitize_html(html): | |
""" | |
Sanitize an HTML string by escaping quotes. | |
How it works: | |
1. Replaces all unwanted and special characters with an empty string. | |
2. Escapes double and single quotes for safe usage. | |
Args: | |
html (str): The HTML string to sanitize. | |
Returns: | |
str: The sanitized HTML string. | |
""" | |
# Replace all unwanted and special characters with an empty string | |
sanitized_html = html | |
# sanitized_html = re.sub(r'[^\w\s.,;:!?=\[\]{}()<>\/\\\-"]', '', html) | |
# Escape all double and single quotes | |
sanitized_html = sanitized_html.replace('"', '\\"').replace("'", "\\'") | |
return sanitized_html | |
def sanitize_input_encode(text: str) -> str: | |
"""Sanitize input to handle potential encoding issues.""" | |
try: | |
try: | |
if not text: | |
return '' | |
# Attempt to encode and decode as UTF-8 to handle potential encoding issues | |
return text.encode('utf-8', errors='ignore').decode('utf-8') | |
except UnicodeEncodeError as e: | |
print(f"Warning: Encoding issue detected. Some characters may be lost. Error: {e}") | |
# Fall back to ASCII if UTF-8 fails | |
return text.encode('ascii', errors='ignore').decode('ascii') | |
except Exception as e: | |
raise ValueError(f"Error sanitizing input: {str(e)}") from e | |
def escape_json_string(s): | |
""" | |
Escapes characters in a string to be JSON safe. | |
Parameters: | |
s (str): The input string to be escaped. | |
Returns: | |
str: The escaped string, safe for JSON encoding. | |
""" | |
# Replace problematic backslash first | |
s = s.replace('\\', '\\\\') | |
# Replace the double quote | |
s = s.replace('"', '\\"') | |
# Escape control characters | |
s = s.replace('\b', '\\b') | |
s = s.replace('\f', '\\f') | |
s = s.replace('\n', '\\n') | |
s = s.replace('\r', '\\r') | |
s = s.replace('\t', '\\t') | |
# Additional problematic characters | |
# Unicode control characters | |
s = re.sub(r'[\x00-\x1f\x7f-\x9f]', lambda x: '\\u{:04x}'.format(ord(x.group())), s) | |
return s | |
def replace_inline_tags(soup, tags, only_text=False): | |
""" | |
Replace inline HTML tags with Markdown-style equivalents. | |
How it works: | |
1. Maps specific tags (e.g., <b>, <i>) to Markdown syntax. | |
2. Finds and replaces all occurrences of these tags in the provided BeautifulSoup object. | |
3. Optionally replaces tags with their text content only. | |
Args: | |
soup (BeautifulSoup): Parsed HTML content. | |
tags (List[str]): List of tags to replace. | |
only_text (bool): Whether to replace tags with plain text. Defaults to False. | |
Returns: | |
BeautifulSoup: Updated BeautifulSoup object with replaced tags. | |
""" | |
tag_replacements = { | |
'b': lambda tag: f"**{tag.text}**", | |
'i': lambda tag: f"*{tag.text}*", | |
'u': lambda tag: f"__{tag.text}__", | |
'span': lambda tag: f"{tag.text}", | |
'del': lambda tag: f"~~{tag.text}~~", | |
'ins': lambda tag: f"++{tag.text}++", | |
'sub': lambda tag: f"~{tag.text}~", | |
'sup': lambda tag: f"^^{tag.text}^^", | |
'strong': lambda tag: f"**{tag.text}**", | |
'em': lambda tag: f"*{tag.text}*", | |
'code': lambda tag: f"`{tag.text}`", | |
'kbd': lambda tag: f"`{tag.text}`", | |
'var': lambda tag: f"_{tag.text}_", | |
's': lambda tag: f"~~{tag.text}~~", | |
'q': lambda tag: f'"{tag.text}"', | |
'abbr': lambda tag: f"{tag.text} ({tag.get('title', '')})", | |
'cite': lambda tag: f"_{tag.text}_", | |
'dfn': lambda tag: f"_{tag.text}_", | |
'time': lambda tag: f"{tag.text}", | |
'small': lambda tag: f"<small>{tag.text}</small>", | |
'mark': lambda tag: f"=={tag.text}==" | |
} | |
replacement_data = [(tag, tag_replacements.get(tag, lambda t: t.text)) for tag in tags] | |
for tag_name, replacement_func in replacement_data: | |
for tag in soup.find_all(tag_name): | |
replacement_text = tag.text if only_text else replacement_func(tag) | |
tag.replace_with(replacement_text) | |
return soup | |
# for tag_name in tags: | |
# for tag in soup.find_all(tag_name): | |
# if not only_text: | |
# replacement_text = tag_replacements.get(tag_name, lambda t: t.text)(tag) | |
# tag.replace_with(replacement_text) | |
# else: | |
# tag.replace_with(tag.text) | |
# return soup | |
def get_content_of_website(url, html, word_count_threshold = MIN_WORD_THRESHOLD, css_selector = None, **kwargs): | |
""" | |
Extract structured content, media, and links from website HTML. | |
How it works: | |
1. Parses the HTML content using BeautifulSoup. | |
2. Extracts internal/external links and media (images, videos, audios). | |
3. Cleans the content by removing unwanted tags and attributes. | |
4. Converts cleaned HTML to Markdown. | |
5. Collects metadata and returns the extracted information. | |
Args: | |
url (str): The website URL. | |
html (str): The HTML content of the website. | |
word_count_threshold (int): Minimum word count for content inclusion. Defaults to MIN_WORD_THRESHOLD. | |
css_selector (Optional[str]): CSS selector to extract specific content. Defaults to None. | |
Returns: | |
Dict[str, Any]: Extracted content including Markdown, cleaned HTML, media, links, and metadata. | |
""" | |
try: | |
if not html: | |
return None | |
# Parse HTML content with BeautifulSoup | |
soup = BeautifulSoup(html, 'html.parser') | |
# Get the content within the <body> tag | |
body = soup.body | |
# If css_selector is provided, extract content based on the selector | |
if css_selector: | |
selected_elements = body.select(css_selector) | |
if not selected_elements: | |
raise InvalidCSSSelectorError(f"Invalid CSS selector , No elements found for CSS selector: {css_selector}") | |
div_tag = soup.new_tag('div') | |
for el in selected_elements: | |
div_tag.append(el) | |
body = div_tag | |
links = { | |
'internal': [], | |
'external': [] | |
} | |
# Extract all internal and external links | |
for a in body.find_all('a', href=True): | |
href = a['href'] | |
url_base = url.split('/')[2] | |
if href.startswith('http') and url_base not in href: | |
links['external'].append({ | |
'href': href, | |
'text': a.get_text() | |
}) | |
else: | |
links['internal'].append( | |
{ | |
'href': href, | |
'text': a.get_text() | |
} | |
) | |
# Remove script, style, and other tags that don't carry useful content from body | |
for tag in body.find_all(['script', 'style', 'link', 'meta', 'noscript']): | |
tag.decompose() | |
# Remove all attributes from remaining tags in body, except for img tags | |
for tag in body.find_all(): | |
if tag.name != 'img': | |
tag.attrs = {} | |
# Extract all img tgas int0 [{src: '', alt: ''}] | |
media = { | |
'images': [], | |
'videos': [], | |
'audios': [] | |
} | |
for img in body.find_all('img'): | |
media['images'].append({ | |
'src': img.get('src'), | |
'alt': img.get('alt'), | |
"type": "image" | |
}) | |
# Extract all video tags into [{src: '', alt: ''}] | |
for video in body.find_all('video'): | |
media['videos'].append({ | |
'src': video.get('src'), | |
'alt': video.get('alt'), | |
"type": "video" | |
}) | |
# Extract all audio tags into [{src: '', alt: ''}] | |
for audio in body.find_all('audio'): | |
media['audios'].append({ | |
'src': audio.get('src'), | |
'alt': audio.get('alt'), | |
"type": "audio" | |
}) | |
# Replace images with their alt text or remove them if no alt text is available | |
for img in body.find_all('img'): | |
alt_text = img.get('alt') | |
if alt_text: | |
img.replace_with(soup.new_string(alt_text)) | |
else: | |
img.decompose() | |
# Create a function that replace content of all"pre" tag with its inner text | |
def replace_pre_tags_with_text(node): | |
for child in node.find_all('pre'): | |
# set child inner html to its text | |
child.string = child.get_text() | |
return node | |
# Replace all "pre" tags with their inner text | |
body = replace_pre_tags_with_text(body) | |
# Replace inline tags with their text content | |
body = replace_inline_tags( | |
body, | |
['b', 'i', 'u', 'span', 'del', 'ins', 'sub', 'sup', 'strong', 'em', 'code', 'kbd', 'var', 's', 'q', 'abbr', 'cite', 'dfn', 'time', 'small', 'mark'], | |
only_text=kwargs.get('only_text', False) | |
) | |
# Recursively remove empty elements, their parent elements, and elements with word count below threshold | |
def remove_empty_and_low_word_count_elements(node, word_count_threshold): | |
for child in node.contents: | |
if isinstance(child, element.Tag): | |
remove_empty_and_low_word_count_elements(child, word_count_threshold) | |
word_count = len(child.get_text(strip=True).split()) | |
if (len(child.contents) == 0 and not child.get_text(strip=True)) or word_count < word_count_threshold: | |
child.decompose() | |
return node | |
body = remove_empty_and_low_word_count_elements(body, word_count_threshold) | |
def remove_small_text_tags(body: Tag, word_count_threshold: int = MIN_WORD_THRESHOLD): | |
# We'll use a list to collect all tags that don't meet the word count requirement | |
tags_to_remove = [] | |
# Traverse all tags in the body | |
for tag in body.find_all(True): # True here means all tags | |
# Check if the tag contains text and if it's not just whitespace | |
if tag.string and tag.string.strip(): | |
# Split the text by spaces and count the words | |
word_count = len(tag.string.strip().split()) | |
# If the word count is less than the threshold, mark the tag for removal | |
if word_count < word_count_threshold: | |
tags_to_remove.append(tag) | |
# Remove all marked tags from the tree | |
for tag in tags_to_remove: | |
tag.decompose() # or tag.extract() to remove and get the element | |
return body | |
# Remove small text tags | |
body = remove_small_text_tags(body, word_count_threshold) | |
def is_empty_or_whitespace(tag: Tag): | |
if isinstance(tag, NavigableString): | |
return not tag.strip() | |
# Check if the tag itself is empty or all its children are empty/whitespace | |
if not tag.contents: | |
return True | |
return all(is_empty_or_whitespace(child) for child in tag.contents) | |
def remove_empty_tags(body: Tag): | |
# Continue processing until no more changes are made | |
changes = True | |
while changes: | |
changes = False | |
# Collect all tags that are empty or contain only whitespace | |
empty_tags = [tag for tag in body.find_all(True) if is_empty_or_whitespace(tag)] | |
for tag in empty_tags: | |
# If a tag is empty, decompose it | |
tag.decompose() | |
changes = True # Mark that a change was made | |
return body | |
# Remove empty tags | |
body = remove_empty_tags(body) | |
# Flatten nested elements with only one child of the same type | |
def flatten_nested_elements(node): | |
for child in node.contents: | |
if isinstance(child, element.Tag): | |
flatten_nested_elements(child) | |
if len(child.contents) == 1 and child.contents[0].name == child.name: | |
# print('Flattening:', child.name) | |
child_content = child.contents[0] | |
child.replace_with(child_content) | |
return node | |
body = flatten_nested_elements(body) | |
# Remove comments | |
for comment in soup.find_all(string=lambda text: isinstance(text, Comment)): | |
comment.extract() | |
# Remove consecutive empty newlines and replace multiple spaces with a single space | |
cleaned_html = str(body).replace('\n\n', '\n').replace(' ', ' ') | |
# Sanitize the cleaned HTML content | |
cleaned_html = sanitize_html(cleaned_html) | |
# sanitized_html = escape_json_string(cleaned_html) | |
# Convert cleaned HTML to Markdown | |
h = html2text.HTML2Text() | |
h = CustomHTML2Text() | |
h.ignore_links = True | |
markdown = h.handle(cleaned_html) | |
markdown = markdown.replace(' ```', '```') | |
try: | |
meta = extract_metadata(html, soup) | |
except Exception as e: | |
print('Error extracting metadata:', str(e)) | |
meta = {} | |
# Return the Markdown content | |
return{ | |
'markdown': markdown, | |
'cleaned_html': cleaned_html, | |
'success': True, | |
'media': media, | |
'links': links, | |
'metadata': meta | |
} | |
except Exception as e: | |
print('Error processing HTML content:', str(e)) | |
raise InvalidCSSSelectorError(f"Invalid CSS selector: {css_selector}") from e | |
def get_content_of_website_optimized(url: str, html: str, word_count_threshold: int = MIN_WORD_THRESHOLD, css_selector: str = None, **kwargs) -> Dict[str, Any]: | |
if not html: | |
return None | |
soup = BeautifulSoup(html, 'html.parser') | |
body = soup.body | |
image_description_min_word_threshold = kwargs.get('image_description_min_word_threshold', IMAGE_DESCRIPTION_MIN_WORD_THRESHOLD) | |
for tag in kwargs.get('excluded_tags', []) or []: | |
for el in body.select(tag): | |
el.decompose() | |
if css_selector: | |
selected_elements = body.select(css_selector) | |
if not selected_elements: | |
raise InvalidCSSSelectorError(f"Invalid CSS selector, No elements found for CSS selector: {css_selector}") | |
body = soup.new_tag('div') | |
for el in selected_elements: | |
body.append(el) | |
links = {'internal': [], 'external': []} | |
media = {'images': [], 'videos': [], 'audios': []} | |
# Extract meaningful text for media files from closest parent | |
def find_closest_parent_with_useful_text(tag): | |
current_tag = tag | |
while current_tag: | |
current_tag = current_tag.parent | |
# Get the text content from the parent tag | |
if current_tag: | |
text_content = current_tag.get_text(separator=' ',strip=True) | |
# Check if the text content has at least word_count_threshold | |
if len(text_content.split()) >= image_description_min_word_threshold: | |
return text_content | |
return None | |
def process_image(img, url, index, total_images): | |
#Check if an image has valid display and inside undesired html elements | |
def is_valid_image(img, parent, parent_classes): | |
style = img.get('style', '') | |
src = img.get('src', '') | |
classes_to_check = ['button', 'icon', 'logo'] | |
tags_to_check = ['button', 'input'] | |
return all([ | |
'display:none' not in style, | |
src, | |
not any(s in var for var in [src, img.get('alt', ''), *parent_classes] for s in classes_to_check), | |
parent.name not in tags_to_check | |
]) | |
#Score an image for it's usefulness | |
def score_image_for_usefulness(img, base_url, index, images_count): | |
# Function to parse image height/width value and units | |
def parse_dimension(dimension): | |
if dimension: | |
match = re.match(r"(\d+)(\D*)", dimension) | |
if match: | |
number = int(match.group(1)) | |
unit = match.group(2) or 'px' # Default unit is 'px' if not specified | |
return number, unit | |
return None, None | |
# Fetch image file metadata to extract size and extension | |
def fetch_image_file_size(img, base_url): | |
#If src is relative path construct full URL, if not it may be CDN URL | |
img_url = urljoin(base_url,img.get('src')) | |
try: | |
response = requests.head(img_url) | |
if response.status_code == 200: | |
return response.headers.get('Content-Length',None) | |
else: | |
print(f"Failed to retrieve file size for {img_url}") | |
return None | |
except InvalidSchema as e: | |
return None | |
finally: | |
return | |
image_height = img.get('height') | |
height_value, height_unit = parse_dimension(image_height) | |
image_width = img.get('width') | |
width_value, width_unit = parse_dimension(image_width) | |
image_size = 0 #int(fetch_image_file_size(img,base_url) or 0) | |
image_format = os.path.splitext(img.get('src',''))[1].lower() | |
# Remove . from format | |
image_format = image_format.strip('.') | |
score = 0 | |
if height_value: | |
if height_unit == 'px' and height_value > 150: | |
score += 1 | |
if height_unit in ['%','vh','vmin','vmax'] and height_value >30: | |
score += 1 | |
if width_value: | |
if width_unit == 'px' and width_value > 150: | |
score += 1 | |
if width_unit in ['%','vh','vmin','vmax'] and width_value >30: | |
score += 1 | |
if image_size > 10000: | |
score += 1 | |
if img.get('alt') != '': | |
score+=1 | |
if any(image_format==format for format in ['jpg','png','webp']): | |
score+=1 | |
if index/images_count<0.5: | |
score+=1 | |
return score | |
if not is_valid_image(img, img.parent, img.parent.get('class', [])): | |
return None | |
score = score_image_for_usefulness(img, url, index, total_images) | |
if score <= IMAGE_SCORE_THRESHOLD: | |
return None | |
return { | |
'src': img.get('src', '').replace('\\"', '"').strip(), | |
'alt': img.get('alt', ''), | |
'desc': find_closest_parent_with_useful_text(img), | |
'score': score, | |
'type': 'image' | |
} | |
def process_element(element: element.PageElement) -> bool: | |
try: | |
if isinstance(element, NavigableString): | |
if isinstance(element, Comment): | |
element.extract() | |
return False | |
if element.name in ['script', 'style', 'link', 'meta', 'noscript']: | |
element.decompose() | |
return False | |
keep_element = False | |
if element.name == 'a' and element.get('href'): | |
href = element['href'] | |
url_base = url.split('/')[2] | |
link_data = {'href': href, 'text': element.get_text()} | |
if href.startswith('http') and url_base not in href: | |
links['external'].append(link_data) | |
else: | |
links['internal'].append(link_data) | |
keep_element = True | |
elif element.name == 'img': | |
return True # Always keep image elements | |
elif element.name in ['video', 'audio']: | |
media[f"{element.name}s"].append({ | |
'src': element.get('src'), | |
'alt': element.get('alt'), | |
'type': element.name, | |
'description': find_closest_parent_with_useful_text(element) | |
}) | |
source_tags = element.find_all('source') | |
for source_tag in source_tags: | |
media[f"{element.name}s"].append({ | |
'src': source_tag.get('src'), | |
'alt': element.get('alt'), | |
'type': element.name, | |
'description': find_closest_parent_with_useful_text(element) | |
}) | |
return True # Always keep video and audio elements | |
if element.name != 'pre': | |
if element.name in ['b', 'i', 'u', 'span', 'del', 'ins', 'sub', 'sup', 'strong', 'em', 'code', 'kbd', 'var', 's', 'q', 'abbr', 'cite', 'dfn', 'time', 'small', 'mark']: | |
if kwargs.get('only_text', False): | |
element.replace_with(element.get_text()) | |
else: | |
element.unwrap() | |
elif element.name != 'img': | |
element.attrs = {} | |
# Process children | |
for child in list(element.children): | |
if isinstance(child, NavigableString) and not isinstance(child, Comment): | |
if len(child.strip()) > 0: | |
keep_element = True | |
else: | |
if process_element(child): | |
keep_element = True | |
# Check word count | |
if not keep_element: | |
word_count = len(element.get_text(strip=True).split()) | |
keep_element = word_count >= word_count_threshold | |
if not keep_element: | |
element.decompose() | |
return keep_element | |
except Exception as e: | |
print('Error processing element:', str(e)) | |
return False | |
#process images by filtering and extracting contextual text from the page | |
imgs = body.find_all('img') | |
media['images'] = [ | |
result for result in | |
(process_image(img, url, i, len(imgs)) for i, img in enumerate(imgs)) | |
if result is not None | |
] | |
process_element(body) | |
def flatten_nested_elements(node): | |
if isinstance(node, NavigableString): | |
return node | |
if len(node.contents) == 1 and isinstance(node.contents[0], element.Tag) and node.contents[0].name == node.name: | |
return flatten_nested_elements(node.contents[0]) | |
node.contents = [flatten_nested_elements(child) for child in node.contents] | |
return node | |
body = flatten_nested_elements(body) | |
base64_pattern = re.compile(r'data:image/[^;]+;base64,([^"]+)') | |
for img in imgs: | |
try: | |
src = img.get('src', '') | |
if base64_pattern.match(src): | |
img['src'] = base64_pattern.sub('', src) | |
except: | |
pass | |
cleaned_html = str(body).replace('\n\n', '\n').replace(' ', ' ') | |
cleaned_html = sanitize_html(cleaned_html) | |
h = CustomHTML2Text() | |
h.ignore_links = True | |
markdown = h.handle(cleaned_html) | |
markdown = markdown.replace(' ```', '```') | |
try: | |
meta = extract_metadata(html, soup) | |
except Exception as e: | |
print('Error extracting metadata:', str(e)) | |
meta = {} | |
return { | |
'markdown': markdown, | |
'cleaned_html': cleaned_html, | |
'success': True, | |
'media': media, | |
'links': links, | |
'metadata': meta | |
} | |
def extract_metadata(html, soup=None): | |
""" | |
Extract optimized content, media, and links from website HTML. | |
How it works: | |
1. Similar to `get_content_of_website`, but optimized for performance. | |
2. Filters and scores images for usefulness. | |
3. Extracts contextual descriptions for media files. | |
4. Handles excluded tags and CSS selectors. | |
5. Cleans HTML and converts it to Markdown. | |
Args: | |
url (str): The website URL. | |
html (str): The HTML content of the website. | |
word_count_threshold (int): Minimum word count for content inclusion. Defaults to MIN_WORD_THRESHOLD. | |
css_selector (Optional[str]): CSS selector to extract specific content. Defaults to None. | |
**kwargs: Additional options for customization. | |
Returns: | |
Dict[str, Any]: Extracted content including Markdown, cleaned HTML, media, links, and metadata. | |
""" | |
metadata = {} | |
if not html and not soup: | |
return {} | |
if not soup: | |
soup = BeautifulSoup(html, 'lxml') | |
head = soup.head | |
if not head: | |
return metadata | |
# Title | |
title_tag = head.find('title') | |
metadata['title'] = title_tag.string.strip() if title_tag and title_tag.string else None | |
# Meta description | |
description_tag = head.find('meta', attrs={'name': 'description'}) | |
metadata['description'] = description_tag.get('content', '').strip() if description_tag else None | |
# Meta keywords | |
keywords_tag = head.find('meta', attrs={'name': 'keywords'}) | |
metadata['keywords'] = keywords_tag.get('content', '').strip() if keywords_tag else None | |
# Meta author | |
author_tag = head.find('meta', attrs={'name': 'author'}) | |
metadata['author'] = author_tag.get('content', '').strip() if author_tag else None | |
# Open Graph metadata | |
og_tags = head.find_all('meta', attrs={'property': re.compile(r'^og:')}) | |
for tag in og_tags: | |
property_name = tag.get('property', '').strip() | |
content = tag.get('content', '').strip() | |
if property_name and content: | |
metadata[property_name] = content | |
# Twitter Card metadata | |
twitter_tags = head.find_all('meta', attrs={'name': re.compile(r'^twitter:')}) | |
for tag in twitter_tags: | |
property_name = tag.get('name', '').strip() | |
content = tag.get('content', '').strip() | |
if property_name and content: | |
metadata[property_name] = content | |
return metadata | |
def extract_xml_tags(string): | |
""" | |
Extracts XML tags from a string. | |
Args: | |
string (str): The input string containing XML tags. | |
Returns: | |
List[str]: A list of XML tags extracted from the input string. | |
""" | |
tags = re.findall(r'<(\w+)>', string) | |
return list(set(tags)) | |
def extract_xml_data(tags, string): | |
""" | |
Extract data for specified XML tags from a string. | |
How it works: | |
1. Searches the string for each tag using regex. | |
2. Extracts the content within the tags. | |
3. Returns a dictionary of tag-content pairs. | |
Args: | |
tags (List[str]): The list of XML tags to extract. | |
string (str): The input string containing XML data. | |
Returns: | |
Dict[str, str]: A dictionary with tag names as keys and extracted content as values. | |
""" | |
data = {} | |
for tag in tags: | |
pattern = f"<{tag}>(.*?)</{tag}>" | |
match = re.search(pattern, string, re.DOTALL) | |
if match: | |
data[tag] = match.group(1).strip() | |
else: | |
data[tag] = "" | |
return data | |
def perform_completion_with_backoff( | |
provider, | |
prompt_with_variables, | |
api_token, | |
json_response = False, | |
base_url=None, | |
**kwargs | |
): | |
""" | |
Perform an API completion request with exponential backoff. | |
How it works: | |
1. Sends a completion request to the API. | |
2. Retries on rate-limit errors with exponential delays. | |
3. Returns the API response or an error after all retries. | |
Args: | |
provider (str): The name of the API provider. | |
prompt_with_variables (str): The input prompt for the completion request. | |
api_token (str): The API token for authentication. | |
json_response (bool): Whether to request a JSON response. Defaults to False. | |
base_url (Optional[str]): The base URL for the API. Defaults to None. | |
**kwargs: Additional arguments for the API request. | |
Returns: | |
dict: The API response or an error message after all retries. | |
""" | |
from litellm import completion | |
from litellm.exceptions import RateLimitError | |
max_attempts = 3 | |
base_delay = 2 # Base delay in seconds, you can adjust this based on your needs | |
extra_args = { | |
"temperature": 0.01, | |
'api_key': api_token, | |
'base_url': base_url | |
} | |
if json_response: | |
extra_args["response_format"] = { "type": "json_object" } | |
if kwargs.get("extra_args"): | |
extra_args.update(kwargs["extra_args"]) | |
for attempt in range(max_attempts): | |
try: | |
response =completion( | |
model=provider, | |
messages=[ | |
{"role": "user", "content": prompt_with_variables} | |
], | |
**extra_args | |
) | |
return response # Return the successful response | |
except RateLimitError as e: | |
print("Rate limit error:", str(e)) | |
# Check if we have exhausted our max attempts | |
if attempt < max_attempts - 1: | |
# Calculate the delay and wait | |
delay = base_delay * (2 ** attempt) # Exponential backoff formula | |
print(f"Waiting for {delay} seconds before retrying...") | |
time.sleep(delay) | |
else: | |
# Return an error response after exhausting all retries | |
return [{ | |
"index": 0, | |
"tags": ["error"], | |
"content": ["Rate limit error. Please try again later."] | |
}] | |
def extract_blocks(url, html, provider = DEFAULT_PROVIDER, api_token = None, base_url = None): | |
""" | |
Extract content blocks from website HTML using an AI provider. | |
How it works: | |
1. Prepares a prompt by sanitizing and escaping HTML. | |
2. Sends the prompt to an AI provider with optional retries. | |
3. Parses the response to extract structured blocks or errors. | |
Args: | |
url (str): The website URL. | |
html (str): The HTML content of the website. | |
provider (str): The AI provider for content extraction. Defaults to DEFAULT_PROVIDER. | |
api_token (Optional[str]): The API token for authentication. Defaults to None. | |
base_url (Optional[str]): The base URL for the API. Defaults to None. | |
Returns: | |
List[dict]: A list of extracted content blocks. | |
""" | |
# api_token = os.getenv('GROQ_API_KEY', None) if not api_token else api_token | |
api_token = PROVIDER_MODELS.get(provider, None) if not api_token else api_token | |
variable_values = { | |
"URL": url, | |
"HTML": escape_json_string(sanitize_html(html)), | |
} | |
prompt_with_variables = PROMPT_EXTRACT_BLOCKS | |
for variable in variable_values: | |
prompt_with_variables = prompt_with_variables.replace( | |
"{" + variable + "}", variable_values[variable] | |
) | |
response = perform_completion_with_backoff(provider, prompt_with_variables, api_token, base_url=base_url) | |
try: | |
blocks = extract_xml_data(["blocks"], response.choices[0].message.content)['blocks'] | |
blocks = json.loads(blocks) | |
## Add error: False to the blocks | |
for block in blocks: | |
block['error'] = False | |
except Exception as e: | |
parsed, unparsed = split_and_parse_json_objects(response.choices[0].message.content) | |
blocks = parsed | |
# Append all unparsed segments as onr error block and content is list of unparsed segments | |
if unparsed: | |
blocks.append({ | |
"index": 0, | |
"error": True, | |
"tags": ["error"], | |
"content": unparsed | |
}) | |
return blocks | |
def extract_blocks_batch(batch_data, provider = "groq/llama3-70b-8192", api_token = None): | |
""" | |
Extract content blocks from a batch of website HTMLs. | |
How it works: | |
1. Prepares prompts for each URL and HTML pair. | |
2. Sends the prompts to the AI provider in a batch request. | |
3. Parses the responses to extract structured blocks or errors. | |
Args: | |
batch_data (List[Tuple[str, str]]): A list of (URL, HTML) pairs. | |
provider (str): The AI provider for content extraction. Defaults to "groq/llama3-70b-8192". | |
api_token (Optional[str]): The API token for authentication. Defaults to None. | |
Returns: | |
List[dict]: A list of extracted content blocks from all batch items. | |
""" | |
api_token = os.getenv('GROQ_API_KEY', None) if not api_token else api_token | |
from litellm import batch_completion | |
messages = [] | |
for url, html in batch_data: | |
variable_values = { | |
"URL": url, | |
"HTML": html, | |
} | |
prompt_with_variables = PROMPT_EXTRACT_BLOCKS | |
for variable in variable_values: | |
prompt_with_variables = prompt_with_variables.replace( | |
"{" + variable + "}", variable_values[variable] | |
) | |
messages.append([{"role": "user", "content": prompt_with_variables}]) | |
responses = batch_completion( | |
model = provider, | |
messages = messages, | |
temperature = 0.01 | |
) | |
all_blocks = [] | |
for response in responses: | |
try: | |
blocks = extract_xml_data(["blocks"], response.choices[0].message.content)['blocks'] | |
blocks = json.loads(blocks) | |
except Exception as e: | |
blocks = [{ | |
"index": 0, | |
"tags": ["error"], | |
"content": ["Error extracting blocks from the HTML content. Choose another provider/model or try again."], | |
"questions": ["What went wrong during the block extraction process?"] | |
}] | |
all_blocks.append(blocks) | |
return sum(all_blocks, []) | |
def merge_chunks_based_on_token_threshold(chunks, token_threshold): | |
""" | |
Merges small chunks into larger ones based on the total token threshold. | |
:param chunks: List of text chunks to be merged based on token count. | |
:param token_threshold: Max number of tokens for each merged chunk. | |
:return: List of merged text chunks. | |
""" | |
merged_sections = [] | |
current_chunk = [] | |
total_token_so_far = 0 | |
for chunk in chunks: | |
chunk_token_count = len(chunk.split()) * 1.3 # Estimate token count with a factor | |
if total_token_so_far + chunk_token_count < token_threshold: | |
current_chunk.append(chunk) | |
total_token_so_far += chunk_token_count | |
else: | |
if current_chunk: | |
merged_sections.append('\n\n'.join(current_chunk)) | |
current_chunk = [chunk] | |
total_token_so_far = chunk_token_count | |
# Add the last chunk if it exists | |
if current_chunk: | |
merged_sections.append('\n\n'.join(current_chunk)) | |
return merged_sections | |
def process_sections(url: str, sections: list, provider: str, api_token: str, base_url=None) -> list: | |
""" | |
Process sections of HTML content sequentially or in parallel. | |
How it works: | |
1. Sequentially processes sections with delays for "groq/" providers. | |
2. Uses ThreadPoolExecutor for parallel processing with other providers. | |
3. Extracts content blocks for each section. | |
Args: | |
url (str): The website URL. | |
sections (List[str]): The list of HTML sections to process. | |
provider (str): The AI provider for content extraction. | |
api_token (str): The API token for authentication. | |
base_url (Optional[str]): The base URL for the API. Defaults to None. | |
Returns: | |
List[dict]: The list of extracted content blocks from all sections. | |
""" | |
extracted_content = [] | |
if provider.startswith("groq/"): | |
# Sequential processing with a delay | |
for section in sections: | |
extracted_content.extend(extract_blocks(url, section, provider, api_token, base_url=base_url)) | |
time.sleep(0.5) # 500 ms delay between each processing | |
else: | |
# Parallel processing using ThreadPoolExecutor | |
with ThreadPoolExecutor() as executor: | |
futures = [executor.submit(extract_blocks, url, section, provider, api_token, base_url=base_url) for section in sections] | |
for future in as_completed(futures): | |
extracted_content.extend(future.result()) | |
return extracted_content | |
def wrap_text(draw, text, font, max_width): | |
""" | |
Wrap text to fit within a specified width for rendering. | |
How it works: | |
1. Splits the text into words. | |
2. Constructs lines that fit within the maximum width using the provided font. | |
3. Returns the wrapped text as a single string. | |
Args: | |
draw (ImageDraw.Draw): The drawing context for measuring text size. | |
text (str): The text to wrap. | |
font (ImageFont.FreeTypeFont): The font to use for measuring text size. | |
max_width (int): The maximum width for each line. | |
Returns: | |
str: The wrapped text. | |
""" | |
# Wrap the text to fit within the specified width | |
lines = [] | |
words = text.split() | |
while words: | |
line = '' | |
while words and draw.textbbox((0, 0), line + words[0], font=font)[2] <= max_width: | |
line += (words.pop(0) + ' ') | |
lines.append(line) | |
return '\n'.join(lines) | |
def format_html(html_string): | |
""" | |
Prettify an HTML string using BeautifulSoup. | |
How it works: | |
1. Parses the HTML string with BeautifulSoup. | |
2. Formats the HTML with proper indentation. | |
3. Returns the prettified HTML string. | |
Args: | |
html_string (str): The HTML string to format. | |
Returns: | |
str: The prettified HTML string. | |
""" | |
soup = BeautifulSoup(html_string, 'lxml.parser') | |
return soup.prettify() | |
def fast_format_html(html_string): | |
""" | |
A fast HTML formatter that uses string operations instead of parsing. | |
Args: | |
html_string (str): The HTML string to format | |
Returns: | |
str: The formatted HTML string | |
""" | |
# Initialize variables | |
indent = 0 | |
indent_str = " " # Two spaces for indentation | |
formatted = [] | |
in_content = False | |
# Split by < and > to separate tags and content | |
parts = html_string.replace('>', '>\n').replace('<', '\n<').split('\n') | |
for part in parts: | |
if not part.strip(): | |
continue | |
# Handle closing tags | |
if part.startswith('</'): | |
indent -= 1 | |
formatted.append(indent_str * indent + part) | |
# Handle self-closing tags | |
elif part.startswith('<') and part.endswith('/>'): | |
formatted.append(indent_str * indent + part) | |
# Handle opening tags | |
elif part.startswith('<'): | |
formatted.append(indent_str * indent + part) | |
indent += 1 | |
# Handle content between tags | |
else: | |
content = part.strip() | |
if content: | |
formatted.append(indent_str * indent + content) | |
return '\n'.join(formatted) | |
def normalize_url(href, base_url): | |
"""Normalize URLs to ensure consistent format""" | |
from urllib.parse import urljoin, urlparse | |
# Parse base URL to get components | |
parsed_base = urlparse(base_url) | |
if not parsed_base.scheme or not parsed_base.netloc: | |
raise ValueError(f"Invalid base URL format: {base_url}") | |
# Use urljoin to handle all cases | |
normalized = urljoin(base_url, href.strip()) | |
return normalized | |
def normalize_url_tmp(href, base_url): | |
"""Normalize URLs to ensure consistent format""" | |
# Extract protocol and domain from base URL | |
try: | |
base_parts = base_url.split('/') | |
protocol = base_parts[0] | |
domain = base_parts[2] | |
except IndexError: | |
raise ValueError(f"Invalid base URL format: {base_url}") | |
# Handle special protocols | |
special_protocols = {'mailto:', 'tel:', 'ftp:', 'file:', 'data:', 'javascript:'} | |
if any(href.lower().startswith(proto) for proto in special_protocols): | |
return href.strip() | |
# Handle anchor links | |
if href.startswith('#'): | |
return f"{base_url}{href}" | |
# Handle protocol-relative URLs | |
if href.startswith('//'): | |
return f"{protocol}{href}" | |
# Handle root-relative URLs | |
if href.startswith('/'): | |
return f"{protocol}//{domain}{href}" | |
# Handle relative URLs | |
if not href.startswith(('http://', 'https://')): | |
# Remove leading './' if present | |
href = href.lstrip('./') | |
return f"{protocol}//{domain}/{href}" | |
return href.strip() | |
def get_base_domain(url: str) -> str: | |
""" | |
Extract the base domain from a given URL, handling common edge cases. | |
How it works: | |
1. Parses the URL to extract the domain. | |
2. Removes the port number and 'www' prefix. | |
3. Handles special domains (e.g., 'co.uk') to extract the correct base. | |
Args: | |
url (str): The URL to extract the base domain from. | |
Returns: | |
str: The extracted base domain or an empty string if parsing fails. | |
""" | |
try: | |
# Get domain from URL | |
domain = urlparse(url).netloc.lower() | |
if not domain: | |
return "" | |
# Remove port if present | |
domain = domain.split(':')[0] | |
# Remove www | |
domain = re.sub(r'^www\.', '', domain) | |
# Extract last two parts of domain (handles co.uk etc) | |
parts = domain.split('.') | |
if len(parts) > 2 and parts[-2] in { | |
'co', 'com', 'org', 'gov', 'edu', 'net', | |
'mil', 'int', 'ac', 'ad', 'ae', 'af', 'ag' | |
}: | |
return '.'.join(parts[-3:]) | |
return '.'.join(parts[-2:]) | |
except Exception: | |
return "" | |
def is_external_url(url: str, base_domain: str) -> bool: | |
""" | |
Extract the base domain from a given URL, handling common edge cases. | |
How it works: | |
1. Parses the URL to extract the domain. | |
2. Removes the port number and 'www' prefix. | |
3. Handles special domains (e.g., 'co.uk') to extract the correct base. | |
Args: | |
url (str): The URL to extract the base domain from. | |
Returns: | |
str: The extracted base domain or an empty string if parsing fails. | |
""" | |
special = {'mailto:', 'tel:', 'ftp:', 'file:', 'data:', 'javascript:'} | |
if any(url.lower().startswith(p) for p in special): | |
return True | |
try: | |
parsed = urlparse(url) | |
if not parsed.netloc: # Relative URL | |
return False | |
# Strip 'www.' from both domains for comparison | |
url_domain = parsed.netloc.lower().replace('www.', '') | |
base = base_domain.lower().replace('www.', '') | |
# Check if URL domain ends with base domain | |
return not url_domain.endswith(base) | |
except Exception: | |
return False | |
def clean_tokens(tokens: list[str]) -> list[str]: | |
""" | |
Clean a list of tokens by removing noise, stop words, and short tokens. | |
How it works: | |
1. Defines a set of noise words and stop words. | |
2. Filters tokens based on length and exclusion criteria. | |
3. Excludes tokens starting with certain symbols (e.g., "β", "β²"). | |
Args: | |
tokens (list[str]): The list of tokens to clean. | |
Returns: | |
list[str]: The cleaned list of tokens. | |
""" | |
# Set of tokens to remove | |
noise = {'ccp', 'up', 'β', 'β²', 'β¬οΈ', 'a', 'an', 'at', 'by', 'in', 'of', 'on', 'to', 'the'} | |
STOP_WORDS = { | |
'a', 'an', 'and', 'are', 'as', 'at', 'be', 'by', 'for', 'from', | |
'has', 'he', 'in', 'is', 'it', 'its', 'of', 'on', 'that', 'the', | |
'to', 'was', 'were', 'will', 'with', | |
# Pronouns | |
'i', 'you', 'he', 'she', 'it', 'we', 'they', | |
'me', 'him', 'her', 'us', 'them', | |
'my', 'your', 'his', 'her', 'its', 'our', 'their', | |
'mine', 'yours', 'hers', 'ours', 'theirs', | |
'myself', 'yourself', 'himself', 'herself', 'itself', 'ourselves', 'themselves', | |
# Common verbs | |
'am', 'is', 'are', 'was', 'were', 'be', 'been', 'being', | |
'have', 'has', 'had', 'having', 'do', 'does', 'did', 'doing', | |
# Prepositions | |
'about', 'above', 'across', 'after', 'against', 'along', 'among', 'around', | |
'at', 'before', 'behind', 'below', 'beneath', 'beside', 'between', 'beyond', | |
'by', 'down', 'during', 'except', 'for', 'from', 'in', 'inside', 'into', | |
'near', 'of', 'off', 'on', 'out', 'outside', 'over', 'past', 'through', | |
'to', 'toward', 'under', 'underneath', 'until', 'up', 'upon', 'with', 'within', | |
# Conjunctions | |
'and', 'but', 'or', 'nor', 'for', 'yet', 'so', | |
'although', 'because', 'since', 'unless', | |
# Articles | |
'a', 'an', 'the', | |
# Other common words | |
'this', 'that', 'these', 'those', | |
'what', 'which', 'who', 'whom', 'whose', | |
'when', 'where', 'why', 'how', | |
'all', 'any', 'both', 'each', 'few', 'more', 'most', 'other', 'some', 'such', | |
'can', 'cannot', "can't", 'could', "couldn't", | |
'may', 'might', 'must', "mustn't", | |
'shall', 'should', "shouldn't", | |
'will', "won't", 'would', "wouldn't", | |
'not', "n't", 'no', 'nor', 'none' | |
} | |
# Single comprehension, more efficient than multiple passes | |
return [token for token in tokens | |
if len(token) > 2 | |
and token not in noise | |
and token not in STOP_WORDS | |
and not token.startswith('β') | |
and not token.startswith('β²') | |
and not token.startswith('β¬')] | |
def profile_and_time(func): | |
""" | |
Decorator to profile a function's execution time and performance. | |
How it works: | |
1. Records the start time before executing the function. | |
2. Profiles the function's execution using `cProfile`. | |
3. Prints the elapsed time and profiling statistics. | |
Args: | |
func (Callable): The function to decorate. | |
Returns: | |
Callable: The decorated function with profiling and timing enabled. | |
""" | |
def wrapper(self, *args, **kwargs): | |
# Start timer | |
start_time = time.perf_counter() | |
# Setup profiler | |
profiler = cProfile.Profile() | |
profiler.enable() | |
# Run function | |
result = func(self, *args, **kwargs) | |
# Stop profiler | |
profiler.disable() | |
# Calculate elapsed time | |
elapsed_time = time.perf_counter() - start_time | |
# Print timing | |
print(f"[PROFILER] Scraping completed in {elapsed_time:.2f} seconds") | |
# Print profiling stats | |
stats = pstats.Stats(profiler) | |
stats.sort_stats('cumulative') # Sort by cumulative time | |
stats.print_stats(20) # Print top 20 time-consuming functions | |
return result | |
return wrapper | |
def generate_content_hash(content: str) -> str: | |
"""Generate a unique hash for content""" | |
return xxhash.xxh64(content.encode()).hexdigest() | |
# return hashlib.sha256(content.encode()).hexdigest() | |
def ensure_content_dirs(base_path: str) -> Dict[str, str]: | |
"""Create content directories if they don't exist""" | |
dirs = { | |
'html': 'html_content', | |
'cleaned': 'cleaned_html', | |
'markdown': 'markdown_content', | |
'extracted': 'extracted_content', | |
'screenshots': 'screenshots', | |
'screenshot': 'screenshots' | |
} | |
content_paths = {} | |
for key, dirname in dirs.items(): | |
path = os.path.join(base_path, dirname) | |
os.makedirs(path, exist_ok=True) | |
content_paths[key] = path | |
return content_paths | |
def configure_windows_event_loop(): | |
""" | |
Configure the Windows event loop to use ProactorEventLoop. | |
This resolves the NotImplementedError that occurs on Windows when using asyncio subprocesses. | |
This function should only be called on Windows systems and before any async operations. | |
On non-Windows systems, this function does nothing. | |
Example: | |
```python | |
from crawl4ai.async_configs import configure_windows_event_loop | |
# Call this before any async operations if you're on Windows | |
configure_windows_event_loop() | |
``` | |
""" | |
if platform.system() == 'Windows': | |
asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy()) | |
def get_error_context(exc_info, context_lines: int = 5): | |
""" | |
Extract error context with more reliable line number tracking. | |
Args: | |
exc_info: The exception info from sys.exc_info() | |
context_lines: Number of lines to show before and after the error | |
Returns: | |
dict: Error context information | |
""" | |
import traceback | |
import linecache | |
import os | |
# Get the full traceback | |
tb = traceback.extract_tb(exc_info[2]) | |
# Get the last frame (where the error occurred) | |
last_frame = tb[-1] | |
filename = last_frame.filename | |
line_no = last_frame.lineno | |
func_name = last_frame.name | |
# Get the source code context using linecache | |
# This is more reliable than inspect.getsourcelines | |
context_start = max(1, line_no - context_lines) | |
context_end = line_no + context_lines + 1 | |
# Build the context lines with line numbers | |
context_lines = [] | |
for i in range(context_start, context_end): | |
line = linecache.getline(filename, i) | |
if line: | |
# Remove any trailing whitespace/newlines and add the pointer for error line | |
line = line.rstrip() | |
pointer = 'β' if i == line_no else ' ' | |
context_lines.append(f"{i:4d} {pointer} {line}") | |
# Join the lines with newlines | |
code_context = '\n'.join(context_lines) | |
# Get relative path for cleaner output | |
try: | |
rel_path = os.path.relpath(filename) | |
except ValueError: | |
# Fallback if relpath fails (can happen on Windows with different drives) | |
rel_path = filename | |
return { | |
"filename": rel_path, | |
"line_no": line_no, | |
"function": func_name, | |
"code_context": code_context | |
} | |