Swarms / swarms /utils /add_docs_to_agents.py
harshalmore31's picture
Synced repo using 'sync_with_huggingface' Github Action
d8d14f1 verified
raw
history blame
4.67 kB
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
from typing import Any, List, Optional, Union
from doc_master import doc_master
from tenacity import retry, stop_after_attempt, wait_exponential
from swarms.utils.loguru_logger import initialize_logger
logger = initialize_logger(log_folder="add_docs_to_agents")
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=4, max=10),
)
def _process_document(doc_path: Union[str, Path]) -> str:
"""Safely process a single document with retries.
Args:
doc_path: Path to the document to process
Returns:
Processed document text
Raises:
Exception: If document processing fails after retries
"""
try:
return doc_master(
file_path=str(doc_path), output_type="string"
)
except Exception as e:
logger.error(
f"Error processing document {doc_path}: {str(e)}"
)
raise
def handle_input_docs(
agents: Any,
docs: Optional[List[Union[str, Path]]] = None,
doc_folder: Optional[Union[str, Path]] = None,
max_workers: int = 4,
chunk_size: int = 1000000,
) -> Any:
"""
Add document content to agent prompts with improved reliability and performance.
Args:
agents: Dictionary mapping agent names to Agent objects
docs: List of document paths
doc_folder: Path to folder containing documents
max_workers: Maximum number of parallel document processing workers
chunk_size: Maximum characters to process at once to avoid memory issues
Raises:
ValueError: If neither docs nor doc_folder is provided
RuntimeError: If document processing fails
"""
if not agents:
logger.warning(
"No agents provided, skipping document distribution"
)
return
if not docs and not doc_folder:
logger.warning(
"No documents or folder provided, skipping document distribution"
)
return
logger.info("Starting document distribution to agents")
try:
processed_docs = []
# Process individual documents in parallel
if docs:
with ThreadPoolExecutor(
max_workers=max_workers
) as executor:
future_to_doc = {
executor.submit(_process_document, doc): doc
for doc in docs
}
for future in as_completed(future_to_doc):
doc = future_to_doc[future]
try:
processed_docs.append(future.result())
except Exception as e:
logger.error(
f"Failed to process document {doc}: {str(e)}"
)
raise RuntimeError(
f"Document processing failed: {str(e)}"
)
# Process folder if specified
elif doc_folder:
try:
folder_content = doc_master(
folder_path=str(doc_folder), output_type="string"
)
processed_docs.append(folder_content)
except Exception as e:
logger.error(
f"Failed to process folder {doc_folder}: {str(e)}"
)
raise RuntimeError(
f"Folder processing failed: {str(e)}"
)
# Combine and chunk the processed documents
combined_data = "\n".join(processed_docs)
# Update agent prompts in chunks to avoid memory issues
for agent in agents.values():
try:
for i in range(0, len(combined_data), chunk_size):
chunk = combined_data[i : i + chunk_size]
if i == 0:
agent.system_prompt += (
"\nDocuments:\n" + chunk
)
else:
agent.system_prompt += chunk
except Exception as e:
logger.error(
f"Failed to update agent prompt: {str(e)}"
)
raise RuntimeError(
f"Agent prompt update failed: {str(e)}"
)
logger.info(
f"Successfully added documents to {len(agents)} agents"
)
return agents
except Exception as e:
logger.error(f"Document distribution failed: {str(e)}")
raise RuntimeError(f"Document distribution failed: {str(e)}")