From db48ce882f05ab8e73c7c018e51790a394fce227 Mon Sep 17 00:00:00 2001 From: Juarez Rudsatz Date: Fri, 3 Jan 2020 19:35:46 -0300 Subject: [PATCH 01/13] add AvroReader, AvroWriter --- bonobo/_api.py | 2 + bonobo/nodes/io/__init__.py | 3 ++ bonobo/nodes/io/avro.py | 86 +++++++++++++++++++++++++++++++++++++ 3 files changed, 91 insertions(+) create mode 100644 bonobo/nodes/io/avro.py diff --git a/bonobo/_api.py b/bonobo/_api.py index 96310c7d..39b485ab 100644 --- a/bonobo/_api.py +++ b/bonobo/_api.py @@ -142,6 +142,8 @@ def open_fs(fs_url=None, *args, **kwargs): # standard transformations api.register_group( + AvroReader, + AvroWriter, CsvReader, CsvWriter, FileReader, diff --git a/bonobo/nodes/io/__init__.py b/bonobo/nodes/io/__init__.py index c375c917..48585ff0 100644 --- a/bonobo/nodes/io/__init__.py +++ b/bonobo/nodes/io/__init__.py @@ -4,6 +4,7 @@ from .file import FileReader, FileWriter from .json import JsonReader, JsonWriter, LdjsonReader, LdjsonWriter from .pickle import PickleReader, PickleWriter +from .avro import AvroReader, AvroWriter __all__ = [ "CsvReader", @@ -16,4 +17,6 @@ "LdjsonWriter", "PickleReader", "PickleWriter", + "AvroReader", + "AvroWriter", ] diff --git a/bonobo/nodes/io/avro.py b/bonobo/nodes/io/avro.py new file mode 100644 index 00000000..bf7fa5fd --- /dev/null +++ b/bonobo/nodes/io/avro.py @@ -0,0 +1,86 @@ +import fastavro +# from fastavro import writer, reader, parse_schema + +from bonobo.config import Option, use_context +from bonobo.constants import NOT_MODIFIED +from bonobo.nodes.io.base import FileHandler +from bonobo.nodes.io.file import FileReader, FileWriter +from bonobo.util import ensure_tuple +from bonobo.util.collections import coalesce, tuple_or_const + + +class AvroHandler(FileHandler): + """ + + .. attribute:: item_names + + The names of the items in the Avro, if it is not defined in the first item of the Avro. + + """ + + fields = Option(tuple, required=False) + name = Option(str, required=False) + namespace = Option(str, required=False) + doc = Option(str, required=False) + + +@use_context +class AvroReader(FileReader, AvroHandler): + """ + Reads a record from avro file and yields the rows in dicts. + """ + + mode = Option(str, default="rb") + + def read(self, file, context, *, fs): + avro_reader = fastavro.reader(file) + + # TODO : fill the output_fields + if not context.output_type: + context.set_output_fields(self.fields) + + for row in avro_reader: + yield tuple(row) + + + __call__ = read + + +@use_context +class AvroWriter(FileWriter, AvroHandler): + mode = Option(str, default="wb") + + def build_schema(self, context): + schema_doc = coalesce(self.doc, "generated by bonobo") + schema_name = coalesce(self.doc, "output") + schema_namespace = coalesce(self.doc, "avro") + props = coalesce(self.fields, context.get_input_fields()) + + schema = { + 'type': 'record', + 'name': schema_name, + 'namespace': schema_namespace, + 'doc': schema_doc, + 'fields': [ + {'name': 'station', 'type': 'string'}, + {'name': 'time', 'type': 'long'}, + {'name': 'temp', 'type': 'int'}, + ], + } + return schema + + + def write(self, file, context, *values, fs): + """ + Write a record to the opened file. + """ + context.setdefault("schema", None) + + if not context.schema: + schema = self.build_schema(context) + parsed_schema = fastavro.parse_schema(schema) + context.schema = parsed_schema + + fastavro.writer(file, context.schema, values) + + __call__ = write From ef3852d37d047c759da874dc17d2821ecdfeba52 Mon Sep 17 00:00:00 2001 From: Juarez Rudsatz Date: Fri, 3 Jan 2020 19:38:02 -0300 Subject: [PATCH 02/13] add fastavro as dev requirement --- Projectfile | 1 + requirements-dev.txt | 1 + 2 files changed, 2 insertions(+) diff --git a/Projectfile b/Projectfile index d6ab52b4..e713eeae 100644 --- a/Projectfile +++ b/Projectfile @@ -57,6 +57,7 @@ python.add_requirements( 'cookiecutter >=1.5,<1.6', 'pytest-timeout >=1,<2', 'sphinx-sitemap >=0.2,<0.3', + 'fastavro>=0.22.9', ], docker=[ 'bonobo-docker ~=0.6.0a1', diff --git a/requirements-dev.txt b/requirements-dev.txt index 45fa213a..a892790a 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -12,6 +12,7 @@ click==7.0 cookiecutter==1.5.1 coverage==4.5.3 docutils==0.14 +fastavro>=0.22.9 future==0.17.1 idna==2.8 imagesize==1.1.0 From 8c6ff1cb4fb43cb9fb56712ff376dd601a0623f1 Mon Sep 17 00:00:00 2001 From: Juarez Rudsatz Date: Mon, 13 Jan 2020 21:29:08 -0300 Subject: [PATCH 03/13] avro: basic read and write working --- bonobo/nodes/io/avro.py | 58 ++++++++++++++++++++++++++++------------- 1 file changed, 40 insertions(+), 18 deletions(-) diff --git a/bonobo/nodes/io/avro.py b/bonobo/nodes/io/avro.py index bf7fa5fd..b648f570 100644 --- a/bonobo/nodes/io/avro.py +++ b/bonobo/nodes/io/avro.py @@ -1,5 +1,5 @@ import fastavro -# from fastavro import writer, reader, parse_schema +from collections import OrderedDict from bonobo.config import Option, use_context from bonobo.constants import NOT_MODIFIED @@ -22,6 +22,7 @@ class AvroHandler(FileHandler): name = Option(str, required=False) namespace = Option(str, required=False) doc = Option(str, required=False) + codec = Option(str, required=False, default="null") @use_context @@ -32,17 +33,31 @@ class AvroReader(FileReader, AvroHandler): mode = Option(str, default="rb") + def load_schema(self, context, avro_reader): + aschema = avro_reader.schema + if 'doc' in aschema: + self.doc = aschema['doc'] + if 'name' in aschema: + self.name = aschema['name'] + if 'namespace' in aschema: + self.namespace = aschema['namespace'] + props = context.get_input_fields() + src = aschema['fields'] + dst = [] + for f in src: + fname = f['name'] + dst.append(fname) + tfields = tuple(dst) + context.set_output_fields(tfields) + def read(self, file, context, *, fs): avro_reader = fastavro.reader(file) - - # TODO : fill the output_fields if not context.output_type: - context.set_output_fields(self.fields) + self.load_schema(context, avro_reader) for row in avro_reader: yield tuple(row) - __call__ = read @@ -50,37 +65,44 @@ def read(self, file, context, *, fs): class AvroWriter(FileWriter, AvroHandler): mode = Option(str, default="wb") - def build_schema(self, context): + def build_schema(self, props): schema_doc = coalesce(self.doc, "generated by bonobo") - schema_name = coalesce(self.doc, "output") - schema_namespace = coalesce(self.doc, "avro") - props = coalesce(self.fields, context.get_input_fields()) + schema_name = coalesce(self.name, "output") + schema_namespace = coalesce(self.namespace, "avro") + + # TODO: discover types for fields + schema_fields = [] + for p in props: + f = {'name': p, 'type': 'string'} + schema_fields.append(f) schema = { 'type': 'record', 'name': schema_name, 'namespace': schema_namespace, 'doc': schema_doc, - 'fields': [ - {'name': 'station', 'type': 'string'}, - {'name': 'time', 'type': 'long'}, - {'name': 'temp', 'type': 'int'}, - ], + 'fields': schema_fields, } return schema + def get_write_fields(self, context): + props = coalesce(self.fields, context.get_input_fields()) + return props def write(self, file, context, *values, fs): """ Write a record to the opened file. """ - context.setdefault("schema", None) + props = self.get_write_fields(context) + context.setdefault("schema", None) if not context.schema: - schema = self.build_schema(context) - parsed_schema = fastavro.parse_schema(schema) + aschema = self.build_schema(props) + parsed_schema = fastavro.parse_schema(aschema) context.schema = parsed_schema - fastavro.writer(file, context.schema, values) + kv = {k: v for k, v in zip(props, values)} + row = [kv] + fastavro.writer(fo=file, schema=context.schema, records=row, codec=self.codec) __call__ = write From 4214a3c682fe4ab1e34727487e6186c2b07f578e Mon Sep 17 00:00:00 2001 From: Juarez Rudsatz Date: Mon, 13 Jan 2020 21:29:39 -0300 Subject: [PATCH 04/13] avro: basic test for read and write --- tests/nodes/io/test_avro.py | 64 +++++++++++++++++++++++++++++++++++++ 1 file changed, 64 insertions(+) create mode 100644 tests/nodes/io/test_avro.py diff --git a/tests/nodes/io/test_avro.py b/tests/nodes/io/test_avro.py new file mode 100644 index 00000000..5ca24965 --- /dev/null +++ b/tests/nodes/io/test_avro.py @@ -0,0 +1,64 @@ +import pytest + +from bonobo import AvroReader, AvroWriter +from bonobo.constants import EMPTY +from bonobo.execution.contexts.node import NodeExecutionContext +from bonobo.util.testing import BufferingNodeExecutionContext, FilesystemTester + +avro_tester = FilesystemTester("avro", mode="wb") +# avro_tester.input_data = pickle.dumps([["a", "b", "c"], ["a foo", "b foo", "c foo"], ["a bar", "b bar", "c bar"]]) + + +def test_write_records_to_avro_file(tmpdir): + fs, filename, services = avro_tester.get_services_for_writer(tmpdir) + + writav = AvroWriter(filename) + with NodeExecutionContext(writav, services=services) as context: + context.set_input_fields(["foo", "bar"]) + context.write_sync(("a", "b"), ("c", "d")) + + # with fs.open(filename, "rb") as fp: + # assert pickle.loads(fp.read()) == {"foo": "bar"} + + +def create_avro_example(path): + import fastavro + + schema = { + 'doc': 'A weather reading.', + 'name': 'Weather', + 'namespace': 'test', + 'type': 'record', + 'fields': [ + {'name': 'station', 'type': 'string'}, + {'name': 'time', 'type': 'long'}, + {'name': 'temp', 'type': 'int'}, + ], + } + parsed_schema = fastavro.parse_schema(schema) + + records = [ + {u'station': u'cold', u'temp': 0, u'time': 1433269388}, + {u'station': u'warm', u'temp': 22, u'time': 1433270389}, + {u'station': u'frozen', u'temp': -11, u'time': 1433273379}, + {u'station': u'hot', u'temp': 111, u'time': 1433275478}, + ] + + with open(path, 'wb') as out: + fastavro.writer(out, parsed_schema, records) + + +def test_read_records_from_avro_file(tmpdir): + dst = tmpdir.strpath + '/output.avro' + create_avro_example(dst) + + fs, filename, services = avro_tester.get_services_for_writer(tmpdir) + + readav = AvroReader(filename) + with BufferingNodeExecutionContext(readav, services=services) as context: + context.write_sync(EMPTY) + + output = context.get_buffer() + props = context.get_output_fields() + assert props == ("station", "time", "temp") + # assert output == [("a foo", "b foo", "c foo"), ("a bar", "b bar", "c bar")] From a0e44704576130a6b1ad4199c88875c7ce3ea546 Mon Sep 17 00:00:00 2001 From: Juarez Rudsatz Date: Wed, 22 Jan 2020 21:17:57 -0300 Subject: [PATCH 05/13] avro: type detection WIP --- bonobo/nodes/io/avro.py | 58 ++++++++++++++++++++++++++++++++--------- 1 file changed, 46 insertions(+), 12 deletions(-) diff --git a/bonobo/nodes/io/avro.py b/bonobo/nodes/io/avro.py index b648f570..8ce99663 100644 --- a/bonobo/nodes/io/avro.py +++ b/bonobo/nodes/io/avro.py @@ -1,5 +1,6 @@ import fastavro from collections import OrderedDict +from datetime import datetime, date from bonobo.config import Option, use_context from bonobo.constants import NOT_MODIFIED @@ -19,6 +20,7 @@ class AvroHandler(FileHandler): """ fields = Option(tuple, required=False) + types = Option(tuple, required=False) name = Option(str, required=False) namespace = Option(str, required=False) doc = Option(str, required=False) @@ -63,24 +65,56 @@ def read(self, file, context, *, fs): @use_context class AvroWriter(FileWriter, AvroHandler): - mode = Option(str, default="wb") - def build_schema(self, props): - schema_doc = coalesce(self.doc, "generated by bonobo") - schema_name = coalesce(self.name, "output") - schema_namespace = coalesce(self.namespace, "avro") + mode = Option(str, default="wb+") - # TODO: discover types for fields + def assure_same_len(self, props, values): + if len(props) != len(values): + raise ValueError( + "Values length differs from input fields length. Expected: {}. Got: {}. Values: {!r}.".format( + len(props), len(values), values + ) + ) + + def build_schema_from_types(self, props): + schema_fields = [] + for p, t in zip(props, self.types): + f = {'name': p, 'type': t} + schema_fields.append(f) + return schema_fields + + def build_schema_from_values(self, props, values): + # https://avro.apache.org/docs/current/spec.html#schema_primitive + self.assure_same_len(props, values) schema_fields = [] - for p in props: - f = {'name': p, 'type': 'string'} + for p, v in zip(props, values): + if isinstance(v, int): + f = {'name': p, 'type': 'int'} + elif isinstance(v, bool): + f = {'name': p, 'type': 'boolean'} + elif isinstance(v, float): + f = {'name': p, 'type': 'double'} + elif isinstance(v, datetime): + f = {'name': p, 'type': 'long', "logicalType": "timestamp-millis"} + elif isinstance(v, date): + f = {'name': p, 'type': 'int', "logicalType": "date"} + else: + f = {'name': p, 'type': 'string'} schema_fields.append(f) + return schema_fields + + def build_schema(self, props, values): + + if self.types is not None: + schema_fields = self.build_schema_from_types(props) + else: + schema_fields = self.build_schema_from_values(props, values) schema = { 'type': 'record', - 'name': schema_name, - 'namespace': schema_namespace, - 'doc': schema_doc, + 'name': coalesce(self.name, "output"), + 'namespace': coalesce(self.namespace, "avro"), + 'doc': coalesce(self.doc, "generated by bonobo"), 'fields': schema_fields, } return schema @@ -97,7 +131,7 @@ def write(self, file, context, *values, fs): context.setdefault("schema", None) if not context.schema: - aschema = self.build_schema(props) + aschema = self.build_schema(props, values) parsed_schema = fastavro.parse_schema(aschema) context.schema = parsed_schema From 078fcccc973c01a2ab9fb510fb6121e7e8f5af7c Mon Sep 17 00:00:00 2001 From: Juarez Rudsatz Date: Wed, 29 Jan 2020 21:43:27 -0300 Subject: [PATCH 06/13] avro: handle fieldtypes like int date time decimal --- bonobo/nodes/io/avro.py | 88 ++++++++++++++++++++++++++++++++----- tests/nodes/io/test_avro.py | 8 +++- 2 files changed, 83 insertions(+), 13 deletions(-) diff --git a/bonobo/nodes/io/avro.py b/bonobo/nodes/io/avro.py index 8ce99663..f044d012 100644 --- a/bonobo/nodes/io/avro.py +++ b/bonobo/nodes/io/avro.py @@ -1,13 +1,12 @@ -import fastavro -from collections import OrderedDict -from datetime import datetime, date +from datetime import datetime, date, timedelta, time +from decimal import Decimal from bonobo.config import Option, use_context -from bonobo.constants import NOT_MODIFIED from bonobo.nodes.io.base import FileHandler from bonobo.nodes.io.file import FileReader, FileWriter -from bonobo.util import ensure_tuple -from bonobo.util.collections import coalesce, tuple_or_const +from bonobo.util.collections import coalesce + +import fastavro class AvroHandler(FileHandler): @@ -79,7 +78,17 @@ def assure_same_len(self, props, values): def build_schema_from_types(self, props): schema_fields = [] for p, t in zip(props, self.types): - f = {'name': p, 'type': t} + if isinstance(t, dict): + f = t + f['name'] = p + elif t == "date": + f = {'name': p, 'type': 'int', "logicalType": "date"} + elif t == "time-micros": + f = {'name': p, 'type': 'long', "logicalType": "time-micros"} + elif t == "timestamp-micros": + f = {'name': p, 'type': 'long', "logicalType": "timestamp-micros"} + else: + f = {'name': p, 'type': t} schema_fields.append(f) return schema_fields @@ -89,20 +98,72 @@ def build_schema_from_values(self, props, values): schema_fields = [] for p, v in zip(props, values): if isinstance(v, int): - f = {'name': p, 'type': 'int'} + f = {'name': p, 'type': 'long'} elif isinstance(v, bool): f = {'name': p, 'type': 'boolean'} elif isinstance(v, float): f = {'name': p, 'type': 'double'} - elif isinstance(v, datetime): - f = {'name': p, 'type': 'long', "logicalType": "timestamp-millis"} elif isinstance(v, date): f = {'name': p, 'type': 'int', "logicalType": "date"} + elif isinstance(v, timedelta) or isinstance(v, time): + f = {'name': p, 'type': 'long', "logicalType": "time-micros"} + elif isinstance(v, datetime): + f = {'name': p, 'type': 'long', "logicalType": "timestamp-micros"} + elif isinstance(v, Decimal): + f = {'name': p, 'type': 'double'} + elif isinstance(v, bytes): + f = {'name': p, 'type': 'bytes'} else: f = {'name': p, 'type': 'string'} schema_fields.append(f) return schema_fields + def build_converters_from_values(self, values): + converters = [] + for v in values: + if isinstance(v, datetime): + f = AvroWriter.get_value_as_datetime + elif isinstance(v, time): + f = AvroWriter.get_value_as_time + elif isinstance(v, timedelta): + f = AvroWriter.get_value_as_timedelta + elif isinstance(v, date): + f = AvroWriter.get_value_as_date + elif isinstance(v, Decimal): + f = AvroWriter.get_value_as_float + else: + f = AvroWriter.get_same_value + converters.append(f) + return converters + + @staticmethod + def get_same_value(value): + return value + + @staticmethod + def get_value_as_date(value): + diff = value - date(1970,1,1) + return diff.days + + @staticmethod + def get_value_as_datetime(value): + elapsed = value.timestamp() + return int(elapsed) + + @staticmethod + def get_value_as_timedelta(value): + elapsed = (value.days * 86400000) + (value.seconds * 1000) + value.microseconds + return elapsed + + @staticmethod + def get_value_as_time(value): + elapsed = (value.hour * 3600000) + (value.minute * 60000) + (value.second * 1000) + value.microsecond + return elapsed + + @staticmethod + def get_value_as_float(value): + return float(value) + def build_schema(self, props, values): if self.types is not None: @@ -135,7 +196,12 @@ def write(self, file, context, *values, fs): parsed_schema = fastavro.parse_schema(aschema) context.schema = parsed_schema - kv = {k: v for k, v in zip(props, values)} + context.setdefault("converters", None) + if not context.converters: + context.converters = self.build_converters_from_values(values) + + kv = {k: conv(v) for k, v, conv in zip(props, values, context.converters)} + row = [kv] fastavro.writer(fo=file, schema=context.schema, records=row, codec=self.codec) diff --git a/tests/nodes/io/test_avro.py b/tests/nodes/io/test_avro.py index 5ca24965..4be02872 100644 --- a/tests/nodes/io/test_avro.py +++ b/tests/nodes/io/test_avro.py @@ -1,4 +1,5 @@ import pytest +from datetime import datetime, date, timedelta from bonobo import AvroReader, AvroWriter from bonobo.constants import EMPTY @@ -12,10 +13,13 @@ def test_write_records_to_avro_file(tmpdir): fs, filename, services = avro_tester.get_services_for_writer(tmpdir) + john = ("john", 7, date(2012,10,11), datetime(2012,10,11,15,16,17)) + jane = ("jane", 17, date(2002,10,11), datetime(2002,10,13,15,16,17)) + writav = AvroWriter(filename) with NodeExecutionContext(writav, services=services) as context: - context.set_input_fields(["foo", "bar"]) - context.write_sync(("a", "b"), ("c", "d")) + context.set_input_fields(["name", "age", "birthday", "registered"]) + context.write_sync(john, jane) # with fs.open(filename, "rb") as fp: # assert pickle.loads(fp.read()) == {"foo": "bar"} From 8e67213ecd0013419d239b2fee5823f1fd089cb1 Mon Sep 17 00:00:00 2001 From: Juarez Rudsatz Date: Thu, 30 Jan 2020 15:56:33 -0300 Subject: [PATCH 07/13] avro: allow testing with optional fastavro setup --- tests/nodes/io/test_avro.py | 36 +++++++++++++++++++++--------------- 1 file changed, 21 insertions(+), 15 deletions(-) diff --git a/tests/nodes/io/test_avro.py b/tests/nodes/io/test_avro.py index 4be02872..00107816 100644 --- a/tests/nodes/io/test_avro.py +++ b/tests/nodes/io/test_avro.py @@ -7,22 +7,29 @@ from bonobo.util.testing import BufferingNodeExecutionContext, FilesystemTester avro_tester = FilesystemTester("avro", mode="wb") -# avro_tester.input_data = pickle.dumps([["a", "b", "c"], ["a foo", "b foo", "c foo"], ["a bar", "b bar", "c bar"]]) -def test_write_records_to_avro_file(tmpdir): - fs, filename, services = avro_tester.get_services_for_writer(tmpdir) +def is_fastavro_missing(): + try: + import fastavro + return False + except ModuleNotFoundError: + return True - john = ("john", 7, date(2012,10,11), datetime(2012,10,11,15,16,17)) - jane = ("jane", 17, date(2002,10,11), datetime(2002,10,13,15,16,17)) +def test_write_records_to_avro_file(tmpdir): + if is_fastavro_missing(): + return + fs, filename, services = avro_tester.get_services_for_writer(tmpdir) writav = AvroWriter(filename) with NodeExecutionContext(writav, services=services) as context: - context.set_input_fields(["name", "age", "birthday", "registered"]) - context.write_sync(john, jane) + john = ("john", 7, date(2012,10,11), datetime(2018,9,14,15,16,17)) + jane = ("jane", 17, date(2002,11,12), datetime(2015,12,13,14,15,16)) + jack = ("jack", 27, date(1992,12,13), datetime(2010,11,12,13,14,15)) + joel = ("joel", 37, date(1982,12,25), datetime(2009,10,11,12,13,14)) - # with fs.open(filename, "rb") as fp: - # assert pickle.loads(fp.read()) == {"foo": "bar"} + context.set_input_fields(["name", "age", "birthday", "registered"]) + context.write_sync(john, jane, jack, joel) def create_avro_example(path): @@ -40,29 +47,28 @@ def create_avro_example(path): ], } parsed_schema = fastavro.parse_schema(schema) - records = [ {u'station': u'cold', u'temp': 0, u'time': 1433269388}, {u'station': u'warm', u'temp': 22, u'time': 1433270389}, {u'station': u'frozen', u'temp': -11, u'time': 1433273379}, {u'station': u'hot', u'temp': 111, u'time': 1433275478}, ] - with open(path, 'wb') as out: fastavro.writer(out, parsed_schema, records) def test_read_records_from_avro_file(tmpdir): + if is_fastavro_missing(): + return dst = tmpdir.strpath + '/output.avro' create_avro_example(dst) - fs, filename, services = avro_tester.get_services_for_writer(tmpdir) - readav = AvroReader(filename) with BufferingNodeExecutionContext(readav, services=services) as context: context.write_sync(EMPTY) - - output = context.get_buffer() props = context.get_output_fields() assert props == ("station", "time", "temp") + # output = context.get_buffer() # assert output == [("a foo", "b foo", "c foo"), ("a bar", "b bar", "c bar")] + +# end of test file From d62465adef183228583becfca5e5d20c2b849362 Mon Sep 17 00:00:00 2001 From: Juarez Rudsatz Date: Thu, 30 Jan 2020 16:05:07 -0300 Subject: [PATCH 08/13] avro: remove unused variable --- bonobo/nodes/io/avro.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/bonobo/nodes/io/avro.py b/bonobo/nodes/io/avro.py index f044d012..84b26724 100644 --- a/bonobo/nodes/io/avro.py +++ b/bonobo/nodes/io/avro.py @@ -42,12 +42,8 @@ def load_schema(self, context, avro_reader): self.name = aschema['name'] if 'namespace' in aschema: self.namespace = aschema['namespace'] - props = context.get_input_fields() src = aschema['fields'] - dst = [] - for f in src: - fname = f['name'] - dst.append(fname) + dst = [f['name'] for f in src] tfields = tuple(dst) context.set_output_fields(tfields) From adc443173c3076408823d3c08bd381ad7e995e96 Mon Sep 17 00:00:00 2001 From: Juarez Rudsatz Date: Sun, 2 Feb 2020 17:23:26 -0300 Subject: [PATCH 09/13] avro: fix error on github CI skipping avro tests --- tests/nodes/io/test_avro.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/nodes/io/test_avro.py b/tests/nodes/io/test_avro.py index 00107816..056c3b15 100644 --- a/tests/nodes/io/test_avro.py +++ b/tests/nodes/io/test_avro.py @@ -13,7 +13,7 @@ def is_fastavro_missing(): try: import fastavro return False - except ModuleNotFoundError: + except ImportError: return True From 27809bbcd8dcd8a521b5b72d63a7471c18bf552e Mon Sep 17 00:00:00 2001 From: Juarez Rudsatz Date: Sat, 7 Mar 2020 01:23:05 -0300 Subject: [PATCH 10/13] avro: add prop schema as dict do AvroWriter --- bonobo/nodes/io/avro.py | 163 +++++++++++++++++++--------------------- 1 file changed, 79 insertions(+), 84 deletions(-) diff --git a/bonobo/nodes/io/avro.py b/bonobo/nodes/io/avro.py index 84b26724..da6b3af3 100644 --- a/bonobo/nodes/io/avro.py +++ b/bonobo/nodes/io/avro.py @@ -4,89 +4,88 @@ from bonobo.config import Option, use_context from bonobo.nodes.io.base import FileHandler from bonobo.nodes.io.file import FileReader, FileWriter -from bonobo.util.collections import coalesce -import fastavro +# import fastavro +try: + import fastavro +except ImportError: + pass class AvroHandler(FileHandler): - """ - - .. attribute:: item_names - - The names of the items in the Avro, if it is not defined in the first item of the Avro. - - """ - - fields = Option(tuple, required=False) - types = Option(tuple, required=False) - name = Option(str, required=False) - namespace = Option(str, required=False) - doc = Option(str, required=False) - codec = Option(str, required=False, default="null") + schema = Option(tuple, required=False, + __doc__="A dict specifying the schema fields acording avro spec.\ + ```\ + schema = {\ + 'doc': 'A weather reading.',\ + 'name': 'Weather',\ + 'namespace': 'test',\ + 'type': 'record',\ + 'fields': [\ + {'name': 'station', 'type': 'string'},\ + {'name': 'date', 'type': 'int', 'logicalType': 'date'},\ + {'name': 'time', 'type': 'long', 'logicalType': 'time-micros'},\ + {'name': 'temp', 'type': 'int'},\ + ],\ + }\ + ```\ + See: https://avro.apache.org/docs/current/spec.html#schema_primitive\ + https://avro.apache.org/docs/current/spec.html#Logical+Types") + + codec = Option(str, required=False, default="null", + __doc__="The name of the compression codec used to compress blocks.\ + Compression codec can be ‘null’, ‘deflate’ or ‘snappy’ (if installed)") @use_context class AvroReader(FileReader, AvroHandler): """ - Reads a record from avro file and yields the rows in dicts. + Reads the records from a avro file and yields the values in dicts. """ - mode = Option(str, default="rb") def load_schema(self, context, avro_reader): - aschema = avro_reader.schema - if 'doc' in aschema: - self.doc = aschema['doc'] - if 'name' in aschema: - self.name = aschema['name'] - if 'namespace' in aschema: - self.namespace = aschema['namespace'] - src = aschema['fields'] - dst = [f['name'] for f in src] - tfields = tuple(dst) - context.set_output_fields(tfields) + file_schema = avro_reader.writer_schema + self.schema = file_schema + self.codec = avro_reader.codec + schema_fields = file_schema['fields'] + field_names = [col['name'] for col in schema_fields] + col_names = tuple(field_names) + context.set_output_fields(col_names) def read(self, file, context, *, fs): avro_reader = fastavro.reader(file) if not context.output_type: self.load_schema(context, avro_reader) - - for row in avro_reader: - yield tuple(row) + for record in avro_reader: + row = tuple(record.values()) + yield row __call__ = read @use_context class AvroWriter(FileWriter, AvroHandler): + """ + Writes the values as records into a avro file according to the fields defined in schema + + When the schema is not specified, it tries to guess types from the values + of the fields present in the first record. + The type of values written follow the ones of python type system. Take + care when writing data extracted from sql databases as their types are + usually affected by factors like driver issues, type mismatch, incorrect + mapping, precision loss (specially float and decimals), SQLArchitect... + """ + compression_level = Option(int, required=False, + __doc__="Compression level to use when the specified codec supports it") mode = Option(str, default="wb+") def assure_same_len(self, props, values): if len(props) != len(values): - raise ValueError( - "Values length differs from input fields length. Expected: {}. Got: {}. Values: {!r}.".format( - len(props), len(values), values - ) - ) - - def build_schema_from_types(self, props): - schema_fields = [] - for p, t in zip(props, self.types): - if isinstance(t, dict): - f = t - f['name'] = p - elif t == "date": - f = {'name': p, 'type': 'int', "logicalType": "date"} - elif t == "time-micros": - f = {'name': p, 'type': 'long', "logicalType": "time-micros"} - elif t == "timestamp-micros": - f = {'name': p, 'type': 'long', "logicalType": "timestamp-micros"} - else: - f = {'name': p, 'type': t} - schema_fields.append(f) - return schema_fields + m = "Values length differs from input fields length. Expected: {}. Got: {}. Values: {!r}." + f = m.format(len(props), len(values), values) + raise ValueError(m) def build_schema_from_values(self, props, values): # https://avro.apache.org/docs/current/spec.html#schema_primitive @@ -100,11 +99,11 @@ def build_schema_from_values(self, props, values): elif isinstance(v, float): f = {'name': p, 'type': 'double'} elif isinstance(v, date): - f = {'name': p, 'type': 'int', "logicalType": "date"} + f = {'name': p, 'type': 'int', 'logicalType': 'date'} elif isinstance(v, timedelta) or isinstance(v, time): - f = {'name': p, 'type': 'long', "logicalType": "time-micros"} + f = {'name': p, 'type': 'long', 'logicalType': 'time-micros'} elif isinstance(v, datetime): - f = {'name': p, 'type': 'long', "logicalType": "timestamp-micros"} + f = {'name': p, 'type': 'long', 'logicalType': 'timestamp-micros'} elif isinstance(v, Decimal): f = {'name': p, 'type': 'double'} elif isinstance(v, bytes): @@ -160,45 +159,41 @@ def get_value_as_time(value): def get_value_as_float(value): return float(value) - def build_schema(self, props, values): - - if self.types is not None: - schema_fields = self.build_schema_from_types(props) - else: - schema_fields = self.build_schema_from_values(props, values) - + def build_schema_from(self, props, values): + if self.schema is not None: + return self.schema + schema_fields = self.build_schema_from_values(props, values) schema = { 'type': 'record', - 'name': coalesce(self.name, "output"), - 'namespace': coalesce(self.namespace, "avro"), - 'doc': coalesce(self.doc, "generated by bonobo"), + 'name': 'output', + 'namespace': "avro", + 'doc': "generated by bonobo", 'fields': schema_fields, } return schema - def get_write_fields(self, context): - props = coalesce(self.fields, context.get_input_fields()) - return props - def write(self, file, context, *values, fs): """ - Write a record to the opened file. + Write a record to the opened file using the defined schema """ - props = self.get_write_fields(context) - context.setdefault("schema", None) - if not context.schema: - aschema = self.build_schema(props, values) - parsed_schema = fastavro.parse_schema(aschema) - context.schema = parsed_schema - context.setdefault("converters", None) + + props = context.get_input_fields() + if not context.schema: + detected = self.build_schema_from(props, values) + parsed = fastavro.parse_schema(detected) + context.schema = parsed if not context.converters: context.converters = self.build_converters_from_values(values) - - kv = {k: conv(v) for k, v, conv in zip(props, values, context.converters)} - - row = [kv] - fastavro.writer(fo=file, schema=context.schema, records=row, codec=self.codec) + row = {k: conv(v) for k, v, conv in zip(props, values, context.converters)} + one_record = [row] + fastavro.writer( + fo=file, + schema=context.schema, + records=one_record, + codec=self.codec, + codec_compression_level=self.compression_level + ) __call__ = write From 145de5ad4a69d916402a8b80538d12df7092a62b Mon Sep 17 00:00:00 2001 From: Juarez Rudsatz Date: Sat, 7 Mar 2020 01:24:05 -0300 Subject: [PATCH 11/13] avro: test writing schema --- tests/nodes/io/test_avro.py | 49 ++++++++++++++++++++++++++++++------- 1 file changed, 40 insertions(+), 9 deletions(-) diff --git a/tests/nodes/io/test_avro.py b/tests/nodes/io/test_avro.py index 056c3b15..c0de4f83 100644 --- a/tests/nodes/io/test_avro.py +++ b/tests/nodes/io/test_avro.py @@ -21,20 +21,53 @@ def test_write_records_to_avro_file(tmpdir): if is_fastavro_missing(): return fs, filename, services = avro_tester.get_services_for_writer(tmpdir) - writav = AvroWriter(filename) + writav = AvroWriter( + filename, + codec = 'deflate', + compression_level = 6 + ) + john = ("john", 7, date(2012,10,11), datetime(2018,9,14,15,16,17)) + jane = ("jane", 17, date(2002,11,12), datetime(2015,12,13,14,15,16)) + jack = ("jack", 27, date(1992,12,13), datetime(2010,11,12,13,14,15)) + joel = ("joel", 37, date(1982,12,25), datetime(2009,10,11,12,13,14)) with NodeExecutionContext(writav, services=services) as context: - john = ("john", 7, date(2012,10,11), datetime(2018,9,14,15,16,17)) - jane = ("jane", 17, date(2002,11,12), datetime(2015,12,13,14,15,16)) - jack = ("jack", 27, date(1992,12,13), datetime(2010,11,12,13,14,15)) - joel = ("joel", 37, date(1982,12,25), datetime(2009,10,11,12,13,14)) - context.set_input_fields(["name", "age", "birthday", "registered"]) context.write_sync(john, jane, jack, joel) +def test_write_with_schema_to_avro_file(tmpdir): + if is_fastavro_missing(): + return + fs, filename, services = avro_tester.get_services_for_writer(tmpdir) + custom_schema = { + 'doc': 'Some random people.', + 'name': 'Crowd', + 'namespace': 'test', + 'type': 'record', + 'fields': [ + {'name': 'pete', 'type': 'string'}, + {'name': 'age', 'type': 'int'}, + {'name': 'birthday', 'type': 'int', 'logicalType': 'date'}, + {'name': 'registered', 'type': 'long', 'logicalType': 'timestamp-micros'}, + ], + } + writav = AvroWriter( + filename, + schema = custom_schema, + codec = 'deflate', + compression_level = 6 + ) + pete = ("pete", 7, date(2012,10,11), datetime(2018,9,14,15,16,17)) + mike = ("mike", 17, date(2002,11,12), datetime(2015,12,13,14,15,16)) + zack = ("zack", 27, date(1992,12,13), datetime(2010,11,12,13,14,15)) + gene = ("gene", 37, date(1982,12,25), datetime(2009,10,11,12,13,14)) + with NodeExecutionContext(writav, services=services) as context: + context.set_input_fields(["name", "age", "birthday", "registered"]) + context.write_sync(pete, mike, zack, gene) + + def create_avro_example(path): import fastavro - schema = { 'doc': 'A weather reading.', 'name': 'Weather', @@ -68,7 +101,5 @@ def test_read_records_from_avro_file(tmpdir): context.write_sync(EMPTY) props = context.get_output_fields() assert props == ("station", "time", "temp") - # output = context.get_buffer() - # assert output == [("a foo", "b foo", "c foo"), ("a bar", "b bar", "c bar")] # end of test file From 165575dca601b6dc7a473555432a76bd13ff0cf2 Mon Sep 17 00:00:00 2001 From: Juarez Rudsatz Date: Sat, 7 Mar 2020 01:56:01 -0300 Subject: [PATCH 12/13] avro: refactor hiding private methods --- bonobo/nodes/io/avro.py | 214 +++++++++++++++++++++------------------- 1 file changed, 110 insertions(+), 104 deletions(-) diff --git a/bonobo/nodes/io/avro.py b/bonobo/nodes/io/avro.py index da6b3af3..48cd0681 100644 --- a/bonobo/nodes/io/avro.py +++ b/bonobo/nodes/io/avro.py @@ -5,12 +5,109 @@ from bonobo.nodes.io.base import FileHandler from bonobo.nodes.io.file import FileReader, FileWriter -# import fastavro +# avro support is optional and should'nt impact when lib is not installed try: import fastavro except ImportError: pass +# Helpers + +def assure_same_len(props, values): + if len(props) != len(values): + m = "Values length differs from input fields length. Expected: {}. Got: {}. Values: {!r}." + f = m.format(len(props), len(values), values) + raise ValueError(m) + + +def build_schema_from_values(props, values): + # https://avro.apache.org/docs/current/spec.html#schema_primitive + assure_same_len(props, values) + schema_fields = [] + for p, v in zip(props, values): + if isinstance(v, int): + f = {'name': p, 'type': 'long'} + elif isinstance(v, bool): + f = {'name': p, 'type': 'boolean'} + elif isinstance(v, float): + f = {'name': p, 'type': 'double'} + elif isinstance(v, date): + f = {'name': p, 'type': 'int', 'logicalType': 'date'} + elif isinstance(v, timedelta) or isinstance(v, time): + f = {'name': p, 'type': 'long', 'logicalType': 'time-micros'} + elif isinstance(v, datetime): + f = {'name': p, 'type': 'long', 'logicalType': 'timestamp-micros'} + elif isinstance(v, Decimal): + f = {'name': p, 'type': 'double'} + elif isinstance(v, bytes): + f = {'name': p, 'type': 'bytes'} + else: + f = {'name': p, 'type': 'string'} + schema_fields.append(f) + return schema_fields + + +def get_same_value(value): + return value + + +def get_value_as_date(value): + diff = value - date(1970,1,1) + return diff.days + + +def get_value_as_datetime(value): + elapsed = value.timestamp() + return int(elapsed) + + +def get_value_as_timedelta(value): + elapsed = (value.days * 86400000) + (value.seconds * 1000) + value.microseconds + return elapsed + + +def get_value_as_time(value): + elapsed = (value.hour * 3600000) + (value.minute * 60000) + (value.second * 1000) + value.microsecond + return elapsed + + +def get_value_as_float(value): + return float(value) + + +def build_schema_from(props, values): + schema_fields = build_schema_from_values(props, values) + schema = { + 'type': 'record', + 'name': 'output', + 'namespace': "avro", + 'doc': "generated by bonobo", + 'fields': schema_fields, + } + return schema + + +def build_converters_from_values(values): + converters = [] + for v in values: + if isinstance(v, datetime): + f = get_value_as_datetime + elif isinstance(v, time): + f = get_value_as_time + elif isinstance(v, timedelta): + f = get_value_as_timedelta + elif isinstance(v, date): + f = get_value_as_date + elif isinstance(v, Decimal): + f = get_value_as_float + else: + f = get_same_value + converters.append(f) + return converters + + +# Exported classes + class AvroHandler(FileHandler): schema = Option(tuple, required=False, @@ -44,19 +141,17 @@ class AvroReader(FileReader, AvroHandler): """ mode = Option(str, default="rb") - def load_schema(self, context, avro_reader): - file_schema = avro_reader.writer_schema - self.schema = file_schema - self.codec = avro_reader.codec - schema_fields = file_schema['fields'] - field_names = [col['name'] for col in schema_fields] - col_names = tuple(field_names) - context.set_output_fields(col_names) - def read(self, file, context, *, fs): avro_reader = fastavro.reader(file) if not context.output_type: - self.load_schema(context, avro_reader) + file_schema = avro_reader.writer_schema + schema_fields = file_schema['fields'] + field_names = [col['name'] for col in schema_fields] + col_names = tuple(field_names) + context.set_output_fields(col_names) + self.schema = file_schema + self.codec = avro_reader.codec + for record in avro_reader: row = tuple(record.values()) yield row @@ -81,97 +176,6 @@ class AvroWriter(FileWriter, AvroHandler): mode = Option(str, default="wb+") - def assure_same_len(self, props, values): - if len(props) != len(values): - m = "Values length differs from input fields length. Expected: {}. Got: {}. Values: {!r}." - f = m.format(len(props), len(values), values) - raise ValueError(m) - - def build_schema_from_values(self, props, values): - # https://avro.apache.org/docs/current/spec.html#schema_primitive - self.assure_same_len(props, values) - schema_fields = [] - for p, v in zip(props, values): - if isinstance(v, int): - f = {'name': p, 'type': 'long'} - elif isinstance(v, bool): - f = {'name': p, 'type': 'boolean'} - elif isinstance(v, float): - f = {'name': p, 'type': 'double'} - elif isinstance(v, date): - f = {'name': p, 'type': 'int', 'logicalType': 'date'} - elif isinstance(v, timedelta) or isinstance(v, time): - f = {'name': p, 'type': 'long', 'logicalType': 'time-micros'} - elif isinstance(v, datetime): - f = {'name': p, 'type': 'long', 'logicalType': 'timestamp-micros'} - elif isinstance(v, Decimal): - f = {'name': p, 'type': 'double'} - elif isinstance(v, bytes): - f = {'name': p, 'type': 'bytes'} - else: - f = {'name': p, 'type': 'string'} - schema_fields.append(f) - return schema_fields - - def build_converters_from_values(self, values): - converters = [] - for v in values: - if isinstance(v, datetime): - f = AvroWriter.get_value_as_datetime - elif isinstance(v, time): - f = AvroWriter.get_value_as_time - elif isinstance(v, timedelta): - f = AvroWriter.get_value_as_timedelta - elif isinstance(v, date): - f = AvroWriter.get_value_as_date - elif isinstance(v, Decimal): - f = AvroWriter.get_value_as_float - else: - f = AvroWriter.get_same_value - converters.append(f) - return converters - - @staticmethod - def get_same_value(value): - return value - - @staticmethod - def get_value_as_date(value): - diff = value - date(1970,1,1) - return diff.days - - @staticmethod - def get_value_as_datetime(value): - elapsed = value.timestamp() - return int(elapsed) - - @staticmethod - def get_value_as_timedelta(value): - elapsed = (value.days * 86400000) + (value.seconds * 1000) + value.microseconds - return elapsed - - @staticmethod - def get_value_as_time(value): - elapsed = (value.hour * 3600000) + (value.minute * 60000) + (value.second * 1000) + value.microsecond - return elapsed - - @staticmethod - def get_value_as_float(value): - return float(value) - - def build_schema_from(self, props, values): - if self.schema is not None: - return self.schema - schema_fields = self.build_schema_from_values(props, values) - schema = { - 'type': 'record', - 'name': 'output', - 'namespace': "avro", - 'doc': "generated by bonobo", - 'fields': schema_fields, - } - return schema - def write(self, file, context, *values, fs): """ Write a record to the opened file using the defined schema @@ -181,11 +185,11 @@ def write(self, file, context, *values, fs): props = context.get_input_fields() if not context.schema: - detected = self.build_schema_from(props, values) + detected = build_schema_from(props, values) parsed = fastavro.parse_schema(detected) context.schema = parsed if not context.converters: - context.converters = self.build_converters_from_values(values) + context.converters = build_converters_from_values(values) row = {k: conv(v) for k, v, conv in zip(props, values, context.converters)} one_record = [row] fastavro.writer( @@ -197,3 +201,5 @@ def write(self, file, context, *values, fs): ) __call__ = write + +# end of file # From 3fd0c889aa6899d334de100379be3fe129ee9181 Mon Sep 17 00:00:00 2001 From: Juarez Rudsatz Date: Sat, 7 Mar 2020 02:12:11 -0300 Subject: [PATCH 13/13] avro: misc fixes, coverage --- bonobo/nodes/io/avro.py | 2 +- tests/nodes/io/test_avro.py | 10 +++++----- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/bonobo/nodes/io/avro.py b/bonobo/nodes/io/avro.py index 48cd0681..a2cb031c 100644 --- a/bonobo/nodes/io/avro.py +++ b/bonobo/nodes/io/avro.py @@ -17,7 +17,7 @@ def assure_same_len(props, values): if len(props) != len(values): m = "Values length differs from input fields length. Expected: {}. Got: {}. Values: {!r}." f = m.format(len(props), len(values), values) - raise ValueError(m) + raise ValueError(f) def build_schema_from_values(props, values): diff --git a/tests/nodes/io/test_avro.py b/tests/nodes/io/test_avro.py index c0de4f83..8f721c58 100644 --- a/tests/nodes/io/test_avro.py +++ b/tests/nodes/io/test_avro.py @@ -26,12 +26,12 @@ def test_write_records_to_avro_file(tmpdir): codec = 'deflate', compression_level = 6 ) - john = ("john", 7, date(2012,10,11), datetime(2018,9,14,15,16,17)) - jane = ("jane", 17, date(2002,11,12), datetime(2015,12,13,14,15,16)) - jack = ("jack", 27, date(1992,12,13), datetime(2010,11,12,13,14,15)) - joel = ("joel", 37, date(1982,12,25), datetime(2009,10,11,12,13,14)) + john = ("john", 7, 0.7, False, date(2012,10,11), datetime(2018,9,14,15,16,17)) + jane = ("jane", 17, 1.7, True, date(2002,11,12), datetime(2015,12,13,14,15,16)) + jack = ("jack", 27, 2.7, True, date(1992,12,13), datetime(2010,11,12,13,14,15)) + joel = ("joel", 37, 3.7, True, date(1982,12,25), datetime(2009,10,11,12,13,14)) with NodeExecutionContext(writav, services=services) as context: - context.set_input_fields(["name", "age", "birthday", "registered"]) + context.set_input_fields(["name", "age", "score", "smart", "birthday", "registered"]) context.write_sync(john, jane, jack, joel)