Skip to content

Commit

Permalink
MINOR: Backport two system test changes for Connect to give more star…
Browse files Browse the repository at this point in the history
…tup control (apache#6638)

This merge consists of two commits previously merged into later branches.
Author: Cyrus Vafadari <[email protected]>
Reviewers: Randall Hauch <[email protected]>

Commit #1:
MINOR: Add async and different sync startup modes in connect service test class

Allow Connect Service in system tests to start asynchronously.

Specifically, allow for three startup conditions:
1. No condition - start async and return immediately.
2. Semi-async - start immediately after plugins have been discovered successfully.
3. Sync - start returns after the worker has completed startup. This is the current mode, but its condition is improved by checking that the port of Connect's REST interface is open, rather than that a log line has appeared in the logs.

Author: Konstantine Karantasis <[email protected]>
Reviewers: Randall Hauch <[email protected]>, Ewen Cheslack-Postava <[email protected]>
Closes apache#4423 from kkonstantine/MINOR-Add-async-and-different-sync-startup-modes-in-ConnectService-test-class

Commit apache#2:
MINOR: Modify Connect service's startup timeout to be passed via the init (apache#5882)

Currently, the startup timeout is hardcoded to be 60 seconds in Connect's test service. Modifying it to be passable via init.

Author: Magesh Nandakumar <[email protected]>
Reviewers: Randall Hauch <[email protected]>, Jason Gustafson <[email protected]>
  • Loading branch information
cyrusv authored and rhauch committed Apr 26, 2019
1 parent 2ffb62f commit f763aa7
Showing 1 changed file with 68 additions and 18 deletions.
86 changes: 68 additions & 18 deletions tests/kafkatest/services/connect.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import time

import requests
from ducktape.cluster.remoteaccount import RemoteCommandError
from ducktape.errors import DucktapeError
from ducktape.services.service import Service
from ducktape.utils.util import wait_until
Expand All @@ -39,6 +40,15 @@ class ConnectServiceBase(KafkaPathResolverMixin, Service):
STDERR_FILE = os.path.join(PERSISTENT_ROOT, "connect.stderr")
LOG4J_CONFIG_FILE = os.path.join(PERSISTENT_ROOT, "connect-log4j.properties")
PID_FILE = os.path.join(PERSISTENT_ROOT, "connect.pid")
CONNECT_REST_PORT = 8083

# Currently the Connect worker supports waiting on three modes:
STARTUP_MODE_INSTANT = 'INSTANT'
"""STARTUP_MODE_INSTANT: Start Connect worker and return immediately"""
STARTUP_MODE_LOAD = 'LOAD'
"""STARTUP_MODE_LOAD: Start Connect worker and return after discovering and loading plugins"""
STARTUP_MODE_LISTEN = 'LISTEN'
"""STARTUP_MODE_LISTEN: Start Connect worker and return after opening the REST port."""

logs = {
"connect_log": {
Expand All @@ -52,11 +62,13 @@ class ConnectServiceBase(KafkaPathResolverMixin, Service):
"collect_default": True},
}

def __init__(self, context, num_nodes, kafka, files):
def __init__(self, context, num_nodes, kafka, files, startup_timeout_sec = 60):
super(ConnectServiceBase, self).__init__(context, num_nodes)
self.kafka = kafka
self.security_config = kafka.security_config.client_config()
self.files = files
self.startup_mode = self.STARTUP_MODE_LISTEN
self.startup_timeout_sec = startup_timeout_sec
self.environment = {}

def pids(self, node):
Expand All @@ -76,6 +88,38 @@ def set_configs(self, config_template_func, connector_config_templates=None):
self.config_template_func = config_template_func
self.connector_config_templates = connector_config_templates

def listening(self, node):
try:
cmd = "nc -z %s %s" % (node.account.hostname, self.CONNECT_REST_PORT)
node.account.ssh_output(cmd, allow_fail=False)
self.logger.debug("Connect worker started accepting connections at: '%s:%s')", node.account.hostname,
self.CONNECT_REST_PORT)
return True
except (RemoteCommandError, ValueError) as e:
return False

def start(self, mode=STARTUP_MODE_LISTEN):
self.startup_mode = mode
super(ConnectServiceBase, self).start()

def start_and_return_immediately(self, node, worker_type, remote_connector_configs):
cmd = self.start_cmd(node, remote_connector_configs)
self.logger.debug("Connect %s command: %s", worker_type, cmd)
node.account.ssh(cmd)

def start_and_wait_to_load_plugins(self, node, worker_type, remote_connector_configs):
with node.account.monitor_log(self.LOG_FILE) as monitor:
self.start_and_return_immediately(node, worker_type, remote_connector_configs)
monitor.wait_until('Kafka version', timeout_sec=self.startup_timeout_sec,
err_msg="Never saw message indicating Kafka Connect finished startup on node: " +
"%s in condition mode: %s" % (str(node.account), self.startup_mode))

def start_and_wait_to_start_listening(self, node, worker_type, remote_connector_configs):
self.start_and_return_immediately(node, worker_type, remote_connector_configs)
wait_until(lambda: self.listening(node), timeout_sec=self.startup_timeout_sec,
err_msg="Kafka Connect failed to start on node: %s in condition mode: %s" %
(str(node.account), self.startup_mode))

def stop_node(self, node, clean_shutdown=True):
self.logger.info((clean_shutdown and "Cleanly" or "Forcibly") + " stopping Kafka Connect on " + str(node.account))
pids = self.pids(node)
Expand All @@ -85,7 +129,8 @@ def stop_node(self, node, clean_shutdown=True):
node.account.signal(pid, sig, allow_fail=True)
if clean_shutdown:
for pid in pids:
wait_until(lambda: not node.account.alive(pid), timeout_sec=60, err_msg="Kafka Connect process on " + str(node.account) + " took too long to exit")
wait_until(lambda: not node.account.alive(pid), timeout_sec=self.startup_timeout_sec, err_msg="Kafka Connect process on " + str(
node.account) + " took too long to exit")

node.account.ssh("rm -f " + self.PID_FILE, allow_fail=False)

Expand Down Expand Up @@ -192,14 +237,14 @@ def _rest_with_retry(self, path, body=None, node=None, method="GET", retries=40,
raise exception_to_throw

def _base_url(self, node):
return 'http://' + node.account.externally_routable_ip + ':' + '8083'
return 'http://' + node.account.externally_routable_ip + ':' + str(self.CONNECT_REST_PORT)


class ConnectStandaloneService(ConnectServiceBase):
"""Runs Kafka Connect in standalone mode."""

def __init__(self, context, kafka, files):
super(ConnectStandaloneService, self).__init__(context, 1, kafka, files)
def __init__(self, context, kafka, files, startup_timeout_sec = 60):
super(ConnectStandaloneService, self).__init__(context, 1, kafka, files, startup_timeout_sec)

# For convenience since this service only makes sense with a single node
@property
Expand Down Expand Up @@ -229,11 +274,13 @@ def start_node(self, node):
remote_connector_configs.append(target_file)

self.logger.info("Starting Kafka Connect standalone process on " + str(node.account))
with node.account.monitor_log(self.LOG_FILE) as monitor:
cmd = self.start_cmd(node, remote_connector_configs)
self.logger.debug("Connect standalone command: %s", cmd)
node.account.ssh(cmd)
monitor.wait_until('Kafka Connect started', timeout_sec=60, err_msg="Never saw message indicating Kafka Connect finished startup on " + str(node.account))
if self.startup_mode == self.STARTUP_MODE_LOAD:
self.start_and_wait_to_load_plugins(node, 'standalone', remote_connector_configs)
elif self.startup_mode == self.STARTUP_MODE_INSTANT:
self.start_and_return_immediately(node, 'standalone', remote_connector_configs)
else:
# The default mode is to wait until the complete startup of the worker
self.start_and_wait_to_start_listening(node, 'standalone', remote_connector_configs)

if len(self.pids(node)) == 0:
raise RuntimeError("No process ids recorded")
Expand All @@ -243,13 +290,14 @@ class ConnectDistributedService(ConnectServiceBase):
"""Runs Kafka Connect in distributed mode."""

def __init__(self, context, num_nodes, kafka, files, offsets_topic="connect-offsets",
configs_topic="connect-configs", status_topic="connect-status"):
super(ConnectDistributedService, self).__init__(context, num_nodes, kafka, files)
configs_topic="connect-configs", status_topic="connect-status", startup_timeout_sec = 60):
super(ConnectDistributedService, self).__init__(context, num_nodes, kafka, files, startup_timeout_sec)
self.offsets_topic = offsets_topic
self.configs_topic = configs_topic
self.status_topic = status_topic

def start_cmd(self, node):
# connector_configs argument is intentionally ignored in distributed service.
def start_cmd(self, node, connector_configs):
cmd = "( export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\"; " % self.LOG4J_CONFIG_FILE
cmd += "export KAFKA_OPTS=%s; " % self.security_config.kafka_opts
for envvar in self.environment:
Expand All @@ -268,11 +316,13 @@ def start_node(self, node):
raise DucktapeError("Config files are not valid in distributed mode, submit connectors via the REST API")

self.logger.info("Starting Kafka Connect distributed process on " + str(node.account))
with node.account.monitor_log(self.LOG_FILE) as monitor:
cmd = self.start_cmd(node)
self.logger.debug("Connect distributed command: %s", cmd)
node.account.ssh(cmd)
monitor.wait_until('Kafka Connect started', timeout_sec=60, err_msg="Never saw message indicating Kafka Connect finished startup on " + str(node.account))
if self.startup_mode == self.STARTUP_MODE_LOAD:
self.start_and_wait_to_load_plugins(node, 'distributed', '')
elif self.startup_mode == self.STARTUP_MODE_INSTANT:
self.start_and_return_immediately(node, 'distributed', '')
else:
# The default mode is to wait until the complete startup of the worker
self.start_and_wait_to_start_listening(node, 'distributed', '')

if len(self.pids(node)) == 0:
raise RuntimeError("No process ids recorded")
Expand Down

0 comments on commit f763aa7

Please sign in to comment.