Arcana / fiber.py
Ocillus's picture
Update fiber.py
b6d4bd1 verified
import re
from typing import List, Dict
from datetime import datetime
from collections import Counter
import jieba # For Chinese word segmentation
class FiberDBMS:
def __init__(self):
self.database: List[Dict[str, str]] = []
self.content_index: Dict[str, List[int]] = {}
def add_entry(self, name: str, content: str, tags: str) -> None:
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
entry = {
"name": name,
"timestamp": timestamp,
"content": content,
"tags": tags
}
self.database.append(entry)
self._index_content(len(self.database) - 1, content)
def _index_content(self, entry_index: int, content: str) -> None:
words = self._tokenize(content)
for word in words:
if word not in self.content_index:
self.content_index[word] = []
self.content_index[word].append(entry_index)
def load_or_create(self, filename: str) -> None:
try:
self.load_from_file(filename)
print(f"Loaded {len(self.database)} entries from {filename}.")
except FileNotFoundError:
print(f"{filename} not found. Creating a new database.")
def query(self, query: str, top_n: int) -> List[Dict[str, str]]:
query_words = self._tokenize(query)
matching_indices = set()
for word in query_words:
if word in self.content_index:
matching_indices.update(self.content_index[word])
sorted_results = sorted(
matching_indices,
key=lambda idx: self._rate_result(self.database[idx], query_words),
reverse=True
)
results = []
for idx in sorted_results[:top_n]:
entry = self.database[idx]
snippet = self._get_snippet(entry['content'], query_words)
updated_tags = self._update_tags(entry['tags'], entry['content'], query_words)
results.append({
'name': entry['name'],
'content': snippet,
'tags': updated_tags,
'index': idx
})
return results
def save(self, filename: str) -> None:
with open(filename, 'w', encoding='utf-8') as f:
for entry in self.database:
line = f"{entry['name']}\t{entry['timestamp']}\t{entry['content']}\t{entry['tags']}\n"
f.write(line)
print(f"Updated database saved to {filename}.")
def _rate_result(self, entry: Dict[str, str], query_words: List[str]) -> float:
content_tokens = self._tokenize(entry['content'])
name_tokens = self._tokenize(entry['name'])
tags = entry['tags'].split(',')
unique_matches = sum(1 for word in set(query_words) if word in content_tokens)
content_score = sum(content_tokens.count(word) for word in query_words)
name_score = sum(3 for word in query_words if word in name_tokens)
phrase_score = 5 if all(word in content_tokens for word in query_words) else 0
unique_match_score = unique_matches * 10
tag_score = sum(2 for tag in tags if any(word in self._tokenize(tag) for word in query_words))
length_penalty = min(1, len(content_tokens) / 100)
return (content_score + name_score + phrase_score + unique_match_score + tag_score) * length_penalty
def _tokenize(self, text: str) -> List[str]:
# Check if the text contains Chinese characters
if re.search(r'[\u4e00-\u9fff]', text):
return list(jieba.cut(text))
else:
return re.findall(r'\w+', text.lower())
def _get_snippet(self, content: str, query_words: List[str], max_length: int = 200) -> str:
content_tokens = self._tokenize(content)
best_start = 0
max_score = 0
for i in range(len(content_tokens) - max_length):
snippet = content_tokens[i:i+max_length]
score = sum(snippet.count(word) * (len(word) ** 0.5) for word in query_words)
if score > max_score:
max_score = score
best_start = i
snippet = ''.join(content_tokens[best_start:best_start+max_length])
return snippet + "..." if len(content) > max_length else snippet
def _update_tags(self, original_tags: str, content: str, query_words: List[str]) -> str:
tags = original_tags.split(',')
original_tag = tags[0] # Keep the first tag unchanged
words = self._tokenize(content)
word_counts = Counter(words)
relevant_keywords = [word for word in query_words if word in word_counts and word not in tags]
relevant_keywords += [word for word, count in word_counts.most_common(5) if word not in tags and word not in query_words]
updated_tags = [original_tag] + tags[1:] + relevant_keywords
return ','.join(updated_tags)
def load_from_file(self, filename: str) -> None:
self.database.clear()
self.content_index.clear()
with open(filename, 'r', encoding='utf-8') as f:
for idx, line in enumerate(f):
name, timestamp, content, tags = line.strip().split('\t')
self.database.append({
"name": name,
"timestamp": timestamp,
"content": content,
"tags": tags
})
self._index_content(idx, content)
def main():
dbms = FiberDBMS()
# Load or create the database
dbms.load_or_create("Celsiaaa.txt")
while True:
query = input("\nEnter your search query (or 'quit' to exit): ")
if query.lower() == 'quit':
break
try:
top_n = int(input("Enter the number of top results to display: "))
except ValueError:
print("Invalid input. Using default value of 5.")
top_n = 5
results = dbms.query(query, top_n)
if results:
print(f"\nTop {len(results)} results for '{query}':")
for idx, result in enumerate(results, 1):
print(f"\nResult {idx}:")
print(f"Name: {result['name']}")
print(f"Content: {result['content']}")
print(f"Tags: {result['tags']}")
else:
print(f"No results found for '{query}'.")
# Save updated database with new tags
dbms.save("Celsiaaa.txt")
if __name__ == "__main__":
main()