Skip to content

Commit

Permalink
Fix references in Avro unions
Browse files Browse the repository at this point in the history
  • Loading branch information
rayokota committed Nov 20, 2024
1 parent 43fef36 commit bce2db8
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 2 deletions.
4 changes: 2 additions & 2 deletions src/confluent_kafka/schema_registry/avro.py
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ def _get_parsed_schema(self, schema: Schema) -> AvroSchema:
named_schemas = _resolve_named_schema(schema, self._registry)
prepared_schema = _schema_loads(schema.schema_str)
parsed_schema = parse_schema(
loads(prepared_schema.schema_str), named_schemas=named_schemas)
loads(prepared_schema.schema_str), named_schemas=named_schemas, expand=True)

self._parsed_schemas.set(schema, parsed_schema)
return parsed_schema
Expand Down Expand Up @@ -604,7 +604,7 @@ def _get_parsed_schema(self, schema: Schema) -> AvroSchema:
named_schemas = _resolve_named_schema(schema, self._registry)
prepared_schema = _schema_loads(schema.schema_str)
parsed_schema = parse_schema(
loads(prepared_schema.schema_str), named_schemas=named_schemas)
loads(prepared_schema.schema_str), named_schemas=named_schemas, expand=True)

self._parsed_schemas.set(schema, parsed_schema)
return parsed_schema
Expand Down
45 changes: 45 additions & 0 deletions tests/schema_registry/test_avro_serdes.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,51 @@ def test_avro_serialize_references():
assert obj == obj2


def test_avro_serialize_union_with_references():
conf = {'url': _BASE_URL}
client = SchemaRegistryClient.new_client(conf)
ser_conf = {'auto.register.schemas': False, 'use.latest.version': True}

obj = {
'intField': 123,
'doubleField': 45.67,
'stringField': 'hi',
'booleanField': True,
'bytesField': b'foobar',
}
ref_schema = {
'type': 'record',
'name': 'ref',
'fields': [
{'name': 'intField', 'type': 'int'},
{'name': 'doubleField', 'type': 'double'},
{'name': 'stringField', 'type': 'string'},
{'name': 'booleanField', 'type': 'boolean'},
{'name': 'bytesField', 'type': 'bytes'},
]
}
client.register_schema('ref', Schema(json.dumps(ref_schema)))
ref2_schema = {
'type': 'record',
'name': 'ref2',
'fields': [
{'name': 'otherField', 'type': 'string'}
]
}
client.register_schema('ref2', Schema(json.dumps(ref2_schema)))
schema = [ 'ref', 'ref2' ]
refs = [SchemaReference('ref', 'ref', 1), SchemaReference('ref2', 'ref2', 1)]
client.register_schema(_SUBJECT, Schema(json.dumps(schema), 'AVRO', refs))

ser = AvroSerializer(client, schema_str=None, conf=ser_conf)
ser_ctx = SerializationContext(_TOPIC, MessageField.VALUE)
bytes = ser(obj, ser_ctx)

deser = AvroDeserializer(client)
obj2 = deser(bytes, ser_ctx)
assert obj == obj2


def test_avro_schema_evolution():
conf = {'url': _BASE_URL}
client = SchemaRegistryClient.new_client(conf)
Expand Down

0 comments on commit bce2db8

Please sign in to comment.