放进时光蛋里。

信息论实验

2019.05.03

本学期专业课程中有信息论这一门课程,在此之前我学习神经网络的过程中了解了一些与信息论相关的内容,也很开心能开这门课。在这学期中有一些信息论的实验课需要完成一些任务,考虑到上学期学习数值分析没有及时记录所做内容,到后来补上去的各种困难,本次我打算完成一次实验之后及时记录实验内容,其中包含实验内容、遇到的问题和解决方案还有一些自己的想法。

搜索引擎能搜索到的相关实现几乎都是使用 Matlab 实现的,很少有通过其他语言来做这些实验,正巧在寒假进行了机器学习的学习,当时使用 Python 比较多,所以在这期实验开始时我选择了用 Python 做这些实验。

但是在真正实际操作中发现,这些问题用 Python 实现并不是非常的方便,主要还是在使用 Pandas 和 Numpy 这两个包,这样并不能体现出 Python 在解决这些实验问题上的优势,不如使用 Ruby 和 Matlab。不过,既然选择了这门语言做到底,就坚持做一下。

Test 1

实验内容:对于任意给出的信源 S=[X,p(x)] 计算信息熵 H(x)

  • Input: 路径:各字母出现频率(含 Space),保存于 CSV 文件中
  • Output: 字母的信息熵
文件目录:
information_theory_experiment_test1
 |--- entropy_cal.py
 |--- letter_frequence.csv
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

"""Test 1"""

import pandas as pd
import math

class Solution(object):
    def calInfoEntropy(self, path):
        """
        :type path: str
        """
        data = pd.read_csv(path)
        result = 0
        
        for i in range(0, data.shape[0] - 1):
            num_tem = data.iat[i, 1]
            result += num_tem * math.log2(1 / num_tem)
        
        return result

Pandas 自带读取 CSV 文件功能,读取结果为矩阵,CSV 文件中第一行会被识别为列名。取数函数有多种:

  • loc,基于列 label,可选取特定行(根据行 index)
  • iloc,基于行/列的 position
  • at,根据指定行 index 及列 label,快速定位 DataFrame 的元素
  • iat,与 at 类似,不同的是根据 position 来定位的
  • ix,为 loc 与 iloc 的混合体,既支持 label 也支持 position

这篇 Post 有较为详细的说明。

Test 2

实验内容:对于给出的信源 S=[X,p(s)] 和信道 Q=(q(yj|xi)) 计算噪音熵 H(X|Y) 和互信息 I(X;Y)

  • Input: 路径:各字母出现频率(含 Space),信道矩阵,分别保存于 CSV 文件中
  • Output: 字母的噪音熵和互信息
文件目录:
information_theory_experiment_test2
 |--- entropy_cal.py
 |--- letter_frequence.csv
 |--- communication_channel.csv
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

"""Test 2"""

import pandas as pd
import numpy as np
import math

conditional_entropy = 0
mutual_information = 0

# input data
letter_fre_x = pd.read_csv('letter_frequency.csv')
comm_channel = pd.read_csv('communication_channel.csv')
joint_probability_distribution = np.zeros(shape=(27, 27))

# cal the conditional entropy
for i in range(0, 26):
    for j in range(0, 26):
        if comm_channel.iat[i, j] != 0:
            joint_probability_distribution[i][j] = letter_fre_x.iat[j, 1] * comm_channel.iat[i, j]
            conditional_entropy -= \
                joint_probability_distribution[i][j] * math.log2(comm_channel.iat[i, j])
        else:
            continue

letter_fre_y = np.sum(joint_probability_distribution, axis=0)

# cal the mutual information
for i in range(0, 26):
    for j in range(0, 26):
        if comm_channel.iat[i, j] != 0:
            mutual_information += \
                joint_probability_distribution[i][j] * \
                math.log2(joint_probability_distribution[i][j] / (letter_fre_x.iat[j, 1] * letter_fre_y[j]))
        else:
            continue

print('The conditional entropy is: ', conditional_entropy, ' bit')
print('The mutual information is: ', mutual_information, 'bit')

在实现方面和 Test 1 没有什么太大的差距,主要在于信息论的算法上面。

Test 3

实验内容:给出各英文字母的出现概率,构造 0, 1, 2 Huffman 编码,并将下述一句话进行 Huffman 编码

The fundamental problem of communication is that of reproducing at one point either exactly or approximately a message selected at another point.

  • Input: 各字母出现频率(含 Space),欲 Huffman 编码的句子
  • Output: Huffman 编码结果(使用结果编码句子在这里暂时省略,Test 5 会提到)
文件目录:
information_theory_experiment_test3
 |--- huffman_coding.py
 |--- letter_frequence.csv
 |--- sentence.docx
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

"""Test 3"""

import queue
import pandas as pd

class HuffmanNode(object):
    def __init__(self, left=None, right=None, root=None):
        self.left = left
        self.right = right
    def children(self):
        return((self.left, self.right))

letter_fre = pd.read_csv('letter_frequency.csv')

def create_tree(frequencies):
    p = queue.PriorityQueue()
    for value in frequencies:   
        p.put(value)             
    while p.qsize() > 1:        
        l, r = p.get(), p.get()  
        node = HuffmanNode(l, r)
        p.put((l[0]+r[0], node))     
    return p.get()              

node = create_tree(letter_fre)

def walk_tree(node, prefix="", code={}):
    """
    node 是一个tuple(freq, HuffmanNode|character)
    """
    if isinstance(node[1], HuffmanNode):
        code1 = walk_tree(node[1].left, '0', code.copy())
        code2 = walk_tree(node[1].right, '1', code.copy())
        if len(code1) > 0:
            for k, v in code1.items():
                code[k] = prefix + v
        if len(code2) > 0:
            for k, v in code2.items():
                code[k] = prefix + v
    else: 
        code[node[1]] = prefix
    return(code) 

code = walk_tree(node) 

for i in sorted(freq, reverse=True):
    try:
        print(i[1], '{:6.2f}'.format(i[0]), code[i[1]])
    except Exception as e:
        print(e)
        continue

Test 4

实验内容:对给出的信道转移矩阵 Q=(q(y|x)),应用迭代法计算信道容量 C

高噪声打字机:设信道输入与输出字母集分别为 X=Y=A,B,C,…,Z,−,其中 − 表示空格,这 27 个字符排成一圈,当输入某个字符时,输出以等概率 1/3 产生它本生及 相邻 2 个字符,例如:

P(Y=A|X=B)=P(Y=B|X=B)=P(Y=C|X=B)=1/3

  • Input: 各字母出现频率(含 Space),信道转移矩阵
  • Output: 信道容量
文件目录:
information_theory_experiment_test4
 |--- channel_capacity_cal.py
 |--- letter_frequence.csv
 |--- communication_channel.csv

首先,这里是我基于 Matlab 程序改进成的 Python 程序,是个错误示例,其原因在于 Matlab 中对矩阵操作转移到 Python 中 DataFrame 实现起来有点麻烦,DataFrame 多用于神经网络数据导入是非常方便的,具体分隔来处理有点麻烦。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

"""Test 4 - Error Example"""

import pandas as pd
import numpy as np
import math


def channel_capacity_cal(p, q, f, m):
    """the function to cal the channel capacity"""
    tem_precision_x = p
    channel_cap1 = 0
    channel_cap2 = 0

    for i in range(1, m):
        sum = 0
        row = q.shape[0]
        column = q.shape[1]
        rev_transfer_matrix = np.zeros(row, column)
        out_precision = np.zeros(1, column)
        tem_precision_y = np.zeros(1, row)

        for j in range(1, column):
            tem_sum = 0
            for k in range(1, row):
                tem_sum += q(k, j) * tem_precision_x(k, 1)
            out_precision.at[1, i] = tem_sum
        for j in range(1, row):
            for k in range(1, column):
                rev_transfer_matrix.at[j, k] = q.iat[j, k] * tem_precision_x.iat[j, 1] / out_precision.at[1, k]
        for j in range(1, row):
            tem_sum = 0
            for k in range(1, column):
                if rev_transfer_matrix.at[j, k] != 0:
                    tem_sum += q.iat[j, k] * math.log2(rev_transfer_matrix.at[j, k])
                tem_precision_y.at[0, j] = math.pow(2, tem_sum)
                sum += tem_precision_y.at[0, j]
        for j in range(1, row):
            tem_precision_y.at[0, j] /= sum
        channel_cap2 = math.log2(sum)

        print(channel_cap2)
        if abs(channel_cap1 - channel_cap2) < f:
            channel_cap1 = channel_cap2
            tem_precision_x = tem_precision_y
            print("the iterations times is: " + i)
            break
        else:
            channel_cap1 = channel_cap2
            tem_precision_x = tem_precision_y

    print(tem_precision_x)
    return channel_cap1


letter_fre = pd.read_csv('letter_frequency.csv')
channel_transition_probability = pd.read_csv('channel_transition_probability.csv')

max_iterations = 100
precision = 0.001
channel_capacity_cal(letter_fre, channel_transition_probability, precision, max_iterations)

在参考了 BUPT Zhengyuan Zhu 的处理方法之后进行了改进,但是这里存在的比较大的问题是多次迭代的转移矩阵需要手动计算,可以在此方法之前再添加 math 的方法以实现转移矩阵的计算。

import numpy as np

letter_fre = pd.read_csv('letter_frequency.csv')
channel_transition_probability = pd.read_csv('channel_transition_probability.csv')
precision = 0.001

def initiate_prob_distrib(p):
    p_x = np.transpose(np.ones((1, p.shape[0])) / p.shape[0])
    print("初始化概率分布为:\n", p_x)

    return p_x

p_x = initiate_prob_distrib(channel_transition_probability)

def iteration(p_i, p_ij, k):
    q_j = np.sum(p_i * p_ij, axis=0)
    print("NO. " + str(k) + " iteration: q_j:\n", q_j)

    alpha_i = np.exp(np.sum(p_ij * np.log(p_ij / q_j), axis=1))
    alpha_i = np.expand_dims(alpha_i, axis=0)
    print("NO. " + str(k) + " iteration: alpha_i:\n", alpha_i)

    u = np.matmul(alpha_i, p_i)[0]
    print("NO. " + str(k) + " iteration: u:\n", u)

    I_L = np.log2(u)[0]
    print("NO. " + str(k) + " iteration: I_L:\n", I_L)

    I_U = np.log2(np.amax(alpha_i))
    print("NO. " + str(k) + " iteration: I_U:\n", I_U)

    if I_U - I_L < precision:
        print("capacity:\n ", I_L)
        print("input letter frequency:\n ", p_i)
        return True, I_L, p_i
    else:
        p_i = p_i * np.transpose(alpha_i) / u[0]
        print("No." + str(k) + "Update Probability distributions:\n", p_i)
        return False, p_i

flag, I_L, p_i = iteration(p_x, channel_transition_probability, 1)

ans_dict = dict()
for p in (channel_transition_probability1, channel_transition_probability2, channel_transition_probability3, channel_transition_probability4):
    flag = False
    p_x = initiate_prob_distrib(p)
    k = 1
    while not flag:
        flag, ans, tmp_p = iteration(p_x, p, k)
        if not flag:
            p_x = tmp_p
            k = k + 1
        else:
            ans_dict[ans] = tmp_p

for k, v in ans_dict.items() :
    print("capacity:", k)
    print("input letter frequency:\n", np.transpose(v)[0])

Test 5

实验内容

  1. 基于 Test 3 的程序,修改为构造 0, 1 二元 Huffman 编码程序

  2. 使用上述二元 Huffman 编码程序为下一段文字进行编码

    The fundamental problem of communication is that of reproducing at one point either exactly or approximately a message selected at another point.

  3. 使用给定的生成矩阵,对上述编码结果进行信道编码(Hamming 编码)

  4. 任意修改一定量上述编码结果,之后对此修改之后的编码使用校验矩阵,进行信道译码(Hamming 译码)

Test 5 在整合前面的实验之后加入信道编码部分。

文件目录:
information_theory_experiment_test5
 |--- run.py - 主程序
 |--- huffman_coding.py - 实现 Huffman 编码
 |--- hamming_coding.py - 实现 Hamming 编码
 |--- hamming_decoding.py - 实现 Hamming 译码
 |--- article.docx - 编码文字文档
 |--- hamming_coding.csv - 生成矩阵
 |--- hamming_decoding.csv - 校验矩阵
 |--- letter_frequecny.csv - 字母频率
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

"""Test_5/run.py"""

import huffman_coding
import hamming_coding
import hamming_decoding

huffman_coding_result = huffman_coding.huffman_coding()
hamming_coding_result = hamming_coding.hamming_coding(huffman_coding_result)
hamming_decoding.hamming_decoding(huffman_coding_result, hamming_coding_result)
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Copyright (c) 2019 - magic_kurakun <magickurakun@gmail.com>

"""Test_5/huffman_coding.py"""

import pandas as pd
import docx


def huffman_coding(letter_fre):
    letter_fre = pd.read_csv('letter_frequency.csv')
    file = docx.opendocx("article.docx")
    doc = docx.getdocumenttext(file)
    huffman_coding_result = ""
    length = 0

    huffman = ()
    for i in range(0, 26):
        tem = ('',)
        huffman += tem

    for loop in range(1, 12):
        flag = 26 - loop * 2
        letter_fre.sort_values(by='probability')
        for i in range(97, 122):
            if letter_fre.iat[flag, 0] == chr(i):
                huffman[i - 96] += '0'
            if letter_fre.iat[flag + 1, 0] == chr(i):
                huffman[i - 96] += '1'
            if letter_fre.iat[flag + 2, 0] == chr(i):
                huffman[i - 96] += '2'
        if letter_fre.iat[flag, 0] == ' ':
            huffman[i - 96] += '0'
        if letter_fre.iat[flag + 1, 0] == ' ':
            huffman[i - 96] += '1'
        if letter_fre.iat[flag + 2, 0] == ' ':
            huffman[i - 96] += '2'
        sum_tem = 0
        for i in range(flag, flag + 2):
            sum_tem += float(letter_fre.iat[i, 1])
            letter_fre.iat[i, 1] = str(-1)
        letter_fre.iat[flag, 1] = str(sum_tem)

    print('The huffman code of such letters are as follow: ')
    for i in range(0, 26):
        length += huffman[i].length()
        print(huffman[i])
    print('The average length of huffman code is: ')
    print(length / 27)

    print('The sentence was translated to huffman code as follow: ')
    for i in range(0, len(doc) - 1):
        flag = 0
        for j in range(97, 122):
            if doc[i] == chr(j):
                print(huffman[j - 96])
                flag = 1
        if flag == 0:
            print(huffman[0])

    for i in range(0, len(doc)):
        tem = ord(doc[i])
        if tem == 32:
            huffman_coding_result += huffman[0]
        elif tem >= 65 & tem <= 90:
            huffman_coding_result += huffman[tem - 64]
        elif tem >= 97 & tem <= 122:
            huffman_coding_result += huffman[tem - 96]

    print("The result of huffman coding: " + huffman_coding_result)
    return huffman_coding_result
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Copyright (c) 2019 - magic_kurakun <magickurakun@gmail.com>

"""Test_5/hamming_coding.py"""

import pandas as pd
import numpy as np
import docx


def hamming_coding(huffman_coding_result):
    code_length = len(huffman_coding_result)
    remainder = code_length % 4
    b = [0, 0, 0, 0]
    hamming_coding_list = pd.read_csv('hamming_coding.csv')
    hamming_coding_result = ""
    s = np.zeros(int((code_length - remainder) / 4 + 1), 7)

    if remainder != 0:
        for i in range(0, 3 - remainder):
            huffman_coding_result += '0'
    for i in range(0, int((code_length - remainder) / 4) - 1):
        b[0] = ord(huffman_coding_result[4 * i]) - 48
        b[1] = ord(huffman_coding_result[4 * i + 1]) - 48
        b[2] = ord(huffman_coding_result[4 * i + 2]) - 48
        b[3] = ord(huffman_coding_result[4 * i + 3]) - 48
        c = np.dot(b, hamming_coding_list)
        for j in range(0, 6):
            if c[j] == 0 | c[j] == 2:
                s[i, j] = 0
            if c[j] == 1 | c[j] == 3:
                s[i, j] = 1
    s[51, 2] -= 1
    s[34, 2] -= 1

    for i in range(0, int((code_length - remainder) / 4) - 1):
        for j in range(0, 3):
            hamming_coding_result += chr(s[i, j])
    for i in range(0, remainder - 1):
        hamming_coding_result += chr(s[int((code_length - remainder) / 4), i])

    print(hamming_coding_result)
    return s
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Copyright (c) 2019 - magic_kurakun <magickurakun@gmail.com>

"""Test_5/hamming_decoding.py"""

import pandas as pd
import numpy as np
import docx


def hamming_decoding(huffman_coding_result, s):
    code_length = len(huffman_coding_result)
    remainder = code_length % 4
    hamming_decoding_list = pd.read_csv('hamming_decoding.csv')
    d = np.zeros(int((code_length - remainder) / 4), 3)
    hamming_decoding_result = ""
    hamming_decoding_result_tem = ""

    for i in range(0, int((code_length - remainder) / 4) - 1):
        c = np.dot(s[i, :], hamming_decoding_list)
        for j in range(0, 2):
            if c[j] == 0 | c[j] == 2:
                d[i, j] = 0
            elif c[j] == 1 | c[j] == 3:
                d[i, j] = 1
        for j in range(0, 3):
            if d[i, 0] == hamming_decoding_list[j, 0] & d[i, 1] == hamming_decoding_list[j, 2] &\
                    d[i, 2] == hamming_decoding_list[j, 3]:
                s[i, j] -= 1
    for i in range(0, int((code_length - remainder) / 4) - 1):
        for j in range(0, 3):
            hamming_decoding_result_tem += chr(s[i, j])
    for i in range(0, remainder - 1):
        hamming_decoding_result_tem += chr(s[int((code_length - remainder) / 4), i])

    decode_length = len(hamming_decoding_result_tem)
    flag = 1
    break_while = 0
    while flag < decode_length - 3 + remainder:
        for i in range(0, 26):
            str_tem = hamming_decoding_result_tem[flag:flag + i - 1]
            for j in range(0, 26):
                if str_tem == huffman_coding_list[j, :]:
                    if j == 1:
                        hamming_decoding_result += chr(32)
                    else:
                        hamming_decoding_result += chr(j + 96)
                    flag += len(str_tem)
                    break_while = 1
                    break
            if break_while == 1:
                break

    print(hamming_decoding_result)

经过测试之后发现经过校验矩阵之后,传输信息错误率在 0.01 之下,根据传输量趋于 0,即是信道编码的意义。

最后

实验拿了满分,快乐。

发表评论