Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Set avro/json schema on use.latest.version=True #1704

Closed
wants to merge 8 commits into from
55 changes: 38 additions & 17 deletions src/confluent_kafka/schema_registry/avro.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,15 +192,13 @@ def __init__(self, schema_registry_client, schema_str, to_dict=None, conf=None):
else:
raise TypeError('You must pass either schema string or schema object')

self._registry = schema_registry_client
self._schema_id = None
self._known_subjects = set()

if to_dict is not None and not callable(to_dict):
raise ValueError("to_dict must be callable with the signature "
"to_dict(object, SerializationContext)->dict")

self._to_dict = to_dict
self._registry = schema_registry_client
self._known_subjects = set()

conf_copy = self._default_conf.copy()
if conf is not None:
Expand Down Expand Up @@ -228,27 +226,47 @@ def __init__(self, schema_registry_client, schema_str, to_dict=None, conf=None):
raise ValueError("Unrecognized properties: {}"
.format(", ".join(conf_copy.keys())))

schema_dict = loads(schema.schema_str)
self._named_schemas = _resolve_named_schema(schema, schema_registry_client)
parsed_schema = parse_schema(schema_dict, named_schemas=self._named_schemas)
# Set common instance variables
# Called on __init__ and
# on __call__ if use.latest.version is set to True
self._update_schema_info(schema)

def _update_schema_info(self, schema, schema_id=None):
"""
Function to set the instance variables below upon __init__ (always) but
also on __call__ whenever the config param use.latest.version is set to True
and the subject is not in the local cache (self._known_subjects):
> self._schema_id
> self._schema
> self._named_schemas
> self._parsed_schema

if isinstance(parsed_schema, list):
# if parsed_schema is a list, we have an Avro union and there
Args:
schema (Schema):
schema_id (int, default None)
"""
self._schema = schema
self._schema_id = schema_id

schema_dict = loads(self._schema.schema_str)
self._named_schemas = _resolve_named_schema(self._schema,
self._registry)
self._parsed_schema = parse_schema(schema_dict,
named_schemas=self._named_schemas)

if isinstance(self._parsed_schema, list):
# if self._parsed_schema is a list, we have an Avro union and there
# is no valid schema name. This is fine because the only use of
# schema_name is for supplying the subject name to the registry
# and union types should use topic_subject_name_strategy, which
# just discards the schema name anyway
schema_name = None
self._schema_name = None
else:
# The Avro spec states primitives have a name equal to their type
# i.e. {"type": "string"} has a name of string.
# This function does not comply.
# https://github.com/fastavro/fastavro/issues/415
schema_name = parsed_schema.get("name", schema_dict["type"])

self._schema = schema
self._schema_name = schema_name
self._parsed_schema = parsed_schema
self._schema_name = self._parsed_schema.get("name", schema_dict["type"])

def __call__(self, obj, ctx):
"""
Expand Down Expand Up @@ -278,8 +296,11 @@ def __call__(self, obj, ctx):
if subject not in self._known_subjects:
if self._use_latest_version:
latest_schema = self._registry.get_latest_version(subject)
self._schema_id = latest_schema.schema_id

# Update instance variables with latest schema
self._update_schema_info(latest_schema.schema,
schema_id=latest_schema.schema_id)
# Add to registry cache
self._registry._cache.set(self._schema_id, self._schema, subject)
else:
# Check to ensure this schema has been registered under subject_name.
if self._auto_register:
Expand Down
51 changes: 36 additions & 15 deletions src/confluent_kafka/schema_registry/json_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,24 +165,21 @@ class JSONSerializer(Serializer):
'subject.name.strategy': topic_subject_name_strategy}

def __init__(self, schema_str, schema_registry_client, to_dict=None, conf=None):
self._are_references_provided = False
if isinstance(schema_str, str):
self._schema = Schema(schema_str, schema_type="JSON")
schema = Schema(schema_str, schema_type="JSON")
elif isinstance(schema_str, Schema):
self._schema = schema_str
schema = schema_str
self._are_references_provided = bool(schema_str.references)
else:
raise TypeError('You must pass either str or Schema')

self._registry = schema_registry_client
self._schema_id = None
self._known_subjects = set()

if to_dict is not None and not callable(to_dict):
raise ValueError("to_dict must be callable with the signature "
"to_dict(object, SerializationContext)->dict")

self._to_dict = to_dict
self._registry = schema_registry_client
self._known_subjects = set()

conf_copy = self._default_conf.copy()
if conf is not None:
Expand Down Expand Up @@ -210,13 +207,34 @@ def __init__(self, schema_str, schema_registry_client, to_dict=None, conf=None):
raise ValueError("Unrecognized properties: {}"
.format(", ".join(conf_copy.keys())))

schema_dict = json.loads(self._schema.schema_str)
schema_name = schema_dict.get('title', None)
if schema_name is None:
raise ValueError("Missing required JSON schema annotation title")
# Set common instance variables
# Called on __init__ and
# on __call__ if use.latest.version is set to True
self._update_schema_info(schema)

self._schema_name = schema_name
self._parsed_schema = schema_dict
def _update_schema_info(self, schema, schema_id=None):
"""
Function to set the instance variables below upon __init__ (always) but
also on __call__ whenever the config param use.latest.version is set to True
and the subject is not in the local cache (self._known_subjects):
> self._schema_id
> self._schema
> self._schema_name
> self._parsed_schema
> self._are_references_provided

Args:
schema (Schema):
schema_id (int, default None)
"""
self._schema = schema
self._are_references_provided = bool(schema.references)
self._schema_id = schema_id

self._parsed_schema = json.loads(self._schema.schema_str)
self._schema_name = self._parsed_schema.get("title", None)
if self._schema_name is None:
raise ValueError("Missing required JSON schema annotation title")

def __call__(self, obj, ctx):
"""
Expand Down Expand Up @@ -245,8 +263,11 @@ def __call__(self, obj, ctx):
if subject not in self._known_subjects:
if self._use_latest_version:
latest_schema = self._registry.get_latest_version(subject)
self._schema_id = latest_schema.schema_id

# Update instance variables with latest schema
self._update_schema_info(latest_schema.schema,
schema_id=latest_schema.schema_id)
# Add to registry cache
self._registry._cache.set(self._schema_id, self._schema, subject)
else:
# Check to ensure this schema has been registered under subject_name.
if self._auto_register:
Expand Down