diff --git a/tests/kafkatest/tests/core/authorizer_test.py b/tests/kafkatest/tests/core/authorizer_test.py index 1e7178f5f812b..60c0612f356b1 100644 --- a/tests/kafkatest/tests/core/authorizer_test.py +++ b/tests/kafkatest/tests/core/authorizer_test.py @@ -19,7 +19,6 @@ from ducktape.tests.test import Test from kafkatest.services.kafka import KafkaService, quorum -from kafkatest.services.zookeeper import ZookeeperService from kafkatest.services.security.kafka_acls import ACLs class AuthorizerTest(Test): @@ -47,15 +46,8 @@ def setUp(self): def test_authorizer(self, metadata_quorum, authorizer_class): topics = {"test_topic": {"partitions": 1, "replication-factor": 1}} - if (authorizer_class == KafkaService.KRAFT_ACL_AUTHORIZER): - self.zk = None - else: - self.zk = ZookeeperService(self.test_context, num_nodes=1) - self.zk.start() - - self.kafka = KafkaService(self.test_context, num_nodes=1, zk=self.zk, - topics=topics, controller_num_nodes_override=1, - allow_zk_with_kraft=True) + self.kafka = KafkaService(self.test_context, num_nodes=1, zk=None, + topics=topics, controller_num_nodes_override=1) broker_security_protocol = "SSL" broker_principal = "User:CN=systemtest" diff --git a/tests/kafkatest/tests/core/compatibility_test_new_broker_test.py b/tests/kafkatest/tests/core/compatibility_test_new_broker_test.py index ec4a878f03257..7a9d87c0968ae 100644 --- a/tests/kafkatest/tests/core/compatibility_test_new_broker_test.py +++ b/tests/kafkatest/tests/core/compatibility_test_new_broker_test.py @@ -18,7 +18,6 @@ from kafkatest.services.kafka import KafkaService, quorum from kafkatest.services.kafka import config_property from kafkatest.services.verifiable_producer import VerifiableProducer -from kafkatest.services.zookeeper import ZookeeperService from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest from kafkatest.utils import is_int from kafkatest.version import LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, LATEST_2_6, \ @@ -33,10 +32,6 @@ def __init__(self, test_context): def setUp(self): self.topic = "test_topic" - self.zk = ZookeeperService(self.test_context, num_nodes=1) if quorum.for_test(self.test_context) == quorum.zk else None - - if self.zk: - self.zk.start() # Producer and consumer self.producer_throughput = 10000 @@ -67,7 +62,7 @@ def setUp(self): @matrix(producer_version=[str(LATEST_3_9)], consumer_version=[str(LATEST_3_9)], compression_types=[["none"]], timestamp_type=[str("CreateTime")], metadata_quorum=quorum.all_non_upgrade) @matrix(producer_version=[str(LATEST_2_1)], consumer_version=[str(LATEST_2_1)], compression_types=[["zstd"]], timestamp_type=[str("CreateTime")], metadata_quorum=quorum.all_non_upgrade) def test_compatibility(self, producer_version, consumer_version, compression_types, timestamp_type=None, metadata_quorum=quorum.zk): - self.kafka = KafkaService(self.test_context, num_nodes=3, zk=self.zk, version=DEV_BRANCH, topics={self.topic: { + self.kafka = KafkaService(self.test_context, num_nodes=3, zk=None, version=DEV_BRANCH, topics={self.topic: { "partitions": 3, "replication-factor": 3, 'configs': {"min.insync.replicas": 2}}}, diff --git a/tests/kafkatest/tests/core/consume_bench_test.py b/tests/kafkatest/tests/core/consume_bench_test.py index c205604f8f6ce..c84f0dda59e5f 100644 --- a/tests/kafkatest/tests/core/consume_bench_test.py +++ b/tests/kafkatest/tests/core/consume_bench_test.py @@ -22,15 +22,13 @@ from kafkatest.services.trogdor.consume_bench_workload import ConsumeBenchWorkloadService, ConsumeBenchWorkloadSpec from kafkatest.services.trogdor.task_spec import TaskSpec from kafkatest.services.trogdor.trogdor import TrogdorService -from kafkatest.services.zookeeper import ZookeeperService class ConsumeBenchTest(Test): def __init__(self, test_context): """:type test_context: ducktape.tests.test.TestContext""" super(ConsumeBenchTest, self).__init__(test_context) - self.zk = ZookeeperService(test_context, num_nodes=3) if quorum.for_test(test_context) == quorum.zk else None - self.kafka = KafkaService(test_context, num_nodes=3, zk=self.zk) + self.kafka = KafkaService(test_context, num_nodes=3, zk=None) self.producer_workload_service = ProduceBenchWorkloadService(test_context, self.kafka) self.consumer_workload_service = ConsumeBenchWorkloadService(test_context, self.kafka) self.consumer_workload_service_2 = ConsumeBenchWorkloadService(test_context, self.kafka) @@ -42,15 +40,11 @@ def __init__(self, test_context): def setUp(self): self.trogdor.start() - if self.zk: - self.zk.start() self.kafka.start() def teardown(self): self.trogdor.stop() self.kafka.stop() - if self.zk: - self.zk.stop() def produce_messages(self, topics, max_messages=10000): produce_spec = ProduceBenchWorkloadSpec(0, TaskSpec.MAX_DURATION_MS, @@ -85,7 +79,7 @@ def produce_messages(self, topics, max_messages=10000): use_new_coordinator=[True], group_protocol=consumer_group.all_group_protocols ) - def test_consume_bench(self, topics, metadata_quorum=quorum.zk, use_new_coordinator=False, group_protocol=None): + def test_consume_bench(self, topics, metadata_quorum, use_new_coordinator=False, group_protocol=None): """ Runs a ConsumeBench workload to consume messages """ @@ -115,7 +109,7 @@ def test_consume_bench(self, topics, metadata_quorum=quorum.zk, use_new_coordina use_new_coordinator=[True], group_protocol=consumer_group.all_group_protocols ) - def test_single_partition(self, metadata_quorum=quorum.zk, use_new_coordinator=False, group_protocol=None): + def test_single_partition(self, metadata_quorum, use_new_coordinator=False, group_protocol=None): """ Run a ConsumeBench against a single partition """ @@ -146,7 +140,7 @@ def test_single_partition(self, metadata_quorum=quorum.zk, use_new_coordinator=F use_new_coordinator=[True], group_protocol=consumer_group.all_group_protocols ) - def test_multiple_consumers_random_group_topics(self, metadata_quorum=quorum.zk, use_new_coordinator=False, group_protocol=None): + def test_multiple_consumers_random_group_topics(self, metadata_quorum, use_new_coordinator=False, group_protocol=None): """ Runs multiple consumers group to read messages from topics. Since a consumerGroup isn't specified, each consumer should read from all topics independently @@ -178,7 +172,7 @@ def test_multiple_consumers_random_group_topics(self, metadata_quorum=quorum.zk, use_new_coordinator=[True], group_protocol=consumer_group.all_group_protocols ) - def test_two_consumers_specified_group_topics(self, metadata_quorum=quorum.zk, use_new_coordinator=False, group_protocol=None): + def test_two_consumers_specified_group_topics(self, metadata_quorum, use_new_coordinator=False, group_protocol=None): """ Runs two consumers in the same consumer group to read messages from topics. Since a consumerGroup is specified, each consumer should dynamically get assigned a partition from group @@ -211,7 +205,7 @@ def test_two_consumers_specified_group_topics(self, metadata_quorum=quorum.zk, u use_new_coordinator=[True], group_protocol=consumer_group.all_group_protocols ) - def test_multiple_consumers_random_group_partitions(self, metadata_quorum=quorum.zk, use_new_coordinator=False, group_protocol=None): + def test_multiple_consumers_random_group_partitions(self, metadata_quorum, use_new_coordinator=False, group_protocol=None): """ Runs multiple consumers in to read messages from specific partitions. Since a consumerGroup isn't specified, each consumer will get assigned a random group @@ -244,7 +238,7 @@ def test_multiple_consumers_random_group_partitions(self, metadata_quorum=quorum use_new_coordinator=[True], group_protocol=consumer_group.all_group_protocols ) - def test_multiple_consumers_specified_group_partitions_should_raise(self, metadata_quorum=quorum.zk, use_new_coordinator=False, group_protocol=None): + def test_multiple_consumers_specified_group_partitions_should_raise(self, metadata_quorum, use_new_coordinator=False, group_protocol=None): """ Runs multiple consumers in the same group to read messages from specific partitions. It is an invalid configuration to provide a consumer group and specific partitions. diff --git a/tests/kafkatest/tests/core/consumer_group_command_test.py b/tests/kafkatest/tests/core/consumer_group_command_test.py index 2df53e3093a4f..57af320574b78 100644 --- a/tests/kafkatest/tests/core/consumer_group_command_test.py +++ b/tests/kafkatest/tests/core/consumer_group_command_test.py @@ -19,7 +19,6 @@ from ducktape.mark import matrix from ducktape.mark.resource import cluster -from kafkatest.services.zookeeper import ZookeeperService from kafkatest.services.kafka import KafkaService, quorum, consumer_group from kafkatest.services.console_consumer import ConsoleConsumer from kafkatest.services.security.security_config import SecurityConfig @@ -40,23 +39,17 @@ class ConsumerGroupCommandTest(Test): def __init__(self, test_context): super(ConsumerGroupCommandTest, self).__init__(test_context) - self.num_zk = 1 self.num_brokers = 1 self.topics = { TOPIC: {'partitions': 1, 'replication-factor': 1} } - self.zk = ZookeeperService(test_context, self.num_zk) if quorum.for_test(test_context) == quorum.zk else None - - def setUp(self): - if self.zk: - self.zk.start() def start_kafka(self, security_protocol, interbroker_security_protocol): self.kafka = KafkaService( self.test_context, self.num_brokers, - self.zk, security_protocol=security_protocol, + None, security_protocol=security_protocol, interbroker_security_protocol=interbroker_security_protocol, topics=self.topics, - controller_num_nodes_override=self.num_zk) + controller_num_nodes_override=self.num_brokers) self.kafka.start() def start_consumer(self, group_protocol=None): @@ -102,7 +95,7 @@ def setup_and_verify(self, security_protocol, group=None, group_protocol=None): use_new_coordinator=[True], group_protocol=consumer_group.all_group_protocols ) - def test_list_consumer_groups(self, security_protocol='PLAINTEXT', metadata_quorum=quorum.zk, use_new_coordinator=False, group_protocol=None): + def test_list_consumer_groups(self, security_protocol='PLAINTEXT', metadata_quorum=quorum.isolated_kraft, use_new_coordinator=False, group_protocol=None): """ Tests if ConsumerGroupCommand is listing correct consumer groups :return: None @@ -121,7 +114,7 @@ def test_list_consumer_groups(self, security_protocol='PLAINTEXT', metadata_quor use_new_coordinator=[True], group_protocol=consumer_group.all_group_protocols ) - def test_describe_consumer_group(self, security_protocol='PLAINTEXT', metadata_quorum=quorum.zk, use_new_coordinator=False, group_protocol=None): + def test_describe_consumer_group(self, security_protocol='PLAINTEXT', metadata_quorum=quorum.isolated_kraft, use_new_coordinator=False, group_protocol=None): """ Tests if ConsumerGroupCommand is describing a consumer group correctly :return: None diff --git a/tests/kafkatest/tests/core/controller_mutation_quota_test.py b/tests/kafkatest/tests/core/controller_mutation_quota_test.py index bf8a3b874ed46..98f33deab1f17 100644 --- a/tests/kafkatest/tests/core/controller_mutation_quota_test.py +++ b/tests/kafkatest/tests/core/controller_mutation_quota_test.py @@ -17,15 +17,7 @@ from ducktape.mark import matrix from ducktape.tests.test import Test -from kafkatest.services.trogdor.produce_bench_workload import ProduceBenchWorkloadService, ProduceBenchWorkloadSpec -from kafkatest.services.trogdor.consume_bench_workload import ConsumeBenchWorkloadService, ConsumeBenchWorkloadSpec -from kafkatest.services.trogdor.task_spec import TaskSpec from kafkatest.services.kafka import KafkaService, quorum -from kafkatest.services.trogdor.trogdor import TrogdorService -from kafkatest.services.zookeeper import ZookeeperService - -import time - class ControllerMutationQuotaTest(Test): """Tests throttled partition changes via the kafka-topics CLI as follows: @@ -54,11 +46,10 @@ class ControllerMutationQuotaTest(Test): def __init__(self, test_context): super(ControllerMutationQuotaTest, self).__init__(test_context=test_context) self.test_context = test_context - self.zk = ZookeeperService(test_context, num_nodes=1) if quorum.for_test(test_context) == quorum.zk else None self.window_num = 10 self.window_size_seconds = 200 # must be long enough such that all CLI commands fit into it - self.kafka = KafkaService(self.test_context, num_nodes=1, zk=self.zk, + self.kafka = KafkaService(self.test_context, num_nodes=1, zk=None, server_prop_overrides=[ ["quota.window.num", "%s" % self.window_num], ["controller.quota.window.size.seconds", "%s" % self.window_size_seconds] @@ -66,19 +57,15 @@ def __init__(self, test_context): controller_num_nodes_override=1) def setUp(self): - if self.zk: - self.zk.start() self.kafka.start() def teardown(self): # Need to increase the timeout due to partition count self.kafka.stop() - if self.zk: - self.zk.stop() @cluster(num_nodes=2) @matrix(metadata_quorum=quorum.all_kraft) - def test_controller_mutation_quota(self, metadata_quorum=quorum.zk): + def test_controller_mutation_quota(self, metadata_quorum): self.partition_count = 10 mutation_rate = 3 * self.partition_count / (self.window_num * self.window_size_seconds) diff --git a/tests/kafkatest/tests/core/delegation_token_test.py b/tests/kafkatest/tests/core/delegation_token_test.py index 4855572309891..4a1b1e83a9aaf 100644 --- a/tests/kafkatest/tests/core/delegation_token_test.py +++ b/tests/kafkatest/tests/core/delegation_token_test.py @@ -18,7 +18,6 @@ from ducktape.tests.test import Test from ducktape.utils.util import wait_until from kafkatest.services.kafka import config_property, KafkaService, quorum -from kafkatest.services.zookeeper import ZookeeperService from kafkatest.services.console_consumer import ConsoleConsumer from kafkatest.services.delegation_tokens import DelegationTokens from kafkatest.services.verifiable_producer import VerifiableProducer @@ -35,8 +34,7 @@ def __init__(self, test_context): self.test_context = test_context self.topic = "topic" - self.zk = ZookeeperService(test_context, num_nodes=1) if quorum.for_test(test_context) == quorum.zk else None - self.kafka = KafkaService(self.test_context, num_nodes=1, zk=self.zk, zk_chroot="/kafka", + self.kafka = KafkaService(self.test_context, num_nodes=1, zk=None, topics={self.topic: {"partitions": 1, "replication-factor": 1}}, server_prop_overrides=[ [config_property.DELEGATION_TOKEN_MAX_LIFETIME_MS, "604800000"], @@ -66,11 +64,6 @@ def __init__(self, test_context): self.kafka.client_sasl_mechanism = 'GSSAPI,SCRAM-SHA-256' self.kafka.interbroker_sasl_mechanism = 'GSSAPI' - - def setUp(self): - if self.zk: - self.zk.start() - def tearDown(self): self.producer.nodes[0].account.remove(self.jaas_deleg_conf_path) self.consumer.nodes[0].account.remove(self.jaas_deleg_conf_path) @@ -114,7 +107,7 @@ def renew_delegation_token(self): @cluster(num_nodes=5) @matrix(metadata_quorum=quorum.all_non_upgrade) - def test_delegation_token_lifecycle(self, metadata_quorum=quorum.zk): + def test_delegation_token_lifecycle(self, metadata_quorum): self.kafka.start() self.delegation_tokens = DelegationTokens(self.kafka, self.test_context) diff --git a/tests/kafkatest/tests/core/fetch_from_follower_test.py b/tests/kafkatest/tests/core/fetch_from_follower_test.py index a4c810116ddc7..8db6e6d31105e 100644 --- a/tests/kafkatest/tests/core/fetch_from_follower_test.py +++ b/tests/kafkatest/tests/core/fetch_from_follower_test.py @@ -23,7 +23,6 @@ from kafkatest.services.kafka import KafkaService, quorum, consumer_group from kafkatest.services.monitor.jmx import JmxTool from kafkatest.services.verifiable_producer import VerifiableProducer -from kafkatest.services.zookeeper import ZookeeperService from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest from kafkatest.utils import is_int @@ -37,10 +36,9 @@ def __init__(self, test_context): super(FetchFromFollowerTest, self).__init__(test_context=test_context) self.jmx_tool = JmxTool(test_context, jmx_poll_ms=100) self.topic = "test_topic" - self.zk = ZookeeperService(test_context, num_nodes=1) if quorum.for_test(test_context) == quorum.zk else None self.kafka = KafkaService(test_context, num_nodes=3, - zk=self.zk, + zk=None, topics={ self.topic: { "partitions": 1, @@ -65,8 +63,6 @@ def min_cluster_size(self): return super(FetchFromFollowerTest, self).min_cluster_size() + self.num_producers * 2 + self.num_consumers * 2 def setUp(self): - if self.zk: - self.zk.start() self.kafka.start() @cluster(num_nodes=9) @@ -79,7 +75,7 @@ def setUp(self): use_new_coordinator=[True], group_protocol=consumer_group.all_group_protocols ) - def test_consumer_preferred_read_replica(self, metadata_quorum=quorum.zk, use_new_coordinator=False, group_protocol=None): + def test_consumer_preferred_read_replica(self, metadata_quorum, use_new_coordinator=False, group_protocol=None): """ This test starts up brokers with "broker.rack" and "replica.selector.class" configurations set. The replica selector is set to the rack-aware implementation. One of the brokers has a different rack than the other two. diff --git a/tests/kafkatest/tests/core/get_offset_shell_test.py b/tests/kafkatest/tests/core/get_offset_shell_test.py index b48185d15d20f..c54dc565ff6df 100644 --- a/tests/kafkatest/tests/core/get_offset_shell_test.py +++ b/tests/kafkatest/tests/core/get_offset_shell_test.py @@ -20,7 +20,6 @@ from ducktape.mark.resource import cluster from kafkatest.services.verifiable_producer import VerifiableProducer -from kafkatest.services.zookeeper import ZookeeperService from kafkatest.services.kafka import KafkaService, quorum from kafkatest.services.console_consumer import ConsoleConsumer @@ -51,7 +50,6 @@ class GetOffsetShellTest(Test): """ def __init__(self, test_context): super(GetOffsetShellTest, self).__init__(test_context) - self.num_zk = 1 self.num_brokers = 1 self.messages_received_count = 0 self.topics = { @@ -64,16 +62,10 @@ def __init__(self, test_context): TOPIC_TEST_TOPIC_PARTITIONS2: {'partitions': 2, 'replication-factor': REPLICATION_FACTOR} } - self.zk = ZookeeperService(test_context, self.num_zk) if quorum.for_test(test_context) == quorum.zk else None - - def setUp(self): - if self.zk: - self.zk.start() - def start_kafka(self, security_protocol, interbroker_security_protocol): self.kafka = KafkaService( self.test_context, self.num_brokers, - self.zk, security_protocol=security_protocol, + None, security_protocol=security_protocol, interbroker_security_protocol=interbroker_security_protocol, topics=self.topics) self.kafka.start() diff --git a/tests/kafkatest/tests/core/group_mode_transactions_test.py b/tests/kafkatest/tests/core/group_mode_transactions_test.py index 1ffab0413c9c9..2db9c62b46bd1 100644 --- a/tests/kafkatest/tests/core/group_mode_transactions_test.py +++ b/tests/kafkatest/tests/core/group_mode_transactions_test.py @@ -13,7 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -from kafkatest.services.zookeeper import ZookeeperService from kafkatest.services.kafka import KafkaService, quorum from kafkatest.services.console_consumer import ConsoleConsumer from kafkatest.services.verifiable_producer import VerifiableProducer @@ -61,14 +60,9 @@ def __init__(self, test_context): self.progress_timeout_sec = 60 self.consumer_group = "grouped-transactions-test-consumer-group" - self.zk = ZookeeperService(test_context, num_nodes=1) if quorum.for_test(test_context) == quorum.zk else None self.kafka = KafkaService(test_context, num_nodes=self.num_brokers, - zk=self.zk, controller_num_nodes_override=1) - - def setUp(self): - if self.zk: - self.zk.start() + zk=None, controller_num_nodes_override=1) def seed_messages(self, topic, num_seed_messages): seed_timeout_sec = 10000 @@ -98,16 +92,11 @@ def bounce_brokers(self, clean_shutdown): else: self.kafka.stop_node(node, clean_shutdown = False) gracePeriodSecs = 5 - if self.zk: - wait_until(lambda: not self.kafka.pids(node) and not self.kafka.is_registered(node), - timeout_sec=self.kafka.zk_session_timeout + gracePeriodSecs, - err_msg="Failed to see timely deregistration of hard-killed broker %s" % str(node.account)) - else: - brokerSessionTimeoutSecs = 18 - wait_until(lambda: not self.kafka.pids(node), - timeout_sec=brokerSessionTimeoutSecs + gracePeriodSecs, - err_msg="Failed to see timely disappearance of process for hard-killed broker %s" % str(node.account)) - time.sleep(brokerSessionTimeoutSecs + gracePeriodSecs) + brokerSessionTimeoutSecs = 18 + wait_until(lambda: not self.kafka.pids(node), + timeout_sec=brokerSessionTimeoutSecs + gracePeriodSecs, + err_msg="Failed to see timely disappearance of process for hard-killed broker %s" % str(node.account)) + time.sleep(brokerSessionTimeoutSecs + gracePeriodSecs) self.kafka.start_node(node) self.kafka.await_no_under_replicated_partitions() @@ -271,8 +260,9 @@ def setup_topics(self): @cluster(num_nodes=10) @matrix(failure_mode=["hard_bounce", "clean_bounce"], - bounce_target=["brokers", "clients"]) - def test_transactions(self, failure_mode, bounce_target, metadata_quorum=quorum.zk): + bounce_target=["brokers", "clients"], + metadata_quorum=quorum.all_non_upgrade) + def test_transactions(self, failure_mode, bounce_target, metadata_quorum): security_protocol = 'PLAINTEXT' self.kafka.security_protocol = security_protocol self.kafka.interbroker_security_protocol = security_protocol diff --git a/tests/kafkatest/tests/core/kraft_upgrade_test.py b/tests/kafkatest/tests/core/kraft_upgrade_test.py index dc0dc261d1c62..604ffa01803b8 100644 --- a/tests/kafkatest/tests/core/kraft_upgrade_test.py +++ b/tests/kafkatest/tests/core/kraft_upgrade_test.py @@ -53,7 +53,7 @@ def wait_until_rejoin(self): wait_until(lambda: len(self.kafka.isr_idx_list(self.topic, partition)) == self.replication_factor, timeout_sec=60, backoff_sec=1, err_msg="Replicas did not rejoin the ISR in a reasonable amount of time") - def perform_version_change(self, from_kafka_version): + def upgrade_to_dev_version(self, from_kafka_version, update_metadata_version): self.logger.info("Performing rolling upgrade.") for node in self.kafka.controller_quorum.nodes: self.logger.info("Stopping controller node %s" % node.account.hostname) @@ -71,8 +71,28 @@ def perform_version_change(self, from_kafka_version): self.kafka.start_node(node) self.wait_until_rejoin() self.logger.info("Successfully restarted broker node %s" % node.account.hostname) - self.logger.info("Changing metadata.version to %s" % LATEST_STABLE_METADATA_VERSION) - self.kafka.upgrade_metadata_version(LATEST_STABLE_METADATA_VERSION) + if update_metadata_version: + self.logger.info("Changing metadata.version to %s" % LATEST_STABLE_METADATA_VERSION) + self.kafka.upgrade_metadata_version(LATEST_STABLE_METADATA_VERSION) + + def downgrade_to_version(self, to_kafka_version): + self.logger.info("Performing rolling downgrade.") + for node in self.kafka.controller_quorum.nodes: + self.logger.info("Stopping controller node %s" % node.account.hostname) + self.kafka.controller_quorum.stop_node(node) + node.version = KafkaVersion(to_kafka_version) + self.logger.info("Restarting controller node %s" % node.account.hostname) + self.kafka.controller_quorum.start_node(node) + self.wait_until_rejoin() + self.logger.info("Successfully restarted controller node %s" % node.account.hostname) + for node in self.kafka.nodes: + self.logger.info("Stopping broker node %s" % node.account.hostname) + self.kafka.stop_node(node) + node.version = KafkaVersion(to_kafka_version) + self.logger.info("Restarting broker node %s" % node.account.hostname) + self.kafka.start_node(node) + self.wait_until_rejoin() + self.logger.info("Successfully restarted broker node %s" % node.account.hostname) def run_upgrade(self, from_kafka_version): """Test upgrade of Kafka broker cluster from various versions to the current version @@ -102,7 +122,42 @@ def run_upgrade(self, from_kafka_version): self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka, self.topic, consumer_timeout_ms=30000, message_validator=is_int, version=KafkaVersion(from_kafka_version)) - self.run_produce_consume_validate(core_test_action=lambda: self.perform_version_change(from_kafka_version)) + + self.run_produce_consume_validate(core_test_action=lambda: self.upgrade_to_dev_version(from_kafka_version, True)) + cluster_id = self.kafka.cluster_id() + assert cluster_id is not None + assert len(cluster_id) == 22 + assert self.kafka.check_protocol_errors(self) + + def run_upgrade_downgrade(self, starting_kafka_version): + """Test upgrade and downgrade of Kafka broker cluster from various versions to current version and back + + - Start 3 node broker cluster on version 'starting_kafka_version'. + - Perform rolling upgrade but do not update metadata.version. + - Start producer and consumer in the background. + - Perform rolling downgrade. + - Finally, validate that every message acked by the producer was consumed by the consumer. + """ + fromKafkaVersion = KafkaVersion(starting_kafka_version) + self.kafka = KafkaService(self.test_context, + num_nodes=3, + zk=None, + version=fromKafkaVersion, + topics={self.topic: {"partitions": self.partitions, + "replication-factor": self.replication_factor, + 'configs': {"min.insync.replicas": 2}}}) + self.kafka.start() + self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka, + self.topic, throughput=self.producer_throughput, + message_validator=is_int, + compression_types=["none"], + version=KafkaVersion(starting_kafka_version)) + self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka, + self.topic, consumer_timeout_ms=30000, + message_validator=is_int, version=KafkaVersion(starting_kafka_version)) + self.upgrade_to_dev_version(starting_kafka_version, False) + + self.run_produce_consume_validate(core_test_action=lambda: self.downgrade_to_version(starting_kafka_version)) cluster_id = self.kafka.cluster_id() assert cluster_id is not None assert len(cluster_id) == 22 @@ -120,3 +175,14 @@ def test_combined_mode_upgrade(self, from_kafka_version, metadata_quorum, use_ne def test_isolated_mode_upgrade(self, from_kafka_version, metadata_quorum, use_new_coordinator=False): self.run_upgrade(from_kafka_version) + @cluster(num_nodes=5) + @matrix(from_kafka_version=[str(LATEST_3_3), str(LATEST_3_4), str(LATEST_3_5), str(LATEST_3_6), str(LATEST_3_7), str(LATEST_3_8), str(LATEST_3_9), str(DEV_BRANCH)], + metadata_quorum=[combined_kraft]) + def test_combined_mode_upgrade_downgrade(self, from_kafka_version, metadata_quorum, use_new_coordinator=False): + self.run_upgrade_downgrade(from_kafka_version) + + @cluster(num_nodes=8) + @matrix(from_kafka_version=[str(LATEST_3_3), str(LATEST_3_4), str(LATEST_3_5), str(LATEST_3_6), str(LATEST_3_7), str(LATEST_3_8), str(LATEST_3_9), str(DEV_BRANCH)], + metadata_quorum=[isolated_kraft]) + def test_isolated_mode_upgrade_downgrade(self, from_kafka_version, metadata_quorum, use_new_coordinator=False): + self.run_upgrade_downgrade(from_kafka_version) \ No newline at end of file diff --git a/tests/kafkatest/tests/core/log_dir_failure_test.py b/tests/kafkatest/tests/core/log_dir_failure_test.py index ba8390ceb1c91..4bc453c517bff 100644 --- a/tests/kafkatest/tests/core/log_dir_failure_test.py +++ b/tests/kafkatest/tests/core/log_dir_failure_test.py @@ -17,7 +17,6 @@ from ducktape.mark import matrix from ducktape.mark.resource import cluster from kafkatest.services.kafka import config_property -from kafkatest.services.zookeeper import ZookeeperService from kafkatest.services.kafka import KafkaService, quorum from kafkatest.services.verifiable_producer import VerifiableProducer from kafkatest.services.console_consumer import ConsoleConsumer @@ -62,10 +61,9 @@ def __init__(self, test_context): self.topic1 = "test_topic_1" self.topic2 = "test_topic_2" - self.zk = ZookeeperService(test_context, num_nodes=1) if quorum.for_test(test_context) == quorum.zk else None self.kafka = KafkaService(test_context, num_nodes=3, - zk=self.zk, + zk=None, topics={ self.topic1: {"partitions": 1, "replication-factor": 3, "configs": {"min.insync.replicas": 1}}, self.topic2: {"partitions": 1, "replication-factor": 3, "configs": {"min.insync.replicas": 2}} @@ -83,10 +81,6 @@ def __init__(self, test_context): self.num_producers = 1 self.num_consumers = 1 - def setUp(self): - if self.zk: - self.zk.start() - def min_cluster_size(self): """Override this since we're adding services outside of the constructor""" return super(LogDirFailureTest, self).min_cluster_size() + self.num_producers * 2 + self.num_consumers * 2 diff --git a/tests/kafkatest/tests/core/produce_bench_test.py b/tests/kafkatest/tests/core/produce_bench_test.py index 5ac08d7b3f307..23a0304193cf2 100644 --- a/tests/kafkatest/tests/core/produce_bench_test.py +++ b/tests/kafkatest/tests/core/produce_bench_test.py @@ -21,14 +21,12 @@ from kafkatest.services.trogdor.produce_bench_workload import ProduceBenchWorkloadService, ProduceBenchWorkloadSpec from kafkatest.services.trogdor.task_spec import TaskSpec from kafkatest.services.trogdor.trogdor import TrogdorService -from kafkatest.services.zookeeper import ZookeeperService class ProduceBenchTest(Test): def __init__(self, test_context): """:type test_context: ducktape.tests.test.TestContext""" super(ProduceBenchTest, self).__init__(test_context) - self.zk = ZookeeperService(test_context, num_nodes=3) if quorum.for_test(test_context) == quorum.zk else None - self.kafka = KafkaService(test_context, num_nodes=3, zk=self.zk) + self.kafka = KafkaService(test_context, num_nodes=3, zk=None) self.workload_service = ProduceBenchWorkloadService(test_context, self.kafka) self.trogdor = TrogdorService(context=self.test_context, client_services=[self.kafka, self.workload_service]) @@ -37,19 +35,15 @@ def __init__(self, test_context): def setUp(self): self.trogdor.start() - if self.zk: - self.zk.start() self.kafka.start() def teardown(self): self.trogdor.stop() self.kafka.stop() - if self.zk: - self.zk.stop() @cluster(num_nodes=8) @matrix(metadata_quorum=quorum.all_non_upgrade) - def test_produce_bench(self, metadata_quorum=quorum.zk): + def test_produce_bench(self, metadata_quorum): spec = ProduceBenchWorkloadSpec(0, TaskSpec.MAX_DURATION_MS, self.workload_service.producer_node, self.workload_service.bootstrap_servers, @@ -66,7 +60,8 @@ def test_produce_bench(self, metadata_quorum=quorum.zk): self.logger.info("TASKS: %s\n" % json.dumps(tasks, sort_keys=True, indent=2)) @cluster(num_nodes=8) - def test_produce_bench_transactions(self, metadata_quorum=quorum.zk): + @matrix(metadata_quorum=quorum.all_non_upgrade) + def test_produce_bench_transactions(self, metadata_quorum): spec = ProduceBenchWorkloadSpec(0, TaskSpec.MAX_DURATION_MS, self.workload_service.producer_node, self.workload_service.bootstrap_servers, diff --git a/tests/kafkatest/tests/core/reassign_partitions_test.py b/tests/kafkatest/tests/core/reassign_partitions_test.py index 697088dfa9c38..dc7907630bec3 100644 --- a/tests/kafkatest/tests/core/reassign_partitions_test.py +++ b/tests/kafkatest/tests/core/reassign_partitions_test.py @@ -18,7 +18,6 @@ from ducktape.utils.util import wait_until from kafkatest.services.kafka import config_property -from kafkatest.services.zookeeper import ZookeeperService from kafkatest.services.kafka import KafkaService, quorum, consumer_group, TopicPartition from kafkatest.services.verifiable_producer import VerifiableProducer from kafkatest.services.console_consumer import ConsoleConsumer @@ -35,19 +34,17 @@ class ReassignPartitionsTest(ProduceConsumeValidateTest): """ def __init__(self, test_context): - self.num_zk = 1 """:type test_context: ducktape.tests.test.TestContext""" super(ReassignPartitionsTest, self).__init__(test_context=test_context) self.topic = "test_topic" self.num_partitions = 20 - self.zk = ZookeeperService(test_context, self.num_zk) if quorum.for_test(test_context) == quorum.zk else None # We set the min.insync.replicas to match the replication factor because # it makes the test more stringent. If min.isr = 2 and # replication.factor=3, then the test would tolerate the failure of # reassignment for upto one replica per partition, which is not # desirable for this test in particular. - self.kafka = KafkaService(test_context, num_nodes=4, zk=self.zk, + self.kafka = KafkaService(test_context, num_nodes=4, zk=None, server_prop_overrides=[ [config_property.LOG_ROLL_TIME_MS, "5000"], [config_property.LOG_RETENTION_CHECK_INTERVAL_MS, "5000"] @@ -59,16 +56,12 @@ def __init__(self, test_context): "min.insync.replicas": 3, }} }, - controller_num_nodes_override=self.num_zk) + controller_num_nodes_override=1) self.timeout_sec = 60 self.producer_throughput = 1000 self.num_producers = 1 self.num_consumers = 1 - def setUp(self): - if self.zk: - self.zk.start() - def min_cluster_size(self): # Override this since we're adding services outside of the constructor return super(ReassignPartitionsTest, self).min_cluster_size() + self.num_producers + self.num_consumers @@ -159,7 +152,7 @@ def move_start_offset(self): ) def test_reassign_partitions(self, bounce_brokers, reassign_from_offset_zero, metadata_quorum, use_new_coordinator=False, group_protocol=None): """Reassign partitions tests. - Setup: 1 zk, 4 kafka nodes, 1 topic with partitions=20, replication-factor=3, + Setup: 1 controller, 4 kafka nodes, 1 topic with partitions=20, replication-factor=3, and min.insync.replicas=3 - Produce messages in the background diff --git a/tests/kafkatest/tests/core/replica_scale_test.py b/tests/kafkatest/tests/core/replica_scale_test.py index d3ddf87bbd8f2..cd7646a53d75a 100644 --- a/tests/kafkatest/tests/core/replica_scale_test.py +++ b/tests/kafkatest/tests/core/replica_scale_test.py @@ -22,7 +22,6 @@ from kafkatest.services.trogdor.task_spec import TaskSpec from kafkatest.services.kafka import KafkaService, quorum, consumer_group from kafkatest.services.trogdor.trogdor import TrogdorService -from kafkatest.services.zookeeper import ZookeeperService import time @@ -31,12 +30,9 @@ class ReplicaScaleTest(Test): def __init__(self, test_context): super(ReplicaScaleTest, self).__init__(test_context=test_context) self.test_context = test_context - self.zk = ZookeeperService(test_context, num_nodes=1) if quorum.for_test(test_context) == quorum.zk else None - self.kafka = KafkaService(self.test_context, num_nodes=8, zk=self.zk, controller_num_nodes_override=1) + self.kafka = KafkaService(self.test_context, num_nodes=8, zk=None, controller_num_nodes_override=1) def setUp(self): - if self.zk: - self.zk.start() self.kafka.start() def teardown(self): @@ -44,8 +40,6 @@ def teardown(self): for node in self.kafka.nodes: self.kafka.stop_node(node, clean_shutdown=False, timeout_sec=60) self.kafka.stop() - if self.zk: - self.zk.stop() @cluster(num_nodes=12) @matrix( @@ -64,7 +58,7 @@ def teardown(self): group_protocol=consumer_group.all_group_protocols ) def test_produce_consume(self, topic_count, partition_count, replication_factor, - metadata_quorum=quorum.zk, use_new_coordinator=False, group_protocol=None): + metadata_quorum, use_new_coordinator=False, group_protocol=None): topics_create_start_time = time.time() for i in range(topic_count): topic = "replicas_produce_consume_%d" % i @@ -127,7 +121,7 @@ def test_produce_consume(self, topic_count, partition_count, replication_factor, use_new_coordinator=[True, False] ) def test_clean_bounce(self, topic_count, partition_count, replication_factor, - metadata_quorum=quorum.zk, use_new_coordinator=False): + metadata_quorum, use_new_coordinator=False): topics_create_start_time = time.time() for i in range(topic_count): topic = "topic-%04d" % i diff --git a/tests/kafkatest/tests/core/replication_replica_failure_test.py b/tests/kafkatest/tests/core/replication_replica_failure_test.py index cc048fcfdecdf..6e644ce5a22bd 100644 --- a/tests/kafkatest/tests/core/replication_replica_failure_test.py +++ b/tests/kafkatest/tests/core/replication_replica_failure_test.py @@ -46,13 +46,13 @@ def __init__(self, test_context): use_new_coordinator=[True], group_protocol=consumer_group.all_group_protocols ) - def test_replication_with_replica_failure(self, metadata_quorum=quorum.zk, use_new_coordinator=False, group_protocol=None): + def test_replication_with_replica_failure(self, metadata_quorum, use_new_coordinator=False, group_protocol=None): """ This test verifies that replication shrinks the ISR when a replica is not fetching anymore. It also verifies that replication provides simple durability guarantees by checking that data acked by brokers is still available for consumption. - Setup: 1 zk/KRaft controller, 3 kafka nodes, 1 topic with partitions=1, replication-factor=3, and min.insync.replicas=2 + Setup: 1 KRaft controller, 3 kafka nodes, 1 topic with partitions=1, replication-factor=3, and min.insync.replicas=2 - Produce messages in the background - Consume messages in the background - Partition a follower @@ -60,10 +60,6 @@ def test_replication_with_replica_failure(self, metadata_quorum=quorum.zk, use_n - Stop producing and finish consuming - Validate that every acked message was consumed """ - self.create_zookeeper_if_necessary() - if self.zk: - self.zk.start() - self.create_kafka(num_nodes=3, server_prop_overrides=[["replica.lag.time.max.ms", "10000"]], controller_num_nodes_override=1) @@ -73,13 +69,7 @@ def test_replication_with_replica_failure(self, metadata_quorum=quorum.zk, use_n client_services=[self.kafka]) self.trogdor.start() - # If ZK is used, the partition leader is put on the controller node - # to avoid partitioning the controller later on in the test. - if self.zk: - controller = self.kafka.controller() - assignment = [self.kafka.idx(controller)] + [self.kafka.idx(node) for node in self.kafka.nodes if node != controller] - else: - assignment = [self.kafka.idx(node) for node in self.kafka.nodes] + assignment = [self.kafka.idx(node) for node in self.kafka.nodes] self.topic = "test_topic" self.kafka.create_topic({"topic": self.topic, diff --git a/tests/kafkatest/tests/core/replication_test.py b/tests/kafkatest/tests/core/replication_test.py index f219744aa9305..c1e2e6df3ee07 100644 --- a/tests/kafkatest/tests/core/replication_test.py +++ b/tests/kafkatest/tests/core/replication_test.py @@ -122,21 +122,16 @@ def min_cluster_size(self): @matrix(failure_mode=["clean_shutdown", "hard_shutdown", "clean_bounce", "hard_bounce"], broker_type=["leader"], security_protocol=["PLAINTEXT"], - enable_idempotence=[True]) + enable_idempotence=[True], + metadata_quorum=quorum.all_non_upgrade) @matrix(failure_mode=["clean_shutdown", "hard_shutdown", "clean_bounce", "hard_bounce"], broker_type=["leader"], security_protocol=["PLAINTEXT", "SASL_SSL"], metadata_quorum=quorum.all_non_upgrade) - @matrix(failure_mode=["clean_shutdown", "hard_shutdown", "clean_bounce", "hard_bounce"], - broker_type=["controller"], - security_protocol=["PLAINTEXT", "SASL_SSL"]) @matrix(failure_mode=["hard_bounce"], broker_type=["leader"], security_protocol=["SASL_SSL"], client_sasl_mechanism=["PLAIN"], interbroker_sasl_mechanism=["PLAIN", "GSSAPI"], metadata_quorum=quorum.all_non_upgrade) - @parametrize(failure_mode="hard_bounce", - broker_type="leader", - security_protocol="SASL_SSL", client_sasl_mechanism="SCRAM-SHA-256", interbroker_sasl_mechanism="SCRAM-SHA-512") @matrix(failure_mode=["clean_shutdown", "hard_shutdown", "clean_bounce", "hard_bounce"], security_protocol=["PLAINTEXT"], broker_type=["leader"], compression_type=["gzip"], tls_version=["TLSv1.2", "TLSv1.3"], metadata_quorum=quorum.all_non_upgrade) @@ -148,7 +143,7 @@ def test_replication_with_broker_failure(self, failure_mode, security_protocol, These tests verify that replication provides simple durability guarantees by checking that data acked by brokers is still available for consumption in the face of various failure scenarios. - Setup: 1 zk/KRaft controller, 3 kafka nodes, 1 topic with partitions=3, replication-factor=3, and min.insync.replicas=2 + Setup: 1 KRaft controller, 3 kafka nodes, 1 topic with partitions=3, replication-factor=3, and min.insync.replicas=2 - Produce messages in the background - Consume messages in the background @@ -159,9 +154,6 @@ def test_replication_with_broker_failure(self, failure_mode, security_protocol, if failure_mode == "controller" and metadata_quorum != quorum.zk: raise Exception("There is no controller broker when using KRaft metadata quorum") - self.create_zookeeper_if_necessary() - if self.zk: - self.zk.start() self.create_kafka(num_nodes=3, security_protocol=security_protocol, diff --git a/tests/kafkatest/tests/core/round_trip_fault_test.py b/tests/kafkatest/tests/core/round_trip_fault_test.py index 212c705240048..24dd29f7f31a7 100644 --- a/tests/kafkatest/tests/core/round_trip_fault_test.py +++ b/tests/kafkatest/tests/core/round_trip_fault_test.py @@ -33,12 +33,9 @@ class RoundTripFaultTest(Test): def __init__(self, test_context): """:type test_context: ducktape.tests.test.TestContext""" super(RoundTripFaultTest, self).__init__(test_context) - self.zk = ZookeeperService(test_context, num_nodes=3) if quorum.for_test(test_context) == quorum.zk else None - self.kafka = KafkaService(test_context, num_nodes=4, zk=self.zk) + self.kafka = KafkaService(test_context, num_nodes=4, zk=None) self.workload_service = RoundTripWorkloadService(test_context, self.kafka) - if quorum.for_test(test_context) == quorum.zk: - trogdor_client_services = [self.zk, self.kafka, self.workload_service] - elif quorum.for_test(test_context) == quorum.isolated_kraft: + if quorum.for_test(test_context) == quorum.isolated_kraft: trogdor_client_services = [self.kafka.controller_quorum, self.kafka, self.workload_service] else: #co-located case, which we currently don't test but handle here for completeness in case we do test it trogdor_client_services = [self.kafka, self.workload_service] @@ -56,34 +53,28 @@ def __init__(self, test_context): active_topics=active_topics) def setUp(self): - if self.zk: - self.zk.start() self.kafka.start() self.trogdor.start() def teardown(self): self.trogdor.stop() self.kafka.stop() - if self.zk: - self.zk.stop() def remote_quorum_nodes(self): - if quorum.for_test(self.test_context) == quorum.zk: - return self.zk.nodes - elif quorum.for_test(self.test_context) == quorum.isolated_kraft: + if quorum.for_test(self.test_context) == quorum.isolated_kraft: return self.kafka.controller_quorum.nodes else: # co-located case, which we currently don't test but handle here for completeness in case we do test it return [] @cluster(num_nodes=9) @matrix(metadata_quorum=quorum.all_non_upgrade) - def test_round_trip_workload(self, metadata_quorum=quorum.zk): + def test_round_trip_workload(self, metadata_quorum): workload1 = self.trogdor.create_task("workload1", self.round_trip_spec) workload1.wait_for_done(timeout_sec=600) @cluster(num_nodes=9) @matrix(metadata_quorum=quorum.all_non_upgrade) - def test_round_trip_workload_with_broker_partition(self, metadata_quorum=quorum.zk): + def test_round_trip_workload_with_broker_partition(self, metadata_quorum): workload1 = self.trogdor.create_task("workload1", self.round_trip_spec) time.sleep(2) part1 = [self.kafka.nodes[0]] @@ -97,7 +88,7 @@ def test_round_trip_workload_with_broker_partition(self, metadata_quorum=quorum. @cluster(num_nodes=9) @matrix(metadata_quorum=quorum.all_non_upgrade) - def test_produce_consume_with_broker_pause(self, metadata_quorum=quorum.zk): + def test_produce_consume_with_broker_pause(self, metadata_quorum): workload1 = self.trogdor.create_task("workload1", self.round_trip_spec) time.sleep(2) stop1_spec = ProcessStopFaultSpec(0, TaskSpec.MAX_DURATION_MS, [self.kafka.nodes[0]], @@ -110,7 +101,7 @@ def test_produce_consume_with_broker_pause(self, metadata_quorum=quorum.zk): @cluster(num_nodes=9) @matrix(metadata_quorum=quorum.all_non_upgrade) - def test_produce_consume_with_client_partition(self, metadata_quorum=quorum.zk): + def test_produce_consume_with_client_partition(self, metadata_quorum): workload1 = self.trogdor.create_task("workload1", self.round_trip_spec) time.sleep(2) part1 = [self.workload_service.nodes[0]] @@ -123,7 +114,7 @@ def test_produce_consume_with_client_partition(self, metadata_quorum=quorum.zk): @cluster(num_nodes=9) @matrix(metadata_quorum=quorum.all_non_upgrade) - def test_produce_consume_with_latency(self, metadata_quorum=quorum.zk): + def test_produce_consume_with_latency(self, metadata_quorum): workload1 = self.trogdor.create_task("workload1", self.round_trip_spec) time.sleep(2) spec = DegradedNetworkFaultSpec(0, 60000) diff --git a/tests/kafkatest/tests/core/security_rolling_upgrade_test.py b/tests/kafkatest/tests/core/security_rolling_upgrade_test.py index a6e7ca53cb956..96b2ba55194bf 100644 --- a/tests/kafkatest/tests/core/security_rolling_upgrade_test.py +++ b/tests/kafkatest/tests/core/security_rolling_upgrade_test.py @@ -13,13 +13,13 @@ # See the License for the specific language governing permissions and # limitations under the License. from kafkatest.services.security.security_config import SecurityConfig -from kafkatest.services.zookeeper import ZookeeperService -from kafkatest.services.kafka import KafkaService +from kafkatest.services.kafka import KafkaService, quorum from kafkatest.services.verifiable_producer import VerifiableProducer from kafkatest.services.console_consumer import ConsoleConsumer from kafkatest.utils import is_int from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest from ducktape.mark import matrix +from ducktape.mark import parametrize from ducktape.mark.resource import cluster from kafkatest.services.security.kafka_acls import ACLs import time @@ -39,12 +39,11 @@ def setUp(self): self.producer_throughput = 100 self.num_producers = 1 self.num_consumers = 1 - self.zk = ZookeeperService(self.test_context, num_nodes=1) - self.kafka = KafkaService(self.test_context, num_nodes=3, zk=self.zk, topics={self.topic: { + self.kafka = KafkaService(self.test_context, num_nodes=3, zk=None, topics={self.topic: { "partitions": 3, "replication-factor": 3, - 'configs': {"min.insync.replicas": 2}}}) - self.zk.start() + 'configs': {"min.insync.replicas": 2}}}, + controller_num_nodes_override=1) def create_producer_and_consumer(self): self.producer = VerifiableProducer( @@ -69,7 +68,6 @@ def open_secured_port(self, client_protocol): def add_sasl_mechanism(self, new_client_sasl_mechanism): self.kafka.client_sasl_mechanism = new_client_sasl_mechanism - self.kafka.start_minikdc_if_necessary() self.bounce() def add_separate_broker_listener(self, broker_security_protocol, broker_sasl_mechanism): @@ -99,10 +97,10 @@ def remove_separate_broker_listener(self, client_security_protocol): self.bounce() @cluster(num_nodes=8) - @matrix(client_protocol=[SecurityConfig.SSL]) + @matrix(client_protocol=[SecurityConfig.SSL], metadata_quorum=[quorum.isolated_kraft]) @cluster(num_nodes=9) - @matrix(client_protocol=[SecurityConfig.SASL_PLAINTEXT, SecurityConfig.SASL_SSL]) - def test_rolling_upgrade_phase_one(self, client_protocol): + @matrix(client_protocol=[SecurityConfig.SASL_PLAINTEXT, SecurityConfig.SASL_SSL], metadata_quorum=[quorum.isolated_kraft]) + def test_rolling_upgrade_phase_one(self, client_protocol, metadata_quorum): """ Start with a PLAINTEXT cluster, open a SECURED port, via a rolling upgrade, ensuring we could produce and consume throughout over PLAINTEXT. Finally check we can produce and consume the new secured port. @@ -123,8 +121,8 @@ def test_rolling_upgrade_phase_one(self, client_protocol): self.run_produce_consume_validate(lambda: time.sleep(1)) @cluster(num_nodes=9) - @matrix(new_client_sasl_mechanism=[SecurityConfig.SASL_MECHANISM_PLAIN]) - def test_rolling_upgrade_sasl_mechanism_phase_one(self, new_client_sasl_mechanism): + @matrix(new_client_sasl_mechanism=[SecurityConfig.SASL_MECHANISM_PLAIN], metadata_quorum=[quorum.isolated_kraft]) + def test_rolling_upgrade_sasl_mechanism_phase_one(self, new_client_sasl_mechanism, metadata_quorum): """ Start with a SASL/GSSAPI cluster, add new SASL mechanism, via a rolling upgrade, ensuring we could produce and consume throughout over SASL/GSSAPI. Finally check we can produce and consume using new mechanism. @@ -147,7 +145,8 @@ def test_rolling_upgrade_sasl_mechanism_phase_one(self, new_client_sasl_mechanis self.run_produce_consume_validate(lambda: time.sleep(1)) @cluster(num_nodes=9) - def test_enable_separate_interbroker_listener(self): + @parametrize(metadata_quorum=quorum.isolated_kraft) + def test_enable_separate_interbroker_listener(self, metadata_quorum): """ Start with a cluster that has a single PLAINTEXT listener. Start producing/consuming on PLAINTEXT port. @@ -164,7 +163,8 @@ def test_enable_separate_interbroker_listener(self): SecurityConfig.SASL_MECHANISM_PLAIN) @cluster(num_nodes=9) - def test_disable_separate_interbroker_listener(self): + @parametrize(metadata_quorum=quorum.isolated_kraft) + def test_disable_separate_interbroker_listener(self, metadata_quorum): """ Start with a cluster that has two listeners, one on SSL (clients), another on SASL_SSL (broker-to-broker). Start producer and consumer on SSL listener. diff --git a/tests/kafkatest/tests/core/security_test.py b/tests/kafkatest/tests/core/security_test.py index 33f3c72d7518e..a92b1ad6b3e3c 100644 --- a/tests/kafkatest/tests/core/security_test.py +++ b/tests/kafkatest/tests/core/security_test.py @@ -84,7 +84,7 @@ def producer_consumer_have_expected_error(self, error): use_new_coordinator=[True], group_protocol=consumer_group.all_group_protocols ) - def test_client_ssl_endpoint_validation_failure(self, security_protocol, interbroker_security_protocol, metadata_quorum=quorum.zk, use_new_coordinator=False, group_protocol=None): + def test_client_ssl_endpoint_validation_failure(self, security_protocol, interbroker_security_protocol, metadata_quorum, use_new_coordinator=False, group_protocol=None): """ Test that invalid hostname in certificate results in connection failures. When security_protocol=SSL, client SSL handshakes are expected to fail due to hostname verification failure. @@ -99,10 +99,6 @@ def test_client_ssl_endpoint_validation_failure(self, security_protocol, interbr SecurityConfig.ssl_stores = TestSslStores(self.test_context.local_scratch_dir, valid_hostname=True) - self.create_zookeeper_if_necessary() - if self.zk: - self.zk.start() - self.create_kafka(security_protocol=security_protocol, interbroker_security_protocol=interbroker_security_protocol) if self.kafka.quorum_info.using_kraft and interbroker_security_protocol == 'SSL': @@ -165,26 +161,17 @@ def create_and_start_clients(self, log_level, group_protocol): use_new_coordinator=[True], group_protocol=consumer_group.all_group_protocols ) - def test_quorum_ssl_endpoint_validation_failure(self, metadata_quorum=quorum.zk, use_new_coordinator=False, group_protocol=None): + def test_quorum_ssl_endpoint_validation_failure(self, metadata_quorum, use_new_coordinator=False, group_protocol=None): """ Test that invalid hostname in ZooKeeper or KRaft Controller results in broker inability to start. """ - # Start ZooKeeper/KRaft Controller with valid hostnames in the certs' SANs + # Start KRaft Controller with valid hostnames in the certs' SANs # so that we can start Kafka SecurityConfig.ssl_stores = TestSslStores(self.test_context.local_scratch_dir, valid_hostname=True) - self.create_zookeeper_if_necessary(num_nodes=1, - zk_client_port = False, - zk_client_secure_port = True, - zk_tls_encrypt_only = True, - ) - if self.zk: - self.zk.start() - self.create_kafka(num_nodes=1, interbroker_security_protocol='SSL', # also sets the broker-to-kraft-controller security protocol for the KRaft case - zk_client_secure=True, # ignored if we aren't using ZooKeeper ) self.kafka.start() @@ -194,10 +181,7 @@ def test_quorum_ssl_endpoint_validation_failure(self, metadata_quorum=quorum.zk, self.kafka.stop_node(self.kafka.nodes[0]) SecurityConfig.ssl_stores.valid_hostname = False - if quorum.for_test(self.test_context) == quorum.zk: - self.kafka.zk.restart_cluster() - else: - self.kafka.isolated_controller_quorum.restart_cluster() + self.kafka.isolated_controller_quorum.restart_cluster() try: self.kafka.start_node(self.kafka.nodes[0], timeout_sec=30) diff --git a/tests/kafkatest/tests/core/throttling_test.py b/tests/kafkatest/tests/core/throttling_test.py index 1a0c1c6ec0940..e53dd46dbb65e 100644 --- a/tests/kafkatest/tests/core/throttling_test.py +++ b/tests/kafkatest/tests/core/throttling_test.py @@ -20,8 +20,7 @@ from ducktape.utils.util import wait_until from kafkatest.services.performance import ProducerPerformanceService -from kafkatest.services.zookeeper import ZookeeperService -from kafkatest.services.kafka import KafkaService +from kafkatest.services.kafka import KafkaService, quorum from kafkatest.services.console_consumer import ConsoleConsumer from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest from kafkatest.services.verifiable_producer import VerifiableProducer @@ -46,7 +45,6 @@ def __init__(self, test_context): super(ThrottlingTest, self).__init__(test_context=test_context) self.topic = "test_topic" - self.zk = ZookeeperService(test_context, num_nodes=1) # Because we are starting the producer/consumer/validate cycle _after_ # seeding the cluster with big data (to test throttling), we need to # Start the consumer from the end of the stream. further, we need to @@ -58,7 +56,7 @@ def __init__(self, test_context): self.num_partitions = 3 self.kafka = KafkaService(test_context, num_nodes=self.num_brokers, - zk=self.zk, + zk=None, topics={ self.topic: { "partitions": self.num_partitions, @@ -67,7 +65,8 @@ def __init__(self, test_context): "segment.bytes": 64 * 1024 * 1024 } } - }) + }, + controller_num_nodes_override=1) self.producer_throughput = 1000 self.timeout_sec = 400 self.num_records = 2000 @@ -78,9 +77,6 @@ def __init__(self, test_context): self.num_consumers = 1 self.throttle = 4 * 1024 * 1024 # 4 MB/s - def setUp(self): - self.zk.start() - def min_cluster_size(self): # Override this since we're adding services outside of the constructor return super(ThrottlingTest, self).min_cluster_size() +\ @@ -139,9 +135,9 @@ def reassign_partitions(self, bounce_brokers, throttle): time_taken)) @cluster(num_nodes=10) - @parametrize(bounce_brokers=True) - @parametrize(bounce_brokers=False) - def test_throttled_reassignment(self, bounce_brokers): + @parametrize(bounce_brokers=True, metadata_quorum=quorum.isolated_kraft) + @parametrize(bounce_brokers=False, metadata_quorum=quorum.isolated_kraft) + def test_throttled_reassignment(self, bounce_brokers, metadata_quorum): security_protocol = 'PLAINTEXT' self.kafka.security_protocol = security_protocol self.kafka.interbroker_security_protocol = security_protocol diff --git a/tests/kafkatest/tests/core/transactions_test.py b/tests/kafkatest/tests/core/transactions_test.py index 11cd1c24ae334..5fbd987a97c66 100644 --- a/tests/kafkatest/tests/core/transactions_test.py +++ b/tests/kafkatest/tests/core/transactions_test.py @@ -13,7 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -from kafkatest.services.zookeeper import ZookeeperService from kafkatest.services.kafka import KafkaService, quorum, consumer_group from kafkatest.services.console_consumer import ConsoleConsumer from kafkatest.services.verifiable_producer import VerifiableProducer @@ -59,16 +58,11 @@ def __init__(self, test_context): self.progress_timeout_sec = 60 self.consumer_group = "transactions-test-consumer-group" - self.zk = ZookeeperService(test_context, num_nodes=1) if quorum.for_test(test_context) == quorum.zk else None self.kafka = KafkaService(test_context, num_nodes=self.num_brokers, - zk=self.zk, + zk=None, controller_num_nodes_override=1) - def setUp(self): - if self.zk: - self.zk.start() - def seed_messages(self, topic, num_seed_messages): seed_timeout_sec = 10000 seed_producer = VerifiableProducer(context=self.test_context, @@ -96,16 +90,11 @@ def bounce_brokers(self, clean_shutdown): else: self.kafka.stop_node(node, clean_shutdown = False) gracePeriodSecs = 5 - if self.zk: - wait_until(lambda: not self.kafka.pids(node) and not self.kafka.is_registered(node), - timeout_sec=self.kafka.zk_session_timeout + gracePeriodSecs, - err_msg="Failed to see timely deregistration of hard-killed broker %s" % str(node.account)) - else: - brokerSessionTimeoutSecs = 18 - wait_until(lambda: not self.kafka.pids(node), - timeout_sec=brokerSessionTimeoutSecs + gracePeriodSecs, - err_msg="Failed to see timely disappearance of process for hard-killed broker %s" % str(node.account)) - time.sleep(brokerSessionTimeoutSecs + gracePeriodSecs) + brokerSessionTimeoutSecs = 18 + wait_until(lambda: not self.kafka.pids(node), + timeout_sec=brokerSessionTimeoutSecs + gracePeriodSecs, + err_msg="Failed to see timely disappearance of process for hard-killed broker %s" % str(node.account)) + time.sleep(brokerSessionTimeoutSecs + gracePeriodSecs) self.kafka.start_node(node) self.kafka.await_no_under_replicated_partitions() @@ -234,7 +223,7 @@ def setup_topics(self): use_new_coordinator=[True], group_protocol=consumer_group.all_group_protocols ) - def test_transactions(self, failure_mode, bounce_target, check_order, use_group_metadata, metadata_quorum=quorum.zk, use_new_coordinator=False, group_protocol=None): + def test_transactions(self, failure_mode, bounce_target, check_order, use_group_metadata, metadata_quorum, use_new_coordinator=False, group_protocol=None): security_protocol = 'PLAINTEXT' self.kafka.security_protocol = security_protocol self.kafka.interbroker_security_protocol = security_protocol diff --git a/tests/kafkatest/tests/end_to_end.py b/tests/kafkatest/tests/end_to_end.py index 3b9cf9f8c3c6d..ed2ecca87fdd3 100644 --- a/tests/kafkatest/tests/end_to_end.py +++ b/tests/kafkatest/tests/end_to_end.py @@ -36,6 +36,7 @@ class EndToEndTest(Test): def __init__(self, test_context, topic="test_topic", topic_config=DEFAULT_TOPIC_CONFIG): super(EndToEndTest, self).__init__(test_context=test_context) + self.zk = None self.topic = topic self.topic_config = topic_config self.records_consumed = []