diff --git a/refinery/units/compression/decompress.py b/refinery/units/compression/decompress.py index 43a0913da8..74e2529811 100644 --- a/refinery/units/compression/decompress.py +++ b/refinery/units/compression/decompress.py @@ -118,7 +118,7 @@ def decompress(engine: Unit, cutoff: int = 0, prefix: Optional[int] = None): if engine.handles(ingest) is False: return Decompression(engine, None, cutoff, prefix) try: - result = engine.process(ingest) + result = next(engine.act(ingest)) except RefineryPartialResult as pr: result = pr.partial except Exception: diff --git a/refinery/units/compression/zl.py b/refinery/units/compression/zl.py index 1588a89cca..753de32f47 100644 --- a/refinery/units/compression/zl.py +++ b/refinery/units/compression/zl.py @@ -1,8 +1,10 @@ #!/usr/bin/env python3 # -*- coding: utf-8 -*- import zlib +import itertools -from refinery.units import Arg, Unit +from refinery.units import Arg, Unit, RefineryPartialResult +from refinery.lib.tools import exception_to_string class zl(Unit): @@ -14,40 +16,64 @@ def __init__( self, level : Arg.Number('-l', bound=(0, 0X9), help='Specify a compression level between 0 and 9.') = 9, window : Arg.Number('-w', bound=(8, 0XF), help='Manually specify the window size between 8 and 15.') = 15, - force : Arg.Switch('-f', help='Decompress as far as possible, even if all known methods fail.') = False, zlib_header: Arg.Switch('-z', group='MODE', help='Use a ZLIB header.') = False, gzip_header: Arg.Switch('-g', group='MODE', help='Use a GZIP header.') = False ): if zlib_header and gzip_header: raise ValueError('You can only specify one header type (ZLIB or GZIP).') - return super().__init__(level=level, window=window, force=force, zlib_header=zlib_header, gzip_header=gzip_header) + return super().__init__(level=level, window=window, zlib_header=zlib_header, gzip_header=gzip_header) - def _force_decompress(self, data, mode): - z = zlib.decompressobj(mode) - - def as_many_as_possible(): - for k in range(len(data)): - try: yield z.decompress(data[k : k + 1]) - except zlib.error: break - - return B''.join(as_many_as_possible()) + def _decompress_data(self, data, mode: int, step: int): + zl = zlib.decompressobj(mode) + memory = memoryview(data) + result = bytearray() + while not zl.eof: + read = min(step, len(memory)) + try: + chunk = zl.decompress(memory[:read]) + except zlib.error as e: + raise RefineryPartialResult(exception_to_string(e), result) from e + else: + result.extend(chunk) + consumed = read - len(zl.unused_data) + if not memory or consumed == 0: + break + memory = memory[consumed:] + return result, memory def process(self, data): if data[0] == 0x78 or data[0:2] == B'\x1F\x8B' or self.args.zlib_header or self.args.gzip_header: - mode_candidates = [self.args.window | 0x20, -self.args.window] + modes = [self.args.window | 0x20, -self.args.window] else: - mode_candidates = [-self.args.window, self.args.window | 0x20] - mode_candidates.extend([0x10 | self.args.window, 0]) - for mode in mode_candidates: - self.log_debug(F'using mode {mode} for decompression') - try: - z = zlib.decompressobj(mode) - return z.decompress(data) - except zlib.error: - pass - if self.args.force: - return self._force_decompress(data, mode_candidates[0]) - raise ValueError('could not detect any zlib stream.') + modes = [-self.args.window, self.args.window | 0x20] + modes.extend([0x10 | self.args.window, 0]) + view = memoryview(data) + step = 32 if self.args.lenient else len(data) + for k in itertools.count(1): + error = None + rest = view + for mode in modes: + try: + out, rest = self._decompress_data(view, mode, step) + except Exception as e: + error = error or e + else: + self.log_info(F'used mode {mode} to decompress chunk {k}') + yield out + error = None + break + if error: + raise error + if not rest: + break + if len(rest) == len(view): + break + if len(rest) > len(view): + raise RuntimeError('Decompressor returned more tail data than input data.') + yield out + view = rest + if k <= 0: + raise ValueError('Could not detect any zlib stream.') def reverse(self, data): mode = -self.args.window