diff --git a/src/spavro/fast_binary.pyx b/src/spavro/fast_binary.pyx index f16f6e0..dc859a6 100644 --- a/src/spavro/fast_binary.pyx +++ b/src/spavro/fast_binary.pyx @@ -5,6 +5,9 @@ The main edge this code has is that it parses the schema only once and creates a reader/writer call tree from the schema shape. All reads and writes then no longer consult the schema saving lookups.''' +cimport cpython.array as array +from libc.string cimport memcpy + import six INT_MIN_VALUE = -(1 << 31) INT_MAX_VALUE = (1 << 31) - 1 @@ -321,20 +324,136 @@ cdef void write_double(outbuf, double datum): outbuf.write((&datum)[:sizeof(double)]) -cdef void write_null(outbuf, datum): - pass +cdef write_boolean(outbuf, char datum): + """A boolean is written as a single byte whose value is either 0 (false) or + 1 (true).""" + cdef char x = 1 if datum else 0 + outbuf.write((&x)[:sizeof(char)]) + + +cdef void write_int_to_array(array.array outbuf, long long signed_datum): + """int and long values are written using variable-length, zig-zag coding. + """ + cdef: + unsigned long long datum + char temp_datum + size_t size = len(outbuf) + datum = (signed_datum << 1) ^ (signed_datum >> 63) + while datum > 127: + temp_datum = (datum & 0x7f) | 0x80 + array.resize_smart(outbuf, size + 1) + outbuf.data.as_uchars[size] = temp_datum + size += 1 + datum >>= 7 + array.resize_smart(outbuf, size + 1) + outbuf.data.as_uchars[size] = datum + +cdef void write_long_to_array(array.array outbuf, long long signed_datum): + """int and long values are written using variable-length, zig-zag coding. + """ + cdef: + unsigned long long datum + char temp_datum + size_t size = len(outbuf) + datum = (signed_datum << 1) ^ (signed_datum >> 63) + while datum > 127: + temp_datum = (datum & 0x7f) | 0x80 + array.resize_smart(outbuf, size + 1) + outbuf.data.as_uchars[size] = temp_datum + size += 1 + datum >>= 7 + array.resize_smart(outbuf, size + 1) + outbuf.data.as_uchars[size] = datum + + +cdef void write_bytes_to_array(array.array outbuf, datum): + """ + Bytes are encoded as a long followed by that many bytes of data. + """ + cdef: + size_t datum_size = len(datum) + size_t size = 0 + write_long_to_array(outbuf, datum_size) + size = len(outbuf) + array.resize_smart(outbuf, size + datum_size) + memcpy(outbuf.data.as_chars + size, datum, datum_size) -cdef void write_fixed(outbuf, datum): +cdef void write_utf8_to_array(array.array outbuf, datum): + """ + Unicode are encoded as write_bytes_to_array of the utf-8 encoded data. + """ + write_bytes_to_array(outbuf, datum.encode("utf-8")) + + +cdef void write_float_to_array(array.array outbuf, float datum): + """ + A float is written as 4 bytes. + The float is converted into a 32-bit integer using a method equivalent to + Java's floatToIntBits and then encoded in little-endian format. + """ + cdef: + size_t datum_size = sizeof(float) + size_t size = len(outbuf) + array.resize_smart(outbuf, size + datum_size) + memcpy(outbuf.data.as_chars + size, &datum, datum_size) + +cdef void write_double_to_array(array.array outbuf, double datum): + """ + A double is written as 8 bytes. + The double is converted into a 64-bit integer using a method equivalent to + Java's doubleToLongBits and then encoded in little-endian format. + """ + cdef: + size_t datum_size = sizeof(double) + size_t size = len(outbuf) + array.resize_smart(outbuf, size + datum_size) + memcpy(outbuf.data.as_chars + size, &datum, datum_size) + + +cdef void write_fixed_to_array(array.array outbuf, datum): """A fixed writer writes out exactly the bytes up to a count""" - outbuf.write(datum) + cdef: + size_t datum_size = len(datum) + size_t size = len(outbuf) + array.resize_smart(outbuf, size + datum_size) + memcpy(outbuf.data.as_chars + size, datum, datum_size) -cdef write_boolean(outbuf, char datum): +cdef void write_boolean_to_array(array.array outbuf, char datum): """A boolean is written as a single byte whose value is either 0 (false) or 1 (true).""" - cdef char x = 1 if datum else 0 - outbuf.write((&x)[:sizeof(char)]) + cdef: + char x = 1 if datum else 0 + size_t size = len(outbuf) + array.resize_smart(outbuf, size + 1) + outbuf.data.as_uchars[size] = x + + +cdef void write_enum_to_array(array.array outbuf, char datum, list symbols): + cdef int enum_index = symbols.index(datum) + write_int_to_array(outbuf, enum_index) + + +cdef void write_array_to_array(array.array outbuf, list datum, list item_writer): + cdef: + size_t item_count = len(datum) + if item_count > 0: + write_long_to_array(outbuf, item_count) + for item in datum: + execute(outbuf, item, item_writer) + write_long_to_array(outbuf, 0) + + +cdef void write_map_to_array(array.array outbuf, dict datum, list map_value_writer): + cdef: + size_t item_count = len(datum) + if item_count > 0: + write_long_to_array(outbuf, item_count) + for key, val in datum.iteritems(): + write_utf8_to_array(outbuf, key) + execute(outbuf, val, map_value_writer) + write_long_to_array(outbuf, 0) avro_to_py = { @@ -551,133 +670,60 @@ def make_union_writer(union_schema): writer_lookup = complex_writer_lookup - def write_union(outbuf, datum): - idx, data_writer = writer_lookup(datum) - write_long(outbuf, idx) - data_writer(outbuf, datum) - write_union.__reduce__ = lambda: (make_union_writer, (union_schema,)) - return write_union - -def make_enum_writer(schema): - cdef list symbols = schema['symbols'] - - # the datum can be str or unicode? - def write_enum(outbuf, basestring datum): - cdef int enum_index = symbols.index(datum) - write_int(outbuf, enum_index) - write_enum.__reduce__ = lambda: (make_enum_writer, (schema,)) - return write_enum - + return [0, writer_lookup] def make_record_writer(schema): cdef list fields = [WriteField(field['name'], get_writer(field['type'])) for field in schema['fields']] + return [1, fields] - def write_record(outbuf, datum): - for field in fields: - try: - field.writer(outbuf, datum.get(field.name)) - except TypeError as e: - raise TypeError("Error writing record schema at fieldname: '{}', datum: '{}'".format(field.name, repr(datum.get(field.name)))) - write_record.__reduce__ = lambda: (make_record_writer, (schema,)) - return write_record - - -def make_array_writer(schema): - item_writer = get_writer(schema['items']) - - def write_array(outbuf, list datum): - cdef long item_count = len(datum) - if item_count > 0: - write_long(outbuf, item_count) - for item in datum: - item_writer(outbuf, item) - write_long(outbuf, 0) - write_array.__reduce__ = lambda: (make_array_writer, (schema,)) - return write_array - - -def make_map_writer(schema): - map_value_writer = get_writer(schema['values']) - - def write_map(outbuf, datum): - cdef long item_count = len(datum) - if item_count > 0: - write_long(outbuf, item_count) - for key, val in datum.iteritems(): - write_utf8(outbuf, key) - map_value_writer(outbuf, val) - write_long(outbuf, 0) - write_map.__reduce__ = lambda: (make_map_writer, (schema,)) - return write_map +def make_null_writer(schema): + return [2] +def make_string_writer(schema): + return [3] def make_boolean_writer(schema): '''Create a boolean writer, adds a validation step before the actual write function''' - def checked_boolean_writer(outbuf, datum): - if not isinstance(datum, bool): - raise TypeError("{} - Not a boolean value. Schema: {}".format(repr(datum), schema)) - write_boolean(outbuf, datum) - return checked_boolean_writer - + return [4] -def make_fixed_writer(schema): - '''A writer that must write X bytes defined by the schema''' - cdef long size = schema['size'] - # note: not a char* because those are null terminated and fixed - # has no such limitation - def checked_write_fixed(outbuf, datum): - if len(datum) != size: - raise TypeError("{} - Size Mismatch ({}) for Fixed data. Schema: {}".format(repr(datum), len(datum), schema)) - write_fixed(outbuf, datum) - return checked_write_fixed - - -def make_int_writer(schema): - '''Create a int writer, adds a validation step before the actual - write function to make sure the int value doesn't overflow''' - def checked_int_write(outbuf, datum): - if not isinstance(datum, six.integer_types): - raise TypeError("Schema violation, {} is not an example of schema {}".format(datum, schema)) - if not INT_MIN_VALUE <= datum <= INT_MAX_VALUE: - raise TypeError("Schema violation, value overflow. {} can't be stored in schema: {}".format(datum, schema)) - write_long(outbuf, datum) - return checked_int_write +def make_double_writer(schema): + return [5] +def make_float_writer(schema): + return [6] def make_long_writer(schema): '''Create a long writer, adds a validation step before the actual write function to make sure the long value doesn't overflow''' - def checked_long_write(outbuf, datum): - if not (isinstance(datum, six.integer_types) - and LONG_MIN_VALUE <= datum <= LONG_MAX_VALUE): - raise TypeError("{} - Non integer value or overflow. Schema: {}".format(repr(datum), schema)) - write_long(outbuf, datum) - return checked_long_write - - -def make_string_writer(schema): - def checked_string_writer(outbuf, datum): - if not isinstance(datum, six.string_types): - raise TypeError("{} - is not a string value. Schema: {}".format(repr(datum), schema)) - write_utf8(outbuf, datum) - return checked_string_writer - + return [7] def make_byte_writer(schema): - return write_bytes + return [8] +def make_int_writer(schema): + '''Create a int writer, adds a validation step before the actual + write function to make sure the int value doesn't overflow''' + return [9] -def make_float_writer(schema): - return write_float - +def make_fixed_writer(schema): + '''A writer that must write X bytes defined by the schema''' + cdef long size = schema['size'] + # note: not a char* because those are null terminated and fixed + # has no such limitation + return [10, size] -def make_double_writer(schema): - return write_double +def make_enum_writer(schema): + cdef list symbols = schema['symbols'] + return [11, symbols] +def make_array_writer(schema): + cdef list item_writer = get_writer(schema['items']) + return [12, item_writer] -def make_null_writer(schema): - return write_null +def make_map_writer(schema): + cdef list map_value_writer = get_writer(schema['values']) + return [13, map_value_writer] # writer @@ -704,9 +750,6 @@ class WriterPlaceholder(object): def __init__(self): self.writer = None - def __call__(self, fo, val): - return self.writer(fo, val) - def get_writer(schema): cdef unicode schema_type = get_type(schema) @@ -733,7 +776,7 @@ def get_writer(schema): except KeyError: # lookup the schema by unique previously defined name, # i.e. a custom type - writer = schema_cache[schema_type] + writer = schema_cache[schema_type].writer return writer @@ -851,3 +894,79 @@ class FastBinaryDecoder(object): def skip(self, n): self.reader.seek(self.reader.tell() + n) + + +cdef void write_union_to_array(array.array outbuf, datum, writer_lookup): + idx, data_writer = writer_lookup(datum) # TODO: cdef int, list = f()? + write_long_to_array(outbuf, idx) + execute(outbuf, datum, data_writer) + + +cdef void write_record_to_array(array.array outbuf, dict datum, list fields): + for field in fields: + try: + execute(outbuf, datum.get(field.name), field.writer) + except TypeError as e: + raise TypeError("Error writing record schema at fieldname: '{}', datum: '{}'".format(field.name, repr(datum.get(field.name)))) + + +cdef void execute(array.array outbuf, datum, list writer): + cdef unsigned int writer_f = writer[0] + + if writer_f == 0: # make_union_writer + write_union_to_array(outbuf, datum, writer[1]) + + elif writer_f == 1: # make_record_writer + write_record_to_array(outbuf, datum, writer[1]) + + # skip make_null_writer + + elif writer_f == 3: # make_string_writer + if not isinstance(datum, six.string_types): + raise TypeError("{} - is not a string value.".format(repr(datum))) + write_utf8_to_array(outbuf, datum) + + elif writer_f == 4: # make_boolean_writer + if not isinstance(datum, bool): + raise TypeError("{} - Not a boolean value.".format(repr(datum))) + write_boolean_to_array(outbuf, datum) + + elif writer_f == 5: # make_double_writer + write_double_to_array(outbuf, datum) + + elif writer_f == 6: # make_float_writer + write_float_to_array(outbuf, datum) + + elif writer_f == 7: # make_long_writer + if not (isinstance(datum, six.integer_types) + and LONG_MIN_VALUE <= datum <= LONG_MAX_VALUE): + raise TypeError("{} - Non integer value or overflow.".format(repr(datum))) + write_long_to_array(outbuf, datum) + + elif writer_f == 8: # make_byte_writer + write_bytes_to_array(outbuf, datum) + + elif writer_f == 9: # make_int_writer + if not isinstance(datum, six.integer_types): + raise TypeError("Schema violation, {} is not an example of integer".format(datum)) + if not INT_MIN_VALUE <= datum <= INT_MAX_VALUE: + raise TypeError("Schema violation, value overflow. {} can't be stored in schema".format(datum)) + write_long_to_array(outbuf, datum) + + elif writer_f == 10: # make_fixed_writer + write_fixed_to_array(outbuf, datum) + + elif writer_f == 11: # make_enum_writer + write_enum_to_array(outbuf, datum, writer[1]) + + elif writer_f == 12: # make_union_writer + write_array_to_array(outbuf, datum, writer[1]) + + elif writer_f == 13: # make_union_writer + write_map_to_array(outbuf, datum, writer[1]) + + +def write(iobuffer, datum, writer): + cdef array.array outbuf = array.array('B', []) + execute(outbuf, datum, writer) + iobuffer.write(outbuf.data.as_chars[:len(outbuf)]) diff --git a/src/spavro/io.py b/src/spavro/io.py index 466da21..7d1477d 100755 --- a/src/spavro/io.py +++ b/src/spavro/io.py @@ -143,7 +143,7 @@ def validate(expected_schema, datum): log = logging.getLogger(__name__) use_fast = False try: - from spavro.fast_binary import get_reader, get_writer + from spavro.fast_binary import get_reader, get_writer, write from spavro.fast_binary import FastBinaryEncoder, FastBinaryDecoder use_fast = True except ImportError: @@ -822,7 +822,7 @@ def writers_schema(self, parsed_writer_schema): def write(self, datum, encoder): # validate datum try: - self.write_datum(encoder.writer, datum) + write(encoder.writer, datum, self.write_datum) except TypeError as ex: log.error(self.write_datum) log.exception("type error")