Update fiber.py
Browse files
fiber.py
CHANGED
@@ -2,6 +2,7 @@ import re
|
|
2 |
from typing import List, Dict
|
3 |
from datetime import datetime
|
4 |
from collections import Counter
|
|
|
5 |
|
6 |
class FiberDBMS:
|
7 |
def __init__(self):
|
@@ -27,16 +28,13 @@ class FiberDBMS:
|
|
27 |
self.content_index[word].append(entry_index)
|
28 |
|
29 |
def load_or_create(self, filename: str) -> None:
|
30 |
-
"""Load the database from a file or create a new one if the file does not exist."""
|
31 |
try:
|
32 |
self.load_from_file(filename)
|
33 |
print(f"Loaded {len(self.database)} entries from {filename}.")
|
34 |
except FileNotFoundError:
|
35 |
print(f"{filename} not found. Creating a new database.")
|
36 |
-
# Optionally, you can add default entries here if needed.
|
37 |
|
38 |
def query(self, query: str, top_n: int) -> List[Dict[str, str]]:
|
39 |
-
"""Query the database for entries matching the query."""
|
40 |
query_words = self._tokenize(query)
|
41 |
matching_indices = set()
|
42 |
for word in query_words:
|
@@ -64,7 +62,6 @@ class FiberDBMS:
|
|
64 |
return results
|
65 |
|
66 |
def save(self, filename: str) -> None:
|
67 |
-
"""Save the current database to a file."""
|
68 |
with open(filename, 'w', encoding='utf-8') as f:
|
69 |
for entry in self.database:
|
70 |
line = f"{entry['name']}\t{entry['timestamp']}\t{entry['content']}\t{entry['tags']}\n"
|
@@ -72,39 +69,42 @@ class FiberDBMS:
|
|
72 |
print(f"Updated database saved to {filename}.")
|
73 |
|
74 |
def _rate_result(self, entry: Dict[str, str], query_words: List[str]) -> float:
|
75 |
-
|
76 |
-
|
77 |
tags = entry['tags'].split(',')
|
78 |
|
79 |
-
unique_matches = sum(1 for word in set(query_words) if word in
|
80 |
-
content_score = sum(
|
81 |
-
name_score = sum(3 for word in query_words if word in
|
82 |
-
phrase_score = 5 if
|
83 |
unique_match_score = unique_matches * 10
|
84 |
|
85 |
-
|
86 |
-
tag_score = sum(2 for tag in tags if any(word in tag.lower() for word in query_words))
|
87 |
|
88 |
-
length_penalty = min(1, len(
|
89 |
|
90 |
return (content_score + name_score + phrase_score + unique_match_score + tag_score) * length_penalty
|
91 |
|
92 |
def _tokenize(self, text: str) -> List[str]:
|
93 |
-
|
|
|
|
|
|
|
|
|
94 |
|
95 |
def _get_snippet(self, content: str, query_words: List[str], max_length: int = 200) -> str:
|
96 |
-
|
97 |
best_start = 0
|
98 |
max_score = 0
|
99 |
|
100 |
-
for i in range(len(
|
101 |
-
snippet =
|
102 |
score = sum(snippet.count(word) * (len(word) ** 0.5) for word in query_words)
|
103 |
if score > max_score:
|
104 |
max_score = score
|
105 |
best_start = i
|
106 |
|
107 |
-
snippet =
|
108 |
return snippet + "..." if len(content) > max_length else snippet
|
109 |
|
110 |
def _update_tags(self, original_tags: str, content: str, query_words: List[str]) -> str:
|
@@ -165,3 +165,5 @@ def main():
|
|
165 |
# Save updated database with new tags
|
166 |
dbms.save("Celsiaaa.txt")
|
167 |
|
|
|
|
|
|
2 |
from typing import List, Dict
|
3 |
from datetime import datetime
|
4 |
from collections import Counter
|
5 |
+
import jieba # For Chinese word segmentation
|
6 |
|
7 |
class FiberDBMS:
|
8 |
def __init__(self):
|
|
|
28 |
self.content_index[word].append(entry_index)
|
29 |
|
30 |
def load_or_create(self, filename: str) -> None:
|
|
|
31 |
try:
|
32 |
self.load_from_file(filename)
|
33 |
print(f"Loaded {len(self.database)} entries from {filename}.")
|
34 |
except FileNotFoundError:
|
35 |
print(f"{filename} not found. Creating a new database.")
|
|
|
36 |
|
37 |
def query(self, query: str, top_n: int) -> List[Dict[str, str]]:
|
|
|
38 |
query_words = self._tokenize(query)
|
39 |
matching_indices = set()
|
40 |
for word in query_words:
|
|
|
62 |
return results
|
63 |
|
64 |
def save(self, filename: str) -> None:
|
|
|
65 |
with open(filename, 'w', encoding='utf-8') as f:
|
66 |
for entry in self.database:
|
67 |
line = f"{entry['name']}\t{entry['timestamp']}\t{entry['content']}\t{entry['tags']}\n"
|
|
|
69 |
print(f"Updated database saved to {filename}.")
|
70 |
|
71 |
def _rate_result(self, entry: Dict[str, str], query_words: List[str]) -> float:
|
72 |
+
content_tokens = self._tokenize(entry['content'])
|
73 |
+
name_tokens = self._tokenize(entry['name'])
|
74 |
tags = entry['tags'].split(',')
|
75 |
|
76 |
+
unique_matches = sum(1 for word in set(query_words) if word in content_tokens)
|
77 |
+
content_score = sum(content_tokens.count(word) for word in query_words)
|
78 |
+
name_score = sum(3 for word in query_words if word in name_tokens)
|
79 |
+
phrase_score = 5 if all(word in content_tokens for word in query_words) else 0
|
80 |
unique_match_score = unique_matches * 10
|
81 |
|
82 |
+
tag_score = sum(2 for tag in tags if any(word in self._tokenize(tag) for word in query_words))
|
|
|
83 |
|
84 |
+
length_penalty = min(1, len(content_tokens) / 100)
|
85 |
|
86 |
return (content_score + name_score + phrase_score + unique_match_score + tag_score) * length_penalty
|
87 |
|
88 |
def _tokenize(self, text: str) -> List[str]:
|
89 |
+
# Check if the text contains Chinese characters
|
90 |
+
if re.search(r'[\u4e00-\u9fff]', text):
|
91 |
+
return list(jieba.cut(text))
|
92 |
+
else:
|
93 |
+
return re.findall(r'\w+', text.lower())
|
94 |
|
95 |
def _get_snippet(self, content: str, query_words: List[str], max_length: int = 200) -> str:
|
96 |
+
content_tokens = self._tokenize(content)
|
97 |
best_start = 0
|
98 |
max_score = 0
|
99 |
|
100 |
+
for i in range(len(content_tokens) - max_length):
|
101 |
+
snippet = content_tokens[i:i+max_length]
|
102 |
score = sum(snippet.count(word) * (len(word) ** 0.5) for word in query_words)
|
103 |
if score > max_score:
|
104 |
max_score = score
|
105 |
best_start = i
|
106 |
|
107 |
+
snippet = ''.join(content_tokens[best_start:best_start+max_length])
|
108 |
return snippet + "..." if len(content) > max_length else snippet
|
109 |
|
110 |
def _update_tags(self, original_tags: str, content: str, query_words: List[str]) -> str:
|
|
|
165 |
# Save updated database with new tags
|
166 |
dbms.save("Celsiaaa.txt")
|
167 |
|
168 |
+
if __name__ == "__main__":
|
169 |
+
main()
|