diff --git a/tests/kafkatest/services/kafka/quorum.py b/tests/kafkatest/services/kafka/quorum.py index 724d7d0de1278..a98263b8cbac4 100644 --- a/tests/kafkatest/services/kafka/quorum.py +++ b/tests/kafkatest/services/kafka/quorum.py @@ -25,10 +25,8 @@ # How we will parameterize tests that exercise all KRaft quorum styles all_kraft = [isolated_kraft, combined_kraft] # How we will parameterize tests that are unrelated to upgrades: -# [“ZK”] before the KIP-500 bridge release(s) -# [“ZK”, “ISOLATED_KRAFT”] during the KIP-500 bridge release(s) and in preview releases # [“ISOLATED_KRAFT”] after the KIP-500 bridge release(s) -all_non_upgrade = [zk, isolated_kraft] +all_non_upgrade = [isolated_kraft] def for_test(test_context): # A test uses ZooKeeper if it doesn't specify a metadata quorum or if it explicitly specifies ZooKeeper diff --git a/tests/kafkatest/tests/client/consumer_rolling_upgrade_test.py b/tests/kafkatest/tests/client/consumer_rolling_upgrade_test.py index 9805d3656ee24..6925414925464 100644 --- a/tests/kafkatest/tests/client/consumer_rolling_upgrade_test.py +++ b/tests/kafkatest/tests/client/consumer_rolling_upgrade_test.py @@ -48,10 +48,6 @@ def _verify_roundrobin_assignment(self, consumer): "Mismatched assignment: %s" % assignment @cluster(num_nodes=4) - @matrix( - metadata_quorum=[quorum.zk], - use_new_coordinator=[False] - ) @matrix( metadata_quorum=[quorum.isolated_kraft], use_new_coordinator=[True, False] diff --git a/tests/kafkatest/tests/client/consumer_test.py b/tests/kafkatest/tests/client/consumer_test.py index 6cb82869c4f72..4bd680dd2a00e 100644 --- a/tests/kafkatest/tests/client/consumer_test.py +++ b/tests/kafkatest/tests/client/consumer_test.py @@ -76,7 +76,7 @@ def setup_consumer(self, topic, **kwargs): @cluster(num_nodes=7) @matrix( - metadata_quorum=[quorum.zk, quorum.isolated_kraft], + metadata_quorum=[quorum.isolated_kraft], use_new_coordinator=[False] ) @matrix( @@ -138,7 +138,7 @@ def test_broker_rolling_bounce(self, metadata_quorum=quorum.zk, use_new_coordina @matrix( clean_shutdown=[True], bounce_mode=["all", "rolling"], - metadata_quorum=[quorum.zk, quorum.isolated_kraft], + metadata_quorum=[quorum.isolated_kraft], use_new_coordinator=[False] ) @matrix( @@ -195,7 +195,7 @@ def test_consumer_bounce(self, clean_shutdown, bounce_mode, metadata_quorum=quor static_membership=[True, False], bounce_mode=["all", "rolling"], num_bounces=[5], - metadata_quorum=[quorum.zk, quorum.isolated_kraft], + metadata_quorum=[quorum.isolated_kraft], use_new_coordinator=[False] ) @matrix( @@ -272,7 +272,7 @@ def test_static_consumer_bounce_with_eager_assignment(self, clean_shutdown, stat @cluster(num_nodes=7) @matrix( bounce_mode=["all", "rolling"], - metadata_quorum=[quorum.zk, quorum.isolated_kraft], + metadata_quorum=[quorum.isolated_kraft], use_new_coordinator=[False] ) @matrix( @@ -314,7 +314,7 @@ def test_static_consumer_persisted_after_rejoin(self, bounce_mode, metadata_quor @matrix( num_conflict_consumers=[1, 2], fencing_stage=["stable", "all"], - metadata_quorum=[quorum.zk, quorum.isolated_kraft], + metadata_quorum=[quorum.isolated_kraft], use_new_coordinator=[False] ) @matrix( @@ -399,7 +399,7 @@ def test_fencing_static_consumer(self, num_conflict_consumers, fencing_stage, me @matrix( clean_shutdown=[True], enable_autocommit=[True, False], - metadata_quorum=[quorum.zk, quorum.isolated_kraft], + metadata_quorum=[quorum.isolated_kraft], use_new_coordinator=[False] ) @matrix( @@ -454,12 +454,6 @@ def test_consumer_failure(self, clean_shutdown, enable_autocommit, metadata_quor (consumer.last_commit(partition), consumer.current_position(partition)) @cluster(num_nodes=7) - @matrix( - clean_shutdown=[True, False], - enable_autocommit=[True, False], - metadata_quorum=[quorum.zk], - use_new_coordinator=[False] - ) @matrix( clean_shutdown=[True, False], enable_autocommit=[True, False], @@ -511,7 +505,7 @@ def test_broker_failure(self, clean_shutdown, enable_autocommit, metadata_quorum @cluster(num_nodes=7) @matrix( - metadata_quorum=[quorum.zk, quorum.isolated_kraft], + metadata_quorum=[quorum.isolated_kraft], use_new_coordinator=[False] ) @matrix( @@ -574,7 +568,7 @@ def __init__(self, test_context): "org.apache.kafka.clients.consumer.RoundRobinAssignor", "org.apache.kafka.clients.consumer.StickyAssignor", "org.apache.kafka.clients.consumer.CooperativeStickyAssignor"], - metadata_quorum=[quorum.zk, quorum.isolated_kraft], + metadata_quorum=[quorum.isolated_kraft], use_new_coordinator=[False] ) @matrix( diff --git a/tests/kafkatest/tests/client/message_format_change_test.py b/tests/kafkatest/tests/client/message_format_change_test.py deleted file mode 100644 index b65ef24704a68..0000000000000 --- a/tests/kafkatest/tests/client/message_format_change_test.py +++ /dev/null @@ -1,106 +0,0 @@ -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from ducktape.mark import matrix -from ducktape.utils.util import wait_until -from ducktape.mark.resource import cluster - -from kafkatest.services.console_consumer import ConsoleConsumer -from kafkatest.services.kafka import config_property, KafkaService, quorum -from kafkatest.services.verifiable_producer import VerifiableProducer -from kafkatest.services.zookeeper import ZookeeperService -from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest -from kafkatest.utils import is_int -from kafkatest.version import LATEST_0_9, LATEST_0_10, LATEST_0_11, V_2_8_0, DEV_BRANCH, KafkaVersion - - -class MessageFormatChangeTest(ProduceConsumeValidateTest): - - def __init__(self, test_context): - super(MessageFormatChangeTest, self).__init__(test_context=test_context) - - def setUp(self): - self.topic = "test_topic" - self.zk = ZookeeperService(self.test_context, num_nodes=1) if quorum.for_test(self.test_context) == quorum.zk else None - - if self.zk: - self.zk.start() - - # Producer and consumer - self.producer_throughput = 10000 - self.num_producers = 1 - self.num_consumers = 1 - self.messages_per_producer = 100 - - def produce_and_consume(self, producer_version, consumer_version, group): - self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka, - self.topic, - throughput=self.producer_throughput, - message_validator=is_int, - version=KafkaVersion(producer_version)) - self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka, - self.topic, consumer_timeout_ms=30000, - message_validator=is_int, version=KafkaVersion(consumer_version)) - self.consumer.group_id = group - self.run_produce_consume_validate(lambda: wait_until( - lambda: self.producer.each_produced_at_least(self.messages_per_producer) == True, - timeout_sec=120, backoff_sec=1, - err_msg="Producer did not produce all messages in reasonable amount of time")) - - @cluster(num_nodes=12) - @matrix(producer_version=[str(DEV_BRANCH)], consumer_version=[str(DEV_BRANCH)], metadata_quorum=[quorum.zk]) - @matrix(producer_version=[str(LATEST_0_10)], consumer_version=[str(LATEST_0_10)], metadata_quorum=[quorum.zk]) - @matrix(producer_version=[str(LATEST_0_9)], consumer_version=[str(LATEST_0_9)], metadata_quorum=[quorum.zk]) - def test_compatibility(self, producer_version, consumer_version, metadata_quorum=quorum.zk): - """ This tests performs the following checks: - The workload is a mix of 0.9.x, 0.10.x and 0.11.x producers and consumers - that produce to and consume from a DEV_BRANCH cluster - 1. initially the topic is using message format 0.9.0 - 2. change the message format version for topic to 0.10.0 on the fly. - 3. change the message format version for topic to 0.11.0 on the fly. - 4. change the message format version for topic back to 0.10.0 on the fly (only if the client version is 0.11.0 or newer) - - The producers and consumers should not have any issue. - - Note regarding step number 4. Downgrading the message format version is generally unsupported as it breaks - older clients. More concretely, if we downgrade a topic from 0.11.0 to 0.10.0 after it contains messages with - version 0.11.0, we will return the 0.11.0 messages without down conversion due to an optimisation in the - handling of fetch requests. This will break any consumer that doesn't support 0.11.0. So, in practice, step 4 - is similar to step 2 and it didn't seem worth it to increase the cluster size to in order to add a step 5 that - would change the message format version for the topic back to 0.9.0.0. - """ - self.kafka = KafkaService(self.test_context, num_nodes=3, zk=self.zk, version=DEV_BRANCH, topics={self.topic: { - "partitions": 3, - "replication-factor": 3, - 'configs': {"min.insync.replicas": 2}}}, - controller_num_nodes_override=1) - for node in self.kafka.nodes: - node.config[config_property.INTER_BROKER_PROTOCOL_VERSION] = str(V_2_8_0) # required for writing old message formats - - self.kafka.start() - self.logger.info("First format change to 0.9.0") - self.kafka.alter_message_format(self.topic, str(LATEST_0_9)) - self.produce_and_consume(producer_version, consumer_version, "group1") - - self.logger.info("Second format change to 0.10.0") - self.kafka.alter_message_format(self.topic, str(LATEST_0_10)) - self.produce_and_consume(producer_version, consumer_version, "group2") - - self.logger.info("Third format change to 0.11.0") - self.kafka.alter_message_format(self.topic, str(LATEST_0_11)) - self.produce_and_consume(producer_version, consumer_version, "group3") - - if producer_version == str(DEV_BRANCH) and consumer_version == str(DEV_BRANCH): - self.logger.info("Fourth format change back to 0.10.0") - self.kafka.alter_message_format(self.topic, str(LATEST_0_10)) - self.produce_and_consume(producer_version, consumer_version, "group4") - - diff --git a/tests/kafkatest/tests/connect/connect_distributed_test.py b/tests/kafkatest/tests/connect/connect_distributed_test.py index 6ac24cdb925e3..3066ac16831e6 100644 --- a/tests/kafkatest/tests/connect/connect_distributed_test.py +++ b/tests/kafkatest/tests/connect/connect_distributed_test.py @@ -175,7 +175,7 @@ def task_is_running(self, connector, task_id, node=None): @matrix( exactly_once_source=[True, False], connect_protocol=['sessioned', 'compatible', 'eager'], - metadata_quorum=[quorum.zk, quorum.isolated_kraft], + metadata_quorum=[quorum.isolated_kraft], use_new_coordinator=[False] ) @matrix( @@ -210,7 +210,7 @@ def test_restart_failed_connector(self, exactly_once_source, connect_protocol, m @matrix( connector_type=['source', 'exactly-once source', 'sink'], connect_protocol=['sessioned', 'compatible', 'eager'], - metadata_quorum=[quorum.zk, quorum.isolated_kraft], + metadata_quorum=[quorum.isolated_kraft], use_new_coordinator=[False] ) @matrix( @@ -247,7 +247,7 @@ def test_restart_failed_task(self, connector_type, connect_protocol, metadata_qu @cluster(num_nodes=5) @matrix( connect_protocol=['sessioned', 'compatible', 'eager'], - metadata_quorum=[quorum.zk, quorum.isolated_kraft], + metadata_quorum=[quorum.isolated_kraft], use_new_coordinator=[False] ) @matrix( @@ -277,7 +277,7 @@ def test_restart_connector_and_tasks_failed_connector(self, connect_protocol, me @matrix( connector_type=['source', 'sink'], connect_protocol=['sessioned', 'compatible', 'eager'], - metadata_quorum=[quorum.zk, quorum.isolated_kraft], + metadata_quorum=[quorum.isolated_kraft], use_new_coordinator=[False] ) @matrix( @@ -311,12 +311,6 @@ def test_restart_connector_and_tasks_failed_task(self, connector_type, connect_p err_msg="Failed to see task transition to the RUNNING state") @cluster(num_nodes=5) - @matrix( - exactly_once_source=[True, False], - connect_protocol=['sessioned', 'compatible', 'eager'], - metadata_quorum=[quorum.zk], - use_new_coordinator=[False] - ) @matrix( exactly_once_source=[True, False], connect_protocol=['sessioned', 'compatible', 'eager'], @@ -366,7 +360,7 @@ def test_pause_and_resume_source(self, exactly_once_source, connect_protocol, me @cluster(num_nodes=5) @matrix( connect_protocol=['sessioned', 'compatible', 'eager'], - metadata_quorum=[quorum.zk, quorum.isolated_kraft], + metadata_quorum=[quorum.isolated_kraft], use_new_coordinator=[False] ) @matrix( @@ -422,12 +416,6 @@ def test_pause_and_resume_sink(self, connect_protocol, metadata_quorum, use_new_ err_msg="Failed to consume messages after resuming sink connector") @cluster(num_nodes=5) - @matrix( - exactly_once_source=[True, False], - connect_protocol=['sessioned', 'compatible', 'eager'], - metadata_quorum=[quorum.zk], - use_new_coordinator=[False] - ) @matrix( exactly_once_source=[True, False], connect_protocol=['sessioned', 'compatible', 'eager'], @@ -643,7 +631,7 @@ def _wait_for_loggers(self, level, request_time, namespace, workers=None): security_protocol=[SecurityConfig.PLAINTEXT, SecurityConfig.SASL_SSL], exactly_once_source=[True, False], connect_protocol=['sessioned', 'compatible', 'eager'], - metadata_quorum=[quorum.zk, quorum.isolated_kraft], + metadata_quorum=[quorum.isolated_kraft], use_new_coordinator=[False] ) @matrix( @@ -693,7 +681,7 @@ def test_file_source_and_sink(self, security_protocol, exactly_once_source, conn @matrix( clean=[True, False], connect_protocol=['sessioned', 'compatible', 'eager'], - metadata_quorum=[quorum.zk, quorum.isolated_kraft], + metadata_quorum=[quorum.isolated_kraft], use_new_coordinator=[False] ) @matrix( @@ -823,12 +811,6 @@ def test_bounce(self, clean, connect_protocol, metadata_quorum, use_new_coordina assert success, "Found validation errors:\n" + "\n ".join(errors) @cluster(num_nodes=7) - @matrix( - clean=[True, False], - connect_protocol=['sessioned', 'compatible', 'eager'], - metadata_quorum=[quorum.zk], - use_new_coordinator=[False] - ) @matrix( clean=[True, False], connect_protocol=['sessioned', 'compatible', 'eager'], @@ -933,7 +915,7 @@ def test_exactly_once_source(self, clean, connect_protocol, metadata_quorum, use @cluster(num_nodes=6) @matrix( connect_protocol=['sessioned', 'compatible', 'eager'], - metadata_quorum=[quorum.zk, quorum.isolated_kraft], + metadata_quorum=[quorum.isolated_kraft], use_new_coordinator=[False] ) @matrix( diff --git a/tests/kafkatest/tests/connect/connect_test.py b/tests/kafkatest/tests/connect/connect_test.py index 312c10a0ebf7c..ad850c568b0e2 100644 --- a/tests/kafkatest/tests/connect/connect_test.py +++ b/tests/kafkatest/tests/connect/connect_test.py @@ -138,10 +138,8 @@ def validate_output(self, value): return False @cluster(num_nodes=5) - @parametrize(error_tolerance=ErrorTolerance.ALL, metadata_quorum=quorum.zk) @parametrize(error_tolerance=ErrorTolerance.NONE, metadata_quorum=quorum.isolated_kraft) @parametrize(error_tolerance=ErrorTolerance.ALL, metadata_quorum=quorum.isolated_kraft) - @parametrize(error_tolerance=ErrorTolerance.NONE, metadata_quorum=quorum.zk) def test_skip_and_log_to_dlq(self, error_tolerance, metadata_quorum): self.kafka = KafkaService(self.test_context, self.num_brokers, self.zk, topics=self.topics) diff --git a/tests/kafkatest/tests/core/consume_bench_test.py b/tests/kafkatest/tests/core/consume_bench_test.py index 0575b5d8f2172..c205604f8f6ce 100644 --- a/tests/kafkatest/tests/core/consume_bench_test.py +++ b/tests/kafkatest/tests/core/consume_bench_test.py @@ -68,14 +68,6 @@ def produce_messages(self, topics, max_messages=10000): self.logger.debug("Produce workload finished") @cluster(num_nodes=10) - @matrix( - topics=[ - ["consume_bench_topic[0-5]"], # topic subscription - ["consume_bench_topic[0-5]:[0-4]"] # manual topic assignment - ], - metadata_quorum=[quorum.zk], - use_new_coordinator=[False] - ) @matrix( topics=[ ["consume_bench_topic[0-5]"], # topic subscription @@ -114,10 +106,6 @@ def test_consume_bench(self, topics, metadata_quorum=quorum.zk, use_new_coordina self.logger.info("TASKS: %s\n" % json.dumps(tasks, sort_keys=True, indent=2)) @cluster(num_nodes=10) - @matrix( - metadata_quorum=[quorum.zk], - use_new_coordinator=[False] - ) @matrix( metadata_quorum=[quorum.isolated_kraft], use_new_coordinator=[False] @@ -149,10 +137,6 @@ def test_single_partition(self, metadata_quorum=quorum.zk, use_new_coordinator=F self.logger.info("TASKS: %s\n" % json.dumps(tasks, sort_keys=True, indent=2)) @cluster(num_nodes=10) - @matrix( - metadata_quorum=[quorum.zk], - use_new_coordinator=[False] - ) @matrix( metadata_quorum=[quorum.isolated_kraft], use_new_coordinator=[False] @@ -185,10 +169,6 @@ def test_multiple_consumers_random_group_topics(self, metadata_quorum=quorum.zk, self.logger.info("TASKS: %s\n" % json.dumps(tasks, sort_keys=True, indent=2)) @cluster(num_nodes=10) - @matrix( - metadata_quorum=[quorum.zk], - use_new_coordinator=[False] - ) @matrix( metadata_quorum=[quorum.isolated_kraft], use_new_coordinator=[False] @@ -222,10 +202,6 @@ def test_two_consumers_specified_group_topics(self, metadata_quorum=quorum.zk, u self.logger.info("TASKS: %s\n" % json.dumps(tasks, sort_keys=True, indent=2)) @cluster(num_nodes=10) - @matrix( - metadata_quorum=[quorum.zk], - use_new_coordinator=[False] - ) @matrix( metadata_quorum=[quorum.isolated_kraft], use_new_coordinator=[False] @@ -259,10 +235,6 @@ def test_multiple_consumers_random_group_partitions(self, metadata_quorum=quorum self.logger.info("TASKS: %s\n" % json.dumps(tasks, sort_keys=True, indent=2)) @cluster(num_nodes=10) - @matrix( - metadata_quorum=[quorum.zk], - use_new_coordinator=[False] - ) @matrix( metadata_quorum=[quorum.isolated_kraft], use_new_coordinator=[False] diff --git a/tests/kafkatest/tests/core/consumer_group_command_test.py b/tests/kafkatest/tests/core/consumer_group_command_test.py index 7f1d79574d355..2df53e3093a4f 100644 --- a/tests/kafkatest/tests/core/consumer_group_command_test.py +++ b/tests/kafkatest/tests/core/consumer_group_command_test.py @@ -93,7 +93,7 @@ def setup_and_verify(self, security_protocol, group=None, group_protocol=None): @cluster(num_nodes=3) @matrix( security_protocol=['PLAINTEXT', 'SSL'], - metadata_quorum=[quorum.zk, quorum.isolated_kraft], + metadata_quorum=[quorum.isolated_kraft], use_new_coordinator=[False] ) @matrix( @@ -112,7 +112,7 @@ def test_list_consumer_groups(self, security_protocol='PLAINTEXT', metadata_quor @cluster(num_nodes=3) @matrix( security_protocol=['PLAINTEXT', 'SSL'], - metadata_quorum=[quorum.zk, quorum.isolated_kraft], + metadata_quorum=[quorum.isolated_kraft], use_new_coordinator=[False] ) @matrix( diff --git a/tests/kafkatest/tests/core/controller_mutation_quota_test.py b/tests/kafkatest/tests/core/controller_mutation_quota_test.py index b5578ce857eec..bf8a3b874ed46 100644 --- a/tests/kafkatest/tests/core/controller_mutation_quota_test.py +++ b/tests/kafkatest/tests/core/controller_mutation_quota_test.py @@ -77,7 +77,7 @@ def teardown(self): self.zk.stop() @cluster(num_nodes=2) - @matrix(metadata_quorum=quorum.all) + @matrix(metadata_quorum=quorum.all_kraft) def test_controller_mutation_quota(self, metadata_quorum=quorum.zk): self.partition_count = 10 mutation_rate = 3 * self.partition_count / (self.window_num * self.window_size_seconds) diff --git a/tests/kafkatest/tests/core/fetch_from_follower_test.py b/tests/kafkatest/tests/core/fetch_from_follower_test.py index 6a096d7b92be8..a4c810116ddc7 100644 --- a/tests/kafkatest/tests/core/fetch_from_follower_test.py +++ b/tests/kafkatest/tests/core/fetch_from_follower_test.py @@ -71,7 +71,7 @@ def setUp(self): @cluster(num_nodes=9) @matrix( - metadata_quorum=[quorum.zk, quorum.isolated_kraft], + metadata_quorum=[quorum.isolated_kraft], use_new_coordinator=[False] ) @matrix( diff --git a/tests/kafkatest/tests/core/log_dir_failure_test.py b/tests/kafkatest/tests/core/log_dir_failure_test.py index 483ed674c357f..ba8390ceb1c91 100644 --- a/tests/kafkatest/tests/core/log_dir_failure_test.py +++ b/tests/kafkatest/tests/core/log_dir_failure_test.py @@ -91,8 +91,6 @@ def min_cluster_size(self): """Override this since we're adding services outside of the constructor""" return super(LogDirFailureTest, self).min_cluster_size() + self.num_producers * 2 + self.num_consumers * 2 - @cluster(num_nodes=8) - @matrix(bounce_broker=[False, True], broker_type=["leader", "follower"], security_protocol=["PLAINTEXT"], metadata_quorum=[quorum.zk]) @cluster(num_nodes=10) @matrix(bounce_broker=[False, True], broker_type=["leader", "follower"], security_protocol=["PLAINTEXT"], metadata_quorum=[quorum.isolated_kraft]) def test_replication_with_disk_failure(self, bounce_broker, security_protocol, broker_type, metadata_quorum): diff --git a/tests/kafkatest/tests/core/reassign_partitions_test.py b/tests/kafkatest/tests/core/reassign_partitions_test.py index ce8280225e852..697088dfa9c38 100644 --- a/tests/kafkatest/tests/core/reassign_partitions_test.py +++ b/tests/kafkatest/tests/core/reassign_partitions_test.py @@ -147,7 +147,7 @@ def move_start_offset(self): @matrix( bounce_brokers=[True, False], reassign_from_offset_zero=[True, False], - metadata_quorum=[quorum.zk, quorum.isolated_kraft], + metadata_quorum=[quorum.isolated_kraft], use_new_coordinator=[False] ) @matrix( diff --git a/tests/kafkatest/tests/core/replica_scale_test.py b/tests/kafkatest/tests/core/replica_scale_test.py index 8500c0bf302d7..d3ddf87bbd8f2 100644 --- a/tests/kafkatest/tests/core/replica_scale_test.py +++ b/tests/kafkatest/tests/core/replica_scale_test.py @@ -52,7 +52,7 @@ def teardown(self): topic_count=[50], partition_count=[34], replication_factor=[3], - metadata_quorum=[quorum.zk, quorum.isolated_kraft], + metadata_quorum=[quorum.isolated_kraft], use_new_coordinator=[False] ) @matrix( @@ -119,13 +119,6 @@ def test_produce_consume(self, topic_count, partition_count, replication_factor, trogdor.stop() @cluster(num_nodes=12) - @matrix( - topic_count=[50], - partition_count=[34], - replication_factor=[3], - metadata_quorum=[quorum.zk], - use_new_coordinator=[False] - ) @matrix( topic_count=[50], partition_count=[34], diff --git a/tests/kafkatest/tests/core/replication_replica_failure_test.py b/tests/kafkatest/tests/core/replication_replica_failure_test.py index f17f6a2d26cae..cc048fcfdecdf 100644 --- a/tests/kafkatest/tests/core/replication_replica_failure_test.py +++ b/tests/kafkatest/tests/core/replication_replica_failure_test.py @@ -38,7 +38,7 @@ def __init__(self, test_context): @cluster(num_nodes=7) @matrix( - metadata_quorum=[quorum.zk, quorum.isolated_kraft], + metadata_quorum=[quorum.isolated_kraft], use_new_coordinator=[False] ) @matrix( diff --git a/tests/kafkatest/tests/core/security_test.py b/tests/kafkatest/tests/core/security_test.py index 47aba24dbd693..33f3c72d7518e 100644 --- a/tests/kafkatest/tests/core/security_test.py +++ b/tests/kafkatest/tests/core/security_test.py @@ -61,7 +61,7 @@ def producer_consumer_have_expected_error(self, error): @matrix( security_protocol=['PLAINTEXT'], interbroker_security_protocol=['SSL'], - metadata_quorum=[quorum.zk, quorum.isolated_kraft], + metadata_quorum=[quorum.isolated_kraft], use_new_coordinator=[False] ) @matrix( @@ -74,7 +74,7 @@ def producer_consumer_have_expected_error(self, error): @matrix( security_protocol=['SSL'], interbroker_security_protocol=['PLAINTEXT'], - metadata_quorum=[quorum.zk, quorum.isolated_kraft], + metadata_quorum=[quorum.isolated_kraft], use_new_coordinator=[False] ) @matrix( @@ -157,7 +157,7 @@ def create_and_start_clients(self, log_level, group_protocol): @cluster(num_nodes=2) @matrix( - metadata_quorum=[quorum.zk, quorum.isolated_kraft], + metadata_quorum=[quorum.isolated_kraft], use_new_coordinator=[False] ) @matrix( diff --git a/tests/kafkatest/tests/core/transactions_test.py b/tests/kafkatest/tests/core/transactions_test.py index ac5ae03ddb4d8..11cd1c24ae334 100644 --- a/tests/kafkatest/tests/core/transactions_test.py +++ b/tests/kafkatest/tests/core/transactions_test.py @@ -217,14 +217,6 @@ def setup_topics(self): } @cluster(num_nodes=9) - @matrix( - failure_mode=["hard_bounce", "clean_bounce"], - bounce_target=["brokers", "clients"], - check_order=[True, False], - use_group_metadata=[True, False], - metadata_quorum=[quorum.zk], - use_new_coordinator=[False] - ) @matrix( failure_mode=["hard_bounce", "clean_bounce"], bounce_target=["brokers", "clients"], diff --git a/tests/kafkatest/tests/streams/streams_broker_bounce_test.py b/tests/kafkatest/tests/streams/streams_broker_bounce_test.py index 47ce6f231348b..c00b7ac3e1465 100644 --- a/tests/kafkatest/tests/streams/streams_broker_bounce_test.py +++ b/tests/kafkatest/tests/streams/streams_broker_bounce_test.py @@ -217,11 +217,6 @@ def collect_results(self, sleep_time_secs): num_threads=[1, 3], sleep_time_secs=[120], metadata_quorum=[quorum.isolated_kraft]) - @matrix(failure_mode=["clean_shutdown", "hard_shutdown", "clean_bounce", "hard_bounce"], - broker_type=["leader", "controller"], - num_threads=[1, 3], - sleep_time_secs=[120], - metadata_quorum=[quorum.zk]) def test_broker_type_bounce(self, failure_mode, broker_type, sleep_time_secs, num_threads, metadata_quorum): """ Start a smoke test client, then kill one particular broker and ensure data is still received