Spaces:
Runtime error
Runtime error
from contextlib import contextmanager | |
import gradio as gr | |
from database.operation import * | |
from memorize import * | |
from database import SessionLocal, engine, Base | |
import database.schema as schema | |
from database import constant | |
import time | |
import asyncio | |
import pandas as pd | |
from collections import defaultdict | |
from datetime import datetime | |
Base.metadata.create_all(bind=engine) | |
db = SessionLocal() | |
def session_scope(): | |
try: | |
yield db | |
db.commit() | |
except Exception: | |
db.rollback() | |
raise | |
finally: | |
db.close() | |
intro = """\ | |
目标场景:只考虑记住单词及其意思,使得能无障碍阅读,不考虑用于写作。 | |
主要想法:批量记单词,每批 n 个单词,这 n 个单词用 AI 生成故事,复述故事即可记住单词。 | |
为什么? | |
- 批量记单词,一次可以记住 n 个单词,而不是一个一个记,效率高。 | |
- 复述故事,即费曼学习法,故事是单词的记忆之锚。 | |
- 复述故事而不是复述单词,故事具有连续性,更符合人类天性,容易记。 | |
### 使用建议 | |
1. 记单词前,先完整过一遍全部单词,剔除已记住的单词,从而提高新词密度 | |
2. 先看单词表格,然后看英文故事,对照中文完成记忆 | |
3. 记忆完成后需要一个一个勾选已记住的单词,勾选时尝试复述单词意思,以此来检验记忆效果 | |
> 本项目基于[开源数据集](https://github.com/LinXueyuanStdio/DictionaryData),并且[开源代码](https://github.com/LinXueyuanStdio/oh-my-words),欢迎大家贡献代码~ | |
""" | |
with gr.Blocks(title="批量记单词") as demo: | |
# gr.Markdown("# 批量记单词") | |
gr.HTML("<h1 align=\"center\">批量记单词</h1>") | |
user = gr.State(value={}) | |
# 0. 登录 | |
with gr.Tab("主页"): | |
gr.Markdown(intro) | |
gr.Markdown(f"共 {get_book_count(db)} 本书") | |
gr.HTML("""<iframe src="https://ghbtns.com/github-btn.html?user=LinXueyuanStdio&repo=oh-my-words&type=star&count=true&size=small" frameborder="0" scrolling="0" width="170" height="30" title="GitHub"></iframe>""") | |
with gr.Row(): | |
with gr.Column(): | |
email = gr.TextArea(value=constant.email, lines=1, label="邮箱") | |
password = gr.TextArea(value=constant.password, lines=1, label="密码") | |
login_btn = gr.Button("登录") | |
with gr.Column(): | |
register_email = gr.TextArea(value='', lines=1, label="邮箱") | |
register_password = gr.TextArea(value='', lines=1, label="密码") | |
register_btn = gr.Button("立即注册", variant="primary") | |
user_status = gr.Textbox("", lines=1, label="用户状态") | |
# 1. 创建记忆计划 | |
tab1 = gr.Tab("创建记忆计划", visible=False) | |
with tab1: | |
select_book = gr.Dropdown([], label="单词书", info="选择一本单词书") | |
batch_size = gr.Number(value=10, label="批次大小") | |
randomize = gr.Checkbox(value=True, label="以单词乱序进行记忆") | |
title = gr.TextArea(value='单词书', lines=1, label="记忆计划的名称") | |
btn = gr.Button("创建记忆计划") | |
status = gr.Textbox("", lines=1, label="状态") | |
def submit(user: Dict[str, str], book, title, randomize, batch_size): | |
user_id = user.get("id", None) | |
if user_id is None: | |
gr.Error("请先登录") | |
return "请先登录" | |
book_id = book.split(" [")[1][:-1] | |
user_book = create_user_book(db, UserBookCreate( | |
owner_id=user_id, | |
book_id=book_id, | |
title=title, | |
random=randomize, | |
batch_size=batch_size | |
)) | |
if user_book is not None: | |
return "成功" | |
else: | |
return "失败" | |
btn.click(submit, [user, select_book, title, randomize, batch_size], [status]) | |
def on_select(user: Dict[str, str], evt: gr.SelectData): | |
user_id = user.get("id", None) | |
new_options = [] | |
if user_id is None: | |
return gr.Dropdown(choices=new_options), "请先登录" | |
books = get_all_books_for_user(db, user_id) | |
new_options = [f"{'⭐ ' if book.permission == 'private' else ''}{book.bk_name} (共 {book.bk_item_num} 词) [{book.bk_id}]" for book in books] | |
return gr.Dropdown(choices=new_options), f"您好,{user['email']}" | |
tab1.select(on_select, [user], [select_book, status]) | |
# 2. 选择单词分批 | |
with gr.Tab("选择单词分批") as tab2: | |
select_user_book = gr.Dropdown( | |
[], label="记忆计划", info="请选择记忆计划" | |
) | |
word_count = gr.Number(value=0, label="单词个数") | |
known_words = gr.CheckboxGroup( | |
[], label="已学会的单词", info="正式记忆前将去除已学会的单词,提高每个批次的新词密度,进而提高效率" | |
) | |
btn = gr.Button("生成批次") | |
status = gr.Textbox("3000 词大概要 2 小时才能写完所有的故事", lines=1, label="生成结果") | |
def on_select_user(user): | |
user_id = user.get("id", None) | |
if user_id is None: | |
gr.Error("请先登录") | |
return gr.Dropdown(choices=[]), "请先登录" | |
new_options = [] | |
user_book = get_user_books_by_owner_id(db, user_id) | |
new_options = [f"{book.title} | {book.batch_size}个单词一组 [{book.id}]" for book in user_book] | |
return gr.Dropdown(choices=new_options), "3000 词大概要 2 小时才能写完所有的故事" | |
def on_select_user_book(user_book): | |
logger.debug(f'user_book {user_book}') | |
if user_book is None: | |
return 0, gr.CheckboxGroup(choices=[]) | |
new_options = [] | |
user_book_id = user_book.split(" [")[1][:-1] | |
user_book = get_user_book(db, user_book_id) | |
book_id = user_book.book_id | |
book = get_book(db, book_id) | |
if book is None: | |
return 0, gr.CheckboxGroup(choices=[]) | |
words = get_words_for_book(db, user_book) | |
new_options = [f"{word.vc_vocabulary}" for word in words] | |
return len(words), gr.CheckboxGroup(choices=new_options) | |
select_user_book.select(on_select_user_book, inputs=[select_user_book], outputs=[word_count, known_words]) | |
tab2.select(on_select_user, [user], [select_user_book, status]) | |
def submit(user_book, known_words): | |
start_time = time.time() | |
user_book_id = user_book.split(" [")[1][:-1] | |
user_book = get_user_book(db, user_book_id) | |
all_words = get_words_for_book(db, user_book) | |
unknown_words = [] | |
for w in all_words: | |
if w.vc_vocabulary not in known_words: | |
unknown_words.append(w) | |
track(db, user_book, unknown_words) | |
end_time = time.time() | |
duration = end_time - start_time | |
return f"成功!分为 {len(unknown_words) // user_book.batch_size} 个批次,共 {len(unknown_words)} 个单词,耗时 {duration:.2f} 秒" | |
btn.click(submit, [select_user_book, known_words], [status]) | |
# 3. 记忆 | |
with gr.Tab("记忆") as tab3: | |
select_user_book = gr.Dropdown( | |
[], label="记忆计划", info="请选择记忆计划" | |
) | |
info = gr.Accordion(f"新词", open=False) | |
with info: | |
gr.Markdown(f"新词") | |
dataframe_header = ["单词", "中文词意", "英式音标", "美式音标", "记忆量"] | |
memorizing_dataframe = gr.Dataframe( | |
headers=dataframe_header, | |
datatype=["str"] * len(dataframe_header), | |
col_count=(len(dataframe_header), "fixed"), | |
wrap=True, | |
) | |
batches = gr.State(value=[]) | |
current_batch_index = gr.State(value=-1) | |
user_book_id = gr.State(value=None) | |
with gr.Row(): | |
# story = gr.HighlightedText([]) | |
# translated_story = gr.HighlightedText([]) | |
# story = gr.Textbox() | |
# translated_story = gr.Textbox() | |
story = gr.Markdown() | |
translated_story = gr.Markdown() | |
# 试了一下,还是 markdown 的显示效果好 | |
memorize_action = gr.CheckboxGroup(choices=[], label="记住的单词", info="能够复述出意思才算记住") | |
with gr.Row(): | |
previous_batch_btn = gr.Button("上一批") | |
regenerate_btn = gr.Button("重新生成故事") | |
next_batch_btn = gr.Button("下一批", variant="primary") | |
progress = gr.Slider(1, 1, value=1, step=1, label="进度", info="") | |
def on_select_user(user): | |
user_id = user.get("id", None) | |
if user_id is None: | |
gr.Error("请先登录") | |
return gr.Dropdown(choices=[]) | |
new_options = [] | |
user_book = get_user_books_by_owner_id(db, user_id) | |
new_options = [f"{book.title} | {book.batch_size}个单词一组 [{book.id}]" for book in user_book] | |
return gr.Dropdown(choices=new_options) | |
def update_from_batch(memorizing_batch: UserMemoryBatch): | |
new_options = [] | |
word_df = [] | |
# logger.debug(get_user_memory_batch(db, memorizing_batch.id)) | |
# logger.debug(memorizing_batch.id) | |
# logger.debug(get_user_memory_words_by_batch_id(db, memorizing_batch.id)) | |
# logger.debug(get_words_by_ids(db, [w.word_id for w in get_user_memory_words_by_batch_id(db, memorizing_batch.id)])) | |
# words = get_words_in_batch(db, memorizing_batch.id) | |
# words = get_words_by_ids(db, [w.word_id for w in memorizing_words]) | |
memorizing_words = get_user_memory_words_by_batch_id(db, memorizing_batch.id) | |
words = get_words_by_ids(db, [w.word_id for w in memorizing_words]) | |
# 统计记忆量 | |
actions = get_actions_at_each_word(db, [w.word_id for w in memorizing_words]) | |
remember_count = defaultdict(int) | |
forget_count = defaultdict(int) | |
for a in actions: | |
if a.action == "remember": | |
remember_count[a.word_id] += 1 | |
else: | |
forget_count[a.word_id] += 1 | |
# 统计记忆效率 | |
batch_actions = get_user_memory_batch_actions_by_user_memory_batch_id(db, memorizing_batch.id) | |
batch_actions.sort(key=lambda x: x.create_time) | |
start, end = None, None | |
total_duration = None | |
for a in batch_actions: | |
if a.action == "start": | |
start: datetime = a.create_time | |
elif a.action == "end": | |
end: datetime = a.create_time | |
if start is None: | |
continue | |
if total_duration is None: | |
total_duration = end - start | |
else: | |
total_duration += end - start | |
memory_speed = f"{memorizing_batch.batch_type}" | |
if total_duration is not None: | |
sec = total_duration.total_seconds() | |
minutes = sec / 60 | |
memory_speed += f":当前批次记忆效率 {len(memorizing_words) / minutes:.2f} 词/分钟,{minutes:.2f} 分钟/批次" | |
# 单词信息表格与勾选 | |
for w in words: | |
new_options.append(f"{w.vc_vocabulary}") | |
word_df.append([ | |
w.vc_vocabulary, # 单词 | |
w.vc_translation, # 中文词意 | |
w.vc_phonetic_uk, # 英式音标 | |
w.vc_phonetic_us, # 美式音标 | |
f"{remember_count[w.vc_id]} / {remember_count[w.vc_id] + forget_count[w.vc_id]}", # 记忆量 | |
]) | |
df = pd.DataFrame(word_df, columns=dataframe_header) | |
if memorizing_batch.batch_type == "回忆": | |
df = pd.DataFrame([[row[0], "", row[2], row[3], row[4]] for row in word_df], columns=dataframe_header) | |
# 故事 | |
story = memorizing_batch.story | |
translated_story = memorizing_batch.translated_story | |
if len(story) == 0 or len(translated_story) == 0: | |
story, translated_story = regenerate_for_batch(memorizing_batch, words) | |
logger.info("计算批次信息") | |
logger.info(new_options) | |
logger.info(story) | |
logger.info(translated_story) | |
logger.info("=" * 8) | |
return (gr.Accordion(label=memory_speed), df, story, translated_story, gr.CheckboxGroup(choices=new_options)) | |
def on_select_user_book(user_book_id: str): | |
""" | |
1. 当前单词 | |
2. 对当前单词的操作 | |
3. 故事 | |
""" | |
logger.debug(f'user_book {user_book_id}') | |
if user_book_id is None: | |
# 为什么会空?这里返回的东西可能会爆炸,但好像执行不到这里 | |
# 不管了,放个告示牌在这里,大家看见这个坑请绕着走 | |
return [], gr.CheckboxGroup(choices=[]) | |
user_book_id: str = user_book_id.split(" [")[1][:-1] | |
user_book = get_user_book(db, user_book_id) | |
batches = get_new_user_memory_batches_by_user_book_id(db, user_book_id) # 只缓存新词 | |
batch_id = user_book.memorizing_batch | |
memorizing_batch = get_user_memory_batch(db, batch_id) | |
current_batch_index = -1 | |
if memorizing_batch is not None: | |
for index, b in enumerate(batches): | |
if b.id == memorizing_batch.id: | |
current_batch_index = index | |
break | |
if current_batch_index == -1: | |
# 当前还没开始记忆,或者当前批次不是新词批次 | |
current_batch_index = 0 | |
memorizing_batch = batches[0] | |
batch_id = memorizing_batch.id | |
user_book.memorizing_batch = batch_id | |
update_user_book(db, user_book_id, UserBookUpdate( | |
owner_id=user_book.owner_id, | |
book_id=user_book.book_id, | |
title=user_book.title, | |
random=user_book.random, | |
batch_size=user_book.batch_size, | |
memorizing_batch=batch_id | |
)) | |
updates = update_from_batch(memorizing_batch) | |
on_batch_start(db, memorizing_batch.id) | |
asyncio.run(pregenerate(batches, current_batch_index)) | |
return (batches, current_batch_index, user_book) + updates + ( | |
gr.Slider( | |
minimum=1, | |
maximum=len(batches), | |
value=current_batch_index, | |
),) | |
batch_widget = [info, memorizing_dataframe, story, translated_story, memorize_action] | |
tab3.select(on_select_user, inputs=[user], outputs=[select_user_book]) | |
select_user_book.select( | |
on_select_user_book, | |
inputs=[select_user_book], | |
outputs=[batches, current_batch_index, user_book_id] + batch_widget + [progress] | |
) | |
async def worker_regenerate_for_batch(batches: List[UserMemoryBatch], index: int): | |
started_at = time.monotonic() | |
logger.info(f"started {index}") | |
# start | |
batch = batches[index] | |
story = batch.story | |
translated_story = batch.translated_story | |
if len(story) == 0 or len(translated_story) == 0: | |
batch_words = get_words_in_batch(db, batch.id) | |
regenerate_for_batch(batch, batch_words) | |
# end | |
total = time.monotonic() - started_at | |
logger.info(f'completed in {total:.2f} seconds') | |
async def pregenerate(batches: List[UserMemoryBatch], current_batch_index: int): | |
logger.info("开始预生成故事") | |
indexes = [current_batch_index+i+1 for i in range(3)]+[current_batch_index-i-1 for i in range(3)] | |
indexes = [i for i in indexes if 0 <= i < len(batches)] | |
for index in indexes: | |
asyncio.ensure_future(worker_regenerate_for_batch(batches, index)) | |
logger.info("结束预生成故事") | |
def submit_batch(batches: List[UserMemoryBatch], current_batch_index: int): | |
memorizing_batch = batches[current_batch_index] | |
return set_memorizing_batch(batches, current_batch_index, memorizing_batch) | |
def set_memorizing_batch(batches: List[UserMemoryBatch], current_batch_index: int, memorizing_batch: UserMemoryBatch): | |
updates = update_from_batch(memorizing_batch) | |
asyncio.run(pregenerate(batches, current_batch_index)) | |
logger.info("pregenerated") | |
return updates + (gr.Slider(value=current_batch_index+1), current_batch_index) | |
def save_progress(old_batch: UserMemoryBatch, memorize_action: List[str]): | |
# 保存单词记忆进度 | |
actions = [] | |
words = get_words_in_batch(db, old_batch.id) | |
for word in words: | |
if word.vc_vocabulary in memorize_action: | |
actions.append((word.vc_id, "remember")) | |
else: | |
actions.append((word.vc_id, "forget")) | |
save_memorizing_word_action(db, old_batch.id, actions) | |
def previous_batch(batches: List[UserMemoryBatch], current_batch_index: int, user_book: schema.UserBook, memorize_action: List[str]): | |
old_index = current_batch_index | |
if current_batch_index <= 0: | |
current_batch_index = 0 | |
elif current_batch_index > 0: | |
current_batch_index -= 1 | |
if current_batch_index != old_index: | |
# 下一页之前需要保存记忆进度 | |
# logger.info("下一页之前需要保存记忆进度") | |
# logger.info(memorize_action) | |
# 保存批次进度 | |
old_batch = batches[old_index] | |
current_batch = batches[current_batch_index] | |
save_progress(old_batch, memorize_action) | |
on_batch_end(db, old_batch.id) | |
on_batch_start(db, current_batch.id) | |
user_book_id = user_book.id | |
update_user_book_memorizing_batch(db, user_book_id, current_batch.id) | |
return submit_batch(batches, current_batch_index) | |
def next_batch(batches: List[UserMemoryBatch], current_batch_index: int, user_book: schema.UserBook, memorize_action: List[str]): | |
old_index = current_batch_index | |
if current_batch_index >= len(batches)-1: | |
current_batch_index = len(batches)-1 | |
elif current_batch_index < len(batches) - 1: | |
current_batch_index += 1 | |
if current_batch_index != old_index: | |
# 下一页之前需要保存记忆进度 | |
# logger.info("下一页之前需要保存记忆进度") | |
# logger.info(memorize_action) | |
# 保存批次进度 | |
old_batch = batches[old_index] | |
memorizing_batch = get_user_memory_batch(db, user_book.memorizing_batch) | |
if memorizing_batch is not None: | |
old_batch = memorizing_batch | |
current_batch = batches[current_batch_index] | |
save_progress(old_batch, memorize_action) | |
on_batch_end(db, old_batch.id) | |
next_batch = generate_next_batch(db, user_book, minutes=60, k=3) | |
if next_batch is not None: | |
current_batch = next_batch | |
on_batch_start(db, current_batch.id) | |
user_book_id = user_book.id | |
update_user_book_memorizing_batch(db, user_book_id, current_batch.id) | |
if next_batch is not None: | |
return set_memorizing_batch(batches, old_index, current_batch) | |
else: | |
return set_memorizing_batch(batches, current_batch_index, current_batch) | |
else: | |
memorizing_batch = get_user_memory_batch(db, user_book.memorizing_batch) | |
current_batch = batches[current_batch_index] | |
save_progress(memorizing_batch, memorize_action) | |
on_batch_end(db, memorizing_batch.id) | |
next_batch = generate_next_batch(db, user_book, minutes=60, k=3) | |
if next_batch is not None: | |
current_batch = next_batch | |
on_batch_start(db, current_batch.id) | |
user_book_id = user_book.id | |
update_user_book_memorizing_batch(db, user_book_id, current_batch.id) | |
if next_batch is not None: | |
return set_memorizing_batch(batches, old_index, current_batch) | |
else: | |
return set_memorizing_batch(batches, current_batch_index, current_batch) | |
previous_batch_btn.click( | |
previous_batch, | |
inputs=[batches, current_batch_index, user_book_id, memorize_action], | |
outputs=batch_widget + [progress, current_batch_index] | |
) | |
next_batch_btn.click( | |
next_batch, | |
inputs=[batches, current_batch_index, user_book_id, memorize_action], | |
outputs=batch_widget + [progress, current_batch_index] | |
) | |
def regenerate_for_batch(memorizing_batch: UserMemoryBatch, batch_words: List[Word]): | |
batch_words_str_list = [word.vc_vocabulary for word in batch_words] | |
logger.info(f"生成故事 {batch_words_str_list}") | |
story, translated_story = generate_story_and_translated_story(batch_words_str_list) | |
memorizing_batch.story = story | |
memorizing_batch.translated_story = translated_story | |
db.commit() | |
db.refresh(memorizing_batch) | |
create_user_memory_batch_generation_history(db, UserMemoryBatchGenerationHistoryCreate( | |
batch_id=memorizing_batch.id, | |
story=story, | |
translated_story=translated_story | |
)) | |
logger.info(story) | |
logger.info(translated_story) | |
return story, translated_story | |
def regenerate(batches: List[UserMemoryBatch], current_batch_index: int): | |
# 重新生成故事 | |
memorizing_batch = batches[current_batch_index] | |
batch_words = get_words_in_batch(db, memorizing_batch.id) | |
story, translated_story = regenerate_for_batch(memorizing_batch, batch_words) | |
return story, translated_story | |
regenerate_btn.click(regenerate, inputs=[batches, current_batch_index], outputs=[story, translated_story]) | |
# 4. 从记忆计划中创建单词书 | |
with gr.Tab("从记忆计划中创建单词书") as tab4: | |
select_user_book = gr.Dropdown( | |
[], label="记忆计划", info="请选择记忆计划" | |
) | |
word_count = gr.Number(value=0, label="单词个数") | |
known_words = gr.CheckboxGroup( | |
[], label="已学会的单词", info="请检查已学会的单词,这些单词将不会被包含在新的单词书中" | |
) | |
title = gr.TextArea(value='单词书', lines=1, label="单词书的名称") | |
btn = gr.Button("从记忆计划中创建单词书") | |
status = gr.Textbox("", lines=1, label="状态") | |
def on_select_user(user): | |
user_id = user.get("id", None) | |
if user_id is None: | |
gr.Error("请先登录") | |
return gr.Dropdown(choices=[]) | |
new_options = [] | |
user_book = get_user_books_by_owner_id(db, user_id) | |
new_options = [f"{book.title} | {book.batch_size}个单词一组 [{book.id}]" for book in user_book] | |
return gr.Dropdown(choices=new_options) | |
def on_select_user_book(user_book): | |
logger.debug(f'user_book {user_book}') | |
if user_book is None: | |
return 0, gr.CheckboxGroup(choices=[]) | |
new_options = [] | |
user_book_id = user_book.split(" [")[1][:-1] | |
words = get_words_in_user_book(db, user_book_id) | |
new_options = [f"{word.vc_vocabulary}" for word in words] | |
return len(words), gr.CheckboxGroup(choices=new_options) | |
tab4.select(on_select_user, inputs=[user], outputs=[select_user_book]) | |
select_user_book.select(on_select_user_book, inputs=[select_user_book], outputs=[word_count, known_words]) | |
def submit(user, user_book, known_words, title): | |
user_id = user.get("id", None) | |
if user_id is None: | |
gr.Error("请先登录") | |
return "请先登录" | |
user_book_id = user_book.split(" [")[1][:-1] | |
all_words = get_words_in_user_book(db, user_book_id) | |
unknown_words = [] | |
for w in all_words: | |
if w.vc_vocabulary not in known_words: | |
unknown_words.append(w) | |
# all_words = get_words_by_vocabulary(db, known_words) | |
book = save_words_as_book(db, user_id, unknown_words, title) | |
if book is not None: | |
return f"成功生成一本单词书:{book.bk_name}" | |
else: | |
return "失败" | |
btn.click(submit, [user, select_user_book, known_words, title], [status]) | |
# 5. 统计 | |
with gr.Tab("统计") as tab5: | |
# 5.1. 故事生成历史 | |
with gr.Tab("AI 历史记录") as tab51: | |
select_user_book = gr.Dropdown( | |
[], label="记忆计划", info="请选择记忆计划" | |
) | |
history_header = ["单词", "故事", "中文故事", "生成时间"] | |
history_dataframe = gr.Dataframe( | |
headers=history_header, | |
datatype=["str"] * len(history_header), | |
col_count=(len(history_header), "fixed"), | |
wrap=True, | |
min_width=320, | |
height=800, | |
) | |
def on_select_user(user): | |
user_id = user.get("id", None) | |
if user_id is None: | |
gr.Error("请先登录") | |
return gr.Dropdown(choices=[]) | |
new_options = [] | |
user_book = get_user_books_by_owner_id(db, user_id) | |
new_options = [f"{book.title} | {book.batch_size}个单词一组 [{book.id}]" for book in user_book] | |
return gr.Dropdown(choices=new_options) | |
def on_select_user_book(user_book_id): | |
logger.debug(f'user_book {user_book_id}') | |
if user_book_id is None: | |
return 0, gr.CheckboxGroup(choices=[]) | |
user_book_id = user_book_id.split(" [")[1][:-1] | |
batch_id_to_words_and_history = get_generation_hostorys_by_user_book_id(db, user_book_id) | |
data = [] | |
for batch_id, (words, histories) in batch_id_to_words_and_history.items(): | |
for history in histories: | |
word = ", ".join([w.vc_vocabulary for w in words]) | |
story = history.story | |
translated_story = history.translated_story | |
create_time = history.create_time | |
data.append([word, story, translated_story, create_time]) | |
df = pd.DataFrame(data, columns=history_header) | |
return df | |
tab51.select(on_select_user, inputs=[user], outputs=[select_user_book]) | |
select_user_book.select(on_select_user_book, inputs=[select_user_book], outputs=[history_dataframe]) | |
# 5.2. 记忆历史记录 | |
with gr.Tab("记忆历史记录") as tab52: | |
select_user_book = gr.Dropdown( | |
[], label="记忆计划", info="请选择记忆计划" | |
) | |
batch_history_header = ["单词", "故事", "中文故事", "批次类型", "记忆情况", "生成时间"] | |
batch_history_dataframe = gr.Dataframe( | |
headers=batch_history_header, | |
datatype=["str"] * len(batch_history_header), | |
col_count=(len(batch_history_header), "fixed"), | |
wrap=True, | |
min_width=320, | |
height=800, | |
) | |
def on_select_user(user): | |
user_id = user.get("id", None) | |
if user_id is None: | |
gr.Error("请先登录") | |
return gr.Dropdown(choices=[]) | |
new_options = [] | |
user_book = get_user_books_by_owner_id(db, user_id) | |
new_options = [f"{book.title} | {book.batch_size}个单词一组 [{book.id}]" for book in user_book] | |
return gr.Dropdown(choices=new_options) | |
def on_select_user_book(user_book_id): | |
logger.debug(f'user_book {user_book_id}') | |
if user_book_id is None: | |
return 0, gr.CheckboxGroup(choices=[]) | |
user_book_id = user_book_id.split(" [")[1][:-1] | |
actions, batch_id_to_batch, batch_id_to_words, batch_id_to_actions = get_user_memory_batch_history(db, user_book_id) | |
data = [] | |
for action in actions: | |
batch_id = action.batch_id | |
words = batch_id_to_words[batch_id] | |
word = ", ".join([w.vc_vocabulary for w in words]) | |
batch = batch_id_to_batch[batch_id] | |
story = batch.story | |
translated_story = batch.translated_story | |
batch_type = batch.batch_type | |
memory_actions = batch_id_to_actions.get(batch_id, []) | |
remember_word_ids = {a.word_id for a in memory_actions if a.action == "remember"} | |
remember_words = [] | |
forget_words = [] | |
for w in words: | |
if w.vc_id in remember_word_ids: | |
remember_words.append(w.vc_vocabulary) | |
else: | |
forget_words.append(w.vc_vocabulary) | |
memory_status = f"记住 {len(remember_words)} 个单词,忘记 {len(forget_words)} 个单词" | |
memory_status += f",记住的单词:{', '.join(remember_words)}" | |
memory_status += f",忘记的单词:{', '.join(forget_words)}" | |
create_time = action.create_time | |
data.append([word, story, translated_story, batch_type, memory_status, create_time]) | |
df = pd.DataFrame(data, columns=batch_history_header) | |
return df | |
tab52.select(on_select_user, inputs=[user], outputs=[select_user_book]) | |
select_user_book.select(on_select_user_book, inputs=[select_user_book], outputs=[batch_history_dataframe]) | |
on_login_success_ui = [email, password, login_btn, register_email, register_password, register_btn] | |
on_login_success_ui += [tab1] | |
def on_login(login_success): | |
return ( | |
gr.TextArea(visible=not login_success), | |
gr.TextArea(visible=not login_success), | |
gr.Button(visible=not login_success), | |
gr.TextArea(visible=not login_success), | |
gr.TextArea(visible=not login_success), | |
gr.Button(visible=not login_success), | |
# gr.Accordion(visible=not login_success), | |
) + ( | |
gr.Tab(visible=login_success), | |
) | |
def login(email, password): | |
user = get_user_by_email(db, email) | |
if password is None or len(password) == 0: | |
return { | |
"id": "", | |
"email": "", | |
}, "登录失败", *on_login(False) | |
if user is None or not user.verify_password(password): | |
return { | |
"id": "", | |
"email": "", | |
}, "登录失败", *on_login(False) | |
return { | |
"id": user.id, | |
"email": user.email, | |
}, "登录成功", *on_login(True) | |
login_btn.click(login, [email, password], [user, user_status] + on_login_success_ui) | |
def register(email, password): | |
user = get_user_by_email(db, email) | |
if user is not None: | |
return { | |
"id": "", | |
"email": "", | |
}, "注册失败,该邮箱已被注册", *on_login(False) | |
else: | |
user = create_user(db, email=email, password=password) | |
return { | |
"id": user.id, | |
"email": user.email, | |
}, "注册并登录成功", *on_login(True) | |
register_btn.click(register, [register_email, register_password], [user, user_status] + on_login_success_ui) | |
if __name__ == "__main__": | |
# import os | |
# os.environ["no_proxy"] = "localhost,127.0.0.1,::1" | |
# demo.launch(server_name="127.0.0.1", server_port=8090, debug=True) | |
logger.add(f"output/logs/web_{date_str}.log", rotation="1 day", retention="7 days", level="INFO") | |
demo.launch() | |