代码清除
让我们尝试利用Python标准库提供的算法对代码进行一些重构,同时保持霍夫曼树计算和图像编码方法的精髓。
计算符号计数
首先,我们可以将符号计数重构为一个函数,并以更简洁的方式将其重写:
此外,我们可以更改它以返回(symbol,count)
的列表,列表按(count,symbol)
升序排列。为此,我们可以将其与您的sortFreq(...)
函数的重写版本结合使用,
实施:
from collections import Counter
from itertools import chain
def count_symbols(image):
pixels = image.getdata()
values = chain.from_iterable(pixels)
counts = Counter(values).items()
return sorted(counts,key=lambda x:x[::-1])
建造树
这里只需要进行很小的更改-由于我们已经对符号计数进行了排序,因此我们只需要反转元组即可使您现有的树构建算法起作用。我们可以结合使用list comprehension和元组切片来简洁地表达这一点。
实施:
def build_tree(counts) :
nodes = [entry[::-1] for entry in counts] # Reverse each (symbol,count) tuple
while len(nodes) > 1 :
leastTwo = tuple(nodes[0:2]) # get the 2 to combine
theRest = nodes[2:] # all the others
combFreq = leastTwo[0][0] + leastTwo[1][0] # the branch points freq
nodes = theRest + [(combFreq,leastTwo)] # add branch point to the end
nodes.sort() # sort it into place
return nodes[0] # Return the single tree inside the list
修剪树
同样,与原始实现相比,只有两个小变化:
- 更改测试以检查
tuple
(节点),以独立于符号的表示方式。
- 摆脱不必要的
else
实施:
def trim_tree(tree) :
p = tree[1] # Ignore freq count in [0]
if type(p) is tuple: # Node,trim left then right and recombine
return (trim_tree(p[0]),trim_tree(p[1]))
return p # Leaf,just return it
分配代码
这里最重要的更改是消除对全局codes
变量的依赖。要解决该问题,我们可以将实现分为两个函数,一个处理递归代码分配,一个包装器创建一个新的本地codes
字典,在其上分派递归函数,然后返回输出。>
我们还将代码的表示形式从字符串切换为位列表([0,1]
范围内的整数),其作用将在稍后显现。
再一次,我们将更改测试以检查tuple
(与修整时的原因相同)。
实施:
def assign_codes_impl(codes,node,pat):
if type(node) == tuple:
assign_codes_impl(codes,node[0],pat + [0]) # Branch point. Do the left branch
assign_codes_impl(codes,node[1],pat + [1]) # then do the right branch.
else:
codes[node] = pat # A leaf. set its code
def assign_codes(tree):
codes = {}
assign_codes_impl(codes,tree,[])
return codes
编码
让我们绕道走一下,谈谈数据编码。
首先,让我们观察一下,原始RGB像素由3个字节表示(每个颜色通道一个。每个像素24位,构成了我们的基线。
现在,您当前的算法将第一个像素编码为以下ASCII字符串:
['000','0010','0011']
总共23个字节(或184位)。那比未加工的要差得多。让我们检查一下原因:
- 有两个空格,这使人们更容易理解。这些没有信息。 (2个字节)
三个编码中的每一个都由两个撇号定界。由于代码仅由0
和1
组成,因此省略号对于解析来说是不必要的,因此也不携带任何信息。 (6个字节)
- 每个代码都是prefix code,因此可以明确地解析它们,因此也不需要用于代码分离的两个逗号。 (2个字节)
- 我们知道每个像素有三个代码,因此我们不需要大括号(
[
,]
)来分隔像素(与上述原因相同)。 (2个字节)
总共,每个像素12个字节,根本不包含任何信息。剩下的11个字节(在这种情况下)确实包含一些信息...但是多少?
请注意,输出字母中仅有的两个可能的符号是0
和1
。这意味着每个符号携带1位信息。由于将每个符号存储为ASCII字符(一个字节),因此每1位信息使用8位。
在这种特殊情况下,您使用184位表示11位信息-比所需的多了约16.7倍,比仅以原始格式存储像素差了约7.67倍。
显然,使用朴素的文本表示形式的编码数据不会产生任何压缩。我们将需要更好的方法。
比特流
从我们先前的分析来看,很明显,为了有效地执行压缩(和解压缩),我们需要能够将输出(或输入)视为单个位的流。标准的Python库没有提供直接的解决方案,以最低的粒度,我们一次只能读取或写入一个字节的文件。
由于我们要对可能包含多个位的值进行编码,因此必须根据重要性对它们的排序方式进行解码。让我们按从最高到最低的顺序对其进行排序。
位I / O实用程序
如前所述,我们将比特序列表示为[0,1]
范围内的整数列表。让我们从编写一些简单的实用程序功能开始:
- 将整数转换为唯一表示该整数的最短位序列的函数(即至少1位,否则没有前导零)。
- 将位序列转换为整数的函数。
- 一种将位序列零扩展(在最高有效位置添加零)的功能(以允许定长编码)。
实施:
def to_binary_list(n):
"""Convert integer into a list of bits"""
return [n] if (n <= 1) else to_binary_list(n >> 1) + [n & 1]
def from_binary_list(bits):
"""Convert list of bits into an integer"""
result = 0
for bit in bits:
result = (result << 1) | bit
return result
def pad_bits(bits,n):
"""Prefix list of bits with enough zeros to reach n digits"""
assert(n >= len(bits))
return ([0] * (n - len(bits)) + bits)
用法示例:
>>> to_binary_list(14)
[1,1,0]
>>> from_binary_list([1,0])
14
>>> pad_bits(to_binary_list(14),8)
[0,0]
输出比特流
由于文件I / O API只允许保存整个字节,因此我们需要创建一个包装器类,该类将缓冲写入流中的位到内存中。
让我们提供一种方法来写入单个位以及一系列位。
每个写命令(1位或更多位)将首先将这些位添加到缓冲区中。一旦缓冲区包含8位以上,就会从前端删除8位组,将其转换为[0-255]范围内的整数,并保存到输出文件中。直到缓冲区包含少于8位为止。
最后,让我们提供一种“刷新”流的方法-当缓冲区为非空但没有足够的位构成整个字节时,将0加到最低有效位置,直到有8位为止,然后写入字节。当我们关闭比特流时,我们需要这个(还有其他好处,我们将在后面看到)。
实施:
class OutputBitStream(object):
def __init__(self,file_name):
self.file_name = file_name
self.file = open(self.file_name,'wb')
self.bytes_written = 0
self.buffer = []
def write_bit(self,value):
self.write_bits([value])
def write_bits(self,values):
self.buffer += values
while len(self.buffer) >= 8:
self._save_byte()
def flush(self):
if len(self.buffer) > 0: # Add trailing zeros to complete a byte and write it
self.buffer += [0] * (8 - len(self.buffer))
self._save_byte()
assert(len(self.buffer) == 0)
def _save_byte(self):
bits = self.buffer[:8]
self.buffer[:] = self.buffer[8:]
byte_value = from_binary_list(bits)
self.file.write(bytes([byte_value]))
self.bytes_written += 1
def close(self):
self.flush()
self.file.close()
输入比特流
输入比特流遵循类似的主题。我们希望一次读取1个或更多位。为此,我们从文件中加载字节,将每个字节转换为位列表,然后将其添加到缓冲区中,直到有足够的空间满足读取请求为止。
在这种情况下,flush命令清除缓冲区(确保缓冲区仅包含零)。
实施:
class InputBitStream(object):
def __init__(self,'rb')
self.bytes_read = 0
self.buffer = []
def read_bit(self):
return self.read_bits(1)[0]
def read_bits(self,count):
while len(self.buffer) < count:
self._load_byte()
result = self.buffer[:count]
self.buffer[:] = self.buffer[count:]
return result
def flush(self):
assert(not any(self.buffer))
self.buffer[:] = []
def _load_byte(self):
value = ord(self.file.read(1))
self.buffer += pad_bits(to_binary_list(value),8)
self.bytes_read += 1
def close(self):
self.file.close()
压缩格式
接下来,我们需要定义压缩比特流的格式。解码图像需要三个基本信息块:
- 图像的形状(高度和宽度),并假设它是3通道RGB图像。
- 在解码端重建霍夫曼码所需的信息
- 霍夫曼编码的像素数据
让我们将压缩格式制作如下:
- 标题
- 图像高度(16位,无符号)
- 图像宽度(16位,无符号)
- 霍夫曼表(开始与整个字节对齐)
- 像素代码(开始与整个字节对齐)
-
width * height * 3
霍夫曼编码顺序
压缩
实施:
from PIL import Image
def compressed_size(counts,codes):
header_size = 2 * 16 # height and width as 16 bit values
tree_size = len(counts) * (1 + 8) # Leafs: 1 bit flag,8 bit symbol each
tree_size += len(counts) - 1 # Nodes: 1 bit flag each
if tree_size % 8 > 0: # Padding to next full byte
tree_size += 8 - (tree_size % 8)
# Sum for each symbol of count * code length
pixels_size = sum([count * len(codes[symbol]) for symbol,count in counts])
if pixels_size % 8 > 0: # Padding to next full byte
pixels_size += 8 - (pixels_size % 8)
return (header_size + tree_size + pixels_size) / 8
def encode_header(image,bitstream):
height_bits = pad_bits(to_binary_list(image.height),16)
bitstream.write_bits(height_bits)
width_bits = pad_bits(to_binary_list(image.width),16)
bitstream.write_bits(width_bits)
def encode_tree(tree,bitstream):
if type(tree) == tuple: # Note - write 0 and encode children
bitstream.write_bit(0)
encode_tree(tree[0],bitstream)
encode_tree(tree[1],bitstream)
else: # Leaf - write 1,followed by 8 bit symbol
bitstream.write_bit(1)
symbol_bits = pad_bits(to_binary_list(tree),8)
bitstream.write_bits(symbol_bits)
def encode_pixels(image,codes,bitstream):
for pixel in image.getdata():
for value in pixel:
bitstream.write_bits(codes[value])
def compress_image(in_file_name,out_file_name):
print('Compressing "%s" -> "%s"' % (in_file_name,out_file_name))
image = Image.open(in_file_name)
print('Image shape: (height=%d,width=%d)' % (image.height,image.width))
size_raw = raw_size(image.height,image.width)
print('RAW image size: %d bytes' % size_raw)
counts = count_symbols(image)
print('Counts: %s' % counts)
tree = build_tree(counts)
print('Tree: %s' % str(tree))
trimmed_tree = trim_tree(tree)
print('Trimmed tree: %s' % str(trimmed_tree))
codes = assign_codes(trimmed_tree)
print('Codes: %s' % codes)
size_estimate = compressed_size(counts,codes)
print('Estimated size: %d bytes' % size_estimate)
print('Writing...')
stream = OutputBitStream(out_file_name)
print('* Header offset: %d' % stream.bytes_written)
encode_header(image,stream)
stream.flush() # Ensure next chunk is byte-aligned
print('* Tree offset: %d' % stream.bytes_written)
encode_tree(trimmed_tree,stream)
stream.flush() # Ensure next chunk is byte-aligned
print('* Pixel offset: %d' % stream.bytes_written)
encode_pixels(image,stream)
stream.close()
size_real = stream.bytes_written
print('Wrote %d bytes.' % size_real)
print('Estimate is %scorrect.' % ('' if size_estimate == size_real else 'in'))
print('Compression ratio: %0.2f' % (float(size_raw) / size_real))
减压
实施:
from PIL import Image
def decode_header(bitstream):
height = from_binary_list(bitstream.read_bits(16))
width = from_binary_list(bitstream.read_bits(16))
return (height,width)
# https://stackoverflow.com/a/759766/3962537
def decode_tree(bitstream):
flag = bitstream.read_bits(1)[0]
if flag == 1: # Leaf,read and return symbol
return from_binary_list(bitstream.read_bits(8))
left = decode_tree(bitstream)
right = decode_tree(bitstream)
return (left,right)
def decode_value(tree,bitstream):
bit = bitstream.read_bits(1)[0]
node = tree[bit]
if type(node) == tuple:
return decode_value(node,bitstream)
return node
def decode_pixels(height,width,bitstream):
pixels = bytearray()
for i in range(height * width * 3):
pixels.append(decode_value(tree,bitstream))
return Image.frombytes('RGB',(width,height),bytes(pixels))
def decompress_image(in_file_name,out_file_name):
print('Decompressing "%s" -> "%s"' % (in_file_name,out_file_name))
print('Reading...')
stream = InputBitStream(in_file_name)
print('* Header offset: %d' % stream.bytes_read)
height,width = decode_header(stream)
stream.flush() # Ensure next chunk is byte-aligned
print('* Tree offset: %d' % stream.bytes_read)
trimmed_tree = decode_tree(stream)
stream.flush() # Ensure next chunk is byte-aligned
print('* Pixel offset: %d' % stream.bytes_read)
image = decode_pixels(height,trimmed_tree,stream)
stream.close()
print('Read %d bytes.' % stream.bytes_read)
print('Image size: (height=%d,width=%d)' % (height,width))
print('Trimmed tree: %s' % str(trimmed_tree))
image.save(out_file_name)
测试运行
from PIL import ImageChops
def raw_size(width,height):
header_size = 2 * 16 # height and width as 16 bit values
pixels_size = 3 * 8 * width * height # 3 channels,8 bits per channel
return (header_size + pixels_size) / 8
def images_equal(file_name_a,file_name_b):
image_a = Image.open(file_name_a)
image_b = Image.open(file_name_b)
diff = ImageChops.difference(image_a,image_b)
return diff.getbbox() is None
if __name__ == '__main__':
start = time.time()
compress_image('flag.png','answer.txt')
print('-' * 40)
decompress_image('answer.txt','flag_out.png')
stop = time.time()
times = (stop - start) * 1000
print('-' * 40)
print('Run time takes %d miliseconds' % times)
print('Images equal = %s' % images_equal('flag.png','flag_out.png'))
我使用您提供的示例图像运行了脚本。
控制台输出:
Compressing "flag.png" -> "answer.txt"
Image shape: (height=18,width=23)
RAW image size: 1246 bytes
Counts: [(24,90),(131,(215,(59,324),(60,(110,324)]
Tree: (1242,((594,((270,((90,215),(180,24),(90,131))))),(324,59))),(648,((324,60),110)))))
Trimmed tree: (((215,(24,131)),59),110))
Codes: {215: [0,0],24: [0,131: [0,1],59: [0,60: [1,110: [1,1]}
Estimated size: 379 bytes
Writing...
* Header offset: 0
* Tree offset: 4
* Pixel offset: 12
Wrote 379 bytes.
Estimate is correct.
Compression ratio: 3.29
----------------------------------------
Decompressing "answer.txt" -> "flag_out.png"
Reading...
* Header offset: 0
* Tree offset: 4
* Pixel offset: 12
Read 379 bytes.
Image size: (height=18,width=23)
Trimmed tree: (((215,110))
----------------------------------------
Run time takes 32 miliseconds
Images equal = True
潜在的改进
- 每个颜色通道的霍夫曼表
- 调色板图像支持
- 变换过滤器(每个通道的增量编码,或更复杂的预测器)
- 处理重复的模型(RLE,LZ ...)
- 霍夫曼表