diff --git a/Projectfile b/Projectfile index d6ab52b4..e713eeae 100644 --- a/Projectfile +++ b/Projectfile @@ -57,6 +57,7 @@ python.add_requirements( 'cookiecutter >=1.5,<1.6', 'pytest-timeout >=1,<2', 'sphinx-sitemap >=0.2,<0.3', + 'fastavro>=0.22.9', ], docker=[ 'bonobo-docker ~=0.6.0a1', diff --git a/bonobo/_api.py b/bonobo/_api.py index 96310c7d..39b485ab 100644 --- a/bonobo/_api.py +++ b/bonobo/_api.py @@ -142,6 +142,8 @@ def open_fs(fs_url=None, *args, **kwargs): # standard transformations api.register_group( + AvroReader, + AvroWriter, CsvReader, CsvWriter, FileReader, diff --git a/bonobo/nodes/io/__init__.py b/bonobo/nodes/io/__init__.py index c375c917..48585ff0 100644 --- a/bonobo/nodes/io/__init__.py +++ b/bonobo/nodes/io/__init__.py @@ -4,6 +4,7 @@ from .file import FileReader, FileWriter from .json import JsonReader, JsonWriter, LdjsonReader, LdjsonWriter from .pickle import PickleReader, PickleWriter +from .avro import AvroReader, AvroWriter __all__ = [ "CsvReader", @@ -16,4 +17,6 @@ "LdjsonWriter", "PickleReader", "PickleWriter", + "AvroReader", + "AvroWriter", ] diff --git a/bonobo/nodes/io/avro.py b/bonobo/nodes/io/avro.py new file mode 100644 index 00000000..a2cb031c --- /dev/null +++ b/bonobo/nodes/io/avro.py @@ -0,0 +1,205 @@ +from datetime import datetime, date, timedelta, time +from decimal import Decimal + +from bonobo.config import Option, use_context +from bonobo.nodes.io.base import FileHandler +from bonobo.nodes.io.file import FileReader, FileWriter + +# avro support is optional and should'nt impact when lib is not installed +try: + import fastavro +except ImportError: + pass + +# Helpers + +def assure_same_len(props, values): + if len(props) != len(values): + m = "Values length differs from input fields length. Expected: {}. Got: {}. Values: {!r}." + f = m.format(len(props), len(values), values) + raise ValueError(f) + + +def build_schema_from_values(props, values): + # https://avro.apache.org/docs/current/spec.html#schema_primitive + assure_same_len(props, values) + schema_fields = [] + for p, v in zip(props, values): + if isinstance(v, int): + f = {'name': p, 'type': 'long'} + elif isinstance(v, bool): + f = {'name': p, 'type': 'boolean'} + elif isinstance(v, float): + f = {'name': p, 'type': 'double'} + elif isinstance(v, date): + f = {'name': p, 'type': 'int', 'logicalType': 'date'} + elif isinstance(v, timedelta) or isinstance(v, time): + f = {'name': p, 'type': 'long', 'logicalType': 'time-micros'} + elif isinstance(v, datetime): + f = {'name': p, 'type': 'long', 'logicalType': 'timestamp-micros'} + elif isinstance(v, Decimal): + f = {'name': p, 'type': 'double'} + elif isinstance(v, bytes): + f = {'name': p, 'type': 'bytes'} + else: + f = {'name': p, 'type': 'string'} + schema_fields.append(f) + return schema_fields + + +def get_same_value(value): + return value + + +def get_value_as_date(value): + diff = value - date(1970,1,1) + return diff.days + + +def get_value_as_datetime(value): + elapsed = value.timestamp() + return int(elapsed) + + +def get_value_as_timedelta(value): + elapsed = (value.days * 86400000) + (value.seconds * 1000) + value.microseconds + return elapsed + + +def get_value_as_time(value): + elapsed = (value.hour * 3600000) + (value.minute * 60000) + (value.second * 1000) + value.microsecond + return elapsed + + +def get_value_as_float(value): + return float(value) + + +def build_schema_from(props, values): + schema_fields = build_schema_from_values(props, values) + schema = { + 'type': 'record', + 'name': 'output', + 'namespace': "avro", + 'doc': "generated by bonobo", + 'fields': schema_fields, + } + return schema + + +def build_converters_from_values(values): + converters = [] + for v in values: + if isinstance(v, datetime): + f = get_value_as_datetime + elif isinstance(v, time): + f = get_value_as_time + elif isinstance(v, timedelta): + f = get_value_as_timedelta + elif isinstance(v, date): + f = get_value_as_date + elif isinstance(v, Decimal): + f = get_value_as_float + else: + f = get_same_value + converters.append(f) + return converters + + +# Exported classes + + +class AvroHandler(FileHandler): + schema = Option(tuple, required=False, + __doc__="A dict specifying the schema fields acording avro spec.\ + ```\ + schema = {\ + 'doc': 'A weather reading.',\ + 'name': 'Weather',\ + 'namespace': 'test',\ + 'type': 'record',\ + 'fields': [\ + {'name': 'station', 'type': 'string'},\ + {'name': 'date', 'type': 'int', 'logicalType': 'date'},\ + {'name': 'time', 'type': 'long', 'logicalType': 'time-micros'},\ + {'name': 'temp', 'type': 'int'},\ + ],\ + }\ + ```\ + See: https://avro.apache.org/docs/current/spec.html#schema_primitive\ + https://avro.apache.org/docs/current/spec.html#Logical+Types") + + codec = Option(str, required=False, default="null", + __doc__="The name of the compression codec used to compress blocks.\ + Compression codec can be ‘null’, ‘deflate’ or ‘snappy’ (if installed)") + + +@use_context +class AvroReader(FileReader, AvroHandler): + """ + Reads the records from a avro file and yields the values in dicts. + """ + mode = Option(str, default="rb") + + def read(self, file, context, *, fs): + avro_reader = fastavro.reader(file) + if not context.output_type: + file_schema = avro_reader.writer_schema + schema_fields = file_schema['fields'] + field_names = [col['name'] for col in schema_fields] + col_names = tuple(field_names) + context.set_output_fields(col_names) + self.schema = file_schema + self.codec = avro_reader.codec + + for record in avro_reader: + row = tuple(record.values()) + yield row + + __call__ = read + + +@use_context +class AvroWriter(FileWriter, AvroHandler): + """ + Writes the values as records into a avro file according to the fields defined in schema + + When the schema is not specified, it tries to guess types from the values + of the fields present in the first record. + The type of values written follow the ones of python type system. Take + care when writing data extracted from sql databases as their types are + usually affected by factors like driver issues, type mismatch, incorrect + mapping, precision loss (specially float and decimals), SQLArchitect... + """ + compression_level = Option(int, required=False, + __doc__="Compression level to use when the specified codec supports it") + + mode = Option(str, default="wb+") + + def write(self, file, context, *values, fs): + """ + Write a record to the opened file using the defined schema + """ + context.setdefault("schema", None) + context.setdefault("converters", None) + + props = context.get_input_fields() + if not context.schema: + detected = build_schema_from(props, values) + parsed = fastavro.parse_schema(detected) + context.schema = parsed + if not context.converters: + context.converters = build_converters_from_values(values) + row = {k: conv(v) for k, v, conv in zip(props, values, context.converters)} + one_record = [row] + fastavro.writer( + fo=file, + schema=context.schema, + records=one_record, + codec=self.codec, + codec_compression_level=self.compression_level + ) + + __call__ = write + +# end of file # diff --git a/requirements-dev.txt b/requirements-dev.txt index 45fa213a..a892790a 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -12,6 +12,7 @@ click==7.0 cookiecutter==1.5.1 coverage==4.5.3 docutils==0.14 +fastavro>=0.22.9 future==0.17.1 idna==2.8 imagesize==1.1.0 diff --git a/tests/nodes/io/test_avro.py b/tests/nodes/io/test_avro.py new file mode 100644 index 00000000..8f721c58 --- /dev/null +++ b/tests/nodes/io/test_avro.py @@ -0,0 +1,105 @@ +import pytest +from datetime import datetime, date, timedelta + +from bonobo import AvroReader, AvroWriter +from bonobo.constants import EMPTY +from bonobo.execution.contexts.node import NodeExecutionContext +from bonobo.util.testing import BufferingNodeExecutionContext, FilesystemTester + +avro_tester = FilesystemTester("avro", mode="wb") + + +def is_fastavro_missing(): + try: + import fastavro + return False + except ImportError: + return True + + +def test_write_records_to_avro_file(tmpdir): + if is_fastavro_missing(): + return + fs, filename, services = avro_tester.get_services_for_writer(tmpdir) + writav = AvroWriter( + filename, + codec = 'deflate', + compression_level = 6 + ) + john = ("john", 7, 0.7, False, date(2012,10,11), datetime(2018,9,14,15,16,17)) + jane = ("jane", 17, 1.7, True, date(2002,11,12), datetime(2015,12,13,14,15,16)) + jack = ("jack", 27, 2.7, True, date(1992,12,13), datetime(2010,11,12,13,14,15)) + joel = ("joel", 37, 3.7, True, date(1982,12,25), datetime(2009,10,11,12,13,14)) + with NodeExecutionContext(writav, services=services) as context: + context.set_input_fields(["name", "age", "score", "smart", "birthday", "registered"]) + context.write_sync(john, jane, jack, joel) + + +def test_write_with_schema_to_avro_file(tmpdir): + if is_fastavro_missing(): + return + fs, filename, services = avro_tester.get_services_for_writer(tmpdir) + custom_schema = { + 'doc': 'Some random people.', + 'name': 'Crowd', + 'namespace': 'test', + 'type': 'record', + 'fields': [ + {'name': 'pete', 'type': 'string'}, + {'name': 'age', 'type': 'int'}, + {'name': 'birthday', 'type': 'int', 'logicalType': 'date'}, + {'name': 'registered', 'type': 'long', 'logicalType': 'timestamp-micros'}, + ], + } + writav = AvroWriter( + filename, + schema = custom_schema, + codec = 'deflate', + compression_level = 6 + ) + pete = ("pete", 7, date(2012,10,11), datetime(2018,9,14,15,16,17)) + mike = ("mike", 17, date(2002,11,12), datetime(2015,12,13,14,15,16)) + zack = ("zack", 27, date(1992,12,13), datetime(2010,11,12,13,14,15)) + gene = ("gene", 37, date(1982,12,25), datetime(2009,10,11,12,13,14)) + with NodeExecutionContext(writav, services=services) as context: + context.set_input_fields(["name", "age", "birthday", "registered"]) + context.write_sync(pete, mike, zack, gene) + + +def create_avro_example(path): + import fastavro + schema = { + 'doc': 'A weather reading.', + 'name': 'Weather', + 'namespace': 'test', + 'type': 'record', + 'fields': [ + {'name': 'station', 'type': 'string'}, + {'name': 'time', 'type': 'long'}, + {'name': 'temp', 'type': 'int'}, + ], + } + parsed_schema = fastavro.parse_schema(schema) + records = [ + {u'station': u'cold', u'temp': 0, u'time': 1433269388}, + {u'station': u'warm', u'temp': 22, u'time': 1433270389}, + {u'station': u'frozen', u'temp': -11, u'time': 1433273379}, + {u'station': u'hot', u'temp': 111, u'time': 1433275478}, + ] + with open(path, 'wb') as out: + fastavro.writer(out, parsed_schema, records) + + +def test_read_records_from_avro_file(tmpdir): + if is_fastavro_missing(): + return + dst = tmpdir.strpath + '/output.avro' + create_avro_example(dst) + fs, filename, services = avro_tester.get_services_for_writer(tmpdir) + readav = AvroReader(filename) + with BufferingNodeExecutionContext(readav, services=services) as context: + context.write_sync(EMPTY) + props = context.get_output_fields() + assert props == ("station", "time", "temp") + +# end of test file