diff --git a/CHANGELOG.md b/CHANGELOG.md index 045dd47..e04104b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,10 @@ No changes yet. +## 0.10.1.1 - 12 January, 2017 + +- Update to Kafka 0.10.1.1 + ## 0.10.1.0 - 27 October, 2016 - Update to Kafka 0.10.1.0 ([xrl], #25) diff --git a/Dockerfile b/Dockerfile index eb4acd3..e957fb6 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,23 +1,35 @@ -# Builds an image for Apache Kafka 0.8.1.1 from binary distribution. -# -# The netflixoss/java base image runs Oracle Java 7 installed atop the +# The image runs Oracle Java 8 installed atop the # ubuntu:trusty (14.04) official image. Docker's official java images are # OpenJDK-only currently, and the Kafka project, Confluent, and most other # major Java projects test and recommend Oracle Java for production for optimal # performance. -FROM netflixoss/java:8 +FROM ubuntu:trusty MAINTAINER Ches Martin -# The Scala 2.11 build is currently recommended by the project. -ENV KAFKA_VERSION=0.10.1.0 KAFKA_SCALA_VERSION=2.11 JMX_PORT=7203 -ENV KAFKA_RELEASE_ARCHIVE kafka_${KAFKA_SCALA_VERSION}-${KAFKA_VERSION}.tgz +# Install Java. +# https://github.com/dockerfile/java/blob/master/oracle-java8/Dockerfile +RUN \ + apt-get update && apt-get install -y software-properties-common && \ + echo oracle-java8-installer shared/accepted-oracle-license-v1-1 select true | debconf-set-selections && \ + add-apt-repository -y ppa:webupd8team/java && \ + apt-get update && \ + apt-get install -y oracle-java8-installer && \ + rm -rf /var/lib/apt/lists/* && \ + rm -rf /var/cache/oracle-jdk8-installer + +# The Scala 2.12 build is currently recommended by the project. +ENV KAFKA_VERSION=0.10.2.1 \ + KAFKA_SCALA_VERSION=2.12 \ + KAFKA_JMX_PORT=7203 \ + JOLOKIA_AGENT_PORT=8778 +ENV KAFKA_RELEASE_ARCHIVE="kafka_${KAFKA_SCALA_VERSION}-${KAFKA_VERSION}.tgz" RUN mkdir /kafka /data /logs RUN apt-get update && \ DEBIAN_FRONTEND=noninteractive apt-get install -y \ - ca-certificates + ca-certificates curl # Download Kafka binary distribution ADD http://www.us.apache.org/dist/kafka/${KAFKA_VERSION}/${KAFKA_RELEASE_ARCHIVE} /tmp/ @@ -37,6 +49,11 @@ RUN tar -zx -C /kafka --strip-components=1 -f ${KAFKA_RELEASE_ARCHIVE} && \ ADD config /kafka/config ADD start.sh /start.sh +# Download Jolokia Agent +ENV JOLOKIA_AGENT_VERSION=1.3.6 +ADD http://search.maven.org/remotecontent?filepath=org/jolokia/jolokia-jvm/$JOLOKIA_AGENT_VERSION/jolokia-jvm-$JOLOKIA_AGENT_VERSION-agent.jar /jolokia/jolokia-jvm-$JOLOKIA_AGENT_VERSION-agent.jar +RUN chmod a+r /jolokia/jolokia-jvm-$JOLOKIA_AGENT_VERSION-agent.jar + # Set up a user to run Kafka RUN groupadd kafka && \ useradd -d /kafka -g kafka -s /bin/false kafka && \ @@ -45,8 +62,25 @@ USER kafka ENV PATH /kafka/bin:$PATH WORKDIR /kafka +ENV GROUP_MAX_SESSION_TIMEOUT_MS="300000" \ + JAVA_RMI_SERVER_HOSTNAME="" \ + KAFKA_BROKER_ID="" \ + KAFKA_DEFAULT_REPLICATION_FACTOR="1" \ + KAFKA_DELETE_TOPIC_ENABLE="false" \ + # KAFKA_ENABLE_JOLOKIA_AGENT="" \ + KAFKA_LOG4J_OPTS="" \ + KAFKA_LOG_DIRS="/data/data" \ + KAFKA_LOG_FLUSH_SCHEDULER_INTERVAL_MS="9223372036854775807" \ + KAFKA_LOG_RETENTION_HOURS="168" \ + KAFKA_NUM_PARTITIONS="1" \ + KAFKA_RECOVERY_THREADS_PER_DATA_DIR="1" \ + ZOOKEEPER_IP="localhost" \ + ZOOKEEPER_PORT="2181" \ + ZOOKEEPER_CONNECTION_TIMEOUT_MS="10000" \ + ZOOKEEPER_SESSION_TIMEOUT_MS="10000" + # broker, jmx -EXPOSE 9092 ${JMX_PORT} +EXPOSE 9092 ${KAFKA_JMX_PORT} VOLUME [ "/data", "/logs" ] CMD ["/start.sh"] diff --git a/config/log4j.properties b/config/log4j.properties index a768a95..65950df 100644 --- a/config/log4j.properties +++ b/config/log4j.properties @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -kafka.logs.dir=/logs +kafka.logs.dir=/logs/logs log4j.rootLogger=INFO, stdout diff --git a/config/server.properties.template b/config/server.properties.template index 81667f2..959c0c1 100644 --- a/config/server.properties.template +++ b/config/server.properties.template @@ -12,65 +12,120 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + # see kafka.server.KafkaConfig for additional details and defaults ############################# Server Basics ############################# # The id of the broker. This must be set to a unique integer for each broker. broker.id={{KAFKA_BROKER_ID}} -auto.leader.rebalance.enable=true - -# Replication -auto.create.topics.enable=true -default.replication.factor=1 - -# Hostname the broker will advertise to consumers. If not set, kafka will use the value returned -# from InetAddress.getLocalHost(). If there are multiple interfaces getLocalHost -# may not be what you want. -advertised.host.name={{KAFKA_ADVERTISED_HOST_NAME}} -# Enable topic deletion +# Switch to enable topic deletion or not, default value is false delete.topic.enable={{KAFKA_DELETE_TOPIC_ENABLE}} ############################# Socket Server Settings ############################# -# The port the socket server listens on -port={{KAFKA_PORT}} -advertised.port={{KAFKA_ADVERTISED_PORT}} +# The address the socket server listens on. It will get the value returned from +# java.net.InetAddress.getCanonicalHostName() if not configured. +# FORMAT: +# listeners = security_protocol://host_name:port +# EXAMPLE: +# listeners = PLAINTEXT://your.host.name:9092 +listeners=PLAINTEXT://{{KAFKA_LISTENER_HOST_NAME}}:9092 +# TODO: how to expose this through docker ENV? + +# Hostname and port the broker will advertise to producers and consumers. If not set, +# it uses the value for "listeners" if configured. Otherwise, it will use the value +# returned from java.net.InetAddress.getCanonicalHostName(). +advertised.listeners=PLAINTEXT://{{KAFKA_ADVERTISED_HOST_NAME}}:9092 +# TODO: how to expose this through docker ENV? + +# The number of threads handling network requests +num.network.threads=3 + +# The number of threads doing disk I/O +num.io.threads=8 + +# The send buffer (SO_SNDBUF) used by the socket server +socket.send.buffer.bytes=102400 + +# The receive buffer (SO_RCVBUF) used by the socket server +socket.receive.buffer.bytes=102400 + +# The maximum size of a request that the socket server will accept (protection against OOM) +socket.request.max.bytes=104857600 + ############################# Log Basics ############################# -# The directory under which to store log files -log.dir=/data -log.dirs=/data +# A comma separated list of directories under which to store log files +log.dirs={{KAFKA_LOG_DIRS}} + +# The default number of log partitions per topic. More partitions allow greater +# parallelism for consumption, but this will also result in more files across +# the brokers. +num.partitions={{KAFKA_NUM_PARTITIONS}} -# The number of logical partitions per topic per server. More partitions allow greater parallelism -# for consumption, but also mean more files. -num.partitions=1 +# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown. +# This value is recommended to be increased for installations with data dirs located in RAID array. +num.recovery.threads.per.data.dir={{KAFKA_RECOVERY_THREADS_PER_DATA_DIR}} + +############################# Log Flush Policy ############################# + +# Messages are immediately written to the filesystem but by default we only fsync() to sync +# the OS cache lazily. The following configurations control the flush of data to disk. +# There are a few important trade-offs here: +# 1. Durability: Unflushed data may be lost if you are not using replication. +# 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush. +# 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks. +# The settings below allow one to configure the flush policy to flush data after a period of time or +# every N messages (or both). This can be done globally and overridden on a per-topic basis. + +# The number of messages to accept before forcing a flush of data to disk +#log.flush.interval.messages=10000 + +# The maximum amount of time a message can sit in a log before we force a flush +#log.flush.interval.ms=1000 + +# The frequency in ms that the log flusher checks whether any log needs to be flushed to disk +# also doubles as log.flush.interval.ms (the maximum time in ms that a message in any topic is kept in memory before flushed to disk) +log.flush.scheduler.interval.ms={{KAFKA_LOG_FLUSH_SCHEDULER_INTERVAL_MS}} ############################# Log Retention Policy ############################# +# Default number of replicas for new topics +default.replication.factor={{KAFKA_DEFAULT_REPLICATION_FACTOR}} + # The following configurations control the disposal of log segments. The policy can # be set to delete segments after a period of time, or after a given size has accumulated. # A segment will be deleted whenever *either* of these criteria are met. Deletion always happens # from the end of the log. # The minimum age of a log file to be eligible for deletion -log.retention.hours=168 +log.retention.hours={{KAFKA_LOG_RETENTION_HOURS}} + +# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining +# segments don't drop below log.retention.bytes. +#log.retention.bytes=1073741824 + +# The maximum size of a log segment file. When this size is reached a new log segment will be created. +log.segment.bytes=1073741824 + +# The interval at which log segments are checked to see if they can be deleted according +# to the retention policies +log.retention.check.interval.ms=300000 ############################# Zookeeper ############################# -# Zk connection string (see zk docs for details). +# Zookeeper connection string (see zookeeper docs for details). # This is a comma separated host:port pairs, each corresponding to a zk # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002". # You can also append an optional chroot string to the urls to specify the # root directory for all kafka znodes. -zookeeper.connect={{ZOOKEEPER_CONNECTION_STRING}}{{ZOOKEEPER_CHROOT}} +zookeeper.connect={{ZOOKEEPER_CONNECTION_STRING}} zookeeper.connection.timeout.ms={{ZOOKEEPER_CONNECTION_TIMEOUT_MS}} zookeeper.session.timeout.ms={{ZOOKEEPER_SESSION_TIMEOUT_MS}} ############################# Additional Broker Settings ####################### -controlled.shutdown.enable=true -group.max.session.timeout.ms={{GROUP_MAX_SESSION_TIMEOUT_MS}} -# vim:set filetype=jproperties +group.max.session.timeout.ms={{GROUP_MAX_SESSION_TIMEOUT_MS}} diff --git a/kube-statefulset.yml b/kube-statefulset.yml new file mode 100644 index 0000000..57c2fba --- /dev/null +++ b/kube-statefulset.yml @@ -0,0 +1,96 @@ +apiVersion: v1 +kind: Service +metadata: + name: kafka-headless + labels: + app: kafka-headless +spec: + ports: + - port: 9092 + name: server + - port: 7203 + name: jmx + clusterIP: None + selector: + app: kafka +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: kafka-config +data: + # ref the zk ensemble config + jvm.heap: "-Xmx4G -Xms4G" +--- +apiVersion: apps/v1beta1 +kind: StatefulSet +metadata: + name: kafka +spec: + serviceName: kafka-headless + replicas: 5 + template: + metadata: + labels: + app: kafka + spec: + containers: + - name: kafka + imagePullPolicy: Always + image: registry/kafka:0.10.1.1 + resources: + requests: + memory: "4Gi" + cpu: "4" + ports: + - containerPort: 9092 + name: server + - containerPort: 7203 + name: jmx + env: + - name: ZOOKEEPER_CONNECTION_STRING # TODO replace with zk-config map + value: "zk-0.zk-headless.default.svc.cluster.local:2181/kafka-prod,zk-1.zk-headless.default.svc.cluster.local:2181/kafka-prod,zk-2.zk-headless.default.svc.cluster.local:2181/kafka-prod" + # - name: KAFKA_BROKER_ID # Inserted by /start.sh server.properties genscript + # value: "0" + - name: KAFKA_HEAP_OPTS + valueFrom: + configMapKeyRef: + name: kafka-config + key: jvm.heap + - name: KAFKA_DELETE_TOPIC_ENABLE + value: "true" + - name: KAFKA_LOG_RETENTION_HOURS + # 5 years + value: "43800" + # command: + # - sh + # - -c + # - zkGenConfig.sh && zkServer.sh start-foreground + # readinessProbe: + # exec: + # command: + # - "zkOk.sh" + # initialDelaySeconds: 15 + # timeoutSeconds: 5 + # livenessProbe: + # exec: + # command: + # - "zkOk.sh" + # initialDelaySeconds: 15 + # timeoutSeconds: 5 + # volumeMounts: + # - name: datadir + # mountPath: /var/lib/zookeeper + # securityContext: + # runAsUser: 1000 + # fsGroup: 1000 + # volumeClaimTemplates: + # - metadata: + # name: datadir + # annotations: + # volume.alpha.kubernetes.io/storage-class: ceph + # spec: + # accessModes: [ "ReadWriteOnce" ] + # resources: + # requests: + # storage: 20Gi \ No newline at end of file diff --git a/start.sh b/start.sh index c632a47..f5f0b41 100755 --- a/start.sh +++ b/start.sh @@ -5,23 +5,35 @@ [ -n "$ZOOKEEPER_PORT_2181_TCP_ADDR" ] && ZOOKEEPER_IP=$ZOOKEEPER_PORT_2181_TCP_ADDR [ -n "$ZOOKEEPER_PORT_2181_TCP_PORT" ] && ZOOKEEPER_PORT=$ZOOKEEPER_PORT_2181_TCP_PORT -IP=$(grep "\s${HOSTNAME}$" /etc/hosts | head -n 1 | awk '{print $1}') +IP=$(hostname -i) # Concatenate the IP:PORT for ZooKeeper to allow setting a full connection # string with multiple ZooKeeper hosts [ -z "$ZOOKEEPER_CONNECTION_STRING" ] && ZOOKEEPER_CONNECTION_STRING="${ZOOKEEPER_IP}:${ZOOKEEPER_PORT:-2181}" +# Let see if we can extract the ID from the name, making us K8S stateful set compatible +# expects $HOSTNAME in the format *-DIGIT +[ -z "$KAFKA_BROKER_ID" ] && KAFKA_BROKER_ID=$(echo $HOSTNAME | sed 's/.*-\([0-9]\+\)$/\1/') + +# I don't like to use NET=host for all my containers, so this is some bending-over-backwards work +[ -z "$KAFKA_ADVERTISED_HOST_NAME" ] && KAFKA_ADVERTISED_HOST_NAME=$(hostname -f) +[ -z "$KAFKA_LISTENER_HOST_NAME" ] && KAFKA_LISTENER_HOST_NAME=$(hostname -i) + cat /kafka/config/server.properties.template | sed \ - -e "s|{{ZOOKEEPER_CONNECTION_STRING}}|${ZOOKEEPER_CONNECTION_STRING}|g" \ - -e "s|{{ZOOKEEPER_CHROOT}}|${ZOOKEEPER_CHROOT:-}|g" \ + -e "s|{{GROUP_MAX_SESSION_TIMEOUT_MS}}|${GROUP_MAX_SESSION_TIMEOUT_MS:-300000}|g" \ + -e "s|{{KAFKA_ADVERTISED_HOST_NAME}}|${KAFKA_ADVERTISED_HOST_NAME}|g" \ -e "s|{{KAFKA_BROKER_ID}}|${KAFKA_BROKER_ID:-0}|g" \ - -e "s|{{KAFKA_ADVERTISED_HOST_NAME}}|${KAFKA_ADVERTISED_HOST_NAME:-$IP}|g" \ - -e "s|{{KAFKA_PORT}}|${KAFKA_PORT:-9092}|g" \ - -e "s|{{KAFKA_ADVERTISED_PORT}}|${KAFKA_ADVERTISED_PORT:-9092}|g" \ + -e "s|{{KAFKA_DEFAULT_REPLICATION_FACTOR}}|${KAFKA_DEFAULT_REPLICATION_FACTOR:-1}|g" \ -e "s|{{KAFKA_DELETE_TOPIC_ENABLE}}|${KAFKA_DELETE_TOPIC_ENABLE:-false}|g" \ + -e "s|{{KAFKA_LISTENER_HOST_NAME}}|${KAFKA_LISTENER_HOST_NAME}|g" \ + -e "s|{{KAFKA_LOG_DIRS}}|${KAFKA_LOG_DIRS:\/data\/data}|g" \ + -e "s|{{KAFKA_LOG_FLUSH_SCHEDULER_INTERVAL_MS}}|${KAFKA_LOG_FLUSH_SCHEDULER_INTERVAL_MS:-9223372036854775807}|g" \ + -e "s|{{KAFKA_LOG_RETENTION_HOURS}}|${KAFKA_LOG_RETENTION_HOURS:-168}|g" \ + -e "s|{{KAFKA_NUM_PARTITIONS}}|${KAFKA_NUM_PARTITIONS:-1}|g" \ + -e "s|{{KAFKA_RECOVERY_THREADS_PER_DATA_DIR}}|${KAFKA_RECOVERY_THREADS_PER_DATA_DIR:-1}|g" \ + -e "s|{{ZOOKEEPER_CONNECTION_STRING}}|${ZOOKEEPER_CONNECTION_STRING}|g" \ -e "s|{{ZOOKEEPER_CONNECTION_TIMEOUT_MS}}|${ZOOKEEPER_CONNECTION_TIMEOUT_MS:-10000}|g" \ -e "s|{{ZOOKEEPER_SESSION_TIMEOUT_MS}}|${ZOOKEEPER_SESSION_TIMEOUT_MS:-10000}|g" \ - -e "s|{{GROUP_MAX_SESSION_TIMEOUT_MS}}|${GROUP_MAX_SESSION_TIMEOUT_MS:-300000}|g" \ > /kafka/config/server.properties # Kafka's built-in start scripts set the first three system properties here, but @@ -38,13 +50,27 @@ cat /kafka/config/server.properties.template | sed \ # # https://issues.apache.org/jira/browse/CASSANDRA-7087 if [ -z $KAFKA_JMX_OPTS ]; then + LONG_HOSTNAME=$(hostname -f) KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote=true" KAFKA_JMX_OPTS="$KAFKA_JMX_OPTS -Dcom.sun.management.jmxremote.authenticate=false" KAFKA_JMX_OPTS="$KAFKA_JMX_OPTS -Dcom.sun.management.jmxremote.ssl=false" - KAFKA_JMX_OPTS="$KAFKA_JMX_OPTS -Dcom.sun.management.jmxremote.rmi.port=$JMX_PORT" - KAFKA_JMX_OPTS="$KAFKA_JMX_OPTS -Djava.rmi.server.hostname=${JAVA_RMI_SERVER_HOSTNAME:-$KAFKA_ADVERTISED_HOST_NAME} " + KAFKA_JMX_OPTS="$KAFKA_JMX_OPTS -Dcom.sun.management.jmxremote.rmi.port=${KAFKA_JMX_PORT:-7203}" + KAFKA_JMX_OPTS="$KAFKA_JMX_OPTS -Djava.net.preferIPv4Stack=true" + KAFKA_JMX_OPTS="$KAFKA_JMX_OPTS -Djava.rmi.server.hostname=${JAVA_RMI_SERVER_HOSTNAME:-$LONG_HOSTNAME} " export KAFKA_JMX_OPTS fi +if [ -z $JMX_PORT ]; then + export JMX_PORT="${KAFKA_JMX_PORT:-7203}" +fi + +if [ -n $KAFKA_ENABLE_JOLOKIA_AGENT ]; then + KAFKA_JOLOKIA_LISTEN_ADDR=$(hostname -f) + export KAFKA_OPTS="$KAFKA_OPTS -javaagent:/jolokia/jolokia-jvm-1.3.6-agent.jar=host=$KAFKA_JOLOKIA_LISTEN_ADDR" +fi + +# awful no-good hack for dealing with mounted FS +mkdir -p /data/data /logs/logs + echo "Starting kafka" -exec /kafka/bin/kafka-server-start.sh /kafka/config/server.properties +exec /kafka/bin/kafka-server-start.sh /kafka/config/server.properties $@