diff --git a/refinery/units/formats/pym.py b/refinery/units/formats/pym.py index 5c228623e1..b5dec0694a 100644 --- a/refinery/units/formats/pym.py +++ b/refinery/units/formats/pym.py @@ -2,7 +2,6 @@ # -*- coding: utf-8 -*- from refinery.units import Unit -import importlib import marshal @@ -16,6 +15,44 @@ class pym(Unit): refer to the official Python documentation for more details. """ + def reverse(self, data): + return marshal.dumps(data) + def process(self, data): - # https://stackoverflow.com/a/73454818 - return importlib._bootstrap_external._code_to_timestamp_pyc(marshal.loads(data)) + data = marshal.loads(data) + code = (lambda: 0).__code__.__class__ + + def toblob(data): + if isinstance(data, (bytes, bytearray)): + self.log_info(U'unmarshalled a byte string, returning as is') + return data + if isinstance(data, str): + self.log_info(F'unmarshalled a string object, encoding as {self.codec}') + return data.encode(self.codec) + if isinstance(data, code): + self.log_info(U'unmarshalled a code object, converting to pyc') + import importlib + return importlib._bootstrap_external._code_to_timestamp_pyc(data) + if isinstance(data, int): + self.log_info(U'unmarshalled an integer, returning big endian encoding') + q, r = divmod(data.bit_length(), 8) + q += int(bool(r)) + return data.to_bytes(q, 'big') + if isinstance(data, dict): + try: + import json + serialized = json.dumps(data, indent=4) + except Exception: + pass + else: + self.log_info(U'unmarshalled a serializable dictionary, returning JSON') + return serialized.encode(self.codec) + raise NotImplementedError( + F'No serialization implemented for object of type {data.__class__.__name__}') + + if isinstance(data, list): + self.log_info('object is a list, converting each item individually') + for item in data: + yield toblob(item) + else: + yield toblob(data) diff --git a/test/units/formats/test_pym.py b/test/units/formats/test_pym.py new file mode 100644 index 0000000000..9cfe4d022f --- /dev/null +++ b/test/units/formats/test_pym.py @@ -0,0 +1,57 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +import marshal +import json + +from .. import TestUnitBase + + +class TestPyMarshal(TestUnitBase): + + def test_integer(self): + unit = self.load() + data = 935532112 + test = marshal.dumps(data) | unit | bytes + self.assertEqual(int.from_bytes(test, 'big'), data) + + def test_string(self): + unit = self.load() + data = 'The binary refinery refines the finest binaries.' + test = marshal.dumps(data) | unit | str + self.assertEqual(test, data) + + def test_strings(self): + unit = self.load() + data = 'The binary refinery refines the finest binaries.'.split() + test = marshal.dumps(data) | unit | [str] + self.assertEqual(test, data) + + def test_json(self): + unit = self.load() + data = { + 'foo': None, + 'bar': [1, 12, 7], + 'baz': { + 'x': 'refined', + 'y': 'binaries', + } + } + test = marshal.dumps(data) | unit | json.loads + self.assertEqual(test, data) + + def test_bytes(self): + unit = self.load() + for k in (1, 2, 12, 200, 353444): + t = self.generate_random_buffer(k) + self.assertEqual(marshal.dumps(t) | unit | bytes, t) + + def test_code(self): + def test_function(): + print('refine your binaries!') + + from refinery.units.formats.pyc import pyc + from refinery.units.formats.pym import pym + + data = marshal.dumps(test_function.__code__) + test = data | pym | pyc | str + self.assertIn('refine your binaries!', test)