Spaces:
Running
Running
from server.db.models.knowledge_base_model import KnowledgeBaseModel | |
from server.db.models.knowledge_file_model import KnowledgeFileModel, FileDocModel | |
from server.db.session import with_session | |
from server.knowledge_base.utils import KnowledgeFile | |
from typing import List, Dict | |
def list_docs_from_db(session, | |
kb_name: str, | |
file_name: str = None, | |
metadata: Dict = {}, | |
) -> List[Dict]: | |
''' | |
列出某知识库某文件对应的所有Document。 | |
返回形式:[{"id": str, "metadata": dict}, ...] | |
''' | |
docs = session.query(FileDocModel).filter(FileDocModel.kb_name.ilike(kb_name)) | |
if file_name: | |
docs = docs.filter(FileDocModel.file_name.ilike(file_name)) | |
for k, v in metadata.items(): | |
docs = docs.filter(FileDocModel.meta_data[k].as_string()==str(v)) | |
return [{"id": x.doc_id, "metadata": x.metadata} for x in docs.all()] | |
def delete_docs_from_db(session, | |
kb_name: str, | |
file_name: str = None, | |
) -> List[Dict]: | |
''' | |
删除某知识库某文件对应的所有Document,并返回被删除的Document。 | |
返回形式:[{"id": str, "metadata": dict}, ...] | |
''' | |
docs = list_docs_from_db(kb_name=kb_name, file_name=file_name) | |
query = session.query(FileDocModel).filter(FileDocModel.kb_name.ilike(kb_name)) | |
if file_name: | |
query = query.filter(FileDocModel.file_name.ilike(file_name)) | |
query.delete(synchronize_session=False) | |
session.commit() | |
return docs | |
def add_docs_to_db(session, | |
kb_name: str, | |
file_name: str, | |
doc_infos: List[Dict]): | |
''' | |
将某知识库某文件对应的所有Document信息添加到数据库。 | |
doc_infos形式:[{"id": str, "metadata": dict}, ...] | |
''' | |
#! 这里会出现doc_infos为None的情况,需要进一步排查 | |
if doc_infos is None: | |
print("输入的server.db.repository.knowledge_file_repository.add_docs_to_db的doc_infos参数为None") | |
return False | |
for d in doc_infos: | |
obj = FileDocModel( | |
kb_name=kb_name, | |
file_name=file_name, | |
doc_id=d["id"], | |
meta_data=d["metadata"], | |
) | |
session.add(obj) | |
return True | |
def count_files_from_db(session, kb_name: str) -> int: | |
return session.query(KnowledgeFileModel).filter(KnowledgeFileModel.kb_name.ilike(kb_name)).count() | |
def list_files_from_db(session, kb_name): | |
files = session.query(KnowledgeFileModel).filter(KnowledgeFileModel.kb_name.ilike(kb_name)).all() | |
docs = [f.file_name for f in files] | |
return docs | |
def add_file_to_db(session, | |
kb_file: KnowledgeFile, | |
docs_count: int = 0, | |
custom_docs: bool = False, | |
doc_infos: List[Dict] = [], # 形式:[{"id": str, "metadata": dict}, ...] | |
): | |
kb = session.query(KnowledgeBaseModel).filter_by(kb_name=kb_file.kb_name).first() | |
if kb: | |
# 如果已经存在该文件,则更新文件信息与版本号 | |
existing_file: KnowledgeFileModel = (session.query(KnowledgeFileModel) | |
.filter(KnowledgeFileModel.kb_name.ilike(kb_file.kb_name), | |
KnowledgeFileModel.file_name.ilike(kb_file.filename)) | |
.first()) | |
mtime = kb_file.get_mtime() | |
size = kb_file.get_size() | |
if existing_file: | |
existing_file.file_mtime = mtime | |
existing_file.file_size = size | |
existing_file.docs_count = docs_count | |
existing_file.custom_docs = custom_docs | |
existing_file.file_version += 1 | |
# 否则,添加新文件 | |
else: | |
new_file = KnowledgeFileModel( | |
file_name=kb_file.filename, | |
file_ext=kb_file.ext, | |
kb_name=kb_file.kb_name, | |
document_loader_name=kb_file.document_loader_name, | |
text_splitter_name=kb_file.text_splitter_name or "SpacyTextSplitter", | |
file_mtime=mtime, | |
file_size=size, | |
docs_count = docs_count, | |
custom_docs=custom_docs, | |
) | |
kb.file_count += 1 | |
session.add(new_file) | |
add_docs_to_db(kb_name=kb_file.kb_name, file_name=kb_file.filename, doc_infos=doc_infos) | |
return True | |
def delete_file_from_db(session, kb_file: KnowledgeFile): | |
existing_file = (session.query(KnowledgeFileModel) | |
.filter(KnowledgeFileModel.file_name.ilike(kb_file.filename), | |
KnowledgeFileModel.kb_name.ilike(kb_file.kb_name)) | |
.first()) | |
if existing_file: | |
session.delete(existing_file) | |
delete_docs_from_db(kb_name=kb_file.kb_name, file_name=kb_file.filename) | |
session.commit() | |
kb = session.query(KnowledgeBaseModel).filter(KnowledgeBaseModel.kb_name.ilike(kb_file.kb_name)).first() | |
if kb: | |
kb.file_count -= 1 | |
session.commit() | |
return True | |
def delete_files_from_db(session, knowledge_base_name: str): | |
session.query(KnowledgeFileModel).filter(KnowledgeFileModel.kb_name.ilike(knowledge_base_name)).delete(synchronize_session=False) | |
session.query(FileDocModel).filter(FileDocModel.kb_name.ilike(knowledge_base_name)).delete(synchronize_session=False) | |
kb = session.query(KnowledgeBaseModel).filter(KnowledgeBaseModel.kb_name.ilike(knowledge_base_name)).first() | |
if kb: | |
kb.file_count = 0 | |
session.commit() | |
return True | |
def file_exists_in_db(session, kb_file: KnowledgeFile): | |
existing_file = (session.query(KnowledgeFileModel) | |
.filter(KnowledgeFileModel.file_name.ilike(kb_file.filename), | |
KnowledgeFileModel.kb_name.ilike(kb_file.kb_name)) | |
.first()) | |
return True if existing_file else False | |
def get_file_detail(session, kb_name: str, filename: str) -> dict: | |
file: KnowledgeFileModel = (session.query(KnowledgeFileModel) | |
.filter(KnowledgeFileModel.file_name.ilike(filename), | |
KnowledgeFileModel.kb_name.ilike(kb_name)) | |
.first()) | |
if file: | |
return { | |
"kb_name": file.kb_name, | |
"file_name": file.file_name, | |
"file_ext": file.file_ext, | |
"file_version": file.file_version, | |
"document_loader": file.document_loader_name, | |
"text_splitter": file.text_splitter_name, | |
"create_time": file.create_time, | |
"file_mtime": file.file_mtime, | |
"file_size": file.file_size, | |
"custom_docs": file.custom_docs, | |
"docs_count": file.docs_count, | |
} | |
else: | |
return {} | |