forked from snowflakedb/snowflake-connector-python
-
Notifications
You must be signed in to change notification settings - Fork 0
/
gzip_decoder.py
110 lines (92 loc) · 2.92 KB
/
gzip_decoder.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright (c) 2012-2019 Snowflake Computing Inc. All right reserved.
#
import io
import subprocess
import zlib
from logging import getLogger
CHUNK_SIZE = 16384
MAGIC_NUMBER = 16 # magic number from requests/packages/urllib3/response.py
logger = getLogger(__name__)
def decompress_raw_data(raw_data_fd, add_bracket=True):
"""
Decompresses raw data from file like object and return
a byte array
"""
obj = zlib.decompressobj(MAGIC_NUMBER + zlib.MAX_WBITS)
writer = io.BytesIO()
if add_bracket:
writer.write(b'[')
d = raw_data_fd.read(CHUNK_SIZE)
while d:
writer.write(obj.decompress(d))
while obj.unused_data != b'':
unused_data = obj.unused_data
obj = zlib.decompressobj(MAGIC_NUMBER + zlib.MAX_WBITS)
writer.write(obj.decompress(unused_data))
d = raw_data_fd.read(CHUNK_SIZE)
writer.write(obj.flush())
if add_bracket:
writer.write(b']')
return writer.getvalue()
def decompress_raw_data_by_zcat(raw_data_fd, add_bracket=True):
"""
Experiment: Decompresses raw data from file like object and return
a byte array
"""
writer = io.BytesIO()
if add_bracket:
writer.write(b'[')
p = subprocess.Popen(["zcat"],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE)
writer.write(p.communicate(input=raw_data_fd.read())[0])
if add_bracket:
writer.write(b']')
return writer.getvalue()
class IterStreamer(object):
"""
File-like streaming iterator.
"""
def __init__(self, generator):
self.generator = generator
self.iterator = iter(generator)
self.leftover = ''
def __len__(self):
return self.generator.__len__()
def __iter__(self):
return self.iterator
def next(self):
return next(self.iterator)
def read(self, size):
data = self.leftover
count = len(self.leftover)
try:
while count < size:
chunk = next(self)
data += chunk
count += len(chunk)
except StopIteration:
self.leftover = ''
return data
if count > size:
self.leftover = data[size:]
return data[:size]
def decompress_raw_data_to_unicode_stream(raw_data_fd):
"""
Decompresses a raw data in file like object and yields
a Unicode string.
"""
obj = zlib.decompressobj(MAGIC_NUMBER + zlib.MAX_WBITS)
yield u'['
d = raw_data_fd.read(CHUNK_SIZE)
while d:
yield obj.decompress(d).decode(u'utf-8')
while obj.unused_data != b'':
unused_data = obj.unused_data
obj = zlib.decompressobj(MAGIC_NUMBER + zlib.MAX_WBITS)
yield obj.decompress(unused_data).decode(u'utf-8')
d = raw_data_fd.read(CHUNK_SIZE)
yield obj.flush().decode(u'utf-8') + u']'