Skip to content

Commit 27a1409

Browse files
committed
Merge branch 'master' of https://github.com/gojek/ziggurat
2 parents 8151590 + 2770614 commit 27a1409

File tree

4 files changed

+170
-8
lines changed

4 files changed

+170
-8
lines changed

resources/config.test.ci.edn

+1
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
:stream-threads-count [1 :int]
3939
:origin-topic "topic"
4040
:upgrade-from "1.1"
41+
:changelog-topic-replication-factor [1 :int]
4142
:channels {:channel-1 {:worker-count [10 :int]
4243
:retry {:type [:linear :keyword]
4344
:count [5 :int]

resources/config.test.edn

+1
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
:origin-topic "topic"
4343
:upgrade-from "1.1"
4444
:consumer-type :default
45+
:changelog-topic-replication-factor [1 :int]
4546
:channels {:channel-1 {:worker-count [10 :int]
4647
:retry {:type [:linear :keyword]
4748
:count [5 :int]

src/ziggurat/streams.clj

+54-7
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
[org.apache.kafka.clients.consumer ConsumerConfig]
1717
[org.apache.kafka.common.serialization Serdes]
1818
[org.apache.kafka.common.utils SystemTime]
19-
[org.apache.kafka.streams KafkaStreams StreamsConfig StreamsBuilder Topology]
19+
[org.apache.kafka.streams KafkaStreams KafkaStreams$State StreamsConfig StreamsBuilder Topology]
2020
[org.apache.kafka.streams.kstream JoinWindows ValueMapper TransformerSupplier ValueJoiner ValueTransformerSupplier]
2121
[org.apache.kafka.streams.state.internals KeyValueStoreBuilder RocksDbKeyValueBytesStoreSupplier]
2222
[ziggurat.timestamp_transformer IngestionTimeExtractor]
@@ -157,10 +157,29 @@
157157

158158
(declare stream)
159159

160+
(defn close-stream
161+
[topic-entity stream]
162+
(let [stream-state (.state stream)]
163+
(if (= stream-state KafkaStreams$State/NOT_RUNNING)
164+
(log/error
165+
(str
166+
"Kafka stream cannot be stopped at the moment, current state is "
167+
stream-state))
168+
(do
169+
(.close stream)
170+
(log/info
171+
(str "Stopping Kafka stream with topic-entity " topic-entity))))))
172+
173+
(defn stop-stream [topic-entity]
174+
(let [stream (get stream topic-entity)]
175+
(if stream
176+
(close-stream topic-entity stream)
177+
(log/error (str "No Kafka stream with provided topic-entity: " topic-entity " exists.")))))
178+
160179
(defn stop-streams [streams]
161180
(log/debug "Stopping Kafka streams")
162-
(doseq [stream streams]
163-
(.close stream)))
181+
(doseq [[topic-entity stream] streams]
182+
(close-stream topic-entity stream)))
164183

165184
(defn- traced-handler-fn [handler-fn channels message topic-entity]
166185
(let [parent-ctx (TracingKafkaUtils/extractSpanContext (:headers message) tracer)
@@ -273,12 +292,40 @@
273292
stream (start-stream* topic-handler-fn stream-config topic-entity channels)]
274293
(when-not (nil? stream)
275294
(.start stream)
276-
(conj streams stream))))
277-
[]
295+
(assoc streams topic-entity stream))))
296+
{}
278297
stream-routes)))
279298

299+
(defn- stream-object-evaluator
300+
[new-stream-map stream-object topic-entity]
301+
(let [stream-topic-entity (first stream-object)
302+
stream (second stream-object)]
303+
(if (and (= (.state stream) KafkaStreams$State/NOT_RUNNING)
304+
(= stream-topic-entity topic-entity))
305+
(let [new-stream-object-map (start-streams
306+
(hash-map stream-topic-entity
307+
(get (:stream-routes (mount/args))
308+
stream-topic-entity)))]
309+
(assoc new-stream-map
310+
stream-topic-entity
311+
(get new-stream-object-map stream-topic-entity)))
312+
(assoc new-stream-map stream-topic-entity stream))))
313+
314+
(defn start-stream
315+
[topic-entity]
316+
(if (seq (select-keys ziggurat.streams/stream [topic-entity]))
317+
(mount/start-with-states {#'ziggurat.streams/stream
318+
{:start (fn []
319+
(reduce (fn
320+
[new-stream-map stream-object]
321+
(stream-object-evaluator new-stream-map stream-object topic-entity))
322+
{}
323+
stream))
324+
:stop (fn [] (stop-streams stream))}})
325+
(log/error (str "No Kafka stream with provided topic-entity: " topic-entity " exists."))))
326+
280327
(defstate stream
281-
:start (do (log/info "Starting Kafka stream")
328+
:start (do (log/info "Starting Kafka streams")
282329
(start-streams (:stream-routes (mount/args)) (ziggurat-config)))
283-
:stop (do (log/info "Stopping Kafka stream")
330+
:stop (do (log/info "Stopping Kafka streams")
284331
(stop-streams stream)))

test/ziggurat/streams_test.clj

+114-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
(ns ziggurat.streams-test
22
(:require [clojure.test :refer [deftest is join-fixtures testing use-fixtures]]
33
[protobuf.core :as proto]
4-
[ziggurat.streams :refer [start-streams stop-streams]]
4+
[mount.core :as mount]
5+
[ziggurat.streams :refer [start-streams stop-streams stop-stream start-stream]]
56
[ziggurat.fixtures :as fix]
67
[ziggurat.config :refer [ziggurat-config]]
78
[ziggurat.middleware.default :as default-middleware]
@@ -15,6 +16,7 @@
1516
[kafka.utils MockTime]
1617
[org.apache.kafka.clients.producer ProducerConfig]
1718
[org.apache.kafka.streams KeyValue]
19+
[org.apache.kafka.streams KafkaStreams$State]
1820
[org.apache.kafka.streams.integration.utils IntegrationTestUtils]
1921
[io.opentracing.tag Tags]))
2022

@@ -59,6 +61,16 @@
5961
(swap! message-received-count inc))
6062
:success)))
6163

64+
(defn- poll-to-check-if-running
65+
([stream]
66+
(poll-to-check-if-running stream :default))
67+
([stream topic-entity]
68+
(let [threshold 0]
69+
(while (and (not= (.state (get stream topic-entity)) KafkaStreams$State/RUNNING)
70+
(> threshold 30))
71+
(Thread/sleep 2000)
72+
(inc threshold)))))
73+
6274
(defn- rand-application-id []
6375
(str "test" "-" (rand-int 999999999)))
6476

@@ -102,6 +114,107 @@
102114
(stop-streams streams)
103115
(is (= times @message-received-count))))
104116

117+
(deftest stop-stream-test
118+
(let [message-received-count (atom 0)
119+
mapped-fn (get-mapped-fn message-received-count)
120+
handler-fn (default-middleware/protobuf->hash mapped-fn proto-class :default)
121+
_ (mount/start)]
122+
(mount/start-with-states {#'ziggurat.streams/stream {:start (fn [] (start-streams {:default {:handler-fn handler-fn}}
123+
(ziggurat-config)))
124+
:stop (fn [] (stop-streams #'ziggurat.streams/stream))}}))
125+
(poll-to-check-if-running ziggurat.streams/stream)
126+
(stop-stream :default)
127+
(is (not= (.state (get ziggurat.streams/stream :default)) KafkaStreams$State/RUNNING)))
128+
129+
(deftest start-stopped-stream-test
130+
(let [message-received-count (atom 0)
131+
mapped-fn (get-mapped-fn message-received-count)
132+
handler-fn (default-middleware/protobuf->hash mapped-fn proto-class :default)
133+
_ (mount/start)]
134+
(mount/start-with-states {#'ziggurat.streams/stream {:start (fn [] (start-streams {:default {:handler-fn handler-fn}}
135+
(ziggurat-config)))
136+
:stop (fn [] (stop-streams ziggurat.streams/stream))}}))
137+
(poll-to-check-if-running ziggurat.streams/stream)
138+
(stop-stream :default)
139+
(is (not= (.state (get ziggurat.streams/stream :default)) KafkaStreams$State/RUNNING))
140+
(start-stream :default)
141+
(poll-to-check-if-running ziggurat.streams/stream)
142+
(is (not= (.state (get ziggurat.streams/stream :default)) KafkaStreams$State/NOT_RUNNING)))
143+
144+
(deftest stop-restarted-stream-test
145+
(let [message-received-count (atom 0)
146+
mapped-fn (get-mapped-fn message-received-count)
147+
handler-fn (default-middleware/protobuf->hash mapped-fn proto-class :default)
148+
_ (mount/start)]
149+
(mount/start-with-states {#'ziggurat.streams/stream {:start (fn [] (start-streams {:default {:handler-fn handler-fn}}
150+
(ziggurat-config)))
151+
:stop (fn [] (stop-streams ziggurat.streams/stream))}}))
152+
(poll-to-check-if-running ziggurat.streams/stream)
153+
(stop-stream :default)
154+
(is (not= (.state (get ziggurat.streams/stream :default)) KafkaStreams$State/RUNNING))
155+
(start-stream :default)
156+
(poll-to-check-if-running ziggurat.streams/stream :default)
157+
(is (not= (.state (get ziggurat.streams/stream :default)) KafkaStreams$State/NOT_RUNNING))
158+
(stop-stream :default)
159+
(is (not= (.state (get ziggurat.streams/stream :default)) KafkaStreams$State/RUNNING)))
160+
161+
(deftest stop-desired-stream-only-test
162+
(let [message-received-count (atom 0)
163+
mapped-fn (get-mapped-fn message-received-count)
164+
handler-fn (default-middleware/protobuf->hash mapped-fn proto-class :default)
165+
_ (mount/start)]
166+
(mount/start-with-states {#'ziggurat.streams/stream {:start (fn [] (start-streams {:default {:handler-fn handler-fn}
167+
:using-string-serde {:handler-fn handler-fn}}
168+
(-> (ziggurat-config)
169+
(assoc-in [:stream-router :default :application-id] (rand-application-id))
170+
(assoc-in [:stream-router :default :changelog-topic-replication-factor] changelog-topic-replication-factor)
171+
(assoc-in [:stream-router :using-string-serde :application-id] (rand-application-id))
172+
(assoc-in [:stream-router :using-string-serde :changelog-topic-replication-factor] changelog-topic-replication-factor))))
173+
:stop (fn [] (stop-streams ziggurat.streams/stream))}}))
174+
(poll-to-check-if-running ziggurat.streams/stream)
175+
(stop-stream :default)
176+
(is (not= (.state (get ziggurat.streams/stream :default)) KafkaStreams$State/RUNNING))
177+
(is (not= (.state (get ziggurat.streams/stream :using-string-serde)) KafkaStreams$State/NOT_RUNNING)))
178+
179+
(deftest stop-duplicate-stream-test
180+
(let [message-received-count (atom 0)
181+
mapped-fn (get-mapped-fn message-received-count)
182+
handler-fn (default-middleware/protobuf->hash mapped-fn proto-class :default)
183+
_ (mount/start)]
184+
(mount/start-with-states {#'ziggurat.streams/stream {:start (fn [] (start-streams {:default {:handler-fn handler-fn}}
185+
(ziggurat-config)))
186+
:stop (fn [] (stop-streams ziggurat.streams/stream))}}))
187+
(poll-to-check-if-running ziggurat.streams/stream)
188+
(stop-stream :default)
189+
(is (not= (.state (get ziggurat.streams/stream :default)) KafkaStreams$State/RUNNING))
190+
(stop-stream :default))
191+
192+
(deftest stop-invalid-stream-test
193+
(let [is-close-called (atom 0)
194+
mapped-fn (get-mapped-fn is-close-called)
195+
handler-fn (default-middleware/protobuf->hash mapped-fn proto-class :default)
196+
_ (mount/start)]
197+
(mount/start-with-states {#'ziggurat.streams/stream {:start (fn [] (start-streams {:default {:handler-fn handler-fn}}
198+
(ziggurat-config)))
199+
:stop (fn [] (stop-streams ziggurat.streams/stream))}})
200+
(poll-to-check-if-running ziggurat.streams/stream)
201+
(with-redefs [ziggurat.streams/close-stream (fn [] (swap! is-close-called inc))]
202+
(stop-stream :invalid-topic-entity)
203+
(is (= @is-close-called 0)))))
204+
205+
(deftest start-invalid-stream-test
206+
(let [is-close-called (atom 0)
207+
mapped-fn (get-mapped-fn is-close-called)
208+
handler-fn (default-middleware/protobuf->hash mapped-fn proto-class :default)
209+
_ (mount/start)]
210+
(mount/start-with-states {#'ziggurat.streams/stream {:start (fn [] (start-streams {:default {:handler-fn handler-fn}}
211+
(ziggurat-config)))
212+
:stop (fn [] (stop-streams ziggurat.streams/stream))}}))
213+
(let [is-close-called (atom 0)]
214+
(with-redefs [mount/start-with-states (fn [] (swap! is-close-called inc))]
215+
(start-stream :invalid-topic-entity)
216+
(is (= @is-close-called 0)))))
217+
105218
(deftest start-stream-joins-test
106219
(testing "stream joins using inner join"
107220
(let [orig-config (ziggurat-config)]

0 commit comments

Comments
 (0)