diff --git a/src/karapace/backup/api.py b/src/karapace/backup/api.py index d06c99ebe..3da9a2304 100644 --- a/src/karapace/backup/api.py +++ b/src/karapace/backup/api.py @@ -373,13 +373,20 @@ def _handle_restore_topic( instruction: RestoreTopic, config: Config, skip_topic_creation: bool = False, + override_replication_factor: int | None = None, ) -> None: if skip_topic_creation: return + repl_factor = instruction.replication_factor + if override_replication_factor is not None: + LOG.info( + "Overriding replication factor with: %d (was: %d)", override_replication_factor, instruction.replication_factor + ) + repl_factor = override_replication_factor if not _maybe_create_topic( config=config, name=instruction.topic_name, - replication_factor=instruction.replication_factor, + replication_factor=repl_factor, topic_configs=instruction.topic_configs, ): raise BackupTopicAlreadyExists(f"Topic to restore '{instruction.topic_name}' already exists") @@ -426,6 +433,7 @@ def restore_backup( backup_location: ExistingFile, topic_name: TopicName, skip_topic_creation: bool = False, + override_replication_factor: int | None = None, ) -> None: """Restores a backup from the specified location into the configured topic. @@ -475,7 +483,7 @@ def _check_producer_exception() -> None: _handle_restore_topic_legacy(instruction, config, skip_topic_creation) producer = stack.enter_context(_producer(config, instruction.topic_name)) elif isinstance(instruction, RestoreTopic): - _handle_restore_topic(instruction, config, skip_topic_creation) + _handle_restore_topic(instruction, config, skip_topic_creation, override_replication_factor) producer = stack.enter_context(_producer(config, instruction.topic_name)) elif isinstance(instruction, ProducerSend): if producer is None: diff --git a/src/karapace/backup/cli.py b/src/karapace/backup/cli.py index 5e3d72854..7125b1e04 100644 --- a/src/karapace/backup/cli.py +++ b/src/karapace/backup/cli.py @@ -76,6 +76,15 @@ def parse_args() -> argparse.Namespace: ), ) + parser_restore.add_argument( + "--override-replication-factor", + help=( + "Override the replication factor that is save in the backup. This is needed when restoring a backup from a" + "downsized cluster (like scaling down from 6 to 3 nodes). This has effect only for V3 backups." + ), + type=int, + ) + return parser.parse_args() @@ -115,6 +124,7 @@ def dispatch(args: argparse.Namespace) -> None: backup_location=api.locate_backup_file(location), topic_name=api.normalize_topic_name(args.topic, config), skip_topic_creation=args.skip_topic_creation, + override_replication_factor=args.override_replication_factor, ) except BackupDataRestorationError: traceback.print_exc() diff --git a/src/karapace/protobuf/serialization.py b/src/karapace/protobuf/serialization.py index 6c3ca61fd..123e80c8f 100644 --- a/src/karapace/protobuf/serialization.py +++ b/src/karapace/protobuf/serialization.py @@ -93,17 +93,31 @@ def _deserialize_msg(msgtype: Any) -> MessageElement: for nested_enum in msgtype.enum_type: nested_types.append(_deserialize_enum(nested_enum)) - one_ofs: list[OneOfElement] = [OneOfElement(oneof.name) for oneof in msgtype.oneof_decl] + one_ofs: list[OneOfElement | None] = [OneOfElement(oneof.name) for oneof in msgtype.oneof_decl] for f in msgtype.field: sf = _deserialize_field(f) - if f.HasField("oneof_index"): + is_oneof = f.HasField("oneof_index") + is_proto3_optional = f.HasField("oneof_index") and f.HasField("proto3_optional") and f.proto3_optional + if is_proto3_optional: + # Every proto3 optional field is placed into a one-field oneof, called a "synthetic" oneof, + # as it was not present in the source .proto file. + # This will make sure that we don't interpret those optionals as oneof. + one_ofs[f.oneof_index] = None + fields.append(sf) + elif is_oneof: one_ofs[f.oneof_index].fields.append(sf) else: fields.append(sf) + one_ofs_filtered: list[OneOfElement] = [oneof for oneof in one_ofs if oneof is not None] return MessageElement( - DEFAULT_LOCATION, msgtype.name, nested_types=nested_types, reserveds=reserveds, fields=fields, one_ofs=one_ofs + DEFAULT_LOCATION, + msgtype.name, + nested_types=nested_types, + reserveds=reserveds, + fields=fields, + one_ofs=one_ofs_filtered, ) diff --git a/tests/integration/backup/test_v3_backup.py b/tests/integration/backup/test_v3_backup.py index 744437be6..6f2e5df35 100644 --- a/tests/integration/backup/test_v3_backup.py +++ b/tests/integration/backup/test_v3_backup.py @@ -4,7 +4,7 @@ """ from __future__ import annotations -from aiokafka.errors import UnknownTopicOrPartitionError +from aiokafka.errors import InvalidReplicationFactorError, UnknownTopicOrPartitionError from collections.abc import Iterator from confluent_kafka import Message, TopicPartition from confluent_kafka.admin import NewTopic @@ -119,14 +119,13 @@ def test_roundtrip_from_kafka_state( admin_client.update_topic_config(new_topic.topic, {"max.message.bytes": "999"}) # Populate topic. - producer.send( + first_record_fut = producer.send( new_topic.topic, key=b"bar", value=b"foo", partition=0, - timestamp=1683474641, ) - producer.send( + second_record_fut = producer.send( new_topic.topic, key=b"foo", value=b"bar", @@ -135,10 +134,12 @@ def test_roundtrip_from_kafka_state( ("some-header", b"some header value"), ("other-header", b"some other header value"), ], - timestamp=1683474657, ) producer.flush() + first_message_timestamp = first_record_fut.result(timeout=5).timestamp()[1] + second_message_timestamp = second_record_fut.result(timeout=5).timestamp()[1] + topic_config = get_topic_configurations(admin_client, new_topic.topic, {ConfigSource.DYNAMIC_TOPIC_CONFIG}) # Execute backup creation. @@ -212,7 +213,7 @@ def test_roundtrip_from_kafka_state( # Note: This might be unreliable due to not using idempotent producer, i.e. we have # no guarantee against duplicates currently. assert first_record.offset() == 0 - assert first_record.timestamp()[1] == 1683474641 + assert first_record.timestamp()[1] == first_message_timestamp assert first_record.timestamp()[0] == Timestamp.CREATE_TIME assert first_record.key() == b"bar" assert first_record.value() == b"foo" @@ -223,7 +224,7 @@ def test_roundtrip_from_kafka_state( assert second_record.topic() == new_topic.topic assert second_record.partition() == partition assert second_record.offset() == 1 - assert second_record.timestamp()[1] == 1683474657 + assert second_record.timestamp()[1] == second_message_timestamp assert second_record.timestamp()[0] == Timestamp.CREATE_TIME assert second_record.key() == b"foo" assert second_record.value() == b"bar" @@ -697,6 +698,56 @@ def __exit__(self, exc_type, exc_value, exc_traceback): ) +def test_backup_restoration_override_replication_factor( + admin_client: KafkaAdminClient, + kafka_servers: KafkaServers, + producer: KafkaProducer, + new_topic: NewTopic, +) -> None: + backup_directory = Path(__file__).parent.parent.resolve() / "test_data" / "backup_v3_single_partition" / new_topic.topic + metadata_path = backup_directory / f"{new_topic.topic}.metadata" + config = set_config_defaults( + { + "bootstrap_uri": kafka_servers.bootstrap_servers, + } + ) + + # pupulate the topic and create a backup + for i in range(10): + producer.send( + new_topic.topic, + key=f"message-key-{i}", + value=f"message-value-{i}-" + 1000 * "X", + ) + producer.flush() + api.create_backup( + config=config, + backup_location=backup_directory, + topic_name=TopicName(new_topic.topic), + version=BackupVersion.V3, + replication_factor=6, + ) + + # make sure topic doesn't exist beforehand. + _delete_topic(admin_client, new_topic.topic) + + # assert that the restore would fail without the replication factor override + with pytest.raises(InvalidReplicationFactorError): + api.restore_backup( + config=config, + backup_location=metadata_path, + topic_name=TopicName(new_topic.topic), + ) + + # finally restore the backup with override + api.restore_backup( + config=config, + backup_location=metadata_path, + topic_name=TopicName(new_topic.topic), + override_replication_factor=1, + ) + + def no_color_env() -> dict[str, str]: env = os.environ.copy() try: diff --git a/tests/schemas/protobuf.py b/tests/schemas/protobuf.py index 1bd3f05d5..afbb3f890 100644 --- a/tests/schemas/protobuf.py +++ b/tests/schemas/protobuf.py @@ -261,3 +261,21 @@ "lzdGVyLk1ldGFkYXRhEhYKDmNvbXBhbnlfbnVtYmVyGAIgASgJGhYKCE1ldGFkYXRhEgoK" "AmlkGAEgASgJYgZwcm90bzM=" ) + +schema_protobuf_optionals_bin = ( + "Cgp0ZXN0LnByb3RvIqYBCgpEaW1lbnNpb25zEhEKBHNpemUYASABKAFIAIgBARISCgV3aWR0aBgCIAEoAUgBiAEBEhMKBmhlaWdodBgDIAEo" + + "AUgCiAEBEhMKBmxlbmd0aBgEIAEoAUgDiAEBEhMKBndlaWdodBgFIAEoAUgEiAEBQgcKBV9zaXplQggKBl93aWR0aEIJCgdfaGVpZ2h0Qg" + + "kKB19sZW5ndGhCCQoHX3dlaWdodGIGcHJvdG8z" +) + +schema_protobuf_optionals = """\ +syntax = "proto3"; + +message Dimensions { + optional double size = 1; + optional double width = 2; + optional double height = 3; + optional double length = 4; + optional double weight = 5; +} +""" diff --git a/tests/unit/test_protobuf_binary_serialization.py b/tests/unit/test_protobuf_binary_serialization.py index 6950066d3..99bfe375e 100644 --- a/tests/unit/test_protobuf_binary_serialization.py +++ b/tests/unit/test_protobuf_binary_serialization.py @@ -16,6 +16,8 @@ schema_protobuf_nested_message4_bin_protoc, schema_protobuf_oneof, schema_protobuf_oneof_bin, + schema_protobuf_optionals, + schema_protobuf_optionals_bin, schema_protobuf_order_after, schema_protobuf_order_after_bin, schema_protobuf_plain, @@ -89,6 +91,7 @@ (schema_protobuf_references, schema_protobuf_references_bin), (schema_protobuf_references2, schema_protobuf_references2_bin), (schema_protobuf_complex, schema_protobuf_complex_bin), + (schema_protobuf_optionals, schema_protobuf_optionals_bin), ], ) def test_schema_deserialize(schema_plain, schema_serialized): @@ -125,6 +128,7 @@ def test_protoc_serialized_schema_deserialize(schema_plain, schema_serialized): schema_protobuf_references, schema_protobuf_references2, schema_protobuf_complex, + schema_protobuf_optionals, ], ) def test_simple_schema_serialize(schema):