搜索引擎能搜索到的相关实现几乎都是使用 Matlab 实现的,很少有通过其他语言来做这些实验,正巧在寒假进行了机器学习的学习,当时使用 Python 比较多,所以在这期实验开始时我选择了用 Python 做这些实验。
但是在真正实际操作中发现,这些问题用 Python 实现并不是非常的方便,主要还是在使用 Pandas 和 Numpy 这两个包,这样并不能体现出 Python 在解决这些实验问题上的优势,不如使用 Ruby 和 Matlab。不过,既然选择了这门语言做到底,就坚持做一下。
Test 1
实验内容:对于任意给出的信源 S=[X,p(x)] 计算信息熵 H(x)
- Input: 路径:各字母出现频率(含 Space),保存于 CSV 文件中
- Output: 字母的信息熵
|--- entropy_cal.py
|--- letter_frequence.csv
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""Test 1"""
import pandas as pd
import math
class Solution(object):
def calInfoEntropy(self, path):
:type path: str
data = pd.read_csv(path)
result = 0
for i in range(0, data.shape[0] - 1):
num_tem = data.iat[i, 1]
result += num_tem * math.log2(1 / num_tem)
return result
Pandas 自带读取 CSV 文件功能,读取结果为矩阵,CSV 文件中第一行会被识别为列名。取数函数有多种:
,基于列 label,可选取特定行(根据行 index)iloc
,基于行/列的 positionat
,根据指定行 index 及列 label,快速定位 DataFrame 的元素iat
,与 at 类似,不同的是根据 position 来定位的ix
,为 loc 与 iloc 的混合体,既支持 label 也支持 position
这篇 Post 有较为详细的说明。
Test 2
实验内容:对于给出的信源 S=[X,p(s)] 和信道 Q=(q(yj|xi)) 计算噪音熵 H(X|Y) 和互信息 I(X;Y)
- Input: 路径:各字母出现频率(含 Space),信道矩阵,分别保存于 CSV 文件中
- Output: 字母的噪音熵和互信息
|--- entropy_cal.py
|--- letter_frequence.csv
|--- communication_channel.csv
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""Test 2"""
import pandas as pd
import numpy as np
import math
conditional_entropy = 0
mutual_information = 0
# input data
letter_fre_x = pd.read_csv('letter_frequency.csv')
comm_channel = pd.read_csv('communication_channel.csv')
joint_probability_distribution = np.zeros(shape=(27, 27))
# cal the conditional entropy
for i in range(0, 26):
for j in range(0, 26):
if comm_channel.iat[i, j] != 0:
joint_probability_distribution[i][j] = letter_fre_x.iat[j, 1] * comm_channel.iat[i, j]
conditional_entropy -= \
joint_probability_distribution[i][j] * math.log2(comm_channel.iat[i, j])
letter_fre_y = np.sum(joint_probability_distribution, axis=0)
# cal the mutual information
for i in range(0, 26):
for j in range(0, 26):
if comm_channel.iat[i, j] != 0:
mutual_information += \
joint_probability_distribution[i][j] * \
math.log2(joint_probability_distribution[i][j] / (letter_fre_x.iat[j, 1] * letter_fre_y[j]))
print('The conditional entropy is: ', conditional_entropy, ' bit')
print('The mutual information is: ', mutual_information, 'bit')
在实现方面和 Test 1 没有什么太大的差距,主要在于信息论的算法上面。
Test 3
实验内容:给出各英文字母的出现概率,构造 0, 1, 2 Huffman 编码,并将下述一句话进行 Huffman 编码
The fundamental problem of communication is that of reproducing at one point either exactly or approximately a message selected at another point.
- Input: 各字母出现频率(含 Space),欲 Huffman 编码的句子
- Output: Huffman 编码结果(使用结果编码句子在这里暂时省略,Test 5 会提到)
|--- huffman_coding.py
|--- letter_frequence.csv
|--- sentence.docx
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""Test 3"""
import queue
import pandas as pd
class HuffmanNode(object):
def __init__(self, left=None, right=None, root=None):
self.left = left
self.right = right
def children(self):
return((self.left, self.right))
letter_fre = pd.read_csv('letter_frequency.csv')
def create_tree(frequencies):
p = queue.PriorityQueue()
for value in frequencies:
while p.qsize() > 1:
l, r = p.get(), p.get()
node = HuffmanNode(l, r)
p.put((l[0]+r[0], node))
return p.get()
node = create_tree(letter_fre)
def walk_tree(node, prefix="", code={}):
node 是一个tuple(freq, HuffmanNode|character)
if isinstance(node[1], HuffmanNode):
code1 = walk_tree(node[1].left, '0', code.copy())
code2 = walk_tree(node[1].right, '1', code.copy())
if len(code1) > 0:
for k, v in code1.items():
code[k] = prefix + v
if len(code2) > 0:
for k, v in code2.items():
code[k] = prefix + v
code[node[1]] = prefix
code = walk_tree(node)
for i in sorted(freq, reverse=True):
print(i[1], '{:6.2f}'.format(i[0]), code[i[1]])
except Exception as e:
Test 4
实验内容:对给出的信道转移矩阵 Q=(q(y|x)),应用迭代法计算信道容量 C
高噪声打字机:设信道输入与输出字母集分别为 X=Y=A,B,C,…,Z,−,其中 − 表示空格,这 27 个字符排成一圈,当输入某个字符时,输出以等概率 1/3 产生它本生及 相邻 2 个字符,例如:
- Input: 各字母出现频率(含 Space),信道转移矩阵
- Output: 信道容量
|--- channel_capacity_cal.py
|--- letter_frequence.csv
|--- communication_channel.csv
首先,这里是我基于 Matlab 程序改进成的 Python 程序,是个错误示例,其原因在于 Matlab 中对矩阵操作转移到 Python 中 DataFrame 实现起来有点麻烦,DataFrame 多用于神经网络数据导入是非常方便的,具体分隔来处理有点麻烦。
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""Test 4 - Error Example"""
import pandas as pd
import numpy as np
import math
def channel_capacity_cal(p, q, f, m):
"""the function to cal the channel capacity"""
tem_precision_x = p
channel_cap1 = 0
channel_cap2 = 0
for i in range(1, m):
sum = 0
row = q.shape[0]
column = q.shape[1]
rev_transfer_matrix = np.zeros(row, column)
out_precision = np.zeros(1, column)
tem_precision_y = np.zeros(1, row)
for j in range(1, column):
tem_sum = 0
for k in range(1, row):
tem_sum += q(k, j) * tem_precision_x(k, 1)
out_precision.at[1, i] = tem_sum
for j in range(1, row):
for k in range(1, column):
rev_transfer_matrix.at[j, k] = q.iat[j, k] * tem_precision_x.iat[j, 1] / out_precision.at[1, k]
for j in range(1, row):
tem_sum = 0
for k in range(1, column):
if rev_transfer_matrix.at[j, k] != 0:
tem_sum += q.iat[j, k] * math.log2(rev_transfer_matrix.at[j, k])
tem_precision_y.at[0, j] = math.pow(2, tem_sum)
sum += tem_precision_y.at[0, j]
for j in range(1, row):
tem_precision_y.at[0, j] /= sum
channel_cap2 = math.log2(sum)
if abs(channel_cap1 - channel_cap2) < f:
channel_cap1 = channel_cap2
tem_precision_x = tem_precision_y
print("the iterations times is: " + i)
channel_cap1 = channel_cap2
tem_precision_x = tem_precision_y
return channel_cap1
letter_fre = pd.read_csv('letter_frequency.csv')
channel_transition_probability = pd.read_csv('channel_transition_probability.csv')
max_iterations = 100
precision = 0.001
channel_capacity_cal(letter_fre, channel_transition_probability, precision, max_iterations)
在参考了 BUPT Zhengyuan Zhu 的处理方法之后进行了改进,但是这里存在的比较大的问题是多次迭代的转移矩阵需要手动计算,可以在此方法之前再添加 math 的方法以实现转移矩阵的计算。
import numpy as np
letter_fre = pd.read_csv('letter_frequency.csv')
channel_transition_probability = pd.read_csv('channel_transition_probability.csv')
precision = 0.001
def initiate_prob_distrib(p):
p_x = np.transpose(np.ones((1, p.shape[0])) / p.shape[0])
print("初始化概率分布为:\n", p_x)
return p_x
p_x = initiate_prob_distrib(channel_transition_probability)
def iteration(p_i, p_ij, k):
q_j = np.sum(p_i * p_ij, axis=0)
print("NO. " + str(k) + " iteration: q_j:\n", q_j)
alpha_i = np.exp(np.sum(p_ij * np.log(p_ij / q_j), axis=1))
alpha_i = np.expand_dims(alpha_i, axis=0)
print("NO. " + str(k) + " iteration: alpha_i:\n", alpha_i)
u = np.matmul(alpha_i, p_i)[0]
print("NO. " + str(k) + " iteration: u:\n", u)
I_L = np.log2(u)[0]
print("NO. " + str(k) + " iteration: I_L:\n", I_L)
I_U = np.log2(np.amax(alpha_i))
print("NO. " + str(k) + " iteration: I_U:\n", I_U)
if I_U - I_L < precision:
print("capacity:\n ", I_L)
print("input letter frequency:\n ", p_i)
return True, I_L, p_i
p_i = p_i * np.transpose(alpha_i) / u[0]
print("No." + str(k) + "Update Probability distributions:\n", p_i)
return False, p_i
flag, I_L, p_i = iteration(p_x, channel_transition_probability, 1)
ans_dict = dict()
for p in (channel_transition_probability1, channel_transition_probability2, channel_transition_probability3, channel_transition_probability4):
flag = False
p_x = initiate_prob_distrib(p)
k = 1
while not flag:
flag, ans, tmp_p = iteration(p_x, p, k)
if not flag:
p_x = tmp_p
k = k + 1
ans_dict[ans] = tmp_p
for k, v in ans_dict.items() :
print("capacity:", k)
print("input letter frequency:\n", np.transpose(v)[0])
Test 5
基于 Test 3 的程序,修改为构造 0, 1 二元 Huffman 编码程序
使用上述二元 Huffman 编码程序为下一段文字进行编码
The fundamental problem of communication is that of reproducing at one point either exactly or approximately a message selected at another point.
使用给定的生成矩阵,对上述编码结果进行信道编码(Hamming 编码)
任意修改一定量上述编码结果,之后对此修改之后的编码使用校验矩阵,进行信道译码(Hamming 译码)
Test 5 在整合前面的实验之后加入信道编码部分。
|--- run.py - 主程序
|--- huffman_coding.py - 实现 Huffman 编码
|--- hamming_coding.py - 实现 Hamming 编码
|--- hamming_decoding.py - 实现 Hamming 译码
|--- article.docx - 编码文字文档
|--- hamming_coding.csv - 生成矩阵
|--- hamming_decoding.csv - 校验矩阵
|--- letter_frequecny.csv - 字母频率
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import huffman_coding
import hamming_coding
import hamming_decoding
huffman_coding_result = huffman_coding.huffman_coding()
hamming_coding_result = hamming_coding.hamming_coding(huffman_coding_result)
hamming_decoding.hamming_decoding(huffman_coding_result, hamming_coding_result)
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Copyright (c) 2019 - magic_kurakun <magickurakun@gmail.com>
import pandas as pd
import docx
def huffman_coding(letter_fre):
letter_fre = pd.read_csv('letter_frequency.csv')
file = docx.opendocx("article.docx")
doc = docx.getdocumenttext(file)
huffman_coding_result = ""
length = 0
huffman = ()
for i in range(0, 26):
tem = ('',)
huffman += tem
for loop in range(1, 12):
flag = 26 - loop * 2
for i in range(97, 122):
if letter_fre.iat[flag, 0] == chr(i):
huffman[i - 96] += '0'
if letter_fre.iat[flag + 1, 0] == chr(i):
huffman[i - 96] += '1'
if letter_fre.iat[flag + 2, 0] == chr(i):
huffman[i - 96] += '2'
if letter_fre.iat[flag, 0] == ' ':
huffman[i - 96] += '0'
if letter_fre.iat[flag + 1, 0] == ' ':
huffman[i - 96] += '1'
if letter_fre.iat[flag + 2, 0] == ' ':
huffman[i - 96] += '2'
sum_tem = 0
for i in range(flag, flag + 2):
sum_tem += float(letter_fre.iat[i, 1])
letter_fre.iat[i, 1] = str(-1)
letter_fre.iat[flag, 1] = str(sum_tem)
print('The huffman code of such letters are as follow: ')
for i in range(0, 26):
length += huffman[i].length()
print('The average length of huffman code is: ')
print(length / 27)
print('The sentence was translated to huffman code as follow: ')
for i in range(0, len(doc) - 1):
flag = 0
for j in range(97, 122):
if doc[i] == chr(j):
print(huffman[j - 96])
flag = 1
if flag == 0:
for i in range(0, len(doc)):
tem = ord(doc[i])
if tem == 32:
huffman_coding_result += huffman[0]
elif tem >= 65 & tem <= 90:
huffman_coding_result += huffman[tem - 64]
elif tem >= 97 & tem <= 122:
huffman_coding_result += huffman[tem - 96]
print("The result of huffman coding: " + huffman_coding_result)
return huffman_coding_result
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Copyright (c) 2019 - magic_kurakun <magickurakun@gmail.com>
import pandas as pd
import numpy as np
import docx
def hamming_coding(huffman_coding_result):
code_length = len(huffman_coding_result)
remainder = code_length % 4
b = [0, 0, 0, 0]
hamming_coding_list = pd.read_csv('hamming_coding.csv')
hamming_coding_result = ""
s = np.zeros(int((code_length - remainder) / 4 + 1), 7)
if remainder != 0:
for i in range(0, 3 - remainder):
huffman_coding_result += '0'
for i in range(0, int((code_length - remainder) / 4) - 1):
b[0] = ord(huffman_coding_result[4 * i]) - 48
b[1] = ord(huffman_coding_result[4 * i + 1]) - 48
b[2] = ord(huffman_coding_result[4 * i + 2]) - 48
b[3] = ord(huffman_coding_result[4 * i + 3]) - 48
c = np.dot(b, hamming_coding_list)
for j in range(0, 6):
if c[j] == 0 | c[j] == 2:
s[i, j] = 0
if c[j] == 1 | c[j] == 3:
s[i, j] = 1
s[51, 2] -= 1
s[34, 2] -= 1
for i in range(0, int((code_length - remainder) / 4) - 1):
for j in range(0, 3):
hamming_coding_result += chr(s[i, j])
for i in range(0, remainder - 1):
hamming_coding_result += chr(s[int((code_length - remainder) / 4), i])
return s
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Copyright (c) 2019 - magic_kurakun <magickurakun@gmail.com>
import pandas as pd
import numpy as np
import docx
def hamming_decoding(huffman_coding_result, s):
code_length = len(huffman_coding_result)
remainder = code_length % 4
hamming_decoding_list = pd.read_csv('hamming_decoding.csv')
d = np.zeros(int((code_length - remainder) / 4), 3)
hamming_decoding_result = ""
hamming_decoding_result_tem = ""
for i in range(0, int((code_length - remainder) / 4) - 1):
c = np.dot(s[i, :], hamming_decoding_list)
for j in range(0, 2):
if c[j] == 0 | c[j] == 2:
d[i, j] = 0
elif c[j] == 1 | c[j] == 3:
d[i, j] = 1
for j in range(0, 3):
if d[i, 0] == hamming_decoding_list[j, 0] & d[i, 1] == hamming_decoding_list[j, 2] &\
d[i, 2] == hamming_decoding_list[j, 3]:
s[i, j] -= 1
for i in range(0, int((code_length - remainder) / 4) - 1):
for j in range(0, 3):
hamming_decoding_result_tem += chr(s[i, j])
for i in range(0, remainder - 1):
hamming_decoding_result_tem += chr(s[int((code_length - remainder) / 4), i])
decode_length = len(hamming_decoding_result_tem)
flag = 1
break_while = 0
while flag < decode_length - 3 + remainder:
for i in range(0, 26):
str_tem = hamming_decoding_result_tem[flag:flag + i - 1]
for j in range(0, 26):
if str_tem == huffman_coding_list[j, :]:
if j == 1:
hamming_decoding_result += chr(32)
hamming_decoding_result += chr(j + 96)
flag += len(str_tem)
break_while = 1
if break_while == 1:
经过测试之后发现经过校验矩阵之后,传输信息错误率在 0.01 之下,根据传输量趋于 0,即是信道编码的意义。