File size: 5,320 Bytes
118ceb7
 
15077ed
 
118ceb7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
15077ed
 
118ceb7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
15077ed
118ceb7
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
# _*_ coding:utf-8 _*_
"""

@Version  : 1.1.0

@Time     : 2024年12月28日

@Author   : DuYu (@duyu09, [email protected])

@File     : py2hz.py

@Describe : 基于隐马尔可夫模型(HMM)的拼音转汉字程序。

@Copyright: Copyright (c) 2024 DuYu (No.202103180009), Faculty of Computer Science & Technology, Qilu University of Technology (Shandong Academy of Sciences).

@Note     : 训练集csv文件,要求第一列为由汉语拼音构成的句子,第二列为由汉字构成的句子。

"""

import re
import bz2
import pickle
import numpy as np
import pandas as pd
from hmmlearn import hmm


# 1. 数据预处理:加载CSV数据集
def load_dataset(file_path):
    data = pd.read_csv(file_path)
    sentences = data.iloc[:, 0].tolist()  # 第一列:汉字句子
    pinyins = data.iloc[:, 1].tolist()  # 第二列:拼音句子
    return sentences, pinyins

# 分词函数,确保英文单词保持完整
def segment_sentence(sentence):
    tokens = re.findall(r'[a-zA-Z]+|[一-鿿]', sentence)  # 使用正则表达式分割句子,确保英文单词保持完整
    return tokens

# 2. 构建字典和状态观测集合
def build_vocab(sentences, pinyins):
    hanzi_set = set()
    pinyin_set = set()

    for sentence, pinyin in zip(sentences, pinyins):
        hanzi_set.update(segment_sentence(sentence))
        pinyin_set.update(pinyin.split())

    hanzi_list = list(hanzi_set)
    pinyin_list = list(pinyin_set)

    hanzi2id = {h: i for i, h in enumerate(hanzi_list)}
    id2hanzi = {i: h for i, h in enumerate(hanzi_list)}
    pinyin2id = {p: i for i, p in enumerate(pinyin_list)}
    id2pinyin = {i: p for i, p in enumerate(pinyin_list)}

    return hanzi2id, id2hanzi, pinyin2id, id2pinyin

# 3. 模型训练
def train_hmm(sentences, pinyins, hanzi2id, pinyin2id):
    n_states = len(hanzi2id)
    n_observations = len(pinyin2id)

    model = hmm.MultinomialHMM(n_components=n_states, n_iter=100, tol=1e-4)

    # 统计初始状态概率、转移概率和发射概率
    start_prob = np.zeros(n_states)
    trans_prob = np.zeros((n_states, n_states))
    emit_prob = np.zeros((n_states, n_observations))

    for sentence, pinyin in zip(sentences, pinyins):
        # print(sentence, pinyin)
        hanzi_seq = [hanzi2id[h] for h in segment_sentence(sentence)]
        pinyin_seq = [pinyin2id[p] for p in pinyin.split()]

        # 初始状态概率
        if len(hanzi_seq) == 0:
            continue
        start_prob[hanzi_seq[0]] += 1

        # 转移概率
        for i in range(len(hanzi_seq) - 1):
            trans_prob[hanzi_seq[i], hanzi_seq[i + 1]] += 1

        # 发射概率
        for h, p in zip(hanzi_seq, pinyin_seq):
            emit_prob[h, p] += 1

    # 确保矩阵行和为1,并处理全零行
    if start_prob.sum() == 0:
        start_prob += 1
    start_prob /= start_prob.sum()

    row_sums = trans_prob.sum(axis=1, keepdims=True)
    zero_rows = (row_sums == 0).flatten()  # 修复索引错误
    trans_prob[zero_rows, :] = 1.0 / n_states  # 用均匀分布填充全零行
    trans_prob /= trans_prob.sum(axis=1, keepdims=True)

    emit_sums = emit_prob.sum(axis=1, keepdims=True)
    zero_emit_rows = (emit_sums == 0).flatten()
    emit_prob[zero_emit_rows, :] = 1.0 / n_observations  # 均匀填充
    emit_prob /= emit_prob.sum(axis=1, keepdims=True)

    model.startprob_ = start_prob
    model.transmat_ = trans_prob
    model.emissionprob_ = emit_prob

    return model

# 4. 保存和加载模型
def save_model(model, filepath):
    with bz2.BZ2File(filepath, 'wb') as f:
        pickle.dump(model, f)

def load_model(filepath):
    with bz2.BZ2File(filepath, 'rb') as f:
        return pickle.load(f)

def predict(model, pinyin_seq, pinyin2id, id2hanzi):
    obs_seq = np.zeros((len(pinyin_seq), len(pinyin2id)))  # 转换观测序列为 one-hot 格式

    for t, p in enumerate(pinyin_seq):
        if p in pinyin2id:
            obs_seq[t, pinyin2id[p]] = 1
        else:
            obs_seq[t, 0] = 1  # 未知拼音默认处理

    # 解码预测
    model.n_trials = 3  # 运行3次
    log_prob, state_seq = model.decode(obs_seq, algorithm='viterbi')
    result = ''.join([id2hanzi[s] for s in state_seq])
    return result

    
def train(dataset_path='train.csv', model_path='hmm_model.pkl.bz2'):
    sentences, pinyins = load_dataset(dataset_path)  # 加载数据集
    hanzi2id, id2hanzi, pinyin2id, id2pinyin = build_vocab(sentences, pinyins)  # 构建字典
    model = train_hmm(sentences, pinyins, hanzi2id, pinyin2id)  # 训练模型
    model.pinyin2id = pinyin2id
    model.id2hanzi = id2hanzi
    model.hanzi2id = hanzi2id
    model.id2pinyin = id2pinyin
    save_model(model, model_path)  # 保存模型
    
    
def pred(model_path='hmm_model.pkl.bz2', pinyin_str='ce4 shi4'):
    model = load_model(model_path)  # 加载模型
    pinyin_list = pinyin_str.split()
    result = predict(model, pinyin_list, model.pinyin2id, model.id2hanzi)
    print('预测结果:', result)

if __name__ == '__main__':
    # train(dataset_path='train_o.csv', model_path='hmm_model.pkl.bz2')
    pred(model_path='hmm_model.pkl.bz2', pinyin_str='hong2 yan2 bo2 ming4')