This repository has been archived by the owner on Aug 3, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 68
/
httpzlib.py
86 lines (74 loc) · 2.68 KB
/
httpzlib.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
#!/usr/bin/env python
# Copyright 2011 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Apply gzip/deflate to separate chunks of data."""
import struct
import zlib
GZIP_HEADER = (
'\037\213' # magic header
'\010' # compression method
'\000' # flags (none)
'\000\000\000\000' # packed time (use zero)
'\002'
'\377')
def compress_chunks(uncompressed_chunks, use_gzip):
"""Compress a list of data with gzip or deflate.
The returned chunks may be used with HTTP chunked encoding.
Args:
uncompressed_chunks: a list of strings
(e.g. ["this is the first chunk", "and the second"])
use_gzip: if True, compress with gzip. Otherwise, use deflate.
Returns:
[compressed_chunk_1, compressed_chunk_2, ...]
"""
if use_gzip:
size = 0
crc = zlib.crc32("") & 0xffffffffL
compressor = zlib.compressobj(
6, zlib.DEFLATED, -zlib.MAX_WBITS, zlib.DEF_MEM_LEVEL, 0)
else:
compressor = zlib.compressobj()
compressed_chunks = []
last_index = len(uncompressed_chunks) - 1
for index, data in enumerate(uncompressed_chunks):
chunk = ''
if use_gzip:
size += len(data)
crc = zlib.crc32(data, crc) & 0xffffffffL
if index == 0:
chunk += GZIP_HEADER
chunk += compressor.compress(data)
if index < last_index:
chunk += compressor.flush(zlib.Z_SYNC_FLUSH)
else:
chunk += (compressor.flush(zlib.Z_FULL_FLUSH) +
compressor.flush())
if use_gzip:
chunk += (struct.pack("<L", long(crc)) +
struct.pack("<L", long(size)))
compressed_chunks.append(chunk)
return compressed_chunks
def uncompress_chunks(compressed_chunks, use_gzip):
"""Uncompress a list of data compressed with gzip or deflate.
Args:
compressed_chunks: a list of compressed data
use_gzip: if True, uncompress with gzip. Otherwise, use deflate.
Returns:
[uncompressed_chunk_1, uncompressed_chunk_2, ...]
"""
if use_gzip:
decompress = zlib.decompressobj(16 + zlib.MAX_WBITS).decompress
else:
decompress = zlib.decompressobj(-zlib.MAX_WBITS).decompress
return [decompress(c) for c in compressed_chunks]