diff --git a/src/confluent_kafka/schema_registry/avro.py b/src/confluent_kafka/schema_registry/avro.py index 9b5209909..079175884 100644 --- a/src/confluent_kafka/schema_registry/avro.py +++ b/src/confluent_kafka/schema_registry/avro.py @@ -192,15 +192,13 @@ def __init__(self, schema_registry_client, schema_str, to_dict=None, conf=None): else: raise TypeError('You must pass either schema string or schema object') - self._registry = schema_registry_client - self._schema_id = None - self._known_subjects = set() - if to_dict is not None and not callable(to_dict): raise ValueError("to_dict must be callable with the signature " "to_dict(object, SerializationContext)->dict") self._to_dict = to_dict + self._registry = schema_registry_client + self._known_subjects = set() conf_copy = self._default_conf.copy() if conf is not None: @@ -228,27 +226,47 @@ def __init__(self, schema_registry_client, schema_str, to_dict=None, conf=None): raise ValueError("Unrecognized properties: {}" .format(", ".join(conf_copy.keys()))) - schema_dict = loads(schema.schema_str) - self._named_schemas = _resolve_named_schema(schema, schema_registry_client) - parsed_schema = parse_schema(schema_dict, named_schemas=self._named_schemas) + # Set common instance variables + # Called on __init__ and + # on __call__ if use.latest.version is set to True + self._update_schema_info(schema) + + def _update_schema_info(self, schema, schema_id=None): + """ + Function to set the instance variables below upon __init__ (always) but + also on __call__ whenever the config param use.latest.version is set to True + and the subject is not in the local cache (self._known_subjects): + > self._schema_id + > self._schema + > self._named_schemas + > self._parsed_schema - if isinstance(parsed_schema, list): - # if parsed_schema is a list, we have an Avro union and there + Args: + schema (Schema): + schema_id (int, default None) + """ + self._schema = schema + self._schema_id = schema_id + + schema_dict = loads(self._schema.schema_str) + self._named_schemas = _resolve_named_schema(self._schema, + self._registry) + self._parsed_schema = parse_schema(schema_dict, + named_schemas=self._named_schemas) + + if isinstance(self._parsed_schema, list): + # if self._parsed_schema is a list, we have an Avro union and there # is no valid schema name. This is fine because the only use of # schema_name is for supplying the subject name to the registry # and union types should use topic_subject_name_strategy, which # just discards the schema name anyway - schema_name = None + self._schema_name = None else: # The Avro spec states primitives have a name equal to their type # i.e. {"type": "string"} has a name of string. # This function does not comply. # https://github.com/fastavro/fastavro/issues/415 - schema_name = parsed_schema.get("name", schema_dict["type"]) - - self._schema = schema - self._schema_name = schema_name - self._parsed_schema = parsed_schema + self._schema_name = self._parsed_schema.get("name", schema_dict["type"]) def __call__(self, obj, ctx): """ @@ -278,8 +296,11 @@ def __call__(self, obj, ctx): if subject not in self._known_subjects: if self._use_latest_version: latest_schema = self._registry.get_latest_version(subject) - self._schema_id = latest_schema.schema_id - + # Update instance variables with latest schema + self._update_schema_info(latest_schema.schema, + schema_id=latest_schema.schema_id) + # Add to registry cache + self._registry._cache.set(self._schema_id, self._schema, subject) else: # Check to ensure this schema has been registered under subject_name. if self._auto_register: diff --git a/src/confluent_kafka/schema_registry/json_schema.py b/src/confluent_kafka/schema_registry/json_schema.py index 656937c24..a921da818 100644 --- a/src/confluent_kafka/schema_registry/json_schema.py +++ b/src/confluent_kafka/schema_registry/json_schema.py @@ -165,24 +165,21 @@ class JSONSerializer(Serializer): 'subject.name.strategy': topic_subject_name_strategy} def __init__(self, schema_str, schema_registry_client, to_dict=None, conf=None): - self._are_references_provided = False if isinstance(schema_str, str): - self._schema = Schema(schema_str, schema_type="JSON") + schema = Schema(schema_str, schema_type="JSON") elif isinstance(schema_str, Schema): - self._schema = schema_str + schema = schema_str self._are_references_provided = bool(schema_str.references) else: raise TypeError('You must pass either str or Schema') - self._registry = schema_registry_client - self._schema_id = None - self._known_subjects = set() - if to_dict is not None and not callable(to_dict): raise ValueError("to_dict must be callable with the signature " "to_dict(object, SerializationContext)->dict") self._to_dict = to_dict + self._registry = schema_registry_client + self._known_subjects = set() conf_copy = self._default_conf.copy() if conf is not None: @@ -210,13 +207,34 @@ def __init__(self, schema_str, schema_registry_client, to_dict=None, conf=None): raise ValueError("Unrecognized properties: {}" .format(", ".join(conf_copy.keys()))) - schema_dict = json.loads(self._schema.schema_str) - schema_name = schema_dict.get('title', None) - if schema_name is None: - raise ValueError("Missing required JSON schema annotation title") + # Set common instance variables + # Called on __init__ and + # on __call__ if use.latest.version is set to True + self._update_schema_info(schema) - self._schema_name = schema_name - self._parsed_schema = schema_dict + def _update_schema_info(self, schema, schema_id=None): + """ + Function to set the instance variables below upon __init__ (always) but + also on __call__ whenever the config param use.latest.version is set to True + and the subject is not in the local cache (self._known_subjects): + > self._schema_id + > self._schema + > self._schema_name + > self._parsed_schema + > self._are_references_provided + + Args: + schema (Schema): + schema_id (int, default None) + """ + self._schema = schema + self._are_references_provided = bool(schema.references) + self._schema_id = schema_id + + self._parsed_schema = json.loads(self._schema.schema_str) + self._schema_name = self._parsed_schema.get("title", None) + if self._schema_name is None: + raise ValueError("Missing required JSON schema annotation title") def __call__(self, obj, ctx): """ @@ -245,8 +263,11 @@ def __call__(self, obj, ctx): if subject not in self._known_subjects: if self._use_latest_version: latest_schema = self._registry.get_latest_version(subject) - self._schema_id = latest_schema.schema_id - + # Update instance variables with latest schema + self._update_schema_info(latest_schema.schema, + schema_id=latest_schema.schema_id) + # Add to registry cache + self._registry._cache.set(self._schema_id, self._schema, subject) else: # Check to ensure this schema has been registered under subject_name. if self._auto_register: