From 5ab6ba00fa8b0d6dd0e96e24d0a6d7ef86d90d5d Mon Sep 17 00:00:00 2001 From: libretto Date: Thu, 26 Sep 2024 22:34:47 +0300 Subject: [PATCH 1/3] Implementation of AVRO References support --- karapace/compatibility/__init__.py | 1 + karapace/schema_models.py | 70 ++++- karapace/schema_reader.py | 14 +- karapace/schema_registry_apis.py | 2 +- .../test_schema_avro_references.py | 295 ++++++++++++++++++ tests/unit/test_avro_merge.py | 76 +++++ 6 files changed, 452 insertions(+), 6 deletions(-) create mode 100644 tests/integration/test_schema_avro_references.py create mode 100644 tests/unit/test_avro_merge.py diff --git a/karapace/compatibility/__init__.py b/karapace/compatibility/__init__.py index e5f61e710..2e9e4a0ab 100644 --- a/karapace/compatibility/__init__.py +++ b/karapace/compatibility/__init__.py @@ -88,6 +88,7 @@ def check_compatibility( if old_schema.schema_type is SchemaType.AVRO: assert isinstance(old_schema.schema, AvroSchema) assert isinstance(new_schema.schema, AvroSchema) + if compatibility_mode in {CompatibilityModes.BACKWARD, CompatibilityModes.BACKWARD_TRANSITIVE}: result = check_avro_compatibility( reader_schema=new_schema.schema, diff --git a/karapace/schema_models.py b/karapace/schema_models.py index eab1a5c9f..064f3e163 100644 --- a/karapace/schema_models.py +++ b/karapace/schema_models.py @@ -28,8 +28,10 @@ from karapace.utils import assert_never, json_decode, json_encode, JSONDecodeError from typing import Any, cast, Collection, Dict, Final, final, Mapping, Sequence +import avro.schema import hashlib import logging +import re LOG = logging.getLogger(__name__) @@ -152,6 +154,7 @@ def normalize_schema_str( except JSONDecodeError as e: LOG.info("Schema is not valid JSON") raise e + elif schema_type == SchemaType.PROTOBUF: if schema: schema_str = str(schema) @@ -194,6 +197,45 @@ def schema(self) -> Draft7Validator | AvroSchema | ProtobufSchema: return parsed_typed_schema.schema +class AvroMerge: + def __init__(self, schema_str: str, dependencies: Mapping[str, Dependency] | None = None): + self.schema_str = json_encode(json_decode(schema_str), compact=True, sort_keys=True) + self.dependencies = dependencies + self.unique_id = 0 + self.regex = re.compile(r"^\s*\[") + + def union_safe_schema_str(self, schema_str: str) -> str: + # in case we meet union - we use it as is + + base_schema = ( + f'{{"name": "___RESERVED_KARAPACE_WRAPPER_NAME_{self.unique_id}___",' + f'"type": "record", "fields": [{{"name": "name", "type":' + ) + if self.regex.match(schema_str): + return f"{base_schema} {schema_str}}}]}}" + return f"{base_schema} [{schema_str}]}}]}}" + + def builder(self, schema_str: str, dependencies: Mapping[str, Dependency] | None = None) -> str: + """To support references in AVRO we iteratively merge all referenced schemas with current schema""" + stack: list[tuple[str, Mapping[str, Dependency] | None]] = [(schema_str, dependencies)] + merged_schemas = [] + + while stack: + current_schema_str, current_dependencies = stack.pop() + if current_dependencies: + stack.append((current_schema_str, None)) + for dependency in reversed(current_dependencies.values()): + stack.append((dependency.schema.schema_str, dependency.schema.dependencies)) + else: + self.unique_id += 1 + merged_schemas.append(self.union_safe_schema_str(current_schema_str)) + + return ",\n".join(merged_schemas) + + def wrap(self) -> str: + return "[\n" + self.builder(self.schema_str, self.dependencies) + "\n]" + + def parse( schema_type: SchemaType, schema_str: str, @@ -206,18 +248,37 @@ def parse( ) -> ParsedTypedSchema: if schema_type not in [SchemaType.AVRO, SchemaType.JSONSCHEMA, SchemaType.PROTOBUF]: raise InvalidSchema(f"Unknown parser {schema_type} for {schema_str}") - + parsed_schema_result: Draft7Validator | AvroSchema | ProtobufSchema parsed_schema: Draft7Validator | AvroSchema | ProtobufSchema if schema_type is SchemaType.AVRO: try: + if dependencies: + wrapped_schema_str = AvroMerge(schema_str, dependencies).wrap() + else: + wrapped_schema_str = schema_str parsed_schema = parse_avro_schema_definition( - schema_str, + wrapped_schema_str, validate_enum_symbols=validate_avro_enum_symbols, validate_names=validate_avro_names, ) + if dependencies: + if isinstance(parsed_schema, avro.schema.UnionSchema): + parsed_schema_result = parsed_schema.schemas[-1].fields[0].type.schemas[-1] + + else: + raise InvalidSchema + else: + parsed_schema_result = parsed_schema + return ParsedTypedSchema( + schema_type=schema_type, + schema_str=schema_str, + schema=parsed_schema_result, + references=references, + dependencies=dependencies, + schema_wrapped=parsed_schema, + ) except (SchemaParseException, JSONDecodeError, TypeError) as e: raise InvalidSchema from e - elif schema_type is SchemaType.JSONSCHEMA: try: parsed_schema = parse_jsonschema_definition(schema_str) @@ -284,9 +345,10 @@ def __init__( schema: Draft7Validator | AvroSchema | ProtobufSchema, references: Sequence[Reference] | None = None, dependencies: Mapping[str, Dependency] | None = None, + schema_wrapped: Draft7Validator | AvroSchema | ProtobufSchema | None = None, ) -> None: self._schema_cached: Draft7Validator | AvroSchema | ProtobufSchema | None = schema - + self.schema_wrapped = schema_wrapped super().__init__( schema_type=schema_type, schema_str=schema_str, diff --git a/karapace/schema_reader.py b/karapace/schema_reader.py index 01f07f379..e69a55b30 100644 --- a/karapace/schema_reader.py +++ b/karapace/schema_reader.py @@ -549,7 +549,19 @@ def _handle_msg_schema(self, key: dict, value: dict | None) -> None: parsed_schema: Draft7Validator | AvroSchema | ProtobufSchema | None = None resolved_dependencies: dict[str, Dependency] | None = None - if schema_type_parsed in [SchemaType.AVRO, SchemaType.JSONSCHEMA]: + if schema_type_parsed == SchemaType.AVRO: + try: + if schema_references: + candidate_references = [reference_from_mapping(reference_data) for reference_data in schema_references] + resolved_references, resolved_dependencies = self.resolve_references(candidate_references) + schema_str = json.dumps(json.loads(schema_str), sort_keys=True) + except json.JSONDecodeError as e: + LOG.warning("Schema is not valid JSON") + raise e + except InvalidReferences as e: + LOG.exception("Invalid AVRO references") + raise e + elif schema_type_parsed == SchemaType.JSONSCHEMA: try: schema_str = json.dumps(json.loads(schema_str), sort_keys=True) except json.JSONDecodeError as exc: diff --git a/karapace/schema_registry_apis.py b/karapace/schema_registry_apis.py index 5a9196087..2bd9d634f 100644 --- a/karapace/schema_registry_apis.py +++ b/karapace/schema_registry_apis.py @@ -1056,7 +1056,7 @@ def _validate_references( content_type=content_type, status=HTTPStatus.BAD_REQUEST, ) - if references and schema_type != SchemaType.PROTOBUF: + if references and schema_type != SchemaType.PROTOBUF and schema_type != SchemaType.AVRO: self.r( body={ "error_code": SchemaErrorCodes.REFERENCES_SUPPORT_NOT_IMPLEMENTED.value, diff --git a/tests/integration/test_schema_avro_references.py b/tests/integration/test_schema_avro_references.py new file mode 100644 index 000000000..b1308e0e8 --- /dev/null +++ b/tests/integration/test_schema_avro_references.py @@ -0,0 +1,295 @@ +""" +karapace - schema tests + +Copyright (c) 2023 Aiven Ltd +See LICENSE for details +""" + +from karapace.client import Client, Result +from tests.utils import create_subject_name_factory + +import json + +baseurl = "http://localhost:8081" + +# country.avsc +SCHEMA_COUNTRY = { + "type": "record", + "name": "Country", + "namespace": "com.netapp", + "fields": [{"name": "name", "type": "string"}, {"name": "code", "type": "string"}], +} + +# address.avsc +SCHEMA_ADDRESS = { + "type": "record", + "name": "Address", + "namespace": "com.netapp", + "fields": [ + {"name": "street", "type": "string"}, + {"name": "city", "type": "string"}, + {"name": "postalCode", "type": "string"}, + {"name": "country", "type": "Country"}, + ], +} + +# job.avsc +SCHEMA_JOB = { + "type": "record", + "name": "Job", + "namespace": "com.netapp", + "fields": [{"name": "title", "type": "string"}, {"name": "salary", "type": "double"}], +} + +# person.avsc +SCHEMA_PERSON = { + "type": "record", + "name": "Person", + "namespace": "com.netapp", + "fields": [ + {"name": "name", "type": "string"}, + {"name": "age", "type": "int"}, + {"name": "address", "type": "Address"}, + {"name": "job", "type": "Job"}, + ], +} + +SCHEMA_PERSON_AGE_INT_LONG = { + "type": "record", + "name": "Person", + "namespace": "com.netapp", + "fields": [ + {"name": "name", "type": "string"}, + {"name": "age", "type": "long"}, + {"name": "address", "type": "Address"}, + {"name": "job", "type": "Job"}, + ], +} + +SCHEMA_PERSON_AGE_LONG_STRING = { + "type": "record", + "name": "Person", + "namespace": "com.netapp", + "fields": [ + {"name": "name", "type": "string"}, + {"name": "age", "type": "string"}, + {"name": "address", "type": "Address"}, + {"name": "job", "type": "Job"}, + ], +} + +SCHEMA_UNION_REFERENCES = { + "type": "record", + "namespace": "com.netapp", + "name": "Person2", + "fields": [ + {"name": "name", "type": "string"}, + {"name": "age", "type": "int"}, + {"name": "address", "type": "Address"}, + {"name": "job", "type": "Job"}, + { + "name": "children", + "type": [ + "null", + { + "type": "record", + "name": "child", + "fields": [{"name": "name", "type": "string"}, {"name": "age", "type": "int"}], + }, + ], + }, + ], +} + +SCHEMA_UNION_REFERENCES2 = [ + { + "type": "record", + "name": "Person", + "namespace": "com.netapp", + "fields": [ + {"name": "name", "type": "string"}, + {"name": "age", "type": "int"}, + {"name": "address", "type": "Address"}, + {"name": "job", "type": "Job"}, + ], + }, + { + "type": "record", + "name": "UnemployedPerson", + "namespace": "com.netapp", + "fields": [ + {"name": "name", "type": "string"}, + {"name": "age", "type": "int"}, + {"name": "address", "type": "Address"}, + ], + }, +] + +SCHEMA_ADDRESS_INCOMPATIBLE = { + "type": "record", + "name": "ChangedAddress", + "namespace": "com.netapp", + "fields": [ + {"name": "street", "type": "string"}, + {"name": "city", "type": "string"}, + {"name": "postalCode", "type": "string"}, + {"name": "country", "type": "Country"}, + ], +} + + +def address_references(subject_prefix: str) -> list: + return [{"name": "country.avsc", "subject": f"{subject_prefix}country", "version": 1}] + + +def person_references(subject_prefix: str) -> list: + return [ + {"name": "address.avsc", "subject": f"{subject_prefix}address", "version": 1}, + {"name": "job.avsc", "subject": f"{subject_prefix}job", "version": 1}, + ] + + +def stored_person_subject(subject_prefix: str, subject_id: int) -> dict: + return { + "id": subject_id, + "references": [ + {"name": "address.avsc", "subject": f"{subject_prefix}address", "version": 1}, + {"name": "job.avsc", "subject": f"{subject_prefix}job", "version": 1}, + ], + "schema": json.dumps( + { + "fields": [ + {"name": "name", "type": "string"}, + {"name": "age", "type": "int"}, + {"name": "address", "type": "Address"}, + {"name": "job", "type": "Job"}, + ], + "name": "Person", + "namespace": "com.netapp", + "type": "record", + }, + separators=(",", ":"), + ), + "subject": f"{subject_prefix}person", + "version": 1, + } + + +async def basic_avro_references_fill_test(registry_async_client: Client, subject_prefix: str) -> Result: + res = await registry_async_client.post( + f"subjects/{subject_prefix}country/versions", json={"schema": json.dumps(SCHEMA_COUNTRY)} + ) + assert res.status_code == 200 + assert "id" in res.json() + + res = await registry_async_client.post( + f"subjects/{subject_prefix}address/versions", + json={"schemaType": "AVRO", "schema": json.dumps(SCHEMA_ADDRESS), "references": address_references(subject_prefix)}, + ) + assert res.status_code == 200 + assert "id" in res.json() + address_id = res.json()["id"] + + # Check if the schema has now been registered under the subject + + res = await registry_async_client.post( + f"subjects/{subject_prefix}address", + json={"schemaType": "AVRO", "schema": json.dumps(SCHEMA_ADDRESS), "references": address_references(subject_prefix)}, + ) + assert res.status_code == 200 + assert "subject" in res.json() + assert "id" in res.json() + assert address_id == res.json()["id"] + assert "version" in res.json() + assert "schema" in res.json() + + res = await registry_async_client.post(f"subjects/{subject_prefix}job/versions", json={"schema": json.dumps(SCHEMA_JOB)}) + assert res.status_code == 200 + assert "id" in res.json() + res = await registry_async_client.post( + f"subjects/{subject_prefix}person/versions", + json={"schemaType": "AVRO", "schema": json.dumps(SCHEMA_PERSON), "references": person_references(subject_prefix)}, + ) + assert res.status_code == 200 + assert "id" in res.json() + return res + + +async def test_basic_avro_references(registry_async_client: Client) -> None: + subject_prefix = create_subject_name_factory("basic-avro-references-")() + res = await basic_avro_references_fill_test(registry_async_client, subject_prefix) + person_id = res.json()["id"] + res = await registry_async_client.get(f"subjects/{subject_prefix}person/versions/latest") + assert res.status_code == 200 + assert res.json() == stored_person_subject(subject_prefix, person_id) + + +async def test_avro_references_compatibility(registry_async_client: Client) -> None: + subject_prefix = create_subject_name_factory("avro-references-compatibility-")() + await basic_avro_references_fill_test(registry_async_client, subject_prefix) + + res = await registry_async_client.post( + f"compatibility/subjects/{subject_prefix}person/versions/latest", + json={ + "schemaType": "AVRO", + "schema": json.dumps(SCHEMA_PERSON_AGE_INT_LONG), + "references": person_references(subject_prefix), + }, + ) + assert res.status_code == 200 + assert res.json() == {"is_compatible": True} + res = await registry_async_client.post( + f"compatibility/subjects/{subject_prefix}person/versions/latest", + json={ + "schemaType": "AVRO", + "schema": json.dumps(SCHEMA_PERSON_AGE_LONG_STRING), + "references": person_references(subject_prefix), + }, + ) + assert res.status_code == 200 + assert res.json() == {"is_compatible": False} + + +async def test_avro_union_references(registry_async_client: Client) -> None: + subject_prefix = create_subject_name_factory("avro-references-union-one-")() + await basic_avro_references_fill_test(registry_async_client, subject_prefix) + res = await registry_async_client.post( + f"subjects/{subject_prefix}person2/versions", + json={ + "schemaType": "AVRO", + "schema": json.dumps(SCHEMA_UNION_REFERENCES), + "references": person_references(subject_prefix), + }, + ) + assert res.status_code == 200 + assert "id" in res.json() + + +async def test_avro_union_references2(registry_async_client: Client) -> None: + subject_prefix = create_subject_name_factory("avro-references-union-two-")() + await basic_avro_references_fill_test(registry_async_client, subject_prefix) + res = await registry_async_client.post( + f"subjects/{subject_prefix}person2/versions", + json={ + "schemaType": "AVRO", + "schema": json.dumps(SCHEMA_UNION_REFERENCES2), + "references": person_references(subject_prefix), + }, + ) + assert res.status_code == 200 and "id" in res.json() + + +async def test_avro_incompatible_name_references(registry_async_client: Client) -> None: + subject_prefix = create_subject_name_factory("avro-references-incompatible-name-")() + await basic_avro_references_fill_test(registry_async_client, subject_prefix) + res = await registry_async_client.post( + f"subjects/{subject_prefix}address/versions", + json={ + "schemaType": "AVRO", + "schema": json.dumps(SCHEMA_ADDRESS_INCOMPATIBLE), + "references": address_references(subject_prefix), + }, + ) + assert res.status_code == 409 + msg = "Incompatible schema, compatibility_mode=BACKWARD expected: com.netapp.Address" + assert res.json()["message"] == msg diff --git a/tests/unit/test_avro_merge.py b/tests/unit/test_avro_merge.py new file mode 100644 index 000000000..553afdd0f --- /dev/null +++ b/tests/unit/test_avro_merge.py @@ -0,0 +1,76 @@ +""" +karapace - Unit Test of AvroMerge class + +Copyright (c) 2023 Aiven Ltd +See LICENSE for details +""" + + +from karapace.schema_models import AvroMerge +from karapace.utils import json_decode, json_encode +from unittest.mock import MagicMock + +import pytest + + +class TestAvroMerge: + @pytest.fixture + def avro_merge(self): + schema_str = '{"type": "record", "name": "Test", "fields": [{"name": "field1", "type": "string"}]}' + dependencies = {"dependency1": MagicMock(schema=MagicMock(schema_str='{"type": "string"}', dependencies=None))} + return AvroMerge(schema_str, dependencies) + + def test_init(self, avro_merge): + assert avro_merge.schema_str == json_encode(json_decode(avro_merge.schema_str), compact=True, sort_keys=True) + assert avro_merge.unique_id == 0 + + def test_union_safe_schema_str_no_union(self, avro_merge): + result = avro_merge.union_safe_schema_str('{"type": "string"}') + expected = ( + '{"name": "___RESERVED_KARAPACE_WRAPPER_NAME_0___","type": "record", "fields": [{"name": "name", ' + '"type": [{"type": "string"}]}]}' + ) + assert result == expected + + def test_union_safe_schema_str_with_union(self, avro_merge): + result = avro_merge.union_safe_schema_str('["null", "string"]') + expected = ( + '{"name": "___RESERVED_KARAPACE_WRAPPER_NAME_0___","type": "record", "fields": [{"name": "name", ' + '"type": ["null", "string"]}]}' + ) + assert result == expected + + def test_builder_no_dependencies(self, avro_merge): + avro_merge.dependencies = None + result = avro_merge.builder(avro_merge.schema_str) + expected = ( + '{"name": "___RESERVED_KARAPACE_WRAPPER_NAME_1___","type": "record", "fields": [{"name": "name", ' + '"type": [{"fields":[{"name":"field1","type":"string"}],"name":"Test","type":"record"}]}]}' + ) + assert result == expected + + def test_builder_with_dependencies(self, avro_merge): + result = avro_merge.builder(avro_merge.schema_str, avro_merge.dependencies) + expected = ( + '{"name": "___RESERVED_KARAPACE_WRAPPER_NAME_1___","type": "record", "fields": [{"name": "name", "type": [{' + '"type": "string"}]}]},' + "\n" + '{"name": "___RESERVED_KARAPACE_WRAPPER_NAME_2___","type": "record", "fields": [{"name": "name", "type": [{' + '"fields":[{"name":"field1","type":"string"}],"name":"Test","type":"record"}]}]}' + ) + assert result == expected + + def test_wrap(self, avro_merge): + result = avro_merge.wrap() + expected = ( + "[\n" + + ( + '{"name": "___RESERVED_KARAPACE_WRAPPER_NAME_1___","type": "record", "fields": [{"name": "name", "type": [{' + '"type": "string"}]}]},' + "\n" + '{"name": "___RESERVED_KARAPACE_WRAPPER_NAME_2___","type": "record", "fields": [{"name": "name", "type": [{' + '"fields":[{"name":"field1","type":"string"}],"name":"Test","type":"record"}]}]}' + ) + + "\n]" + ) + assert result == expected From 445519ecd552158f55ce553ce65740bd48e52b14 Mon Sep 17 00:00:00 2001 From: libretto Date: Fri, 25 Oct 2024 20:09:25 +0300 Subject: [PATCH 2/3] fixup after merge --- tests/integration/test_schema_avro_references.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/integration/test_schema_avro_references.py b/tests/integration/test_schema_avro_references.py index b1308e0e8..7beea1f54 100644 --- a/tests/integration/test_schema_avro_references.py +++ b/tests/integration/test_schema_avro_references.py @@ -247,7 +247,7 @@ async def test_avro_references_compatibility(registry_async_client: Client) -> N }, ) assert res.status_code == 200 - assert res.json() == {"is_compatible": False} + assert res.json() == {"is_compatible": False, "messages": ["reader type: string not compatible with writer type: int"]} async def test_avro_union_references(registry_async_client: Client) -> None: @@ -291,5 +291,5 @@ async def test_avro_incompatible_name_references(registry_async_client: Client) }, ) assert res.status_code == 409 - msg = "Incompatible schema, compatibility_mode=BACKWARD expected: com.netapp.Address" + msg = "Incompatible schema, compatibility_mode=BACKWARD. Incompatibilities: expected: com.netapp.Address" assert res.json()["message"] == msg From a1f659f96ae5ac6f4a40c5fa8b8107bcd5459789 Mon Sep 17 00:00:00 2001 From: libretto Date: Sat, 9 Nov 2024 20:22:41 +0200 Subject: [PATCH 3/3] replace unions way with avro library --- src/karapace/schema_models.py | 59 +++--- tests/test_avro_references.py | 356 ++++++++++++++++++++++++++++++++++ tests/unit/test_avro_merge.py | 76 -------- 3 files changed, 379 insertions(+), 112 deletions(-) create mode 100644 tests/test_avro_references.py delete mode 100644 tests/unit/test_avro_merge.py diff --git a/src/karapace/schema_models.py b/src/karapace/schema_models.py index b56fcc123..7bfd7c4d8 100644 --- a/src/karapace/schema_models.py +++ b/src/karapace/schema_models.py @@ -5,7 +5,8 @@ from __future__ import annotations from avro.errors import SchemaParseException -from avro.schema import parse as avro_parse, Schema as AvroSchema +from avro.name import Names as AvroNames +from avro.schema import make_avsc_object, parse as avro_parse, Schema as AvroSchema from collections.abc import Collection, Mapping, Sequence from dataclasses import dataclass from jsonschema import Draft7Validator @@ -29,8 +30,8 @@ from karapace.utils import assert_never, json_decode, json_encode, JSONDecodeError from typing import Any, cast, Final, final -import avro.schema import hashlib +import json import logging import re @@ -198,28 +199,17 @@ def schema(self) -> Draft7Validator | AvroSchema | ProtobufSchema: return parsed_typed_schema.schema -class AvroMerge: +class AvroResolver: def __init__(self, schema_str: str, dependencies: Mapping[str, Dependency] | None = None): self.schema_str = json_encode(json_decode(schema_str), compact=True, sort_keys=True) self.dependencies = dependencies self.unique_id = 0 self.regex = re.compile(r"^\s*\[") - def union_safe_schema_str(self, schema_str: str) -> str: - # in case we meet union - we use it as is - - base_schema = ( - f'{{"name": "___RESERVED_KARAPACE_WRAPPER_NAME_{self.unique_id}___",' - f'"type": "record", "fields": [{{"name": "name", "type":' - ) - if self.regex.match(schema_str): - return f"{base_schema} {schema_str}}}]}}" - return f"{base_schema} [{schema_str}]}}]}}" - - def builder(self, schema_str: str, dependencies: Mapping[str, Dependency] | None = None) -> str: + def builder(self, schema_str: str, dependencies: Mapping[str, Dependency] | None = None) -> list: """To support references in AVRO we iteratively merge all referenced schemas with current schema""" stack: list[tuple[str, Mapping[str, Dependency] | None]] = [(schema_str, dependencies)] - merged_schemas = [] + merge: list = [] while stack: current_schema_str, current_dependencies = stack.pop() @@ -229,12 +219,15 @@ def builder(self, schema_str: str, dependencies: Mapping[str, Dependency] | None stack.append((dependency.schema.schema_str, dependency.schema.dependencies)) else: self.unique_id += 1 - merged_schemas.append(self.union_safe_schema_str(current_schema_str)) + merge.append(current_schema_str) - return ",\n".join(merged_schemas) + return merge - def wrap(self) -> str: - return "[\n" + self.builder(self.schema_str, self.dependencies) + "\n]" + def resolve(self) -> list: + """Resolve the given ``schema_str`` with ``dependencies`` to a list of schemas + sorted in an order where all referenced schemas are located prior to their referrers. + """ + return self.builder(self.schema_str, self.dependencies) def parse( @@ -249,34 +242,30 @@ def parse( ) -> ParsedTypedSchema: if schema_type not in [SchemaType.AVRO, SchemaType.JSONSCHEMA, SchemaType.PROTOBUF]: raise InvalidSchema(f"Unknown parser {schema_type} for {schema_str}") - parsed_schema_result: Draft7Validator | AvroSchema | ProtobufSchema parsed_schema: Draft7Validator | AvroSchema | ProtobufSchema if schema_type is SchemaType.AVRO: try: if dependencies: - wrapped_schema_str = AvroMerge(schema_str, dependencies).wrap() + schemas_list = AvroResolver(schema_str, dependencies).resolve() + names = AvroNames(validate_names=validate_avro_names) + merged_schema = None + for schema in schemas_list: + # Merge dep with all previously merged ones + merged_schema = make_avsc_object(json.loads(schema), names) + merged_schema_str = str(merged_schema) else: - wrapped_schema_str = schema_str + merged_schema_str = schema_str parsed_schema = parse_avro_schema_definition( - wrapped_schema_str, + merged_schema_str, validate_enum_symbols=validate_avro_enum_symbols, validate_names=validate_avro_names, ) - if dependencies: - if isinstance(parsed_schema, avro.schema.UnionSchema): - parsed_schema_result = parsed_schema.schemas[-1].fields[0].type.schemas[-1] - - else: - raise InvalidSchema - else: - parsed_schema_result = parsed_schema return ParsedTypedSchema( schema_type=schema_type, schema_str=schema_str, - schema=parsed_schema_result, + schema=parsed_schema, references=references, dependencies=dependencies, - schema_wrapped=parsed_schema, ) except (SchemaParseException, JSONDecodeError, TypeError) as e: raise InvalidSchema from e @@ -346,10 +335,8 @@ def __init__( schema: Draft7Validator | AvroSchema | ProtobufSchema, references: Sequence[Reference] | None = None, dependencies: Mapping[str, Dependency] | None = None, - schema_wrapped: Draft7Validator | AvroSchema | ProtobufSchema | None = None, ) -> None: self._schema_cached: Draft7Validator | AvroSchema | ProtobufSchema | None = schema - self.schema_wrapped = schema_wrapped super().__init__( schema_type=schema_type, schema_str=schema_str, diff --git a/tests/test_avro_references.py b/tests/test_avro_references.py new file mode 100644 index 000000000..af3aea0a5 --- /dev/null +++ b/tests/test_avro_references.py @@ -0,0 +1,356 @@ +""" +Copyright (c) 2023 Aiven Ltd +See LICENSE for details +""" +from karapace.dependency import Dependency +from karapace.schema_models import AvroResolver, SchemaType, ValidatedTypedSchema +from karapace.schema_references import Reference +from karapace.typing import Subject, Version + +import pytest +import textwrap + + +def create_validated_schema(schema_str: str, dependencies=None) -> ValidatedTypedSchema: + """Helper function to create a validated typed schema.""" + return ValidatedTypedSchema(schema_str=schema_str, schema_type="AVRO", dependencies=dependencies or {}) + + +@pytest.fixture(name="base_schema") +def fixture_base_schema(): + return '{"type": "record", "name": "BaseRecord", "fields": [{"name": "field1", "type": "string"}]}' + + +@pytest.fixture(name="dependency_schema") +def fixture_dependency_schema(): + return '{"type": "record", "name": "DependencyRecord", "fields": [{"name": "depField", "type": "int"}]}' + + +@pytest.fixture(name="another_dependency_schema") +def fixture_another_dependency_schema(): + return '{"type": "record", "name": "AnotherDependency", "fields": [{"name": "anotherField", "type": "boolean"}]}' + + +def test_resolver_without_dependencies(base_schema): + resolver = AvroResolver(schema_str=base_schema) + resolved_schemas = resolver.resolve() + assert resolved_schemas == [base_schema], "Expected single schema in resolved list without dependencies" + + +def test_resolver_with_single_dependency(base_schema, dependency_schema): + dependency = Dependency( + name="Dependency1", + subject=Subject("TestSubject"), + version=Version(1), + target_schema=create_validated_schema(dependency_schema), + ) + dependencies = {"Dependency1": dependency} + resolver = AvroResolver(schema_str=base_schema, dependencies=dependencies) + resolved_schemas = resolver.resolve() + assert resolved_schemas == [dependency_schema, base_schema], "Expected dependency to be resolved before base schema" + + +def test_resolver_with_multiple_dependencies(base_schema, dependency_schema, another_dependency_schema): + dependency1 = Dependency( + name="Dependency1", + subject=Subject("TestSubject1"), + version=Version(1), + target_schema=create_validated_schema(dependency_schema), + ) + dependency2 = Dependency( + name="Dependency2", + subject=Subject("TestSubject2"), + version=Version(1), + target_schema=create_validated_schema(another_dependency_schema), + ) + dependencies = {"Dependency1": dependency1, "Dependency2": dependency2} + resolver = AvroResolver(schema_str=base_schema, dependencies=dependencies) + resolved_schemas = resolver.resolve() + + # Validate both dependencies appear before the base schema, without assuming their specific order + assert dependency_schema in resolved_schemas + assert another_dependency_schema in resolved_schemas + assert resolved_schemas[-1] == base_schema, "Base schema should be the last in the resolved list" + + +def test_builder_unique_id_increment(base_schema, dependency_schema): + dependency = Dependency( + name="Dependency1", + subject=Subject("TestSubject"), + version=Version(1), + target_schema=create_validated_schema(dependency_schema), + ) + dependencies = {"Dependency1": dependency} + resolver = AvroResolver(schema_str=base_schema, dependencies=dependencies) + resolver.builder(base_schema, dependencies) + assert resolver.unique_id == 2, "Unique ID should be incremented for each processed schema" + + +def test_resolver_with_nested_dependencies(base_schema, dependency_schema, another_dependency_schema): + # Create nested dependency structure + nested_dependency = Dependency( + name="NestedDependency", + subject=Subject("NestedSubject"), + version=Version(1), + target_schema=create_validated_schema(another_dependency_schema), + ) + dependency_with_nested = Dependency( + name="Dependency1", + subject=Subject("TestSubject"), + version=Version(1), + target_schema=create_validated_schema(dependency_schema, dependencies={"NestedDependency": nested_dependency}), + ) + dependencies = {"Dependency1": dependency_with_nested} + resolver = AvroResolver(schema_str=base_schema, dependencies=dependencies) + resolved_schemas = resolver.resolve() + + # Ensure all schemas are resolved in the correct order + assert another_dependency_schema in resolved_schemas + assert dependency_schema in resolved_schemas + assert resolved_schemas[-1] == base_schema, "Base schema should be the last in the resolved list" + assert resolved_schemas.index(another_dependency_schema) < resolved_schemas.index( + dependency_schema + ), "Nested dependency should be resolved before its parent" + + +def test_avro_reference() -> None: + country_schema = ValidatedTypedSchema.parse( + schema_type=SchemaType.AVRO, + schema_str=textwrap.dedent( + """\ + { + "type": "record", + "name": "Country", + "namespace": "com.netapp", + "fields": [{"name": "name", "type": "string"}, {"name": "code", "type": "string"}] + } + """ + ), + ) + address_schema = ValidatedTypedSchema.parse( + schema_type=SchemaType.AVRO, + schema_str=textwrap.dedent( + """\ + { + "type": "record", + "name": "Address", + "namespace": "com.netapp", + "fields": [ + {"name": "street", "type": "string"}, + {"name": "city", "type": "string"}, + {"name": "postalCode", "type": "string"}, + {"name": "country", "type": "Country"} + ] + } + """ + ), + references=[Reference(name="country.avsc", subject=Subject("country"), version=Version(1))], + dependencies={ + "country": Dependency( + name="country", + subject=Subject("country"), + version=Version(1), + target_schema=country_schema, + ), + }, + ) + + # Check that the reference schema (Country) has been inlined + assert address_schema.schema == textwrap.dedent( + """\ + { + "type": "record", + "name": "Address", + "namespace": "com.netapp", + "fields": [ + { + "type": "string", + "name": "street" + }, + { + "type": "string", + "name": "city" + }, + { + "type": "string", + "name": "postalCode" + }, + { + "type": { + "type": "record", + "name": "Country", + "namespace": "com.netapp", + "fields": [ + { + "type": "string", + "name": "name" + }, + { + "type": "string", + "name": "code" + } + ] + }, + "name": "country" + } + ] + } + """ + ) + + +def test_avro_reference2() -> None: + # country.avsc + country_schema = ValidatedTypedSchema.parse( + schema_type=SchemaType.AVRO, + schema_str=textwrap.dedent( + """\ + { + "type": "record", + "name": "Country", + "namespace": "com.netapp", + "fields": [{"name": "name", "type": "string"}, {"name": "code", "type": "string"}] + } + """ + ), + ) + + # address.avsc + address_schema = ValidatedTypedSchema.parse( + schema_type=SchemaType.AVRO, + schema_str=textwrap.dedent( + """\ + { + "type": "record", + "name": "Address", + "namespace": "com.netapp", + "fields": [ + {"name": "street", "type": "string"}, + {"name": "city", "type": "string"}, + {"name": "postalCode", "type": "string"}, + {"name": "country", "type": "Country"} + ] + } + """ + ), + references=[Reference(name="country.avsc", subject=Subject("country"), version=Version(1))], + dependencies={ + "country": Dependency( + name="country", + subject=Subject("country"), + version=Version(1), + target_schema=country_schema, + ), + }, + ) + + # job.avsc + job_schema = ValidatedTypedSchema.parse( + schema_type=SchemaType.AVRO, + schema_str=textwrap.dedent( + """\ + { + "type": "record", + "name": "Job", + "namespace": "com.netapp", + "fields": [ + {"name": "title", "type": "string"}, + {"name": "salary", "type": "double"} + ] + } + """ + ), + ) + + # person.avsc + person_schema = ValidatedTypedSchema.parse( + schema_type=SchemaType.AVRO, + schema_str=textwrap.dedent( + """\ + { + "type": "record", + "name": "Person", + "namespace": "com.netapp", + "fields": [ + {"name": "name", "type": "string"}, + {"name": "age", "type": "int"}, + {"name": "address", "type": "Address"}, + {"name": "job", "type": "Job"} + ] + } + """ + ), + references=[ + Reference(name="address.avsc", subject=Subject("address"), version=Version(1)), + Reference(name="job.avsc", subject=Subject("job"), version=Version(1)), + ], + dependencies={ + "address": Dependency( + name="address", + subject=Subject("address"), + version=Version(1), + target_schema=address_schema, + ), + "job": Dependency( + name="job", + subject=Subject("job"), + version=Version(1), + target_schema=job_schema, + ), + }, + ) + + # Check that the Address and Job schemas are correctly inlined within the Person schema + expected_schema = textwrap.dedent( + """\ + { + "type": "record", + "name": "Person", + "namespace": "com.netapp", + "fields": [ + {"name": "name", "type": "string"}, + {"name": "age", "type": "int"}, + { + "name": "address", + "type": { + "type": "record", + "name": "Address", + "namespace": "com.netapp", + "fields": [ + {"name": "street", "type": "string"}, + {"name": "city", "type": "string"}, + {"name": "postalCode", "type": "string"}, + { + "name": "country", + "type": { + "type": "record", + "name": "Country", + "namespace": "com.netapp", + "fields": [ + {"name": "name", "type": "string"}, + {"name": "code", "type": "string"} + ] + } + } + ] + } + }, + { + "name": "job", + "type": { + "type": "record", + "name": "Job", + "namespace": "com.netapp", + "fields": [ + {"name": "title", "type": "string"}, + {"name": "salary", "type": "double"} + ] + } + } + ] + } + """ + ) + + # Check that the reference schemas (Address and Job, including nested Country) have been correctly inlined + assert person_schema.schema == expected_schema diff --git a/tests/unit/test_avro_merge.py b/tests/unit/test_avro_merge.py deleted file mode 100644 index 553afdd0f..000000000 --- a/tests/unit/test_avro_merge.py +++ /dev/null @@ -1,76 +0,0 @@ -""" -karapace - Unit Test of AvroMerge class - -Copyright (c) 2023 Aiven Ltd -See LICENSE for details -""" - - -from karapace.schema_models import AvroMerge -from karapace.utils import json_decode, json_encode -from unittest.mock import MagicMock - -import pytest - - -class TestAvroMerge: - @pytest.fixture - def avro_merge(self): - schema_str = '{"type": "record", "name": "Test", "fields": [{"name": "field1", "type": "string"}]}' - dependencies = {"dependency1": MagicMock(schema=MagicMock(schema_str='{"type": "string"}', dependencies=None))} - return AvroMerge(schema_str, dependencies) - - def test_init(self, avro_merge): - assert avro_merge.schema_str == json_encode(json_decode(avro_merge.schema_str), compact=True, sort_keys=True) - assert avro_merge.unique_id == 0 - - def test_union_safe_schema_str_no_union(self, avro_merge): - result = avro_merge.union_safe_schema_str('{"type": "string"}') - expected = ( - '{"name": "___RESERVED_KARAPACE_WRAPPER_NAME_0___","type": "record", "fields": [{"name": "name", ' - '"type": [{"type": "string"}]}]}' - ) - assert result == expected - - def test_union_safe_schema_str_with_union(self, avro_merge): - result = avro_merge.union_safe_schema_str('["null", "string"]') - expected = ( - '{"name": "___RESERVED_KARAPACE_WRAPPER_NAME_0___","type": "record", "fields": [{"name": "name", ' - '"type": ["null", "string"]}]}' - ) - assert result == expected - - def test_builder_no_dependencies(self, avro_merge): - avro_merge.dependencies = None - result = avro_merge.builder(avro_merge.schema_str) - expected = ( - '{"name": "___RESERVED_KARAPACE_WRAPPER_NAME_1___","type": "record", "fields": [{"name": "name", ' - '"type": [{"fields":[{"name":"field1","type":"string"}],"name":"Test","type":"record"}]}]}' - ) - assert result == expected - - def test_builder_with_dependencies(self, avro_merge): - result = avro_merge.builder(avro_merge.schema_str, avro_merge.dependencies) - expected = ( - '{"name": "___RESERVED_KARAPACE_WRAPPER_NAME_1___","type": "record", "fields": [{"name": "name", "type": [{' - '"type": "string"}]}]},' - "\n" - '{"name": "___RESERVED_KARAPACE_WRAPPER_NAME_2___","type": "record", "fields": [{"name": "name", "type": [{' - '"fields":[{"name":"field1","type":"string"}],"name":"Test","type":"record"}]}]}' - ) - assert result == expected - - def test_wrap(self, avro_merge): - result = avro_merge.wrap() - expected = ( - "[\n" - + ( - '{"name": "___RESERVED_KARAPACE_WRAPPER_NAME_1___","type": "record", "fields": [{"name": "name", "type": [{' - '"type": "string"}]}]},' - "\n" - '{"name": "___RESERVED_KARAPACE_WRAPPER_NAME_2___","type": "record", "fields": [{"name": "name", "type": [{' - '"fields":[{"name":"field1","type":"string"}],"name":"Test","type":"record"}]}]}' - ) - + "\n]" - ) - assert result == expected