Skip to content

Commit

Permalink
resolves code review
Browse files Browse the repository at this point in the history
  • Loading branch information
lancelot1989 committed Dec 10, 2024
1 parent 9866cdd commit 1f10558
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 42 deletions.
75 changes: 38 additions & 37 deletions addons/kafka/scripts/kafka-27-server-setup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -169,44 +169,40 @@ parse_advertised_svc_if_exist() {

set_cfg_metadata() {
# set advertised.listeners for broker
if [[ "broker" = "$KAFKA_CFG_PROCESS_ROLES" ]]; then

current_pod_fqdn=$(get_target_pod_fqdn_from_pod_fqdn_vars "$POD_FQDN_LIST" "$MY_POD_NAME")
if is_empty "$current_pod_fqdn"; then
echo "Error: Failed to get current pod: $MY_POD_NAME fqdn from pod fqdn list: $POD_FQDN_LIST. Exiting." >&2
return 1
fi

if ! parse_advertised_svc_if_exist ; then
echo "Error: Failed to parse advertised svc from BROKER_ADVERTISED_PORT: $BROKER_ADVERTISED_PORT. Exiting." >&2
return 1
fi

# Todo: currently only nodeport and clusterIp network modes are supported. LoadBalance is not supported yet and needs future support.
if [ -n "$advertised_svc_host_value" ] && [ -n "$advertised_svc_port_value" ] && [ "$advertised_svc_port_value" != "9092" ]; then
# enable NodePort, use node ip + mapped port as client connection
nodeport_domain="${advertised_svc_host_value}:${advertised_svc_port_value}"
export KAFKA_CFG_ADVERTISED_LISTENERS="INTERNAL://${current_pod_fqdn}:9094,CLIENT://${nodeport_domain}"
echo "[cfg]KAFKA_CFG_ADVERTISED_LISTENERS=$KAFKA_CFG_ADVERTISED_LISTENERS"
elif [ "${KB_BROKER_DIRECT_POD_ACCESS}" == "true" ]; then
export KAFKA_CFG_ADVERTISED_LISTENERS="INTERNAL://${current_pod_fqdn}:9094,CLIENT://${MY_POD_IP}:9092"
echo "[cfg]KAFKA_CFG_ADVERTISED_LISTENERS=$KAFKA_CFG_ADVERTISED_LISTENERS"
else
# default, use headless service url as client connection
export KAFKA_CFG_ADVERTISED_LISTENERS="INTERNAL://${current_pod_fqdn}:9094,CLIENT://${current_pod_fqdn}:9092"
echo "[cfg]KAFKA_CFG_ADVERTISED_LISTENERS=$KAFKA_CFG_ADVERTISED_LISTENERS"
fi
current_pod_fqdn=$(get_target_pod_fqdn_from_pod_fqdn_vars "$POD_FQDN_LIST" "$MY_POD_NAME")
if is_empty "$current_pod_fqdn"; then
echo "Error: Failed to get current pod: $MY_POD_NAME fqdn from pod fqdn list: $POD_FQDN_LIST. Exiting." >&2
return 1
fi

# override node.id setting
# increments based on a specified base to avoid conflicts with controller settings
INDEX=$(echo $MY_POD_NAME | grep -o "\-[0-9]\+\$")
INDEX=${INDEX#-}
BROKER_NODE_ID=$(( $INDEX + $BROKER_MIN_NODE_ID ))
export KAFKA_CFG_NODE_ID="$BROKER_NODE_ID"
export KAFKA_CFG_BROKER_ID="$BROKER_NODE_ID"
echo "[cfg]KAFKA_CFG_NODE_ID=$KAFKA_CFG_NODE_ID"
if ! parse_advertised_svc_if_exist ; then
echo "Error: Failed to parse advertised svc from BROKER_ADVERTISED_PORT: $BROKER_ADVERTISED_PORT. Exiting." >&2
return 1
fi

# Todo: currently only nodeport and clusterIp network modes are supported. LoadBalance is not supported yet and needs future support.
if [ -n "$advertised_svc_host_value" ] && [ -n "$advertised_svc_port_value" ] && [ "$advertised_svc_port_value" != "9092" ]; then
# enable NodePort, use node ip + mapped port as client connection
nodeport_domain="${advertised_svc_host_value}:${advertised_svc_port_value}"
export KAFKA_CFG_ADVERTISED_LISTENERS="INTERNAL://${current_pod_fqdn}:9094,CLIENT://${nodeport_domain}"
echo "[cfg]KAFKA_CFG_ADVERTISED_LISTENERS=$KAFKA_CFG_ADVERTISED_LISTENERS"
elif [ "${KB_BROKER_DIRECT_POD_ACCESS}" == "true" ]; then
export KAFKA_CFG_ADVERTISED_LISTENERS="INTERNAL://${current_pod_fqdn}:9094,CLIENT://${MY_POD_IP}:9092"
echo "[cfg]KAFKA_CFG_ADVERTISED_LISTENERS=$KAFKA_CFG_ADVERTISED_LISTENERS"
else
# default, use headless service url as client connection
export KAFKA_CFG_ADVERTISED_LISTENERS="INTERNAL://${current_pod_fqdn}:9094,CLIENT://${current_pod_fqdn}:9092"
echo "[cfg]KAFKA_CFG_ADVERTISED_LISTENERS=$KAFKA_CFG_ADVERTISED_LISTENERS"
fi

# override node.id setting
# increments based on a specified base to avoid conflicts with controller settings
INDEX=$(echo $MY_POD_NAME | grep -o "\-[0-9]\+\$")
INDEX=${INDEX#-}
BROKER_NODE_ID=$(( $INDEX + $BROKER_MIN_NODE_ID ))
export KAFKA_CFG_NODE_ID="$BROKER_NODE_ID"
export KAFKA_CFG_BROKER_ID="$BROKER_NODE_ID"
echo "[cfg]KAFKA_CFG_NODE_ID=$KAFKA_CFG_NODE_ID"
}

set_zookeeper_connect() {
Expand All @@ -216,8 +212,13 @@ set_zookeeper_connect() {
return 1
fi

# Set KAFKA_CFG_ZOOKEEPER_CONNECT to the value of KB_KAFKA_ZOOKEEPER_CONN
export KAFKA_CFG_ZOOKEEPER_CONNECT="$KB_KAFKA_ZOOKEEPER_CONN"
if [ -n "$KB_KAFKA_ZK_SUB_PATH" ]; then
# Set KAFKA_CFG_ZOOKEEPER_CONNECT to the concat of KB_KAFKA_ZOOKEEPER_CONN and KB_KAFKA_ZK_SUB_PATH
export KAFKA_CFG_ZOOKEEPER_CONNECT="$KB_KAFKA_ZOOKEEPER_CONN/$KB_KAFKA_ZK_SUB_PATH"
else
# Set KAFKA_CFG_ZOOKEEPER_CONNECT to the value of KB_KAFKA_ZOOKEEPER_CONN
export KAFKA_CFG_ZOOKEEPER_CONNECT="$KB_KAFKA_ZOOKEEPER_CONN"
fi

# Optionally, print the value to verify
echo "[cfg]export KAFKA_CFG_ZOOKEEPER_CONNECT=$KAFKA_CFG_ZOOKEEPER_CONNECT,for kafka-server."
Expand Down
6 changes: 2 additions & 4 deletions addons/kafka/templates/cmpd-broker-27.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,6 @@ spec:
runAsNonRoot: true
runAsUser: 1001
command:
{{/* - sleep*/}}
{{/* - infinity*/}}
- /scripts/kafka-server-setup.sh
env:
- name: BITNAMI_DEBUG
Expand All @@ -143,10 +141,10 @@ spec:
fieldRef:
apiVersion: v1
fieldPath: status.hostIP
- name: KAFKA_CFG_PROCESS_ROLES
value: "broker"
- name: KAFKA_CFG_INTER_BROKER_LISTENER_NAME
value: "INTERNAL"
- name: KB_KAFKA_ZK_SUB_PATH
value: $(KB_CLUSTER_NAME)
- name: KAFKA_CFG_LISTENERS # required for KRaft
value: "INTERNAL://:9094,CLIENT://:9092"
- name: KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP
Expand Down
4 changes: 3 additions & 1 deletion examples/kafka/cluster-2x-ext-zk-svc-descriptor.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ spec:
value: 'true'
- name: KAFKA_HEAP_OPTS
value: "-Xmx500m -Xms500m"
- name: KB_KAFKA_ZK_SUB_PATH
value: "test" # set sub path to use for sharing zk, default value is $(KB_CLUSTER_NAME), set empty string for root
resources:
limits:
cpu: '0.5'
Expand Down Expand Up @@ -80,7 +82,7 @@ spec:
# Represents the endpoint of the service connection credential.
endpoint:
# your external zk endpoints here
value: "zookeeper-cluster-zookeeper-0.zookeeper-cluster-zookeeper-headless.default:2181,zookeeper-cluster-zookeeper-1.zookeeper-cluster-zookeeper-headless.default:2181,zookeeper-cluster-zookeeper-2.zookeeper-cluster-zookeeper-headless.default:2181/test"
value: "zookeeper-cluster-zookeeper-0.zookeeper-cluster-zookeeper-headless.default:2181,zookeeper-cluster-zookeeper-1.zookeeper-cluster-zookeeper-headless.default:2181,zookeeper-cluster-zookeeper-2.zookeeper-cluster-zookeeper-headless.default:2181"
# Represents the port of the service connection credential.
port:
value: "2181"

0 comments on commit 1f10558

Please sign in to comment.