From 25ef7ea280f2494a095484309f4c8d090b3458d2 Mon Sep 17 00:00:00 2001 From: kevin-wu24 <66326898+kevin-wu24@users.noreply.github.com> Date: Wed, 30 Oct 2024 11:29:37 -0700 Subject: [PATCH 1/4] removing explicit ZK test parameterizations --- tests/kafkatest/services/kafka/quorum.py | 4 +- .../client/consumer_rolling_upgrade_test.py | 4 - tests/kafkatest/tests/client/consumer_test.py | 22 ++-- .../client/message_format_change_test.py | 106 ------------------ .../tests/connect/connect_distributed_test.py | 34 ++---- tests/kafkatest/tests/connect/connect_test.py | 2 - .../compatibility_test_new_broker_test.py | 2 - .../tests/core/consume_bench_test.py | 28 ----- .../tests/core/consumer_group_command_test.py | 4 +- .../core/controller_mutation_quota_test.py | 2 +- .../tests/core/fetch_from_follower_test.py | 2 +- .../tests/core/log_dir_failure_test.py | 2 - .../tests/core/reassign_partitions_test.py | 2 +- .../tests/core/replica_scale_test.py | 9 +- .../core/replication_replica_failure_test.py | 2 +- tests/kafkatest/tests/core/security_test.py | 6 +- .../kafkatest/tests/core/transactions_test.py | 8 -- .../streams/streams_broker_bounce_test.py | 5 - 18 files changed, 27 insertions(+), 217 deletions(-) delete mode 100644 tests/kafkatest/tests/client/message_format_change_test.py diff --git a/tests/kafkatest/services/kafka/quorum.py b/tests/kafkatest/services/kafka/quorum.py index 724d7d0de127..a98263b8cbac 100644 --- a/tests/kafkatest/services/kafka/quorum.py +++ b/tests/kafkatest/services/kafka/quorum.py @@ -25,10 +25,8 @@ # How we will parameterize tests that exercise all KRaft quorum styles all_kraft = [isolated_kraft, combined_kraft] # How we will parameterize tests that are unrelated to upgrades: -# [“ZK”] before the KIP-500 bridge release(s) -# [“ZK”, “ISOLATED_KRAFT”] during the KIP-500 bridge release(s) and in preview releases # [“ISOLATED_KRAFT”] after the KIP-500 bridge release(s) -all_non_upgrade = [zk, isolated_kraft] +all_non_upgrade = [isolated_kraft] def for_test(test_context): # A test uses ZooKeeper if it doesn't specify a metadata quorum or if it explicitly specifies ZooKeeper diff --git a/tests/kafkatest/tests/client/consumer_rolling_upgrade_test.py b/tests/kafkatest/tests/client/consumer_rolling_upgrade_test.py index 9805d3656ee2..692541492546 100644 --- a/tests/kafkatest/tests/client/consumer_rolling_upgrade_test.py +++ b/tests/kafkatest/tests/client/consumer_rolling_upgrade_test.py @@ -48,10 +48,6 @@ def _verify_roundrobin_assignment(self, consumer): "Mismatched assignment: %s" % assignment @cluster(num_nodes=4) - @matrix( - metadata_quorum=[quorum.zk], - use_new_coordinator=[False] - ) @matrix( metadata_quorum=[quorum.isolated_kraft], use_new_coordinator=[True, False] diff --git a/tests/kafkatest/tests/client/consumer_test.py b/tests/kafkatest/tests/client/consumer_test.py index 6cb82869c4f7..4bd680dd2a00 100644 --- a/tests/kafkatest/tests/client/consumer_test.py +++ b/tests/kafkatest/tests/client/consumer_test.py @@ -76,7 +76,7 @@ def setup_consumer(self, topic, **kwargs): @cluster(num_nodes=7) @matrix( - metadata_quorum=[quorum.zk, quorum.isolated_kraft], + metadata_quorum=[quorum.isolated_kraft], use_new_coordinator=[False] ) @matrix( @@ -138,7 +138,7 @@ def test_broker_rolling_bounce(self, metadata_quorum=quorum.zk, use_new_coordina @matrix( clean_shutdown=[True], bounce_mode=["all", "rolling"], - metadata_quorum=[quorum.zk, quorum.isolated_kraft], + metadata_quorum=[quorum.isolated_kraft], use_new_coordinator=[False] ) @matrix( @@ -195,7 +195,7 @@ def test_consumer_bounce(self, clean_shutdown, bounce_mode, metadata_quorum=quor static_membership=[True, False], bounce_mode=["all", "rolling"], num_bounces=[5], - metadata_quorum=[quorum.zk, quorum.isolated_kraft], + metadata_quorum=[quorum.isolated_kraft], use_new_coordinator=[False] ) @matrix( @@ -272,7 +272,7 @@ def test_static_consumer_bounce_with_eager_assignment(self, clean_shutdown, stat @cluster(num_nodes=7) @matrix( bounce_mode=["all", "rolling"], - metadata_quorum=[quorum.zk, quorum.isolated_kraft], + metadata_quorum=[quorum.isolated_kraft], use_new_coordinator=[False] ) @matrix( @@ -314,7 +314,7 @@ def test_static_consumer_persisted_after_rejoin(self, bounce_mode, metadata_quor @matrix( num_conflict_consumers=[1, 2], fencing_stage=["stable", "all"], - metadata_quorum=[quorum.zk, quorum.isolated_kraft], + metadata_quorum=[quorum.isolated_kraft], use_new_coordinator=[False] ) @matrix( @@ -399,7 +399,7 @@ def test_fencing_static_consumer(self, num_conflict_consumers, fencing_stage, me @matrix( clean_shutdown=[True], enable_autocommit=[True, False], - metadata_quorum=[quorum.zk, quorum.isolated_kraft], + metadata_quorum=[quorum.isolated_kraft], use_new_coordinator=[False] ) @matrix( @@ -454,12 +454,6 @@ def test_consumer_failure(self, clean_shutdown, enable_autocommit, metadata_quor (consumer.last_commit(partition), consumer.current_position(partition)) @cluster(num_nodes=7) - @matrix( - clean_shutdown=[True, False], - enable_autocommit=[True, False], - metadata_quorum=[quorum.zk], - use_new_coordinator=[False] - ) @matrix( clean_shutdown=[True, False], enable_autocommit=[True, False], @@ -511,7 +505,7 @@ def test_broker_failure(self, clean_shutdown, enable_autocommit, metadata_quorum @cluster(num_nodes=7) @matrix( - metadata_quorum=[quorum.zk, quorum.isolated_kraft], + metadata_quorum=[quorum.isolated_kraft], use_new_coordinator=[False] ) @matrix( @@ -574,7 +568,7 @@ def __init__(self, test_context): "org.apache.kafka.clients.consumer.RoundRobinAssignor", "org.apache.kafka.clients.consumer.StickyAssignor", "org.apache.kafka.clients.consumer.CooperativeStickyAssignor"], - metadata_quorum=[quorum.zk, quorum.isolated_kraft], + metadata_quorum=[quorum.isolated_kraft], use_new_coordinator=[False] ) @matrix( diff --git a/tests/kafkatest/tests/client/message_format_change_test.py b/tests/kafkatest/tests/client/message_format_change_test.py deleted file mode 100644 index b65ef24704a6..000000000000 --- a/tests/kafkatest/tests/client/message_format_change_test.py +++ /dev/null @@ -1,106 +0,0 @@ -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from ducktape.mark import matrix -from ducktape.utils.util import wait_until -from ducktape.mark.resource import cluster - -from kafkatest.services.console_consumer import ConsoleConsumer -from kafkatest.services.kafka import config_property, KafkaService, quorum -from kafkatest.services.verifiable_producer import VerifiableProducer -from kafkatest.services.zookeeper import ZookeeperService -from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest -from kafkatest.utils import is_int -from kafkatest.version import LATEST_0_9, LATEST_0_10, LATEST_0_11, V_2_8_0, DEV_BRANCH, KafkaVersion - - -class MessageFormatChangeTest(ProduceConsumeValidateTest): - - def __init__(self, test_context): - super(MessageFormatChangeTest, self).__init__(test_context=test_context) - - def setUp(self): - self.topic = "test_topic" - self.zk = ZookeeperService(self.test_context, num_nodes=1) if quorum.for_test(self.test_context) == quorum.zk else None - - if self.zk: - self.zk.start() - - # Producer and consumer - self.producer_throughput = 10000 - self.num_producers = 1 - self.num_consumers = 1 - self.messages_per_producer = 100 - - def produce_and_consume(self, producer_version, consumer_version, group): - self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka, - self.topic, - throughput=self.producer_throughput, - message_validator=is_int, - version=KafkaVersion(producer_version)) - self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka, - self.topic, consumer_timeout_ms=30000, - message_validator=is_int, version=KafkaVersion(consumer_version)) - self.consumer.group_id = group - self.run_produce_consume_validate(lambda: wait_until( - lambda: self.producer.each_produced_at_least(self.messages_per_producer) == True, - timeout_sec=120, backoff_sec=1, - err_msg="Producer did not produce all messages in reasonable amount of time")) - - @cluster(num_nodes=12) - @matrix(producer_version=[str(DEV_BRANCH)], consumer_version=[str(DEV_BRANCH)], metadata_quorum=[quorum.zk]) - @matrix(producer_version=[str(LATEST_0_10)], consumer_version=[str(LATEST_0_10)], metadata_quorum=[quorum.zk]) - @matrix(producer_version=[str(LATEST_0_9)], consumer_version=[str(LATEST_0_9)], metadata_quorum=[quorum.zk]) - def test_compatibility(self, producer_version, consumer_version, metadata_quorum=quorum.zk): - """ This tests performs the following checks: - The workload is a mix of 0.9.x, 0.10.x and 0.11.x producers and consumers - that produce to and consume from a DEV_BRANCH cluster - 1. initially the topic is using message format 0.9.0 - 2. change the message format version for topic to 0.10.0 on the fly. - 3. change the message format version for topic to 0.11.0 on the fly. - 4. change the message format version for topic back to 0.10.0 on the fly (only if the client version is 0.11.0 or newer) - - The producers and consumers should not have any issue. - - Note regarding step number 4. Downgrading the message format version is generally unsupported as it breaks - older clients. More concretely, if we downgrade a topic from 0.11.0 to 0.10.0 after it contains messages with - version 0.11.0, we will return the 0.11.0 messages without down conversion due to an optimisation in the - handling of fetch requests. This will break any consumer that doesn't support 0.11.0. So, in practice, step 4 - is similar to step 2 and it didn't seem worth it to increase the cluster size to in order to add a step 5 that - would change the message format version for the topic back to 0.9.0.0. - """ - self.kafka = KafkaService(self.test_context, num_nodes=3, zk=self.zk, version=DEV_BRANCH, topics={self.topic: { - "partitions": 3, - "replication-factor": 3, - 'configs': {"min.insync.replicas": 2}}}, - controller_num_nodes_override=1) - for node in self.kafka.nodes: - node.config[config_property.INTER_BROKER_PROTOCOL_VERSION] = str(V_2_8_0) # required for writing old message formats - - self.kafka.start() - self.logger.info("First format change to 0.9.0") - self.kafka.alter_message_format(self.topic, str(LATEST_0_9)) - self.produce_and_consume(producer_version, consumer_version, "group1") - - self.logger.info("Second format change to 0.10.0") - self.kafka.alter_message_format(self.topic, str(LATEST_0_10)) - self.produce_and_consume(producer_version, consumer_version, "group2") - - self.logger.info("Third format change to 0.11.0") - self.kafka.alter_message_format(self.topic, str(LATEST_0_11)) - self.produce_and_consume(producer_version, consumer_version, "group3") - - if producer_version == str(DEV_BRANCH) and consumer_version == str(DEV_BRANCH): - self.logger.info("Fourth format change back to 0.10.0") - self.kafka.alter_message_format(self.topic, str(LATEST_0_10)) - self.produce_and_consume(producer_version, consumer_version, "group4") - - diff --git a/tests/kafkatest/tests/connect/connect_distributed_test.py b/tests/kafkatest/tests/connect/connect_distributed_test.py index 6ac24cdb925e..3066ac16831e 100644 --- a/tests/kafkatest/tests/connect/connect_distributed_test.py +++ b/tests/kafkatest/tests/connect/connect_distributed_test.py @@ -175,7 +175,7 @@ def task_is_running(self, connector, task_id, node=None): @matrix( exactly_once_source=[True, False], connect_protocol=['sessioned', 'compatible', 'eager'], - metadata_quorum=[quorum.zk, quorum.isolated_kraft], + metadata_quorum=[quorum.isolated_kraft], use_new_coordinator=[False] ) @matrix( @@ -210,7 +210,7 @@ def test_restart_failed_connector(self, exactly_once_source, connect_protocol, m @matrix( connector_type=['source', 'exactly-once source', 'sink'], connect_protocol=['sessioned', 'compatible', 'eager'], - metadata_quorum=[quorum.zk, quorum.isolated_kraft], + metadata_quorum=[quorum.isolated_kraft], use_new_coordinator=[False] ) @matrix( @@ -247,7 +247,7 @@ def test_restart_failed_task(self, connector_type, connect_protocol, metadata_qu @cluster(num_nodes=5) @matrix( connect_protocol=['sessioned', 'compatible', 'eager'], - metadata_quorum=[quorum.zk, quorum.isolated_kraft], + metadata_quorum=[quorum.isolated_kraft], use_new_coordinator=[False] ) @matrix( @@ -277,7 +277,7 @@ def test_restart_connector_and_tasks_failed_connector(self, connect_protocol, me @matrix( connector_type=['source', 'sink'], connect_protocol=['sessioned', 'compatible', 'eager'], - metadata_quorum=[quorum.zk, quorum.isolated_kraft], + metadata_quorum=[quorum.isolated_kraft], use_new_coordinator=[False] ) @matrix( @@ -311,12 +311,6 @@ def test_restart_connector_and_tasks_failed_task(self, connector_type, connect_p err_msg="Failed to see task transition to the RUNNING state") @cluster(num_nodes=5) - @matrix( - exactly_once_source=[True, False], - connect_protocol=['sessioned', 'compatible', 'eager'], - metadata_quorum=[quorum.zk], - use_new_coordinator=[False] - ) @matrix( exactly_once_source=[True, False], connect_protocol=['sessioned', 'compatible', 'eager'], @@ -366,7 +360,7 @@ def test_pause_and_resume_source(self, exactly_once_source, connect_protocol, me @cluster(num_nodes=5) @matrix( connect_protocol=['sessioned', 'compatible', 'eager'], - metadata_quorum=[quorum.zk, quorum.isolated_kraft], + metadata_quorum=[quorum.isolated_kraft], use_new_coordinator=[False] ) @matrix( @@ -422,12 +416,6 @@ def test_pause_and_resume_sink(self, connect_protocol, metadata_quorum, use_new_ err_msg="Failed to consume messages after resuming sink connector") @cluster(num_nodes=5) - @matrix( - exactly_once_source=[True, False], - connect_protocol=['sessioned', 'compatible', 'eager'], - metadata_quorum=[quorum.zk], - use_new_coordinator=[False] - ) @matrix( exactly_once_source=[True, False], connect_protocol=['sessioned', 'compatible', 'eager'], @@ -643,7 +631,7 @@ def _wait_for_loggers(self, level, request_time, namespace, workers=None): security_protocol=[SecurityConfig.PLAINTEXT, SecurityConfig.SASL_SSL], exactly_once_source=[True, False], connect_protocol=['sessioned', 'compatible', 'eager'], - metadata_quorum=[quorum.zk, quorum.isolated_kraft], + metadata_quorum=[quorum.isolated_kraft], use_new_coordinator=[False] ) @matrix( @@ -693,7 +681,7 @@ def test_file_source_and_sink(self, security_protocol, exactly_once_source, conn @matrix( clean=[True, False], connect_protocol=['sessioned', 'compatible', 'eager'], - metadata_quorum=[quorum.zk, quorum.isolated_kraft], + metadata_quorum=[quorum.isolated_kraft], use_new_coordinator=[False] ) @matrix( @@ -823,12 +811,6 @@ def test_bounce(self, clean, connect_protocol, metadata_quorum, use_new_coordina assert success, "Found validation errors:\n" + "\n ".join(errors) @cluster(num_nodes=7) - @matrix( - clean=[True, False], - connect_protocol=['sessioned', 'compatible', 'eager'], - metadata_quorum=[quorum.zk], - use_new_coordinator=[False] - ) @matrix( clean=[True, False], connect_protocol=['sessioned', 'compatible', 'eager'], @@ -933,7 +915,7 @@ def test_exactly_once_source(self, clean, connect_protocol, metadata_quorum, use @cluster(num_nodes=6) @matrix( connect_protocol=['sessioned', 'compatible', 'eager'], - metadata_quorum=[quorum.zk, quorum.isolated_kraft], + metadata_quorum=[quorum.isolated_kraft], use_new_coordinator=[False] ) @matrix( diff --git a/tests/kafkatest/tests/connect/connect_test.py b/tests/kafkatest/tests/connect/connect_test.py index 312c10a0ebf7..ad850c568b0e 100644 --- a/tests/kafkatest/tests/connect/connect_test.py +++ b/tests/kafkatest/tests/connect/connect_test.py @@ -138,10 +138,8 @@ def validate_output(self, value): return False @cluster(num_nodes=5) - @parametrize(error_tolerance=ErrorTolerance.ALL, metadata_quorum=quorum.zk) @parametrize(error_tolerance=ErrorTolerance.NONE, metadata_quorum=quorum.isolated_kraft) @parametrize(error_tolerance=ErrorTolerance.ALL, metadata_quorum=quorum.isolated_kraft) - @parametrize(error_tolerance=ErrorTolerance.NONE, metadata_quorum=quorum.zk) def test_skip_and_log_to_dlq(self, error_tolerance, metadata_quorum): self.kafka = KafkaService(self.test_context, self.num_brokers, self.zk, topics=self.topics) diff --git a/tests/kafkatest/tests/core/compatibility_test_new_broker_test.py b/tests/kafkatest/tests/core/compatibility_test_new_broker_test.py index 35edaffc64dc..0bcfcaf0ee29 100644 --- a/tests/kafkatest/tests/core/compatibility_test_new_broker_test.py +++ b/tests/kafkatest/tests/core/compatibility_test_new_broker_test.py @@ -48,7 +48,6 @@ def setUp(self): @cluster(num_nodes=6) @matrix(producer_version=[str(DEV_BRANCH)], consumer_version=[str(DEV_BRANCH)], compression_types=[["snappy"]], timestamp_type=[str("LogAppendTime")], metadata_quorum=quorum.all_non_upgrade) @matrix(producer_version=[str(DEV_BRANCH)], consumer_version=[str(DEV_BRANCH)], compression_types=[["none"]], timestamp_type=[str("LogAppendTime")], metadata_quorum=quorum.all_non_upgrade) - @parametrize(producer_version=str(DEV_BRANCH), consumer_version=str(LATEST_0_9), compression_types=["none"], new_consumer=False, timestamp_type=None) @matrix(producer_version=[str(DEV_BRANCH)], consumer_version=[str(LATEST_0_9)], compression_types=[["snappy"]], timestamp_type=[str("CreateTime")], metadata_quorum=quorum.all_non_upgrade) @matrix(producer_version=[str(LATEST_2_2)], consumer_version=[str(LATEST_2_2)], compression_types=[["none"]], timestamp_type=[str("CreateTime")], metadata_quorum=quorum.all_non_upgrade) @matrix(producer_version=[str(LATEST_2_3)], consumer_version=[str(LATEST_2_3)], compression_types=[["none"]], timestamp_type=[str("CreateTime")], metadata_quorum=quorum.all_non_upgrade) @@ -77,7 +76,6 @@ def setUp(self): @matrix(producer_version=[str(LATEST_0_9)], consumer_version=[str(DEV_BRANCH)], compression_types=[["none"]], timestamp_type=[None], metadata_quorum=quorum.all_non_upgrade) @matrix(producer_version=[str(LATEST_0_9)], consumer_version=[str(DEV_BRANCH)], compression_types=[["snappy"]], timestamp_type=[None], metadata_quorum=quorum.all_non_upgrade) @matrix(producer_version=[str(LATEST_0_9)], consumer_version=[str(LATEST_0_9)], compression_types=[["snappy"]], timestamp_type=[str("LogAppendTime")], metadata_quorum=quorum.all_non_upgrade) - @parametrize(producer_version=str(LATEST_0_8_2), consumer_version=str(LATEST_0_8_2), compression_types=["none"], new_consumer=False, timestamp_type=None) def test_compatibility(self, producer_version, consumer_version, compression_types, new_consumer=True, timestamp_type=None, metadata_quorum=quorum.zk): if not new_consumer and metadata_quorum != quorum.zk: raise Exception("ZooKeeper-based consumers are not supported when using a KRaft metadata quorum") diff --git a/tests/kafkatest/tests/core/consume_bench_test.py b/tests/kafkatest/tests/core/consume_bench_test.py index 0575b5d8f217..c205604f8f6c 100644 --- a/tests/kafkatest/tests/core/consume_bench_test.py +++ b/tests/kafkatest/tests/core/consume_bench_test.py @@ -68,14 +68,6 @@ def produce_messages(self, topics, max_messages=10000): self.logger.debug("Produce workload finished") @cluster(num_nodes=10) - @matrix( - topics=[ - ["consume_bench_topic[0-5]"], # topic subscription - ["consume_bench_topic[0-5]:[0-4]"] # manual topic assignment - ], - metadata_quorum=[quorum.zk], - use_new_coordinator=[False] - ) @matrix( topics=[ ["consume_bench_topic[0-5]"], # topic subscription @@ -114,10 +106,6 @@ def test_consume_bench(self, topics, metadata_quorum=quorum.zk, use_new_coordina self.logger.info("TASKS: %s\n" % json.dumps(tasks, sort_keys=True, indent=2)) @cluster(num_nodes=10) - @matrix( - metadata_quorum=[quorum.zk], - use_new_coordinator=[False] - ) @matrix( metadata_quorum=[quorum.isolated_kraft], use_new_coordinator=[False] @@ -149,10 +137,6 @@ def test_single_partition(self, metadata_quorum=quorum.zk, use_new_coordinator=F self.logger.info("TASKS: %s\n" % json.dumps(tasks, sort_keys=True, indent=2)) @cluster(num_nodes=10) - @matrix( - metadata_quorum=[quorum.zk], - use_new_coordinator=[False] - ) @matrix( metadata_quorum=[quorum.isolated_kraft], use_new_coordinator=[False] @@ -185,10 +169,6 @@ def test_multiple_consumers_random_group_topics(self, metadata_quorum=quorum.zk, self.logger.info("TASKS: %s\n" % json.dumps(tasks, sort_keys=True, indent=2)) @cluster(num_nodes=10) - @matrix( - metadata_quorum=[quorum.zk], - use_new_coordinator=[False] - ) @matrix( metadata_quorum=[quorum.isolated_kraft], use_new_coordinator=[False] @@ -222,10 +202,6 @@ def test_two_consumers_specified_group_topics(self, metadata_quorum=quorum.zk, u self.logger.info("TASKS: %s\n" % json.dumps(tasks, sort_keys=True, indent=2)) @cluster(num_nodes=10) - @matrix( - metadata_quorum=[quorum.zk], - use_new_coordinator=[False] - ) @matrix( metadata_quorum=[quorum.isolated_kraft], use_new_coordinator=[False] @@ -259,10 +235,6 @@ def test_multiple_consumers_random_group_partitions(self, metadata_quorum=quorum self.logger.info("TASKS: %s\n" % json.dumps(tasks, sort_keys=True, indent=2)) @cluster(num_nodes=10) - @matrix( - metadata_quorum=[quorum.zk], - use_new_coordinator=[False] - ) @matrix( metadata_quorum=[quorum.isolated_kraft], use_new_coordinator=[False] diff --git a/tests/kafkatest/tests/core/consumer_group_command_test.py b/tests/kafkatest/tests/core/consumer_group_command_test.py index 7f1d79574d35..2df53e3093a4 100644 --- a/tests/kafkatest/tests/core/consumer_group_command_test.py +++ b/tests/kafkatest/tests/core/consumer_group_command_test.py @@ -93,7 +93,7 @@ def setup_and_verify(self, security_protocol, group=None, group_protocol=None): @cluster(num_nodes=3) @matrix( security_protocol=['PLAINTEXT', 'SSL'], - metadata_quorum=[quorum.zk, quorum.isolated_kraft], + metadata_quorum=[quorum.isolated_kraft], use_new_coordinator=[False] ) @matrix( @@ -112,7 +112,7 @@ def test_list_consumer_groups(self, security_protocol='PLAINTEXT', metadata_quor @cluster(num_nodes=3) @matrix( security_protocol=['PLAINTEXT', 'SSL'], - metadata_quorum=[quorum.zk, quorum.isolated_kraft], + metadata_quorum=[quorum.isolated_kraft], use_new_coordinator=[False] ) @matrix( diff --git a/tests/kafkatest/tests/core/controller_mutation_quota_test.py b/tests/kafkatest/tests/core/controller_mutation_quota_test.py index b5578ce857ee..bf8a3b874ed4 100644 --- a/tests/kafkatest/tests/core/controller_mutation_quota_test.py +++ b/tests/kafkatest/tests/core/controller_mutation_quota_test.py @@ -77,7 +77,7 @@ def teardown(self): self.zk.stop() @cluster(num_nodes=2) - @matrix(metadata_quorum=quorum.all) + @matrix(metadata_quorum=quorum.all_kraft) def test_controller_mutation_quota(self, metadata_quorum=quorum.zk): self.partition_count = 10 mutation_rate = 3 * self.partition_count / (self.window_num * self.window_size_seconds) diff --git a/tests/kafkatest/tests/core/fetch_from_follower_test.py b/tests/kafkatest/tests/core/fetch_from_follower_test.py index 6a096d7b92be..a4c810116ddc 100644 --- a/tests/kafkatest/tests/core/fetch_from_follower_test.py +++ b/tests/kafkatest/tests/core/fetch_from_follower_test.py @@ -71,7 +71,7 @@ def setUp(self): @cluster(num_nodes=9) @matrix( - metadata_quorum=[quorum.zk, quorum.isolated_kraft], + metadata_quorum=[quorum.isolated_kraft], use_new_coordinator=[False] ) @matrix( diff --git a/tests/kafkatest/tests/core/log_dir_failure_test.py b/tests/kafkatest/tests/core/log_dir_failure_test.py index 483ed674c357..ba8390ceb1c9 100644 --- a/tests/kafkatest/tests/core/log_dir_failure_test.py +++ b/tests/kafkatest/tests/core/log_dir_failure_test.py @@ -91,8 +91,6 @@ def min_cluster_size(self): """Override this since we're adding services outside of the constructor""" return super(LogDirFailureTest, self).min_cluster_size() + self.num_producers * 2 + self.num_consumers * 2 - @cluster(num_nodes=8) - @matrix(bounce_broker=[False, True], broker_type=["leader", "follower"], security_protocol=["PLAINTEXT"], metadata_quorum=[quorum.zk]) @cluster(num_nodes=10) @matrix(bounce_broker=[False, True], broker_type=["leader", "follower"], security_protocol=["PLAINTEXT"], metadata_quorum=[quorum.isolated_kraft]) def test_replication_with_disk_failure(self, bounce_broker, security_protocol, broker_type, metadata_quorum): diff --git a/tests/kafkatest/tests/core/reassign_partitions_test.py b/tests/kafkatest/tests/core/reassign_partitions_test.py index ce8280225e85..697088dfa9c3 100644 --- a/tests/kafkatest/tests/core/reassign_partitions_test.py +++ b/tests/kafkatest/tests/core/reassign_partitions_test.py @@ -147,7 +147,7 @@ def move_start_offset(self): @matrix( bounce_brokers=[True, False], reassign_from_offset_zero=[True, False], - metadata_quorum=[quorum.zk, quorum.isolated_kraft], + metadata_quorum=[quorum.isolated_kraft], use_new_coordinator=[False] ) @matrix( diff --git a/tests/kafkatest/tests/core/replica_scale_test.py b/tests/kafkatest/tests/core/replica_scale_test.py index 8500c0bf302d..d3ddf87bbd8f 100644 --- a/tests/kafkatest/tests/core/replica_scale_test.py +++ b/tests/kafkatest/tests/core/replica_scale_test.py @@ -52,7 +52,7 @@ def teardown(self): topic_count=[50], partition_count=[34], replication_factor=[3], - metadata_quorum=[quorum.zk, quorum.isolated_kraft], + metadata_quorum=[quorum.isolated_kraft], use_new_coordinator=[False] ) @matrix( @@ -119,13 +119,6 @@ def test_produce_consume(self, topic_count, partition_count, replication_factor, trogdor.stop() @cluster(num_nodes=12) - @matrix( - topic_count=[50], - partition_count=[34], - replication_factor=[3], - metadata_quorum=[quorum.zk], - use_new_coordinator=[False] - ) @matrix( topic_count=[50], partition_count=[34], diff --git a/tests/kafkatest/tests/core/replication_replica_failure_test.py b/tests/kafkatest/tests/core/replication_replica_failure_test.py index f17f6a2d26ca..cc048fcfdecd 100644 --- a/tests/kafkatest/tests/core/replication_replica_failure_test.py +++ b/tests/kafkatest/tests/core/replication_replica_failure_test.py @@ -38,7 +38,7 @@ def __init__(self, test_context): @cluster(num_nodes=7) @matrix( - metadata_quorum=[quorum.zk, quorum.isolated_kraft], + metadata_quorum=[quorum.isolated_kraft], use_new_coordinator=[False] ) @matrix( diff --git a/tests/kafkatest/tests/core/security_test.py b/tests/kafkatest/tests/core/security_test.py index 47aba24dbd69..33f3c72d7518 100644 --- a/tests/kafkatest/tests/core/security_test.py +++ b/tests/kafkatest/tests/core/security_test.py @@ -61,7 +61,7 @@ def producer_consumer_have_expected_error(self, error): @matrix( security_protocol=['PLAINTEXT'], interbroker_security_protocol=['SSL'], - metadata_quorum=[quorum.zk, quorum.isolated_kraft], + metadata_quorum=[quorum.isolated_kraft], use_new_coordinator=[False] ) @matrix( @@ -74,7 +74,7 @@ def producer_consumer_have_expected_error(self, error): @matrix( security_protocol=['SSL'], interbroker_security_protocol=['PLAINTEXT'], - metadata_quorum=[quorum.zk, quorum.isolated_kraft], + metadata_quorum=[quorum.isolated_kraft], use_new_coordinator=[False] ) @matrix( @@ -157,7 +157,7 @@ def create_and_start_clients(self, log_level, group_protocol): @cluster(num_nodes=2) @matrix( - metadata_quorum=[quorum.zk, quorum.isolated_kraft], + metadata_quorum=[quorum.isolated_kraft], use_new_coordinator=[False] ) @matrix( diff --git a/tests/kafkatest/tests/core/transactions_test.py b/tests/kafkatest/tests/core/transactions_test.py index ac5ae03ddb4d..11cd1c24ae33 100644 --- a/tests/kafkatest/tests/core/transactions_test.py +++ b/tests/kafkatest/tests/core/transactions_test.py @@ -217,14 +217,6 @@ def setup_topics(self): } @cluster(num_nodes=9) - @matrix( - failure_mode=["hard_bounce", "clean_bounce"], - bounce_target=["brokers", "clients"], - check_order=[True, False], - use_group_metadata=[True, False], - metadata_quorum=[quorum.zk], - use_new_coordinator=[False] - ) @matrix( failure_mode=["hard_bounce", "clean_bounce"], bounce_target=["brokers", "clients"], diff --git a/tests/kafkatest/tests/streams/streams_broker_bounce_test.py b/tests/kafkatest/tests/streams/streams_broker_bounce_test.py index 47ce6f231348..c00b7ac3e146 100644 --- a/tests/kafkatest/tests/streams/streams_broker_bounce_test.py +++ b/tests/kafkatest/tests/streams/streams_broker_bounce_test.py @@ -217,11 +217,6 @@ def collect_results(self, sleep_time_secs): num_threads=[1, 3], sleep_time_secs=[120], metadata_quorum=[quorum.isolated_kraft]) - @matrix(failure_mode=["clean_shutdown", "hard_shutdown", "clean_bounce", "hard_bounce"], - broker_type=["leader", "controller"], - num_threads=[1, 3], - sleep_time_secs=[120], - metadata_quorum=[quorum.zk]) def test_broker_type_bounce(self, failure_mode, broker_type, sleep_time_secs, num_threads, metadata_quorum): """ Start a smoke test client, then kill one particular broker and ensure data is still received From e5c3d06f6eccf3e0e37bce3af9b2e38fe3dd8302 Mon Sep 17 00:00:00 2001 From: kevin-wu24 <66326898+kevin-wu24@users.noreply.github.com> Date: Wed, 30 Oct 2024 11:40:28 -0700 Subject: [PATCH 2/4] cleanup --- .../kafkatest/tests/core/compatibility_test_new_broker_test.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/kafkatest/tests/core/compatibility_test_new_broker_test.py b/tests/kafkatest/tests/core/compatibility_test_new_broker_test.py index 0bcfcaf0ee29..35edaffc64dc 100644 --- a/tests/kafkatest/tests/core/compatibility_test_new_broker_test.py +++ b/tests/kafkatest/tests/core/compatibility_test_new_broker_test.py @@ -48,6 +48,7 @@ def setUp(self): @cluster(num_nodes=6) @matrix(producer_version=[str(DEV_BRANCH)], consumer_version=[str(DEV_BRANCH)], compression_types=[["snappy"]], timestamp_type=[str("LogAppendTime")], metadata_quorum=quorum.all_non_upgrade) @matrix(producer_version=[str(DEV_BRANCH)], consumer_version=[str(DEV_BRANCH)], compression_types=[["none"]], timestamp_type=[str("LogAppendTime")], metadata_quorum=quorum.all_non_upgrade) + @parametrize(producer_version=str(DEV_BRANCH), consumer_version=str(LATEST_0_9), compression_types=["none"], new_consumer=False, timestamp_type=None) @matrix(producer_version=[str(DEV_BRANCH)], consumer_version=[str(LATEST_0_9)], compression_types=[["snappy"]], timestamp_type=[str("CreateTime")], metadata_quorum=quorum.all_non_upgrade) @matrix(producer_version=[str(LATEST_2_2)], consumer_version=[str(LATEST_2_2)], compression_types=[["none"]], timestamp_type=[str("CreateTime")], metadata_quorum=quorum.all_non_upgrade) @matrix(producer_version=[str(LATEST_2_3)], consumer_version=[str(LATEST_2_3)], compression_types=[["none"]], timestamp_type=[str("CreateTime")], metadata_quorum=quorum.all_non_upgrade) @@ -76,6 +77,7 @@ def setUp(self): @matrix(producer_version=[str(LATEST_0_9)], consumer_version=[str(DEV_BRANCH)], compression_types=[["none"]], timestamp_type=[None], metadata_quorum=quorum.all_non_upgrade) @matrix(producer_version=[str(LATEST_0_9)], consumer_version=[str(DEV_BRANCH)], compression_types=[["snappy"]], timestamp_type=[None], metadata_quorum=quorum.all_non_upgrade) @matrix(producer_version=[str(LATEST_0_9)], consumer_version=[str(LATEST_0_9)], compression_types=[["snappy"]], timestamp_type=[str("LogAppendTime")], metadata_quorum=quorum.all_non_upgrade) + @parametrize(producer_version=str(LATEST_0_8_2), consumer_version=str(LATEST_0_8_2), compression_types=["none"], new_consumer=False, timestamp_type=None) def test_compatibility(self, producer_version, consumer_version, compression_types, new_consumer=True, timestamp_type=None, metadata_quorum=quorum.zk): if not new_consumer and metadata_quorum != quorum.zk: raise Exception("ZooKeeper-based consumers are not supported when using a KRaft metadata quorum") From 296031f65a0ddb608dd7fc82a6eb96f5778feae3 Mon Sep 17 00:00:00 2001 From: kevin-wu24 <66326898+kevin-wu24@users.noreply.github.com> Date: Sun, 3 Nov 2024 11:52:13 -0600 Subject: [PATCH 3/4] changing vagrant to jdk11 for test run --- Vagrantfile | 4 ++-- vagrant/base.sh | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/Vagrantfile b/Vagrantfile index a053be28d01d..3f64a4a96596 100644 --- a/Vagrantfile +++ b/Vagrantfile @@ -55,8 +55,8 @@ ec2_iam_instance_profile_name = nil ebs_volume_type = 'gp3' -jdk_major = '8' -jdk_full = '8u202-linux-x64' +jdk_major = '11' +jdk_full = '11.0.2-linux-x64' local_config_file = File.join(File.dirname(__FILE__), "Vagrantfile.local") if File.exists?(local_config_file) then diff --git a/vagrant/base.sh b/vagrant/base.sh index 958b9c7d7a8c..8ad2c0890db2 100755 --- a/vagrant/base.sh +++ b/vagrant/base.sh @@ -36,8 +36,8 @@ fetch_jdk_tgz() { fi } -JDK_MAJOR="${JDK_MAJOR:-8}" -JDK_FULL="${JDK_FULL:-8u202-linux-x64}" +JDK_MAJOR="${JDK_MAJOR:-11}" +JDK_FULL="${JDK_FULL:-11.0.2-linux-x64}" if [ -z `which javac` ]; then apt-get -y update From 3ff0ae2d2e4bf6287cd81c360dd4ce284a71ee1e Mon Sep 17 00:00:00 2001 From: kevin-wu24 <66326898+kevin-wu24@users.noreply.github.com> Date: Mon, 4 Nov 2024 09:34:11 -0600 Subject: [PATCH 4/4] Revert "changing vagrant to jdk11 for test run" This reverts commit 296031f65a0ddb608dd7fc82a6eb96f5778feae3. --- Vagrantfile | 4 ++-- vagrant/base.sh | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/Vagrantfile b/Vagrantfile index 3f64a4a96596..a053be28d01d 100644 --- a/Vagrantfile +++ b/Vagrantfile @@ -55,8 +55,8 @@ ec2_iam_instance_profile_name = nil ebs_volume_type = 'gp3' -jdk_major = '11' -jdk_full = '11.0.2-linux-x64' +jdk_major = '8' +jdk_full = '8u202-linux-x64' local_config_file = File.join(File.dirname(__FILE__), "Vagrantfile.local") if File.exists?(local_config_file) then diff --git a/vagrant/base.sh b/vagrant/base.sh index 8ad2c0890db2..958b9c7d7a8c 100755 --- a/vagrant/base.sh +++ b/vagrant/base.sh @@ -36,8 +36,8 @@ fetch_jdk_tgz() { fi } -JDK_MAJOR="${JDK_MAJOR:-11}" -JDK_FULL="${JDK_FULL:-11.0.2-linux-x64}" +JDK_MAJOR="${JDK_MAJOR:-8}" +JDK_FULL="${JDK_FULL:-8u202-linux-x64}" if [ -z `which javac` ]; then apt-get -y update