📜  霍夫曼编码算法(1)

📅  最后修改于: 2023-12-03 15:12:50.900000             🧑  作者: Mango

霍夫曼编码算法

霍夫曼编码算法是一种数据压缩算法,在通信与存储应用中广泛使用,能够将数据压缩至原始数据的几分之一。

原理

霍夫曼编码算法的原理是通过建立霍夫曼树,将出现频率较高的字符使用较短的编码,出现频率较低的字符使用较长的编码,从而实现数据的压缩。

具体过程如下:

  1. 统计原始数据中每个字符出现的频率,并将每个字符与其对应的频率构建成一个节点;
  2. 选取出现频率最低的两个节点作为新的叶节点,将它们的权值相加作为父节点的权值,并将它们作为父节点的两个子节点;
  3. 将新生成的节点重新加入节点集合中,并从中选择出现频率最低的两个节点,重复上述过程,直至最后只剩下一个根节点;
  4. 根据构建出来的霍夫曼树,给霍夫曼树中的每个字符分配编码,即出现频率高的字符编码短,出现频率低的字符编码长,且不存在一个字符的编码是另一个字符的编码前缀;
  5. 使用分配好的编码对原始数据进行压缩。
实现

以下是 Python 代码片段,实现了霍夫曼编码的压缩和解压缩功能。

压缩
import heapq
import os


class HuffmanCoding:
    def __init__(self, file_path):
        self.file_path = file_path
        self.heap = []
        self.codes = {}
        self.rev_codes = {}

    class HeapNode:
        def __init__(self, freq, char=None):
            self.freq = freq
            self.char = char
            self.left = None
            self.right = None

        def __lt__(self, other):
            return self.freq < other.freq

    def make_freq_dict(self, text):
        freq_dict = {}
        for char in text:
            freq_dict[char] = freq_dict.get(char, 0) + 1
        return freq_dict

    def create_heap(self, freq_dict):
        for char in freq_dict:
            node = self.HeapNode(freq_dict[char], char)
            heapq.heappush(self.heap, node)

    def merge_nodes(self):
        while len(self.heap) > 1:
            node1 = heapq.heappop(self.heap)
            node2 = heapq.heappop(self.heap)

            merged_node = self.HeapNode(node1.freq + node2.freq)
            merged_node.left = node1
            merged_node.right = node2

            heapq.heappush(self.heap, merged_node)

    def make_codes_helper(self, root, current_code):
        if root is None:
            return

        if root.char is not None:
            self.codes[root.char] = current_code
            self.rev_codes[current_code] = root.char
            return

        self.make_codes_helper(root.left, current_code + "0")
        self.make_codes_helper(root.right, current_code + "1")

    def make_codes(self):
        root = heapq.heappop(self.heap)
        current_code = ""
        self.make_codes_helper(root, current_code)

    def get_encoded_text(self, text):
        encoded_text = ""
        for char in text:
            encoded_text += self.codes[char]
        return encoded_text

    def pad_encoded_text(self, encoded_text):
        padding_len = 8 - len(encoded_text) % 8
        for i in range(padding_len):
            encoded_text += "0"
        padding_info = "{0:08b}".format(padding_len)
        encoded_text = padding_info + encoded_text
        return encoded_text

    def get_byte_array(self, padded_encoded_text):
        byte_array = bytearray()
        for i in range(0, len(padded_encoded_text), 8):
            byte = padded_encoded_text[i:i + 8]
            byte_array.append(int(byte, 2))
        return byte_array

    def compress(self):
        filename, file_extension = os.path.splitext(self.file_path)
        output_file_path = filename + ".bin"

        with open(self.file_path, "r") as file, open(output_file_path, "wb") as output:
            text = file.read()
            freq_dict = self.make_freq_dict(text)
            self.create_heap(freq_dict)
            self.merge_nodes()
            self.make_codes()

            encoded_text = self.get_encoded_text(text)
            padded_encoded_text = self.pad_encoded_text(encoded_text)

            byte_array = self.get_byte_array(padded_encoded_text)
            output.write(bytes(byte_array))

        print("Compressed!")
        return output_file_path
解压缩
class HuffmanCoding:
    # ...

    def decode_text(self, encoded_text):
        current_code = ""
        decoded_text = ""

        for bit in encoded_text:
            current_code += bit
            if current_code in self.rev_codes:
                char = self.rev_codes[current_code]
                decoded_text += char
                current_code = ""

        return decoded_text

    def unpad_encoded_text(self, padded_encoded_text):
        padding_info = padded_encoded_text[:8]
        padding_len = int(padding_info, 2)
        padded_encoded_text = padded_encoded_text[8:]
        unpadded_encoded_text = padded_encoded_text[:-1 * padding_len]
        return unpadded_encoded_text

    def decompress(self, input_file_path):
        filename, file_extension = os.path.splitext(input_file_path)
        output_file_path = filename + "_decompressed" + ".txt"

        with open(input_file_path, "rb") as file, open(output_file_path, "w") as output:
            bit_string = ""
            byte = file.read(1)
            while byte:
                byte = ord(byte)
                bits = bin(byte)[2:].rjust(8, '0')
                bit_string += bits
                byte = file.read(1)

            unpadded_encoded_text = self.unpad_encoded_text(bit_string)
            decoded_text = self.decode_text(unpadded_encoded_text)

            output.write(decoded_text)

        print("Decompressed!")
        return output_file_path
性能

霍夫曼编码算法的压缩率与数据出现的频率分布有关,对于出现频率较高的字符,霍夫曼编码能够显著地压缩数据,而对于出现频率很低的字符,霍夫曼编码所得的编码则可能会比原始编码更长。

在通常情况下,使用霍夫曼编码算法可以将数据压缩至原始数据的约 $\frac{1}{3}$。