Skip to content

Commit

Permalink
KAFKA-17609:[3/4]Convert system tests to kraft part 3 (#17327)
Browse files Browse the repository at this point in the history
Part 3 of 4 converting streams system tests to KRaft

Reviewers: Matthias Sax <[email protected]>
  • Loading branch information
bbejeck authored Oct 30, 2024
1 parent 3d2edf8 commit 358d877
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 16 deletions.
16 changes: 3 additions & 13 deletions tests/kafkatest/tests/streams/streams_optimized_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
from kafkatest.services.streams import StreamsOptimizedUpgradeTestService
from kafkatest.services.streams import StreamsResetter
from kafkatest.services.verifiable_producer import VerifiableProducer
from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.tests.streams.utils import stop_processors

class StreamsOptimizedTest(Test):
Expand All @@ -46,13 +45,8 @@ def __init__(self, test_context):
self.join_topic: {'partitions': 6}
}

self.zookeeper = (
ZookeeperService(self.test_context, 1)
if quorum.for_test(self.test_context) == quorum.zk
else None
)
self.kafka = KafkaService(self.test_context, num_nodes=3,
zk=self.zookeeper, topics=self.topics, controller_num_nodes_override=1)
self.kafka = KafkaService(self.test_context, num_nodes=3, controller_num_nodes_override=1,
zk=None, topics=self.topics)

self.producer = VerifiableProducer(self.test_context,
1,
Expand All @@ -62,10 +56,8 @@ def __init__(self, test_context):
acks=1)

@cluster(num_nodes=9)
@matrix(metadata_quorum=[quorum.isolated_kraft])
@matrix(metadata_quorum=[quorum.combined_kraft])
def test_upgrade_optimized_topology(self, metadata_quorum):
if self.zookeeper:
self.zookeeper.start()
self.kafka.start()

processor1 = StreamsOptimizedUpgradeTestService(self.test_context, self.kafka)
Expand Down Expand Up @@ -111,8 +103,6 @@ def test_upgrade_optimized_topology(self, metadata_quorum):
self.logger.info("teardown")
self.producer.stop()
self.kafka.stop()
if self.zookeeper:
self.zookeeper.stop()

def reset_application(self):
resetter = StreamsResetter(self.test_context, self.kafka, topic = self.input_topic, applicationId = 'StreamsOptimizedTest')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,10 @@ def __init__(self, test_context):

@cluster(num_nodes=8)
@matrix(crash=[False, True],
processing_guarantee=['exactly_once', 'exactly_once_v2'],
metadata_quorum=[quorum.isolated_kraft])
def test_streams(self, crash, processing_guarantee, metadata_quorum):
metadata_quorum=[quorum.combined_kraft])
def test_streams(self, crash, metadata_quorum):
driver = StreamsRelationalSmokeTestService(self.test_context, self.kafka, "driver", "ignored", "ignored")
processing_guarantee='exactly_once_v2'

LOG_FILE = driver.LOG_FILE # this is the same for all instances of the service, so we can just declare a "constant"

Expand Down

0 comments on commit 358d877

Please sign in to comment.