import shutil
import os
import gradio as gr
from uuid import uuid4
from huggingface_hub.file_download import http_get
from langchain.document_loaders import (
CSVLoader,
EverNoteLoader,
PDFMinerLoader,
TextLoader,
UnstructuredEmailLoader,
UnstructuredEPubLoader,
UnstructuredHTMLLoader,
UnstructuredMarkdownLoader,
UnstructuredODTLoader,
UnstructuredPowerPointLoader,
UnstructuredWordDocumentLoader,
)
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.vectorstores import Chroma
from langchain.embeddings import HuggingFaceEmbeddings
from langchain.docstore.document import Document
from chromadb.config import Settings
from llama_cpp import Llama
SYSTEM_PROMPT = "Ты — Сайга, русскоязычный автоматический ассистент. Ты разговариваешь с людьми и помогаешь им."
LOADER_MAPPING = {
".csv": (CSVLoader, {}),
".doc": (UnstructuredWordDocumentLoader, {}),
".docx": (UnstructuredWordDocumentLoader, {}),
".enex": (EverNoteLoader, {}),
".epub": (UnstructuredEPubLoader, {}),
".html": (UnstructuredHTMLLoader, {}),
".md": (UnstructuredMarkdownLoader, {}),
".odt": (UnstructuredODTLoader, {}),
".pdf": (PDFMinerLoader, {}),
".ppt": (UnstructuredPowerPointLoader, {}),
".pptx": (UnstructuredPowerPointLoader, {}),
".txt": (TextLoader, {"encoding": "utf8"}),
}
def load_model(
directory: str = ".",
model_name: str = "model-q4_K.gguf",
model_url: str = "https://huggingface.co/IlyaGusev/saiga2_13b_gguf/resolve/main/model-q4_K.gguf"
):
final_model_path = os.path.join(directory, model_name)
print("Downloading all files...")
if not os.path.exists(final_model_path):
with open(final_model_path, "wb") as f:
http_get(model_url, f)
os.chmod(final_model_path, 0o777)
print("Files downloaded!")
model = Llama(
model_path=final_model_path,
n_ctx=2000,
n_parts=1,
)
print("Model loaded!")
return model
MAX_NEW_TOKENS = 1500
EMBEDDER_NAME = "sentence-transformers/paraphrase-multilingual-mpnet-base-v2"
EMBEDDER = HuggingFaceEmbeddings(model_name=EMBEDDER_NAME)
MODEL = load_model()
def get_uuid():
return str(uuid4())
def load_single_document(file_path: str) -> Document:
ext = "." + file_path.rsplit(".", 1)[-1]
assert ext in LOADER_MAPPING
loader_class, loader_args = LOADER_MAPPING[ext]
loader = loader_class(file_path, **loader_args)
return loader.load()[0]
def get_message_tokens(model, role, content):
content = f"{role}\n{content}\n"
content = content.encode("utf-8")
return model.tokenize(content, special=True)
def get_system_tokens(model):
system_message = {"role": "system", "content": SYSTEM_PROMPT}
return get_message_tokens(model, **system_message)
def upload_files(files, file_paths):
file_paths = [f.name for f in files]
return file_paths
def process_text(text):
lines = text.split("\n")
lines = [line for line in lines if len(line.strip()) > 2]
text = "\n".join(lines).strip()
if len(text) < 10:
return None
return text
def build_index(file_paths, db, chunk_size, chunk_overlap, file_warning):
documents = [load_single_document(path) for path in file_paths]
text_splitter = RecursiveCharacterTextSplitter(chunk_size=chunk_size, chunk_overlap=chunk_overlap)
documents = text_splitter.split_documents(documents)
fixed_documents = []
for doc in documents:
doc.page_content = process_text(doc.page_content)
if not doc.page_content:
continue
fixed_documents.append(doc)
db = Chroma.from_documents(
fixed_documents,
EMBEDDER,
client_settings=Settings(
anonymized_telemetry=False
)
)
file_warning = f"Загружено {len(fixed_documents)} фрагментов! Можно задавать вопросы."
return db, file_warning
def user(message, history, system_prompt):
new_history = history + [[message, None]]
return "", new_history
def retrieve(history, db, retrieved_docs, k_documents):
retrieved_docs = ""
if db:
last_user_message = history[-1][0]
retriever = db.as_retriever(search_kwargs={"k": k_documents})
docs = retriever.get_relevant_documents(last_user_message)
retrieved_docs = "\n\n".join([doc.page_content for doc in docs])
return retrieved_docs
def bot(
history,
system_prompt,
conversation_id,
retrieved_docs,
top_p,
top_k,
temp
):
if not history:
return
tokens = get_system_tokens(MODEL)[:]
tokens.append(LINEBREAK_TOKEN)
for user_message, bot_message in history[:-1]:
message_tokens = get_message_tokens(model=MODEL, role="user", content=user_message)
tokens.extend(message_tokens)
if bot_message:
message_tokens = get_message_tokens(model=MODEL, role="bot", content=bot_message)
tokens.extend(message_tokens)
last_user_message = history[-1][0]
if retrieved_docs:
last_user_message = f"Контекст: {retrieved_docs}\n\nИспользуя контекст, ответь на вопрос: {last_user_message}"
message_tokens = get_message_tokens(model=MODEL, role="user", content=last_user_message)
tokens.extend(message_tokens)
role_tokens = [MODEL.token_bos(), BOT_TOKEN, LINEBREAK_TOKEN]
tokens.extend(role_tokens)
generator = MODEL.generate(
tokens,
top_k=top_k,
top_p=top_p,
temp=temp
)
partial_text = ""
for i, token in enumerate(generator):
if token == MODEL.token_eos() or (MAX_NEW_TOKENS is not None and i >= MAX_NEW_TOKENS):
break
partial_text += MODEL.detokenize([token]).decode("utf-8", "ignore")
history[-1][1] = partial_text
yield history
with gr.Blocks(
theme=gr.themes.Soft()
) as demo:
db = gr.State(None)
conversation_id = gr.State(get_uuid)
favicon = ''
gr.Markdown(
f"""