From 2db48b7cbe377a4ed1addf3edf463e38f5997240 Mon Sep 17 00:00:00 2001 From: Jordan Moore <1930631+OneCricketeer@users.noreply.github.com> Date: Mon, 1 May 2023 15:07:31 -0500 Subject: [PATCH] remove zookeeper (#28) * remove zookeeper * fix: duplicate const in test case * prefer 127.0.0.1 for explicit host connections * remove cluster id extrapolation --- README.md | 33 +- bitnami-kraft-libkafka.sh | 986 ++++++++++++++++++ docker-compose.cluster.yml | 96 +- docker-compose.yml | 49 +- .../cli/ConnectDistributedWrapperTest.java | 6 +- 5 files changed, 1095 insertions(+), 75 deletions(-) create mode 100755 bitnami-kraft-libkafka.sh diff --git a/README.md b/README.md index e14c020..f08f78d 100644 --- a/README.md +++ b/README.md @@ -79,18 +79,19 @@ $ DOCKER_REGISTRY= DOCKER_USER=$(whoami) \ ## Tutorial -The following tutorial for using Jib to package `ConnectDistributed` for Kafka Connect will require installation of `docker-compose`, and uses the [Bitnami](https://github.com/bitnami/bitnami-docker-kafka) Kafka+Zookeeper images, however any other Kafka or ZooKeeper Docker images should work. +The following tutorial uses Jib to package `ConnectDistributed` class for running Kafka Connect Distributed mode workers. The following instructions use the [Bitnami](https://github.com/bitnami/bitnami-docker-kafka) Kafka images, however any other Kafka Docker images should work. This tutorial will roughly follow the same steps as the [tutorial for Connect on Kafka's site](https://kafka.apache.org/documentation/#quickstart_kafkaconnect), except using the Distributed Connect server instead. ### Without Docker -If not using Docker, Kafka and ZooKeeper can be started locally using their respective start scripts. If this is done, though, the the variables for the bootstrap servers will need to be adjusted accordingly. +If not using Docker, Kafka (and ZooKeeper, if not using Kraft) can be started locally using their respective start scripts. If this is done, though, the variables for the bootstrap servers will need to be adjusted accordingly. The following steps can be used to run this application locally outside of Docker. ```bash -export CONNECT_BOOTSTRAP_SERVERS=localhost:9092 # Assumes Kafka default port +# Assumes Kafka default port +export CONNECT_BOOTSTRAP_SERVERS=127.0.0.1:9092 export CONNECT_GROUP_ID=cg_connect-idea export CONNECT_CONFIG_STORAGE_TOPIC=connect-jib_config @@ -105,22 +106,22 @@ export CONNECT_STATUS_STORAGE_REPLICATION_FACTOR=1 export CONNECT_KEY_CONVERTER=org.apache.kafka.connect.converters.ByteArrayConverter export CONNECT_VALUE_CONVERTER=org.apache.kafka.connect.converters.ByteArrayConverter -# Runs ConnectDistrbuted via Maven +# Runs ConnectDistributed via Maven ./mvnw clean exec:java ``` ### Start Kafka Cluster in Docker -> ***Note***: Sometimes the Kafka container kills itself in below steps, and the consumer commands therefore may need to be re-executed. The Streams Application should reconnect on its own. +> ***Note***: Sometimes the Kafka container kills itself in below steps, and the consumer commands therefore may need to be re-executed. The Connect worker should reconnect on its own. -For this exercise, we will be using three separate termainal windows, so go ahead and open those. +For this exercise, we will be using three separate terminal windows, so go ahead and open those. -First, we start with getting our cluster running in the foreground. This starts Kafka listening on `9092` on the host, and `29092` within the Docker network. Zookeeper is available on `2181`. +First, we start with getting our cluster running in the foreground. This starts Kafka listening on `9092` on the host, and `29092` within the Docker network. > *Terminal 1* ```bash -docker compose up zookeeper kafka +docker compose up kafka ``` ### Create Kafka Topics @@ -141,6 +142,8 @@ docker compose exec kafka \ bash -c "kafka-topics.sh --list --bootstrap-server kafka:29092" ``` +Should include `input` topic in the list. + ### Produce Lorem Ipsum into input topic ```bash @@ -159,15 +162,15 @@ Should see last line `Processed a total of 9 messages`. ### Start Kafka Connect -Now, we can start Kafka Connect to read from the beginning of the input topic that had data sent into it, and begin processing it. Here, we build the Alpine variant of the container, as it renders a smaller container. +Now, we can build the Kafka Connect image and start it. ```bash -./mvnw clean install -Palpine +./mvnw clean install docker compose up connect-jib-1 ``` -Wait for log-line `Kafka Connect Started`, then post the FileSink Connector, which when not provided a `file`, will output the stdout of the container (Terminal 1). +Wait for log-line `Kafka Connect Started`, then post the FileSink Connector. When not provided a `file`, the connector tasks will write data to the stdout of the container (Terminal 1). > *Terminal 3* @@ -188,6 +191,8 @@ curl -XPUT http://localhost:8083/connectors/console-sink/config -H 'Content-Type }' ``` +This will read from the beginning of the `input` topic that had data sent into it, and begin processing it. + In the output of _Terminal 2_, you should see something similar to the following. ```text @@ -204,7 +209,7 @@ To repeat that process, we delete the connector and reset the consumer group. curl -XDELETE http://localhost:8083/connectors/console-sink docker compose exec kafka \ - bash -c "kafka-consumer-groups.sh --bootstrap-server kafka:9092 --group connect-console-sink --reset-offsets --all-topics --to-earliest --execute" + bash -c "kafka-consumer-groups.sh --bootstrap-server kafka:29092 --group connect-console-sink --reset-offsets --all-topics --to-earliest --execute" ``` Re-run above console-producer and `curl -XPUT ...` command, but this time, there will be more than 9 total messages printed. @@ -213,7 +218,7 @@ Re-run above console-producer and `curl -XPUT ...` command, but this time, there ### Scaling up -Redo the tutorial with more input data and partitions and increase `max.tasks` of the connector. Notice that the `partition` field in the output may change (you may need to produce data multiple times to randomize the record batches). +Redo the tutorial with a new topic having more than one partition. Produce more input data to it, then increase `max.tasks` of the connector. Notice that the `partition` field in the output may change (you may need to produce data multiple times to randomize the record batches). ### Scaling out @@ -230,7 +235,7 @@ connect-jib-2: CONNECT_REST_ADVERTISED_HOST_NAME: connect-jib-2 ``` -See [`docker-compose.cluster.yml`](./docker-compose.cluster.yml). It can be ran via `docker compose -f docker-compose.cluster.yml up`. +A reverse proxy should be added in front of all instances. See an example using Træfik in [`docker-compose.cluster.yml`](./docker-compose.cluster.yml). It can be ran via `docker compose -f docker-compose.cluster.yml up` and tested with `curl -H Host:connect-jib.docker.localhost http://127.0.0.1/`. ## Extending with new Connectors diff --git a/bitnami-kraft-libkafka.sh b/bitnami-kraft-libkafka.sh new file mode 100755 index 0000000..f44bbbf --- /dev/null +++ b/bitnami-kraft-libkafka.sh @@ -0,0 +1,986 @@ +#!/bin/bash +# +# Bitnami Kafka library + +# shellcheck disable=SC1090,SC1091 + +# Load Generic Libraries +. /opt/bitnami/scripts/libfile.sh +. /opt/bitnami/scripts/libfs.sh +. /opt/bitnami/scripts/liblog.sh +. /opt/bitnami/scripts/libos.sh +. /opt/bitnami/scripts/libvalidations.sh +. /opt/bitnami/scripts/libservice.sh + +# Functions + +######################## +# Set a configuration setting value to a file +# Globals: +# None +# Arguments: +# $1 - file +# $2 - key +# $3 - values (array) +# Returns: +# None +######################### +kafka_common_conf_set() { + local file="${1:?missing file}" + local key="${2:?missing key}" + shift + shift + local values=("$@") + + if [[ "${#values[@]}" -eq 0 ]]; then + stderr_print "missing value" + return 1 + elif [[ "${#values[@]}" -ne 1 ]]; then + for i in "${!values[@]}"; do + kafka_common_conf_set "$file" "${key[$i]}" "${values[$i]}" + done + else + value="${values[0]}" + # Check if the value was set before + if grep -q "^[#\\s]*$key\s*=.*" "$file"; then + # Update the existing key + replace_in_file "$file" "^[#\\s]*${key}\s*=.*" "${key}=${value}" false + else + # Add a new key + printf '\n%s=%s' "$key" "$value" >>"$file" + fi + fi +} + +######################## +# Backwards compatibility measure to configure the TLS truststore locations +# Globals: +# KAFKA_CONF_FILE +# KAFKA_ZK_CONF_FILE +# Arguments: +# None +# Returns: +# None +######################### +kafka_configure_default_truststore_locations() { + # Backwards compatibility measure to allow custom truststore locations but at the same time not disrupt + # the UX that the previous version of the containers and the helm chart have. + # Context: The chart and containers by default assumed that the truststore location was KAFKA_CERTS_DIR/kafka.truststore.jks or KAFKA_MOUNTED_CONF_DIR/certs/kafka.truststore.jks. + # Because of this, we could not use custom certificates in different locations (use case: A custom base image that already has a truststore). Changing the logic to allow custom + # locations implied major changes in the current user experience (which only required to mount certificates at the assumed location). In order to maintain this compatibility we need + # use this logic that sets the KAFKA_TLS_*_FILE variables to the previously assumed locations in case it is not set + + # Kafka truststore + if { [[ "${KAFKA_CFG_LISTENERS:-}" =~ SSL ]] || [[ "${KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP:-}" =~ SSL ]]; } && is_empty_value "$KAFKA_TLS_TRUSTSTORE_FILE"; then + local kafka_truststore_filename="kafka.truststore.jks" + [[ "$KAFKA_TLS_TYPE" = "PEM" ]] && kafka_truststore_filename="kafka.truststore.pem" + if [[ -f "${KAFKA_CERTS_DIR}/${kafka_truststore_filename}" ]]; then + # Mounted in /opt/bitnami/kafka/conf/certs + export KAFKA_TLS_TRUSTSTORE_FILE="${KAFKA_CERTS_DIR}/${kafka_truststore_filename}" + else + # Mounted in /bitnami/kafka/conf/certs + export KAFKA_TLS_TRUSTSTORE_FILE="${KAFKA_MOUNTED_CONF_DIR}/certs/${kafka_truststore_filename}" + fi + fi + # Zookeeper truststore + if [[ "${KAFKA_ZOOKEEPER_PROTOCOL:-}" =~ SSL ]] && is_empty_value "$KAFKA_ZOOKEEPER_TLS_TRUSTSTORE_FILE"; then + local zk_truststore_filename="zookeeper.truststore.jks" + [[ "$KAFKA_ZOOKEEPER_TLS_TYPE" = "PEM" ]] && zk_truststore_filename="zookeeper.truststore.pem" + if [[ -f "${KAFKA_CERTS_DIR}/${zk_truststore_filename}" ]]; then + # Mounted in /opt/bitnami/kafka/conf/certs + export KAFKA_ZOOKEEPER_TLS_TRUSTSTORE_FILE="${KAFKA_CERTS_DIR}/${zk_truststore_filename}" + else + # Mounted in /bitnami/kafka/conf/certs + export KAFKA_ZOOKEEPER_TLS_TRUSTSTORE_FILE="${KAFKA_MOUNTED_CONF_DIR}/certs/${zk_truststore_filename}" + fi + fi +} + +######################## +# Set a configuration setting value to server.properties +# Globals: +# KAFKA_CONF_FILE +# KAFKA_ZK_CONF_FILE +# Arguments: +# $1 - key +# $2 - values (array) +# Returns: +# None +######################### +kafka_server_conf_set() { + kafka_common_conf_set "$(kafka_get_conf_file)" "$@" +} + +######################## +# Set a configuration setting value to producer.properties and consumer.properties +# Globals: +# KAFKA_CONF_DIR +# Arguments: +# $1 - key +# $2 - values (array) +# Returns: +# None +######################### +kafka_producer_consumer_conf_set() { + kafka_common_conf_set "$KAFKA_CONF_DIR/producer.properties" "$@" + kafka_common_conf_set "$KAFKA_CONF_DIR/consumer.properties" "$@" +} + +######################## +# Create alias for environment variable, so both can be used +# Globals: +# None +# Arguments: +# $1 - Alias environment variable name +# $2 - Original environment variable name +# Returns: +# None +######################### +kafka_declare_alias_env() { + local -r alias="${1:?missing environment variable alias}" + local -r original="${2:?missing original environment variable}" + if printenv "${original}" >/dev/null; then + export "$alias"="${!original:-}" + fi +} + +######################## +# Map Kafka legacy environment variables to the new names +# Globals: +# KAFKA_* +# Arguments: +# None +# Returns: +# None +######################### +kafka_create_alias_environment_variables() { + suffixes=( + "ADVERTISED_LISTENERS" + "BROKER_ID" + "DEFAULT_REPLICATION_FACTOR" + "DELETE_TOPIC_ENABLE" + "INTER_BROKER_LISTENER_NAME" + "LISTENERS" + "LISTENER_SECURITY_PROTOCOL_MAP" + "LOG_DIRS" + "LOG_FLUSH_INTERVAL_MESSAGES" + "LOG_FLUSH_INTERVAL_MS" + "LOG_MESSAGE_FORMAT_VERSION" + "LOG_RETENTION_BYTES" + "LOG_RETENTION_CHECK_INTERVALS_MS" + "LOG_RETENTION_HOURS" + "LOG_SEGMENT_BYTES" + "MESSAGE_MAX_BYTES" + "NUM_IO_THREADS" + "NUM_NETWORK_THREADS" + "NUM_PARTITIONS" + "NUM_RECOVERY_THREADS_PER_DATA_DIR" + "OFFSETS_TOPIC_REPLICATION_FACTOR" + "SOCKET_RECEIVE_BUFFER_BYTES" + "SOCKET_REQUEST_MAX_BYTES" + "SOCKET_SEND_BUFFER_BYTES" + "SSL_ENDPOINT_IDENTIFICATION_ALGORITHM" + "TRANSACTION_STATE_LOG_MIN_ISR" + "TRANSACTION_STATE_LOG_REPLICATION_FACTOR" + "ZOOKEEPER_CONNECT" + "ZOOKEEPER_CONNECTION_TIMEOUT_MS" + ) + kafka_declare_alias_env "KAFKA_CFG_LOG_DIRS" "KAFKA_LOGS_DIRS" + kafka_declare_alias_env "KAFKA_CFG_LOG_SEGMENT_BYTES" "KAFKA_SEGMENT_BYTES" + kafka_declare_alias_env "KAFKA_CFG_MESSAGE_MAX_BYTES" "KAFKA_MAX_MESSAGE_BYTES" + kafka_declare_alias_env "KAFKA_CFG_ZOOKEEPER_CONNECTION_TIMEOUT_MS" "KAFKA_ZOOKEEPER_CONNECT_TIMEOUT_MS" + kafka_declare_alias_env "KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE" "KAFKA_AUTO_CREATE_TOPICS_ENABLE" + kafka_declare_alias_env "KAFKA_CLIENT_USERS" "KAFKA_BROKER_USER" + kafka_declare_alias_env "KAFKA_CLIENT_PASSWORDS" "KAFKA_BROKER_PASSWORD" + for s in "${suffixes[@]}"; do + kafka_declare_alias_env "KAFKA_CFG_${s}" "KAFKA_${s}" + done +} + +######################## +# Validate settings in KAFKA_* env vars +# Globals: +# KAFKA_* +# Arguments: +# None +# Returns: +# None +######################### +kafka_validate() { + debug "Validating settings in KAFKA_* env vars..." + local error_code=0 + local internal_port + local client_port + + # Auxiliary functions + print_validation_error() { + error "$1" + error_code=1 + } + check_allowed_listener_port() { + local -r total="$#" + for i in $(seq 1 "$((total - 1))"); do + for j in $(seq "$((i + 1))" "$total"); do + if (("${!i}" == "${!j}")); then + print_validation_error "There are listeners bound to the same port" + fi + done + done + } + check_conflicting_listener_ports() { + local validate_port_args=() + ! am_i_root && validate_port_args+=("-unprivileged") + if ! err=$(validate_port "${validate_port_args[@]}" "$1"); then + print_validation_error "An invalid port was specified in the environment variable KAFKA_CFG_LISTENERS: $err" + fi + } + check_multi_value() { + if [[ " ${2} " != *" ${!1} "* ]]; then + print_validation_error "The allowed values for ${1} are: ${2}" + fi + } + + if is_boolean_yes "$KAFKA_ENABLE_KRAFT"; then + if [[ -n "${KAFKA_CFG_PROCESS_ROLES:-}" ]]; then + old_IFS=$IFS + IFS=',' + read -r -a roles <<< "$KAFKA_CFG_PROCESS_ROLES" + IFS=${old_IFS} + controller_role_exists=false + for val in "${roles[@]}"; + do + if [[ "$val" == "controller" ]]; then + controller_role_exists=true + break + fi + done + if [[ "$controller_role_exists" == false ]] && [[ "${KAFKA_CFG_PROCESS_ROLES:-}" != "broker" ]]; then + warn "KAFKA_CFG_PROCESS_ROLES must include 'controller' for KRaft if not 'broker'" + fi + fi + + if [[ -n "${KAFKA_CFG_NODE_ID:-}" ]] || [[ -n "${KAFKA_CFG_CONTROLLER_QUORUM_VOTERS:-}" ]]; then + if [[ -z "${KAFKA_CFG_NODE_ID:-}" ]] && [[ "$conroller_role_exists" == true ]]; then + print_validation_error "KRaft requires KAFKA_CFG_NODE_ID to be set for the quorum controller" + fi + if [[ -z "$KAFKA_CFG_CONTROLLER_QUORUM_VOTERS" ]]; then + print_validation_error "KRaft requires KAFKA_CFG_CONTROLLER_QUORUM_VOTERS to be set" + fi + + old_IFS=$IFS + IFS=',' + read -r -a voters <<< "$KAFKA_CFG_CONTROLLER_QUORUM_VOTERS" + IFS=${old_IFS} + node_id_matched=false + for voter in "${voters[@]}"; do + if [[ "$voter" == *"$KAFKA_CFG_NODE_ID"* ]]; then + node_id_matched=true + break + fi + done + + if [[ "$node_id_matched" == false ]]; then + if [[ "$controller_role_exists" == true ]]; then + print_validation_error "KAFKA_CFG_NODE_ID must match what is set in KAFKA_CFG_CONTROLLER_QUORUM_VOTERS" + fi + if [[ "${KAFKA_CFG_PROCESS_ROLES:-}" == "broker" ]]; then + warn "KAFKA_CFG_CONTROLLER_QUORUM_VOTERS references an external controller for this broker" + fi + fi + fi + + if [[ -n "${KAFKA_CFG_LISTENERS:-}" ]]; then + old_IFS=$IFS + IFS=',' + read -r -a listener <<< "$KAFKA_CFG_LISTENERS" + IFS=${old_IFS} + controller_listener_exists=false + for val in "${listener[@]}"; + do + if [[ $val == *"CONTROLLER"* ]]; then + controller_listener_exists=true + break + fi + done + if [[ "$controller_role_exists" == true ]] && [[ "$controller_listener_exists" == false ]]; then + warn "KAFKA_CFG_LISTENERS must include a listener for CONTROLLER when KAFKA_CFG_PROCESS_ROLES includes 'controller'" + fi + fi + fi + + if [[ ${KAFKA_CFG_LISTENERS:-} =~ INTERNAL://:([0-9]*) ]]; then + internal_port="${BASH_REMATCH[1]}" + check_allowed_listener_port "$internal_port" + fi + if [[ ${KAFKA_CFG_LISTENERS:-} =~ CLIENT://:([0-9]*) ]]; then + client_port="${BASH_REMATCH[1]}" + check_allowed_listener_port "$client_port" + fi + [[ -n ${internal_port:-} && -n ${client_port:-} ]] && check_conflicting_listener_ports "$internal_port" "$client_port" + if [[ -n "${KAFKA_PORT_NUMBER:-}" ]] || [[ -n "${KAFKA_CFG_PORT:-}" ]]; then + warn "The environment variables KAFKA_PORT_NUMBER and KAFKA_CFG_PORT are deprecated, you can specify the port number to use for each listener using the KAFKA_CFG_LISTENERS environment variable instead." + fi + + read -r -a users <<<"$(tr ',;' ' ' <<<"${KAFKA_CLIENT_USERS}")" + read -r -a passwords <<<"$(tr ',;' ' ' <<<"${KAFKA_CLIENT_PASSWORDS}")" + if [[ "${#users[@]}" -ne "${#passwords[@]}" ]]; then + print_validation_error "Specify the same number of passwords on KAFKA_CLIENT_PASSWORDS as the number of users on KAFKA_CLIENT_USERS!" + fi + + if is_boolean_yes "$ALLOW_PLAINTEXT_LISTENER"; then + warn "You set the environment variable ALLOW_PLAINTEXT_LISTENER=$ALLOW_PLAINTEXT_LISTENER. For safety reasons, do not use this flag in a production environment." + fi + if [[ "${KAFKA_CFG_LISTENERS:-}" =~ SSL ]] || [[ "${KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP:-}" =~ SSL ]]; then + if [[ "$KAFKA_TLS_TYPE" = "JKS" ]] && + { [[ ! -f "${KAFKA_CERTS_DIR}/kafka.keystore.jks" ]] || [[ ! -f "$KAFKA_TLS_TRUSTSTORE_FILE" ]]; } && + { [[ ! -f "${KAFKA_MOUNTED_CONF_DIR}/certs/kafka.keystore.jks" ]] || [[ ! -f "$KAFKA_TLS_TRUSTSTORE_FILE" ]]; }; then + print_validation_error "In order to configure the TLS encryption for Kafka with JKS certs you must mount your kafka.keystore.jks and kafka.truststore.jks certs to the ${KAFKA_MOUNTED_CONF_DIR}/certs directory." + elif [[ "$KAFKA_TLS_TYPE" = "PEM" ]] && + { [[ ! -f "${KAFKA_CERTS_DIR}/kafka.keystore.pem" ]] || [[ ! -f "${KAFKA_CERTS_DIR}/kafka.keystore.key" ]] || [[ ! -f "$KAFKA_TLS_TRUSTSTORE_FILE" ]]; } && + { [[ ! -f "${KAFKA_MOUNTED_CONF_DIR}/certs/kafka.keystore.pem" ]] || [[ ! -f "${KAFKA_MOUNTED_CONF_DIR}/certs/kafka.keystore.key" ]] || [[ ! -f "$KAFKA_TLS_TRUSTSTORE_FILE" ]]; }; then + print_validation_error "In order to configure the TLS encryption for Kafka with PEM certs you must mount your kafka.keystore.pem, kafka.keystore.key and kafka.truststore.pem certs to the ${KAFKA_MOUNTED_CONF_DIR}/certs directory." + fi + elif [[ "${KAFKA_CFG_LISTENERS:-}" =~ SASL ]] || [[ "${KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP:-}" =~ SASL ]]; then + if [[ -z "$KAFKA_CLIENT_PASSWORDS" && -z "$KAFKA_INTER_BROKER_PASSWORD" ]]; then + print_validation_error "In order to configure SASL authentication for Kafka, you must provide the SASL credentials. Set the environment variables KAFKA_CLIENT_USERS and KAFKA_CLIENT_PASSWORDS, to configure the credentials for SASL authentication with clients, or set the environment variables KAFKA_INTER_BROKER_USER and KAFKA_INTER_BROKER_PASSWORD, to configure the credentials for SASL authentication between brokers." + fi + elif ! is_boolean_yes "$ALLOW_PLAINTEXT_LISTENER"; then + print_validation_error "The KAFKA_CFG_LISTENERS environment variable does not configure a secure listener. Set the environment variable ALLOW_PLAINTEXT_LISTENER=yes to allow the container to be started with a plaintext listener. This is only recommended for development." + fi + if ! is_boolean_yes "$KAFKA_ENABLE_KRAFT"; then + if [[ "${KAFKA_ZOOKEEPER_PROTOCOL}" =~ SSL ]]; then + if [[ "$KAFKA_ZOOKEEPER_TLS_TYPE" = "JKS" ]] && + [[ ! -f "$KAFKA_ZOOKEEPER_TLS_TRUSTSTORE_FILE" ]]; then + print_validation_error "In order to configure the TLS encryption for Zookeeper with JKS certs you must mount your zookeeper.truststore.jks cert to the ${KAFKA_MOUNTED_CONF_DIR}/certs directory." + elif [[ "$KAFKA_ZOOKEEPER_TLS_TYPE" = "PEM" ]] && + [[ ! -f "$KAFKA_ZOOKEEPER_TLS_TRUSTSTORE_FILE" ]]; then + print_validation_error "In order to configure the TLS encryption for Zookeeper with PEM certs you must mount your zookeeper.truststore.pem cert to the ${KAFKA_MOUNTED_CONF_DIR}/certs directory." + fi + if [[ "$KAFKA_ZOOKEEPER_TLS_TYPE" = "JKS" ]] && + [[ ! -f "${KAFKA_CERTS_DIR}/zookeeper.keystore.jks" ]] && [[ ! -f "${KAFKA_MOUNTED_CONF_DIR}/certs/zookeeper.keystore.jks" ]]; then + warn "In order to configure the mTLS for Zookeeper with JKS certs you must mount your zookeeper.keystore.jks cert to the ${KAFKA_MOUNTED_CONF_DIR}/certs directory." + elif [[ "$KAFKA_ZOOKEEPER_TLS_TYPE" = "PEM" ]] && + { [[ ! -f "${KAFKA_CERTS_DIR}/zookeeper.keystore.pem" ]] || [[ ! -f "${KAFKA_CERTS_DIR}/zookeeper.keystore.key" ]]; } && + { [[ ! -f "${KAFKA_MOUNTED_CONF_DIR}/certs/zookeeper.keystore.pem" ]] || [[ ! -f "${KAFKA_MOUNTED_CONF_DIR}/certs/zookeeper.keystore.key" ]]; }; then + warn "In order to configure the mTLS for Zookeeper with PEM certs you must mount your zookeeper.keystore.pem cert and zookeeper.keystore.key key to the ${KAFKA_MOUNTED_CONF_DIR}/certs directory." + fi + elif [[ "${KAFKA_ZOOKEEPER_PROTOCOL}" =~ SASL ]]; then + if [[ -z "$KAFKA_ZOOKEEPER_USER" ]] || [[ -z "$KAFKA_ZOOKEEPER_PASSWORD" ]]; then + print_validation_error "In order to configure SASL authentication for Kafka, you must provide the SASL credentials. Set the environment variables KAFKA_ZOOKEEPER_USER and KAFKA_ZOOKEEPER_PASSWORD, to configure the credentials for SASL authentication with Zookeeper." + fi + elif ! is_boolean_yes "$ALLOW_PLAINTEXT_LISTENER"; then + print_validation_error "The KAFKA_ZOOKEEPER_PROTOCOL environment variable does not configure a secure protocol. Set the environment variable ALLOW_PLAINTEXT_LISTENER=yes to allow the container to be started with a plaintext listener. This is only recommended for development." + fi + else + # advertised.listeners config must be empty when process.roles=controller + if [[ "$controller_role_exists" == true ]] && [[ "$KAFKA_CFG_PROCESS_ROLES" == "controller" ]]; then + unset KAFKA_ADVERTISED_LISTENERS + unset KAFKA_CFG_ADVERTISED_LISTENERS + remove_in_file "$(kafka_get_conf_file)" "advertised.listeners" false + # sed -i "/^$prop=/d" /opt/bitnami/kafka/config/kraft/server.properties + fi + fi + check_multi_value "KAFKA_TLS_TYPE" "JKS PEM" + check_multi_value "KAFKA_ZOOKEEPER_TLS_TYPE" "JKS PEM" + check_multi_value "KAFKA_TLS_CLIENT_AUTH" "none requested required" + [[ "$error_code" -eq 0 ]] || return "$error_code" +} +######################## +# Generate JAAS authentication file +# Globals: +# KAFKA_* +# Arguments: +# $1 - Authentication protocol to use for the internal listener +# $2 - Authentication protocol to use for the client listener +# Returns: +# None +######################### +kafka_generate_jaas_authentication_file() { + local -r internal_protocol="${1:-}" + local -r client_protocol="${2:-}" + + if [[ ! -f "${KAFKA_CONF_DIR}/kafka_jaas.conf" ]]; then + info "Generating JAAS authentication file" + + read -r -a users <<<"$(tr ',;' ' ' <<<"${KAFKA_CLIENT_USERS:-}")" + read -r -a passwords <<<"$(tr ',;' ' ' <<<"${KAFKA_CLIENT_PASSWORDS:-}")" + + if [[ "${client_protocol:-}" =~ SASL ]]; then + if [[ "${KAFKA_CFG_SASL_ENABLED_MECHANISMS:-}" =~ PLAIN ]]; then + cat >>"${KAFKA_CONF_DIR}/kafka_jaas.conf" <>"${KAFKA_CONF_DIR}/kafka_jaas.conf" <>"${KAFKA_CONF_DIR}/kafka_jaas.conf" <>"${KAFKA_CONF_DIR}/kafka_jaas.conf" <>"${KAFKA_CONF_DIR}/kafka_jaas.conf" <>"${KAFKA_CONF_DIR}/kafka_jaas.conf" <>"${KAFKA_CONF_DIR}/kafka_jaas.conf" <>"${KAFKA_CONF_DIR}/kafka_jaas.conf" <>"${KAFKA_CONF_DIR}/kafka_jaas.conf" <>"${KAFKA_CONF_DIR}/kafka_jaas.conf" <>"${KAFKA_CONF_DIR}/kafka_jaas.conf" <>"${KAFKA_CONF_DIR}/kafka_jaas.conf" <>"${KAFKA_CONF_DIR}/kafka_jaas.conf" <>"${KAFKA_CONF_DIR}/kafka_jaas.conf" <>"${KAFKA_CONF_DIR}/kafka_jaas.conf" < 1{print line"\\n\\"}{line=$0;}END{print $0" "}' <"${1:?missing file}" + } + remove_previous_cert_value() { + local key="${1:?missing key}" + files=( + "$(kafka_get_conf_file)" + "${KAFKA_CONF_DIR}/producer.properties" + "${KAFKA_CONF_DIR}/consumer.properties" + ) + for file in "${files[@]}"; do + if grep -q "^[#\\s]*$key\s*=.*" "$file"; then + # Delete all lines from the certificate beginning to its end + sed -i "/^[#\\s]*$key\s*=.*-----BEGIN/,/-----END/d" "$file" + fi + done + } + # We need to remove the previous cert value + # kafka_common_conf_set uses replace_in_file, which can't match multiple lines + remove_previous_cert_value ssl.keystore.key + remove_previous_cert_value ssl.keystore.certificate.chain + remove_previous_cert_value ssl.truststore.certificates + configure_both ssl.keystore.key "$(file_to_multiline_property "${KAFKA_CERTS_DIR}/kafka.keystore.key")" + configure_both ssl.keystore.certificate.chain "$(file_to_multiline_property "${KAFKA_CERTS_DIR}/kafka.keystore.pem")" + configure_both ssl.truststore.certificates "$(file_to_multiline_property "${kafka_truststore_location}")" + elif [[ "$KAFKA_TLS_TYPE" = "JKS" ]]; then + configure_both ssl.keystore.location "$KAFKA_CERTS_DIR"/kafka.keystore.jks + configure_both ssl.truststore.location "$kafka_truststore_location" + ! is_empty_value "$KAFKA_CERTIFICATE_PASSWORD" && configure_both ssl.keystore.password "$KAFKA_CERTIFICATE_PASSWORD" + ! is_empty_value "$KAFKA_CERTIFICATE_PASSWORD" && configure_both ssl.truststore.password "$KAFKA_CERTIFICATE_PASSWORD" + fi + SSL_CONFIGURED=true # prevents configuring SSL more than once +} + +######################## +# Configure Kafka for inter-broker communications +# Globals: +# None +# Arguments: +# $1 - Authentication protocol to use for the internal listener +# Returns: +# None +######################### +kafka_configure_internal_communications() { + local -r protocol="${1:?missing environment variable protocol}" + local -r allowed_protocols=("PLAINTEXT" "SASL_PLAINTEXT" "SASL_SSL" "SSL") + info "Configuring Kafka for inter-broker communications with ${protocol} authentication." + + if [[ "${allowed_protocols[*]}" =~ $protocol ]]; then + kafka_server_conf_set security.inter.broker.protocol "$protocol" + if [[ "$protocol" = "PLAINTEXT" ]]; then + warn "Inter-broker communications are configured as PLAINTEXT. This is not safe for production environments." + fi + if [[ "$protocol" = "SASL_PLAINTEXT" ]] || [[ "$protocol" = "SASL_SSL" ]]; then + # IMPORTANT: Do not confuse SASL/PLAIN with PLAINTEXT + # For more information, see: https://docs.confluent.io/current/kafka/authentication_sasl/authentication_sasl_plain.html#sasl-plain-overview) + if [[ -n "$KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL" ]]; then + kafka_server_conf_set sasl.mechanism.inter.broker.protocol "$KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL" + else + error "When using SASL for inter broker comunication the mechanism should be provided at KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL" + exit 1 + fi + fi + if [[ "$protocol" = "SASL_SSL" ]] || [[ "$protocol" = "SSL" ]]; then + kafka_configure_ssl + # We need to enable 2 way authentication on SASL_SSL so brokers authenticate each other. + # It won't affect client communications unless the SSL protocol is for them. + kafka_server_conf_set ssl.client.auth "$KAFKA_TLS_CLIENT_AUTH" + fi + else + error "Authentication protocol ${protocol} is not supported!" + exit 1 + fi +} + +######################## +# Configure Kafka for client communications +# Globals: +# None +# Arguments: +# $1 - Authentication protocol to use for the client listener +# Returns: +# None +######################### +kafka_configure_client_communications() { + local -r protocol="${1:?missing environment variable protocol}" + local -r allowed_protocols=("PLAINTEXT" "SASL_PLAINTEXT" "SASL_SSL" "SSL") + info "Configuring Kafka for client communications with ${protocol} authentication." + + if [[ "${allowed_protocols[*]}" =~ ${protocol} ]]; then + kafka_server_conf_set security.inter.broker.protocol "$protocol" + if [[ "$protocol" = "PLAINTEXT" ]]; then + warn "Client communications are configured using PLAINTEXT listeners. For safety reasons, do not use this in a production environment." + fi + if [[ "$protocol" = "SASL_PLAINTEXT" ]] || [[ "$protocol" = "SASL_SSL" ]]; then + # The below lines would need to be updated to support other SASL implementations (i.e. GSSAPI) + # IMPORTANT: Do not confuse SASL/PLAIN with PLAINTEXT + # For more information, see: https://docs.confluent.io/current/kafka/authentication_sasl/authentication_sasl_plain.html#sasl-plain-overview) + kafka_server_conf_set sasl.mechanism.inter.broker.protocol "$KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL" + fi + if [[ "$protocol" = "SASL_SSL" ]] || [[ "$protocol" = "SSL" ]]; then + kafka_configure_ssl + fi + if [[ "$protocol" = "SSL" ]]; then + kafka_server_conf_set ssl.client.auth "$KAFKA_TLS_CLIENT_AUTH" + fi + else + error "Authentication protocol ${protocol} is not supported!" + exit 1 + fi +} + +######################## +# Configure Kafka for external-client communications +# Globals: +# None +# Arguments: +# $1 - Authentication protocol to use for the external-client listener +# Returns: +# None +######################### +kafka_configure_external_client_communications() { + local -r protocol="${1:?missing environment variable protocol}" + local -r allowed_protocols=("PLAINTEXT" "SASL_PLAINTEXT" "SASL_SSL" "SSL") + info "Configuring Kafka for external client communications with ${protocol} authentication." + + if [[ "${allowed_protocols[*]}" =~ ${protocol} ]]; then + if [[ "$protocol" = "PLAINTEXT" ]]; then + warn "External client communications are configured using PLAINTEXT listeners. For safety reasons, do not use this in a production environment." + fi + if [[ "$protocol" = "SASL_SSL" ]] || [[ "$protocol" = "SSL" ]]; then + kafka_configure_ssl + fi + if [[ "$protocol" = "SSL" ]]; then + kafka_server_conf_set ssl.client.auth "$KAFKA_TLS_CLIENT_AUTH" + fi + else + error "Authentication protocol ${protocol} is not supported!" + exit 1 + fi +} + +######################## +# Get Zookeeper TLS settings +# Globals: +# KAFKA_ZOOKEEPER_TLS_* +# Arguments: +# None +# Returns: +# String +######################### +zookeeper_get_tls_config() { + # Note that ZooKeeper does not support a key password different from the keystore password, + # so be sure to set the key password in the keystore to be identical to the keystore password; + # otherwise the connection attempt to Zookeeper will fail. + local keystore_location="" + local -r kafka_zk_truststore_location="${KAFKA_CERTS_DIR}/$(basename "${KAFKA_ZOOKEEPER_TLS_TRUSTSTORE_FILE}")" + + if [[ "$KAFKA_ZOOKEEPER_TLS_TYPE" = "JKS" ]] && [[ -f "$KAFKA_CERTS_DIR"/zookeeper.keystore.jks ]]; then + keystore_location="${KAFKA_CERTS_DIR}/zookeeper.keystore.jks" + elif [[ "$KAFKA_ZOOKEEPER_TLS_TYPE" = "PEM" ]] && [[ -f "$KAFKA_CERTS_DIR"/zookeeper.keystore.pem ]] && [[ -f "$KAFKA_CERTS_DIR"/zookeeper.keystore.key ]]; then + # Concatenating private key into public certificate file + # This is needed to load keystore from location using PEM + cat "$KAFKA_CERTS_DIR"/zookeeper.keystore.key >>"$KAFKA_CERTS_DIR"/zookeeper.keystore.pem + keystore_location="${KAFKA_CERTS_DIR}/zookeeper.keystore.pem" + fi + + echo "-Dzookeeper.clientCnxnSocket=org.apache.zookeeper.ClientCnxnSocketNetty \ + -Dzookeeper.client.secure=true \ + -Dzookeeper.ssl.keyStore.location=${keystore_location} \ + -Dzookeeper.ssl.keyStore.password=${KAFKA_ZOOKEEPER_TLS_KEYSTORE_PASSWORD} \ + -Dzookeeper.ssl.trustStore.location=${kafka_zk_truststore_location} \ + -Dzookeeper.ssl.trustStore.password=${KAFKA_ZOOKEEPER_TLS_TRUSTSTORE_PASSWORD} \ + -Dzookeeper.ssl.hostnameVerification=${KAFKA_ZOOKEEPER_TLS_VERIFY_HOSTNAME}" +} + +######################## +# Configure Kafka configuration files from environment variables +# Globals: +# KAFKA_* +# Arguments: +# None +# Returns: +# None +######################### +kafka_configure_from_environment_variables() { + # List of special cases to apply to the variables + local -r exception_regexps=( + "s/sasl.ssl/sasl_ssl/g" + "s/sasl.plaintext/sasl_plaintext/g" + ) + # Map environment variables to config properties + for var in "${!KAFKA_CFG_@}"; do + key="$(echo "$var" | sed -e 's/^KAFKA_CFG_//g' -e 's/_/\./g' | tr '[:upper:]' '[:lower:]')" + + # Exception for the camel case in this environment variable + [[ "$var" == "KAFKA_CFG_ZOOKEEPER_CLIENTCNXNSOCKET" ]] && key="zookeeper.clientCnxnSocket" + + # Apply exception regexps + for regex in "${exception_regexps[@]}"; do + key="$(echo "$key" | sed "$regex")" + done + + value="${!var}" + if [[ -n "$value" ]]; then + kafka_server_conf_set "$key" "$value" + fi + done +} + +######################## +# Configure Kafka configuration files to set up message sizes +# Globals: +# KAFKA_* +# Arguments: +# None +# Returns: +# None +######################### +kafka_configure_producer_consumer_message_sizes() { + if [[ -n "$KAFKA_CFG_MAX_REQUEST_SIZE" ]]; then + kafka_common_conf_set "$KAFKA_CONF_DIR/producer.properties" max.request.size "$KAFKA_CFG_MAX_REQUEST_SIZE" + fi + if [[ -n "$KAFKA_CFG_MAX_PARTITION_FETCH_BYTES" ]]; then + kafka_common_conf_set "$KAFKA_CONF_DIR/consumer.properties" max.partition.fetch.bytes "$KAFKA_CFG_MAX_PARTITION_FETCH_BYTES" + fi +} + +######################## +# Initialize KRaft +# Globals: +# KAFKA_* +# Arguments: +# None +# Returns: +# None +######################### +kraft_initialize() { + info "Initializing KRaft..." + + if [[ -z "$KAFKA_KRAFT_CLUSTER_ID" ]]; then + warn "KAFKA_KRAFT_CLUSTER_ID not set - If using multiple nodes then you must use the same Cluster ID for each one" + KAFKA_KRAFT_CLUSTER_ID="$("${KAFKA_HOME}/bin/kafka-storage.sh" random-uuid)" + info "Generated Kafka cluster ID '${KAFKA_KRAFT_CLUSTER_ID}'" + fi + + info "Formatting storage directories to add metadata..." + debug_execute "$KAFKA_HOME/bin/kafka-storage.sh" format --config "$(kafka_get_conf_file)" --cluster-id "$KAFKA_KRAFT_CLUSTER_ID" --ignore-formatted +} + +######################## +# Initialize Kafka +# Globals: +# KAFKA_* +# Arguments: +# None +# Returns: +# None +######################### +kafka_initialize() { + info "Initializing Kafka..." + # DEPRECATED. Copy files in old conf directory to maintain compatibility with Helm chart. + if ! is_dir_empty "$KAFKA_BASE_DIR"/conf; then + warn "Detected files mounted to $KAFKA_BASE_DIR/conf. This is deprecated and files should be mounted to $KAFKA_MOUNTED_CONF_DIR." + cp -Lr "$KAFKA_BASE_DIR"/conf/* "$KAFKA_CONF_DIR" + fi + # Check for mounted configuration files + if ! is_dir_empty "$KAFKA_MOUNTED_CONF_DIR"; then + cp -Lr "$KAFKA_MOUNTED_CONF_DIR"/* "$KAFKA_CONF_DIR" + fi + # Copy truststore to cert directory + for cert_var in KAFKA_TLS_TRUSTSTORE_FILE KAFKA_ZOOKEEPER_TLS_TRUSTSTORE_FILE; do + # Only copy if the file exists and it is in a different location than KAFKA_CERTS_DIR (to avoid copying to the same location) + if [[ -f "${!cert_var}" ]] && ! [[ "${!cert_var}" =~ $KAFKA_CERTS_DIR ]]; then + info "Copying truststore ${!cert_var} to ${KAFKA_CERTS_DIR}" + cp -L "${!cert_var}" "$KAFKA_CERTS_DIR" + fi + done + + # DEPRECATED. Check for server.properties file in old conf directory to maintain compatibility with Helm chart. + if [[ ! -f "$KAFKA_BASE_DIR"/conf/server.properties ]] && [[ ! -f "$KAFKA_MOUNTED_CONF_DIR"/server.properties ]]; then + info "No injected configuration files found, creating default config files" + kafka_server_conf_set log.dirs "$KAFKA_DATA_DIR" + kafka_configure_from_environment_variables + # When setting up a Kafka cluster with N brokers, we have several listeners: + # - INTERNAL: used for inter-broker communications + # - CLIENT: used for communications with consumers/producers within the same network + # - (optional) EXTERNAL: used for communications with consumers/producers on different networks + local internal_protocol + local client_protocol + local external_client_protocol + if [[ ${KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP:-} =~ INTERNAL:([a-zA-Z_]*) ]]; then + internal_protocol="${BASH_REMATCH[1]}" + kafka_configure_internal_communications "$internal_protocol" + fi + if [[ ${KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP:-} =~ CLIENT:([a-zA-Z_]*) ]]; then + client_protocol="${BASH_REMATCH[1]}" + kafka_configure_client_communications "$client_protocol" + fi + if [[ ${KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP:-} =~ EXTERNAL:([a-zA-Z_]*) ]]; then + external_client_protocol="${BASH_REMATCH[1]}" + kafka_configure_external_client_communications "$external_client_protocol" + fi + + if [[ "${internal_protocol:-}" =~ "SASL" || "${client_protocol:-}" =~ "SASL" || "${external_client_protocol:-}" =~ "SASL" ]] || [[ "${KAFKA_ZOOKEEPER_PROTOCOL}" =~ SASL ]]; then + if [[ -n "$KAFKA_CFG_SASL_ENABLED_MECHANISMS" ]]; then + kafka_server_conf_set sasl.enabled.mechanisms "$KAFKA_CFG_SASL_ENABLED_MECHANISMS" + kafka_generate_jaas_authentication_file "${internal_protocol:-}" "${client_protocol:-}" + [[ "$KAFKA_CFG_SASL_ENABLED_MECHANISMS" =~ "SCRAM" ]] && kafka_create_sasl_scram_zookeeper_users + else + print_validation_error "Specified SASL protocol but no SASL mechanisms provided in KAFKA_CFG_SASL_ENABLED_MECHANISMS" + fi + fi + # Remove security.inter.broker.protocol if KAFKA_CFG_INTER_BROKER_LISTENER_NAME is configured + if [[ -n "${KAFKA_CFG_INTER_BROKER_LISTENER_NAME:-}" ]]; then + remove_in_file "$(kafka_get_conf_file)" "security.inter.broker.protocol" false + fi + kafka_configure_producer_consumer_message_sizes + fi + true +} + +######################## +# Run custom initialization scripts +# Globals: +# KAFKA_* +# Arguments: +# None +# Returns: +# None +######################### +kafka_custom_init_scripts() { + if [[ -n $(find "${KAFKA_INITSCRIPTS_DIR}/" -type f -regex ".*\.\(sh\)") ]] && [[ ! -f "${KAFKA_VOLUME_DIR}/.user_scripts_initialized" ]]; then + info "Loading user's custom files from $KAFKA_INITSCRIPTS_DIR" + for f in /docker-entrypoint-initdb.d/*; do + debug "Executing $f" + case "$f" in + *.sh) + if [[ -x "$f" ]]; then + if ! "$f"; then + error "Failed executing $f" + return 1 + fi + else + warn "Sourcing $f as it is not executable by the current user, any error may cause initialization to fail" + . "$f" + fi + ;; + *) + warn "Skipping $f, supported formats are: .sh" + ;; + esac + done + touch "$KAFKA_VOLUME_DIR"/.user_scripts_initialized + fi +} + +######################## +# Check if Kafka is running +# Globals: +# KAFKA_PID_FILE +# Arguments: +# None +# Returns: +# Whether Kafka is running +######################## +is_kafka_running() { + local pid + pid="$(get_pid_from_file "$KAFKA_PID_FILE")" + if [[ -n "$pid" ]]; then + is_service_running "$pid" + else + false + fi +} + +######################## +# Check if Kafka is running +# Globals: +# KAFKA_PID_FILE +# Arguments: +# None +# Returns: +# Whether Kafka is not running +######################## +is_kafka_not_running() { + ! is_kafka_running +} + +######################## +# Stop Kafka +# Globals: +# KAFKA_PID_FILE +# Arguments: +# None +# Returns: +# None +######################### +kafka_stop() { + ! is_kafka_running && return + stop_service_using_pid "$KAFKA_PID_FILE" TERM +} + +######################## +# Get configuration file to use +# Globals: +# KAFKA_ENABLE_KRAFT +# Arguments: +# None +# Returns: +# Path to the conf file to use +######################### +kafka_get_conf_file() { + if is_boolean_yes "$KAFKA_ENABLE_KRAFT"; then + echo "$KAFKA_CONF_FILE" + else + echo "$KAFKA_ZK_CONF_FILE" + fi +} diff --git a/docker-compose.cluster.yml b/docker-compose.cluster.yml index 9e50773..26149d0 100644 --- a/docker-compose.cluster.yml +++ b/docker-compose.cluster.yml @@ -1,12 +1,11 @@ -# Originated from https://github.com/bitnami/bitnami-docker-kafka/blob/0b1b18843b8a5c754a4c6e52a49ac5cf992fa5ed/docker-compose.yml version: '3' -x-connect-image: &connect-image cricketeerone/apache-kafka-connect:3.3.2-alpine +x-connect-image: &connect-image cricketeerone/apache-kafka-connect:3.3.2 x-connect: &connect-vars CONNECT_BOOTSTRAP_SERVERS: kafka:29092 - CONNECT_GROUP_ID: cg_connect + CONNECT_GROUP_ID: cg_connect-jib CONNECT_CONFIG_STORAGE_TOPIC: connect-jib_config CONNECT_OFFSET_STORAGE_TOPIC: connect-jib_offsets CONNECT_STATUS_STORAGE_TOPIC: connect-jib_status @@ -14,56 +13,74 @@ x-connect: &connect-vars CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1 CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1 CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1 - + # Defaults for all connectors CONNECT_KEY_CONVERTER: org.apache.kafka.connect.converters.ByteArrayConverter CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.converters.ByteArrayConverter - + # Where Jib places classes CONNECT_PLUGIN_PATH: /app/libs # Connect client overrides CONNECT_TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS: 30000 CONNECT_OFFSET_FLUSH_INTERVAL_MS: 900000 - # CONNECT_CONSUMER_MAX_POLL_RECORDS: 500 default==500 + # Connect consumer overrides + CONNECT_CONSUMER_MAX_POLL_RECORDS: 500 + +x-kraft: &kraft-vars + KAFKA_KRAFT_CLUSTER_ID: WNfE3WMTRRGBs35BikbfRg # Run 'kafka-storage random-uuid' + BITNAMI_DEBUG: yes + ALLOW_PLAINTEXT_LISTENER: yes + KAFKA_ENABLE_KRAFT: yes + KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER + KAFKA_CFG_DELETE_TOPIC_ENABLE: 'true' + KAFKA_CFG_LOG_RETENTION_HOURS: 48 # 2 days of retention for demo purposes services: - zookeeper: - image: bitnami/zookeeper:3.8-debian-11 - ports: - - '2181:2181' - environment: - - ALLOW_ANONYMOUS_LOGIN=yes + kafka-controller: + image: &kafka-image bitnami/kafka:3.3.2 + restart: unless-stopped volumes: - - 'zookeeper_data:/bitnami/zookeeper' + - 'kafka_controller_data:/bitnami/kafka' + - $PWD/bitnami-kraft-libkafka.sh:/opt/bitnami/scripts/libkafka.sh:ro # HOTFIX + environment: + <<: *kraft-vars + KAFKA_CFG_PROCESS_ROLES: controller + KAFKA_CFG_NODE_ID: 1 + KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: &kraft-quorum 1@kafka-controller:9093 + KAFKA_CFG_LISTENERS: CONTROLLER://:9093 + KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT + kafka: - image: bitnami/kafka:3.3.2 - depends_on: - - zookeeper + image: *kafka-image restart: unless-stopped - ports: - - '9092:9092' + depends_on: + - kafka-controller volumes: - 'kafka_data:/bitnami/kafka' + - $PWD/bitnami-kraft-libkafka.sh:/opt/bitnami/scripts/libkafka.sh:ro # HOTFIX - $PWD/lipsum.txt:/data/lipsum.txt:ro # Some data to produce + ports: + - 9092:9092 environment: - - BITNAMI_DEBUG=yes - - ALLOW_PLAINTEXT_LISTENER=yes - - KAFKA_ENABLE_KRAFT=false - - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181 - - KAFKA_CFG_DELETE_TOPIC_ENABLE=true - - KAFKA_CFG_LOG_RETENTION_HOURS=48 # 2 days of retention for demo purposes + <<: *kraft-vars + KAFKA_CFG_PROCESS_ROLES: broker + KAFKA_CFG_NODE_ID: 10 # cannot conflict with controllers + KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: *kraft-quorum # https://rmoff.net/2018/08/02/kafka-listeners-explained/ - - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT - - KAFKA_CFG_LISTENERS=PLAINTEXT://:29092,PLAINTEXT_HOST://0.0.0.0:9092 - - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092 + KAFKA_CFG_LISTENERS: INTERNAL://:29092,EXTERNAL://0.0.0.0:9092 + KAFKA_CFG_INTER_BROKER_LISTENER_NAME: INTERNAL + KAFKA_CFG_ADVERTISED_LISTENERS: INTERNAL://kafka:29092,EXTERNAL://127.0.0.1:9092 + KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT # Jib app connect-jib-1: image: *connect-image hostname: connect-jib-1 + labels: + - traefik.enable=true + - "traefik.http.routers.connect-jib-1.rule=Host(`connect-jib.docker.localhost`)" + - traefik.http.services.connect-jib-1.loadbalancer.server.port=8083 depends_on: - kafka - ports: - - '8083:8083' # full cluster info accessible from one instance environment: <<: *connect-vars CONNECT_REST_ADVERTISED_HOST_NAME: connect-jib-1 @@ -71,14 +88,31 @@ services: connect-jib-2: image: *connect-image hostname: connect-jib-2 + labels: + - traefik.enable=true + - "traefik.http.routers.connect-jib-2.rule=Host(`connect-jib.docker.localhost`)" + - traefik.http.services.connect-jib-2.loadbalancer.server.port=8083 depends_on: - kafka environment: <<: *connect-vars CONNECT_REST_ADVERTISED_HOST_NAME: connect-jib-2 + reverse-proxy: + image: traefik:v2.10 + # Enables the web UI and tells Traefik to listen to docker + command: --api.insecure=true --providers.docker --providers.docker.exposedByDefault=false + ports: + # The HTTP port + - "80:80" + # The Web UI (enabled by --api.insecure=true) + - "8080:8080" + volumes: + # So that Traefik can listen to the Docker events + - /var/run/docker.sock:/var/run/docker.sock + volumes: - zookeeper_data: + kafka_controller_data: driver: local kafka_data: - driver: local + driver: local \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml index 7a98221..bbc249a 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,12 +1,11 @@ -# Originated from https://github.com/bitnami/bitnami-docker-kafka/blob/0b1b18843b8a5c754a4c6e52a49ac5cf992fa5ed/docker-compose.yml version: '3' -x-connect-image: &connect-image cricketeerone/apache-kafka-connect:3.3.2-alpine +x-connect-image: &connect-image cricketeerone/apache-kafka-connect:3.3.2 x-connect: &connect-vars CONNECT_BOOTSTRAP_SERVERS: kafka:29092 - CONNECT_GROUP_ID: cg_connect + CONNECT_GROUP_ID: cg_connect-jib CONNECT_CONFIG_STORAGE_TOPIC: connect-jib_config CONNECT_OFFSET_STORAGE_TOPIC: connect-jib_offsets CONNECT_STATUS_STORAGE_TOPIC: connect-jib_status @@ -14,47 +13,45 @@ x-connect: &connect-vars CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1 CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1 CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1 - + # Defaults for all connectors CONNECT_KEY_CONVERTER: org.apache.kafka.connect.converters.ByteArrayConverter CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.converters.ByteArrayConverter - + # Where Jib places classes CONNECT_PLUGIN_PATH: /app/libs # Connect client overrides CONNECT_TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS: 30000 CONNECT_OFFSET_FLUSH_INTERVAL_MS: 900000 - # CONNECT_CONSUMER_MAX_POLL_RECORDS: 500 default==500 + # Connect consumer overrides + CONNECT_CONSUMER_MAX_POLL_RECORDS: 500 services: - zookeeper: - image: bitnami/zookeeper:3.8-debian-11 - ports: - - '2181:2181' - environment: - - ALLOW_ANONYMOUS_LOGIN=yes - volumes: - - 'zookeeper_data:/bitnami/zookeeper' kafka: image: bitnami/kafka:3.3.2 - depends_on: - - zookeeper restart: unless-stopped ports: - '9092:9092' volumes: - 'kafka_data:/bitnami/kafka' + - $PWD/bitnami-kraft-libkafka.sh:/opt/bitnami/scripts/libkafka.sh:ro # HOTFIX - $PWD/lipsum.txt:/data/lipsum.txt:ro # Some data to produce environment: - - BITNAMI_DEBUG=yes - - ALLOW_PLAINTEXT_LISTENER=yes - - KAFKA_ENABLE_KRAFT=false - - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181 - - KAFKA_CFG_DELETE_TOPIC_ENABLE=true - - KAFKA_CFG_LOG_RETENTION_HOURS=48 # 2 days of retention for demo purposes + BITNAMI_DEBUG: yes + ALLOW_PLAINTEXT_LISTENER: yes + # BEGIN: Kraft + KAFKA_ENABLE_KRAFT: yes + KAFKA_CFG_PROCESS_ROLES: controller,broker + KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER + KAFKA_CFG_NODE_ID: 1 + KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 1@kafka:9093 + # END: Kraft + KAFKA_CFG_DELETE_TOPIC_ENABLE: 'true' + KAFKA_CFG_LOG_RETENTION_HOURS: 48 # 2 days of retention for demo purposes # https://rmoff.net/2018/08/02/kafka-listeners-explained/ - - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT - - KAFKA_CFG_LISTENERS=PLAINTEXT://:29092,PLAINTEXT_HOST://0.0.0.0:9092 - - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092 + KAFKA_CFG_LISTENERS: INTERNAL://:29092,CONTROLLER://:9093,EXTERNAL://0.0.0.0:9092 + KAFKA_CFG_INTER_BROKER_LISTENER_NAME: INTERNAL + KAFKA_CFG_ADVERTISED_LISTENERS: INTERNAL://kafka:29092,EXTERNAL://127.0.0.1:9092 + KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT # Jib app connect-jib-1: @@ -69,7 +66,5 @@ services: CONNECT_REST_ADVERTISED_HOST_NAME: connect-jib-1 volumes: - zookeeper_data: - driver: local kafka_data: driver: local diff --git a/src/test/java/org/apache/kafka/connect/cli/ConnectDistributedWrapperTest.java b/src/test/java/org/apache/kafka/connect/cli/ConnectDistributedWrapperTest.java index 9ecf8a0..c038866 100644 --- a/src/test/java/org/apache/kafka/connect/cli/ConnectDistributedWrapperTest.java +++ b/src/test/java/org/apache/kafka/connect/cli/ConnectDistributedWrapperTest.java @@ -80,11 +80,11 @@ static Stream workerConfigProvider() { WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG, // Internal topics DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG, + DistributedConfig.STATUS_STORAGE_REPLICATION_FACTOR_CONFIG, + DistributedConfig.CONFIG_TOPIC_CONFIG, DistributedConfig.CONFIG_STORAGE_REPLICATION_FACTOR_CONFIG, DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG, - DistributedConfig.OFFSET_STORAGE_REPLICATION_FACTOR_CONFIG, - DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG, - DistributedConfig.STATUS_STORAGE_REPLICATION_FACTOR_CONFIG + DistributedConfig.OFFSET_STORAGE_REPLICATION_FACTOR_CONFIG ); }