KKMS-KSSW-HF / src /data_loader.py
Chintan-Donda's picture
Update src/data_loader.py
f181070
import os
import re
import pandas as pd
from pathlib import Path
import glob
from llama_index import GPTVectorStoreIndex, download_loader, SimpleDirectoryReader, SimpleWebPageReader
from langchain.document_loaders import PyPDFLoader, TextLoader
from langchain.agents import initialize_agent, Tool
from langchain.llms import OpenAI
from langchain.chains.conversation.memory import ConversationBufferMemory
from langchain.docstore.document import Document
import src.utils as utils
import logging
logger = logging.getLogger(__name__)
logging.basicConfig(
format="%(asctime)s %(levelname)s [%(name)s] %(message)s", level=logging.INFO, datefmt="%Y-%m-%d %H:%M:%S"
)
import warnings
warnings.filterwarnings('ignore')
class DATA_LOADER:
def __init__(self):
# Instantiate UTILS class object
self.utils_obj = utils.UTILS()
def load_documents_from_urls(self, urls=[], doc_type='urls'):
url_documents = self.load_document(doc_type=doc_type, urls=urls)
return url_documents
def load_documents_from_pdf(self, doc_filepath='', urls=[], doc_type='pdf'):
if doc_type == 'pdf':
pdf_documents = self.load_document(doc_type=doc_type, doc_filepath=doc_filepath)
elif doc_type == 'online_pdf':
pdf_documents = self.load_document(doc_type=doc_type, urls=urls)
return pdf_documents
def load_documents_from_directory(self, doc_filepath='', doc_type='directory'):
doc_documents = self.load_document(doc_type=doc_type, doc_filepath=doc_filepath)
return doc_documents
def load_documents_from_text(self, doc_filepath='', doc_type='textfile'):
text_documents = self.load_document(doc_type=doc_type, doc_filepath=doc_filepath)
return text_documents
def pdf_loader(self, filepath):
loader = PyPDFLoader(filepath)
return loader.load_and_split()
def text_loader(self, filepath):
loader = TextLoader(filepath)
return loader.load()
def load_document(self,
doc_type='pdf',
doc_filepath='',
urls=[]
):
logger.info(f'Loading {doc_type} in raw format from: {doc_filepath}')
documents = []
# Validation checks
if doc_type in ['directory', 'pdf', 'textfile']:
if not os.path.exists(doc_filepath):
logger.warning(f"{doc_filepath} does not exist, nothing can be loaded!")
return documents
elif doc_type in ['online_pdf', 'urls']:
if len(urls) == 0:
logger.warning(f"URLs list empty, nothing can be loaded!")
return documents
######### Load documents #########
# Load PDF
if doc_type == 'pdf':
# Load multiple PDFs from directory
if os.path.isdir(doc_filepath):
pdfs = glob.glob(f"{doc_filepath}/*.pdf")
logger.info(f'Total PDF files to load: {len(pdfs)}')
for pdf in pdfs:
documents.extend(self.pdf_loader(pdf))
# Loading from a single PDF file
elif os.path.isfile(doc_filepath) and doc_filepath.endswith('.pdf'):
documents.extend(self.pdf_loader(doc_filepath))
# Load PDFs from online (urls). Can read multiple PDFs from multiple URLs in one-shot
elif doc_type == 'online_pdf':
logger.info(f'URLs to load Online PDFs are from: {urls}')
valid_urls = self.utils_obj.validate_url_format(
urls=urls,
url_type=doc_type
)
for url in valid_urls:
# Load and split PDF pages per document
documents.extend(self.pdf_loader(url))
# Load data from URLs (can load data from multiple URLs)
elif doc_type == 'urls':
logger.info(f'URLs to load data from are: {urls}')
valid_urls = self.utils_obj.validate_url_format(
urls=urls,
url_type=doc_type
)
# Load data from URLs
docs = SimpleWebPageReader(html_to_text=True).load_data(valid_urls)
docs = [Document(page_content=doc.text) for doc in docs]
documents.extend(docs)
# Load data from text file(s)
elif doc_type == 'textfile':
# Load multiple text files from directory
if os.path.isdir(doc_filepath):
text_files = glob.glob(f"{doc_filepath}/*.txt")
logger.info(f'Total text files to load: {len(text_files)}')
for tf in text_files:
documents.extend(self.text_loader(tf))
# Loading from a single text file
elif os.path.isfile(doc_filepath) and doc_filepath.endswith('.txt'):
documents.extend(self.text_loader(doc_filepath))
# Load data from files on the local directory (files may be of type .pdf, .txt, .doc, etc.)
elif doc_type == 'directory':
# Load multiple PDFs from directory
if os.path.isdir(doc_filepath):
documents = SimpleDirectoryReader(
input_dir=doc_filepath
).load_data()
# Loading from a file
elif os.path.isfile(doc_filepath):
documents.extend(SimpleDirectoryReader(
input_files=[doc_filepath]
).load_data())
# Load data from URLs in Knowledge Base format
elif doc_type == 'url-kb':
KnowledgeBaseWebReader = download_loader("KnowledgeBaseWebReader")
loader = KnowledgeBaseWebReader()
for url in urls:
doc = loader.load_data(
root_url=url,
link_selectors=['.article-list a', '.article-list a'],
article_path='/articles',
body_selector='.article-body',
title_selector='.article-title',
subtitle_selector='.article-subtitle',
)
documents.extend(doc)
# Load data from URLs and create an agent chain using ChatGPT
elif doc_type == 'url-chatgpt':
BeautifulSoupWebReader = download_loader("BeautifulSoupWebReader")
loader = BeautifulSoupWebReader()
# Load data from URLs
documents = loader.load_data(urls=urls)
# Build the Vector database
index = GPTVectorStoreIndex(documents)
tools = [
Tool(
name="Website Index",
func=lambda q: index.query(q),
description=f"Useful when you want answer questions about the text retrieved from websites.",
),
]
# Call ChatGPT API
llm = OpenAI(temperature=0) # Keep temperature=0 to search from the given urls only
memory = ConversationBufferMemory(memory_key="chat_history")
agent_chain = initialize_agent(
tools, llm, agent="zero-shot-react-description", memory=memory
)
output = agent_chain.run(input="What language is on this website?")
# Clean documents
documents = self.clean_documents(documents)
logger.info(f'{doc_type} in raw format from: {doc_filepath} loaded successfully!')
return documents
def clean_documents(
self,
documents
):
cleaned_documents = []
for document in documents:
if hasattr(document, 'page_content'):
document.page_content = self.utils_obj.replace_newlines_and_spaces(document.page_content)
elif hasattr(document, 'text'):
document.text = self.utils_obj.replace_newlines_and_spaces(document.text)
else:
document = self.utils_obj.replace_newlines_and_spaces(document)
cleaned_documents.append(document)
return cleaned_documents
def load_external_links_used_by_FTAs(self,
sheet_filepath='./data/urls_used_by_ftas/external_links_used_by_FTAs.xlsx'
):
xls = pd.ExcelFile(sheet_filepath)
df = pd.DataFrame(columns=['S.No.', 'Link used for', 'Link type', 'Link'])
for sheet_name in xls.sheet_names:
sheet = pd.read_excel(xls, sheet_name)
if sheet.shape[0] > 0:
df = pd.concat([df, sheet])
else:
logger.info(f'{sheet_name} has no content.')
df = df[['Link used for', 'Link type', 'Link']]
# Clean df
df = self.utils_obj.clean_df(df)
logger.info(f'Total links available across all cities: {df.shape[0]}')
return df