File size: 5,626 Bytes
118ceb7 1800539 118ceb7 15077ed 118ceb7 1800539 118ceb7 1800539 118ceb7 1800539 118ceb7 1800539 118ceb7 1800539 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 |
# _*_ coding:utf-8 _*_
"""
@Version : 1.2.0
@Time : 2024年12月29日
@Author : DuYu (@duyu09, [email protected])
@File : py2hz.py
@Describe : 基于隐马尔可夫模型(HMM)的拼音转汉字程序。
@Copyright: Copyright (c) 2024 DuYu (No.202103180009), Faculty of Computer Science & Technology, Qilu University of Technology (Shandong Academy of Sciences).
@Note : 训练集csv文件,要求第一列为由汉语拼音构成的句子,第二列为由汉字构成的句子。
"""
import re
import bz2
import pickle
import numpy as np
import pandas as pd
from hmmlearn import hmm
# 1. 数据预处理:加载CSV数据集
def load_dataset(file_path):
data = pd.read_csv(file_path)
sentences = data.iloc[:, 0].tolist() # 第一列:汉字句子
pinyins = data.iloc[:, 1].tolist() # 第二列:拼音句子
return sentences, pinyins
# 分词函数,确保英文单词保持完整
def segment_sentence(sentence):
tokens = re.findall(r'[a-zA-Z]+|[一-鿿]', sentence) # 使用正则表达式分割句子,确保英文单词保持完整
return tokens
# 2. 构建字典和状态观测集合
def build_vocab(sentences, pinyins):
hanzi_set = set()
pinyin_set = set()
for sentence, pinyin in zip(sentences, pinyins):
hanzi_set.update(segment_sentence(sentence))
pinyin_set.update(pinyin.split())
hanzi_list = list(hanzi_set)
pinyin_list = list(pinyin_set)
hanzi2id = {h: i for i, h in enumerate(hanzi_list)}
id2hanzi = {i: h for i, h in enumerate(hanzi_list)}
pinyin2id = {p: i for i, p in enumerate(pinyin_list)}
id2pinyin = {i: p for i, p in enumerate(pinyin_list)}
return hanzi2id, id2hanzi, pinyin2id, id2pinyin
# 3. 模型训练
def train_hmm(sentences, pinyins, hanzi2id, pinyin2id):
n_states = len(hanzi2id)
n_observations = len(pinyin2id)
model = hmm.MultinomialHMM(n_components=n_states, n_iter=100, tol=1e-4)
# 统计初始状态概率、转移概率和发射概率
start_prob = np.zeros(n_states)
trans_prob = np.zeros((n_states, n_states))
emit_prob = np.zeros((n_states, n_observations))
for sentence, pinyin in zip(sentences, pinyins):
# print(sentence, pinyin)
hanzi_seq = [hanzi2id[h] for h in segment_sentence(sentence)]
pinyin_seq = [pinyin2id[p] for p in pinyin.split()]
# 初始状态概率
if len(hanzi_seq) == 0:
continue
start_prob[hanzi_seq[0]] += 1
# 转移概率
for i in range(len(hanzi_seq) - 1):
trans_prob[hanzi_seq[i], hanzi_seq[i + 1]] += 1
# 发射概率
for h, p in zip(hanzi_seq, pinyin_seq):
emit_prob[h, p] += 1
# 确保矩阵行和为1,并处理全零行
if start_prob.sum() == 0:
start_prob += 1
start_prob /= start_prob.sum()
row_sums = trans_prob.sum(axis=1, keepdims=True)
zero_rows = (row_sums == 0).flatten() # 修复索引错误
trans_prob[zero_rows, :] = 1.0 / n_states # 用均匀分布填充全零行
trans_prob /= trans_prob.sum(axis=1, keepdims=True)
emit_sums = emit_prob.sum(axis=1, keepdims=True)
zero_emit_rows = (emit_sums == 0).flatten()
emit_prob[zero_emit_rows, :] = 1.0 / n_observations # 均匀填充
emit_prob /= emit_prob.sum(axis=1, keepdims=True)
model.startprob_ = start_prob
model.transmat_ = trans_prob
model.emissionprob_ = emit_prob
return model
# 4. 保存和加载模型
def save_model(model, filepath, mode='compress'): # mode='normal'意味着不使用压缩
if mode == 'normal':
with open(filepath, 'wb') as f:
pickle.dump(model, f)
else:
with bz2.BZ2File(filepath, 'wb') as f:
pickle.dump(model, f)
def load_model(filepath, mode='compress'): # mode='normal'意味着不使用压缩
if mode == 'normal':
with open(filepath, 'rb') as f:
return pickle.load(f)
else:
with bz2.BZ2File(filepath, 'rb') as f:
return pickle.load(f)
def train(dataset_path='train.csv', model_path='hmm_model.pkl.bz2'):
sentences, pinyins = load_dataset(dataset_path) # 加载数据集
hanzi2id, id2hanzi, pinyin2id, id2pinyin = build_vocab(sentences, pinyins) # 构建字典
model = train_hmm(sentences, pinyins, hanzi2id, pinyin2id) # 训练模型
model.pinyin2id = pinyin2id
model.id2hanzi = id2hanzi
model.hanzi2id = hanzi2id
model.id2pinyin = id2pinyin
save_model(model, model_path) # 保存模型
def pred(model_path='hmm_model.pkl.bz2', pinyin_str='ce4 shi4', n_trials=3):
model = load_model(model_path)
pinyin_list = pinyin_str.split()
pinyin2id, id2hanzi = model.pinyin2id, model.id2hanzi
obs_seq = np.zeros((len(pinyin_list), len(pinyin2id))) # 转换观测序列为 one-hot 格式
for t, p in enumerate(pinyin_list):
if p in pinyin2id:
obs_seq[t, pinyin2id[p]] = 1
else:
obs_seq[t, 0] = 1 # 未知拼音默认处理
# 解码预测
model.n_trials = n_trials
log_prob, state_seq = model.decode(obs_seq, algorithm=model.algorithm)
result = ''.join([id2hanzi[s] for s in state_seq])
print('预测结果:', result)
if __name__ == '__main__':
# train(dataset_path='train.csv', model_path='hmm_model_large.pkl.bz2')
pred(model_path='hmm_model_large.pkl.bz2', pinyin_str='hong2 yan2 bo2 ming4') # 预测结果:红颜薄命
|