Spaces:
Running
Running
from concurrent.futures import ThreadPoolExecutor, as_completed | |
from pathlib import Path | |
from typing import Any, List, Optional, Union | |
from doc_master import doc_master | |
from tenacity import retry, stop_after_attempt, wait_exponential | |
from swarms.utils.loguru_logger import initialize_logger | |
logger = initialize_logger(log_folder="add_docs_to_agents") | |
def _process_document(doc_path: Union[str, Path]) -> str: | |
"""Safely process a single document with retries. | |
Args: | |
doc_path: Path to the document to process | |
Returns: | |
Processed document text | |
Raises: | |
Exception: If document processing fails after retries | |
""" | |
try: | |
return doc_master( | |
file_path=str(doc_path), output_type="string" | |
) | |
except Exception as e: | |
logger.error( | |
f"Error processing document {doc_path}: {str(e)}" | |
) | |
raise | |
def handle_input_docs( | |
agents: Any, | |
docs: Optional[List[Union[str, Path]]] = None, | |
doc_folder: Optional[Union[str, Path]] = None, | |
max_workers: int = 4, | |
chunk_size: int = 1000000, | |
) -> Any: | |
""" | |
Add document content to agent prompts with improved reliability and performance. | |
Args: | |
agents: Dictionary mapping agent names to Agent objects | |
docs: List of document paths | |
doc_folder: Path to folder containing documents | |
max_workers: Maximum number of parallel document processing workers | |
chunk_size: Maximum characters to process at once to avoid memory issues | |
Raises: | |
ValueError: If neither docs nor doc_folder is provided | |
RuntimeError: If document processing fails | |
""" | |
if not agents: | |
logger.warning( | |
"No agents provided, skipping document distribution" | |
) | |
return | |
if not docs and not doc_folder: | |
logger.warning( | |
"No documents or folder provided, skipping document distribution" | |
) | |
return | |
logger.info("Starting document distribution to agents") | |
try: | |
processed_docs = [] | |
# Process individual documents in parallel | |
if docs: | |
with ThreadPoolExecutor( | |
max_workers=max_workers | |
) as executor: | |
future_to_doc = { | |
executor.submit(_process_document, doc): doc | |
for doc in docs | |
} | |
for future in as_completed(future_to_doc): | |
doc = future_to_doc[future] | |
try: | |
processed_docs.append(future.result()) | |
except Exception as e: | |
logger.error( | |
f"Failed to process document {doc}: {str(e)}" | |
) | |
raise RuntimeError( | |
f"Document processing failed: {str(e)}" | |
) | |
# Process folder if specified | |
elif doc_folder: | |
try: | |
folder_content = doc_master( | |
folder_path=str(doc_folder), output_type="string" | |
) | |
processed_docs.append(folder_content) | |
except Exception as e: | |
logger.error( | |
f"Failed to process folder {doc_folder}: {str(e)}" | |
) | |
raise RuntimeError( | |
f"Folder processing failed: {str(e)}" | |
) | |
# Combine and chunk the processed documents | |
combined_data = "\n".join(processed_docs) | |
# Update agent prompts in chunks to avoid memory issues | |
for agent in agents.values(): | |
try: | |
for i in range(0, len(combined_data), chunk_size): | |
chunk = combined_data[i : i + chunk_size] | |
if i == 0: | |
agent.system_prompt += ( | |
"\nDocuments:\n" + chunk | |
) | |
else: | |
agent.system_prompt += chunk | |
except Exception as e: | |
logger.error( | |
f"Failed to update agent prompt: {str(e)}" | |
) | |
raise RuntimeError( | |
f"Agent prompt update failed: {str(e)}" | |
) | |
logger.info( | |
f"Successfully added documents to {len(agents)} agents" | |
) | |
return agents | |
except Exception as e: | |
logger.error(f"Document distribution failed: {str(e)}") | |
raise RuntimeError(f"Document distribution failed: {str(e)}") | |