Skip to content

Commit 1ccb2f0

Browse files
authored
Merge pull request #219 from gojek/support-all-kafka-stream-consumer-configs
support all consumer, producer and stream configurations
2 parents 717a7dc + 86ed386 commit 1ccb2f0

File tree

10 files changed

+268
-350
lines changed

10 files changed

+268
-350
lines changed

CHANGELOG.md

+4-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
1-
# Change Log
1+
# Changelog
22

33
All notable changes to this project will be documented in this file. This change log follows the conventions of [keepachangelog.com](http://keepachangelog.com/).
44

5+
## 3.13.0
6+
- Supports all of the official Kafka configurations for [Streams API](https://docs.confluent.io/platform/current/installation/configuration/streams-configs.html), [Consumer API](https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html) and [Producer API](https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html)
7+
58
## 3.12.0
69
- Uses Kafka Streams client 2.7.0
710
- Introduces default.api.timeout.ms for Kafka Consumer API

README.md

+2
Original file line numberDiff line numberDiff line change
@@ -466,6 +466,8 @@ Ziggurat Config | Default Value | Description | Mandatory?
466466

467467
## Configuration
468468

469+
As of Ziggurat version 3.13.0, all the official Kafka configs Kafka configurations for [Streams API](https://docs.confluent.io/platform/current/installation/configuration/streams-configs.html), [Consumer API](https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html) and [Producer API](https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html) are supported.
470+
469471
All Ziggurat configs should be in your `clonfig` `config.edn` under the `:ziggurat` key.
470472

471473
```clojure

project.clj

+1-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
(cemerick.pomegranate.aether/register-wagon-factory!
33
"http" #(org.apache.maven.wagon.providers.http.HttpWagon.))
44

5-
(defproject tech.gojek/ziggurat "3.12.0"
5+
(defproject tech.gojek/ziggurat "3.13.0"
66
:description "A stream processing framework to build stateless applications on kafka"
77
:url "https://github.com/gojektech/ziggurat"
88
:license {:name "Apache License, Version 2.0"

src/ziggurat/config.clj

+74-1
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
(ns ziggurat.config
22
(:require [clojure.edn :as edn]
33
[clojure.java.io :as io]
4+
[clojure.string :as str]
45
[clonfig.core :as clonfig]
56
[mount.core :refer [defstate]]
67
[ziggurat.util.java-util :as util])
8+
(:import (java.util Properties))
79
(:gen-class
810
:methods [^{:static true} [get [String] Object]
911
^{:static true} [getIn [java.lang.Iterable] Object]]
@@ -81,7 +83,7 @@
8183

8284
(defn statsd-config []
8385
(let [cfg (ziggurat-config)]
84-
(get cfg :statsd (:datadog cfg)))) ;; TODO: remove datadog in the future
86+
(get cfg :statsd (:datadog cfg)))) ;; TODO: remove datadog in the future
8587

8688
(defn get-in-config
8789
([ks]
@@ -108,3 +110,74 @@
108110
(defn -get [^String key]
109111
(let [config-vals (get config (keyword key))]
110112
(java-response config-vals)))
113+
114+
(def consumer-config-mapping-table
115+
{:auto-offset-reset-config :auto-offset-reset
116+
:commit-interval-ms :auto-commit-interval-ms
117+
:consumer-group-id :group-id
118+
:default-api-timeout-ms-config :default-api-timeout-ms
119+
:key-deserializer-class-config :key-deserializer
120+
:session-timeout-ms-config :session-timeout-ms
121+
:value-deserializer-class-config :value-deserializer})
122+
123+
(def producer-config-mapping-table
124+
{:key-serializer-class :key-serializer
125+
:retries-config :retries
126+
:value-serializer-class :value-serializer})
127+
128+
(def streams-config-mapping-table
129+
{:auto-offset-reset-config :auto-offset-reset
130+
:default-api-timeout-ms-config :default-api-timeout-ms
131+
:changelog-topic-replication-factor :replication-factor
132+
:session-timeout-ms-config :session-timeout-ms
133+
:stream-threads-count :num-stream-threads})
134+
135+
(def ^:private non-kafka-config-keys
136+
[:channels
137+
:consumer-type
138+
:input-topics
139+
:join-cfg
140+
:oldest-processed-message-in-s
141+
:origin-topic
142+
:poll-timeout-ms-config
143+
:producer
144+
:thread-count])
145+
146+
(defn- to-list
147+
[s]
148+
(if (empty? s)
149+
(list)
150+
(list s)))
151+
152+
(defn- to-string-key
153+
[mapping-table k]
154+
(-> (get mapping-table k k)
155+
(name)
156+
(str/replace #"-" ".")))
157+
158+
(defn- normalize-value
159+
[v]
160+
(str/trim
161+
(cond
162+
(keyword? v) (name v)
163+
:else (str v))))
164+
165+
(defn set-property
166+
[mapping-table p k v]
167+
(when-not (some #(= k %) non-kafka-config-keys)
168+
(let [string-key (to-list (to-string-key mapping-table k))
169+
norm-value (to-list (normalize-value v))]
170+
(doseq [sk string-key
171+
nv norm-value]
172+
(.setProperty p sk nv))))
173+
p)
174+
175+
(defn build-properties
176+
[set-property-fn m]
177+
(reduce-kv set-property-fn (Properties.) m))
178+
179+
(def build-consumer-config-properties (partial build-properties (partial set-property consumer-config-mapping-table)))
180+
181+
(def build-producer-config-properties (partial build-properties (partial set-property producer-config-mapping-table)))
182+
183+
(def build-streams-config-properties (partial build-properties (partial set-property streams-config-mapping-table)))
+9-33
Original file line numberDiff line numberDiff line change
@@ -1,48 +1,24 @@
11
(ns ziggurat.kafka-consumer.consumer
22
(:require [clojure.tools.logging :as log]
3-
[ziggurat.kafka-consumer.consumer-handler :refer :all]
3+
[ziggurat.config :as cfg]
44
[ziggurat.util.map :as umap])
5-
(:import (java.util Map Properties)
6-
(org.apache.kafka.clients.consumer KafkaConsumer ConsumerConfig)
7-
(java.util.regex Pattern)))
5+
(:import (java.util.regex Pattern)
6+
(org.apache.kafka.clients.consumer KafkaConsumer)))
87

98
(def default-consumer-config
10-
{:commit-interval-ms 15000
11-
:max-poll-records 500
12-
:session-timeout-ms-config 60000
13-
:enable-auto-commit true
14-
:default-api-timeout-ms-config 60000
15-
:key-deserializer-class-config "org.apache.kafka.common.serialization.ByteArrayDeserializer"
9+
{:commit-interval-ms 15000
10+
:session-timeout-ms-config 60000
11+
:default-api-timeout-ms-config 60000
12+
:key-deserializer-class-config "org.apache.kafka.common.serialization.ByteArrayDeserializer"
1613
:value-deserializer-class-config "org.apache.kafka.common.serialization.ByteArrayDeserializer"})
1714

18-
(defn- build-consumer-properties-map
19-
[{:keys [bootstrap-servers
20-
consumer-group-id
21-
max-poll-records
22-
session-timeout-ms-config
23-
commit-interval-ms
24-
enable-auto-commit
25-
key-deserializer-class-config
26-
value-deserializer-class-config
27-
default-api-timeout-ms-config]}]
28-
(doto (Properties.)
29-
(.putAll {ConsumerConfig/BOOTSTRAP_SERVERS_CONFIG bootstrap-servers
30-
ConsumerConfig/GROUP_ID_CONFIG consumer-group-id
31-
ConsumerConfig/MAX_POLL_RECORDS_CONFIG (int max-poll-records)
32-
ConsumerConfig/SESSION_TIMEOUT_MS_CONFIG (int session-timeout-ms-config)
33-
ConsumerConfig/ENABLE_AUTO_COMMIT_CONFIG enable-auto-commit
34-
ConsumerConfig/AUTO_COMMIT_INTERVAL_MS_CONFIG (int commit-interval-ms)
35-
ConsumerConfig/KEY_DESERIALIZER_CLASS_CONFIG key-deserializer-class-config
36-
ConsumerConfig/VALUE_DESERIALIZER_CLASS_CONFIG value-deserializer-class-config
37-
ConsumerConfig/DEFAULT_API_TIMEOUT_MS_CONFIG (int default-api-timeout-ms-config)})))
3815
(defn create-consumer
3916
[topic-entity consumer-group-config]
4017
(try
4118
(let [merged-consumer-group-config (umap/deep-merge consumer-group-config default-consumer-config)
42-
consumer (KafkaConsumer. ^Map (build-consumer-properties-map merged-consumer-group-config))
43-
topic-pattern (Pattern/compile (:origin-topic merged-consumer-group-config))]
19+
consumer (KafkaConsumer. (cfg/build-consumer-config-properties merged-consumer-group-config))
20+
topic-pattern (Pattern/compile (:origin-topic merged-consumer-group-config))]
4421
(.subscribe consumer topic-pattern)
4522
consumer)
4623
(catch Exception e
4724
(log/error e "Exception received while creating Kafka Consumer for: " topic-entity))))
48-

src/ziggurat/producer.clj

+16-87
Original file line numberDiff line numberDiff line change
@@ -37,113 +37,42 @@
3737
- key.serializer
3838
- value.serializer
3939
- max.in.flight.requests.per.connection
40-
- enable.idempotencecd
40+
- enable.idempotence
4141
4242
Please see [producer configs](http://kafka.apache.org/documentation.html#producerconfigs)
4343
for a complete list of all producer configs available in Kafka."
44-
45-
(:require [ziggurat.config :refer [ziggurat-config]]
46-
[clojure.tools.logging :as log]
44+
(:refer-clojure :exclude [send])
45+
(:require [clojure.tools.logging :as log]
4746
[mount.core :refer [defstate]]
48-
[camel-snake-kebab.core :as csk]
47+
[ziggurat.config :refer [build-producer-config-properties ziggurat-config]]
4948
[ziggurat.tracer :refer [tracer]]
50-
[ziggurat.util.java-util :refer [get-key]]
51-
[schema.core :as s])
52-
(:import (org.apache.kafka.clients.producer KafkaProducer ProducerRecord ProducerConfig)
53-
(java.util Properties)
54-
(io.opentracing.contrib.kafka TracingKafkaProducer))
49+
[ziggurat.util.java-util :refer [get-key]])
50+
(:import (io.opentracing.contrib.kafka TracingKafkaProducer)
51+
(org.apache.kafka.clients.producer KafkaProducer ProducerRecord))
5552
(:gen-class
56-
:name tech.gojek.ziggurat.internal.Producer
5753
:methods [^{:static true} [send [String String Object Object] java.util.concurrent.Future]
58-
^{:static true} [send [String String int Object Object] java.util.concurrent.Future]]))
59-
60-
(defn *implements-serializer?* [serializer-class]
61-
(contains? (set (.getInterfaces (Class/forName serializer-class)))
62-
(Class/forName "org.apache.kafka.common.serialization.Serializer")))
63-
64-
(def implements-serializer? (s/pred *implements-serializer?* 'implements-serializer?))
65-
66-
(s/defschema ProducerConfigSchema {(s/required-key :bootstrap-servers) s/Any
67-
(s/optional-key :key-serializer-class) implements-serializer?
68-
(s/optional-key :value-serializer-class) implements-serializer?
69-
(s/optional-key :key-serializer) implements-serializer?
70-
(s/optional-key :value-serializer) implements-serializer?
71-
(s/optional-key :retries-config) s/Any
72-
(s/optional-key :metadata-max-age) s/Any
73-
(s/optional-key :reconnect-backoff-ms) s/Any
74-
(s/optional-key :client-id) s/Any
75-
(s/optional-key :metric-num-samples) s/Any
76-
(s/optional-key :transaction-timeout) s/Any
77-
(s/optional-key :retries) s/Any
78-
(s/optional-key :retry-backoff-ms) s/Any
79-
(s/optional-key :receive-buffer) s/Any
80-
(s/optional-key :partitioner-class) s/Any
81-
(s/optional-key :max-block-ms) s/Any
82-
(s/optional-key :metrics-reporter-classes) s/Any
83-
(s/optional-key :compression-type) s/Any
84-
(s/optional-key :max-request-size) s/Any
85-
(s/optional-key :delivery-timeout-ms) s/Any
86-
(s/optional-key :metrics-sample-window-ms) s/Any
87-
(s/optional-key :request-timeout-ms) s/Any
88-
(s/optional-key :buffer-memory) s/Any
89-
(s/optional-key :interceptor-classes) s/Any
90-
(s/optional-key :linger-ms) s/Any
91-
(s/optional-key :connections-max-idle-ms) s/Any
92-
(s/optional-key :acks) s/Any
93-
(s/optional-key :enable-idempotence) s/Any
94-
(s/optional-key :metrics-recording-level) s/Any
95-
(s/optional-key :transactional-id) s/Any
96-
(s/optional-key :reconnect-backoff-max-ms) s/Any
97-
(s/optional-key :client-dns-lookup) s/Any
98-
(s/optional-key :max-in-flight-requests-per-connection) s/Any
99-
(s/optional-key :send-buffer) s/Any
100-
(s/optional-key :batch-size) s/Any})
101-
102-
(def valid-configs? (partial s/validate ProducerConfigSchema))
103-
104-
(def explain-str (partial s/explain ProducerConfigSchema))
105-
106-
(defn property->fn [field-name]
107-
(let [raw-field-name (condp = field-name
108-
:max-in-flight-requests-per-connection "org.apache.kafka.clients.producer.ProducerConfig/%s"
109-
:retries-config "org.apache.kafka.clients.producer.ProducerConfig/RETRIES_CONFIG"
110-
:key-serializer "org.apache.kafka.clients.producer.ProducerConfig/KEY_SERIALIZER_CLASS_CONFIG"
111-
:value-serializer "org.apache.kafka.clients.producer.ProducerConfig/VALUE_SERIALIZER_CLASS_CONFIG"
112-
"org.apache.kafka.clients.producer.ProducerConfig/%s_CONFIG")]
113-
(->> field-name
114-
csk/->SCREAMING_SNAKE_CASE_STRING
115-
(format raw-field-name)
116-
(read-string))))
117-
118-
(defn producer-properties [config-map]
119-
(if (valid-configs? config-map)
120-
(let [props (Properties.)]
121-
(doseq [config-key (keys config-map)]
122-
(.setProperty props
123-
(eval (property->fn config-key))
124-
(str (get config-map config-key))))
125-
props)
126-
(throw (ex-info (explain-str config-map) config-map))))
54+
^{:static true} [send [String String int Object Object] java.util.concurrent.Future]]
55+
:name tech.gojek.ziggurat.internal.Producer))
12756

12857
(defn producer-properties-map []
12958
(reduce (fn [producer-map [stream-config-key stream-config]]
13059
(let [producer-config (:producer stream-config)]
13160
(if (some? producer-config)
132-
(assoc producer-map stream-config-key (producer-properties producer-config))
61+
(assoc producer-map stream-config-key (build-producer-config-properties producer-config))
13362
producer-map)))
13463
{}
13564
(seq (:stream-router (ziggurat-config)))))
13665

66+
(declare kafka-producers)
67+
13768
(defstate kafka-producers
13869
:start (if (not-empty (producer-properties-map))
13970
(do (log/info "Starting Kafka producers ...")
14071
(reduce (fn [producers [stream-config-key properties]]
141-
(do (log/debug "Constructing Kafka producer associated with [" stream-config-key "] ")
142-
(let [_ (println properties)
143-
kp (KafkaProducer. properties)
144-
_ (println kp)
145-
tkp (TracingKafkaProducer. kp tracer)]
146-
(assoc producers stream-config-key tkp))))
72+
(log/debug "Constructing Kafka producer associated with [" stream-config-key "] ")
73+
(let [kp (KafkaProducer. properties)
74+
tkp (TracingKafkaProducer. kp tracer)]
75+
(assoc producers stream-config-key tkp)))
14776
{}
14877
(seq (producer-properties-map))))
14978
(log/info "No producers found. Can not initiate start."))

0 commit comments

Comments
 (0)