Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

new sliding window by time function #374

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,5 @@ pom.xml.asc
/examples/*/target
.clj-kondo/
.lsp/
.calva/
.calva/
.portal/
1 change: 0 additions & 1 deletion project.clj
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
:description "A Clojure library for the Apache Kafka distributed streaming platform."

:license {:name "BSD 3-clause" :url "http://opensource.org/licenses/BSD-3-Clause"}

:scm {:name "git" :url "https://github.com/fundingcircle/jackdaw"}

:url "https://github.com/FundingCircle/jackdaw/"
Expand Down
13 changes: 13 additions & 0 deletions src/jackdaw/streams.clj
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,19 @@
([kgroupedstream window]
(p/windowed-by-session kgroupedstream window)))

(defn sliding-window-by-time
"Windows the KStream using a sliding window"
([kgroupedstream window topic-config] ; default aggregation (sum)
(sliding-window-by-time kgroupedstream window topic-config (fn [] 0) (fn [aggr [_k v]] (+ aggr v))))
([kgroupedstream window topic-config initializer-fn aggregator-fn]
(-> kgroupedstream
(p/sliding-window-by-time window)
(aggregate initializer-fn
aggregator-fn
(assoc topic-config :topic-name "sliding-window-store"))
(suppress {})
(to-kstream))))

(defn kgroupedstream*
"Returns the underlying KGroupedStream object."
([kgroupedstream]
Expand Down
9 changes: 7 additions & 2 deletions src/jackdaw/streams/interop.clj
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
KeyValueMapper Materialized Merger Predicate Printed Produced
Reducer SessionWindowedKStream SessionWindows
Suppressed Suppressed$BufferConfig TimeWindowedKStream ValueJoiner
ValueMapper ValueTransformerSupplier Windows ForeachAction TransformerSupplier]
ValueMapper ValueTransformerSupplier Windows ForeachAction TransformerSupplier SlidingWindows]
[org.apache.kafka.streams.processor.api
ProcessorSupplier]
[org.apache.kafka.streams.state Stores]))
Expand Down Expand Up @@ -135,7 +135,7 @@
key-serde
value-serde))
builder)

(streams-builder*
[_]
streams-builder))
Expand Down Expand Up @@ -603,6 +603,11 @@
(clj-session-windowed-kstream
(.windowedBy ^KGroupedStream kgroupedstream ^SessionWindows windows)))

(sliding-window-by-time
[_ windows]
(clj-time-windowed-kstream
(.windowedBy ^KGroupedStream kgroupedstream ^SlidingWindows windows)))

(kgroupedstream*
[_]
kgroupedstream))
Expand Down
5 changes: 4 additions & 1 deletion src/jackdaw/streams/protocols.clj
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
[topology-builder store-config]
"Adds a persistent state store to the topology with the configured name
and serdes.")

(streams-builder*
[streams-builder]
"Returns the underlying KStreamBuilder."))
Expand Down Expand Up @@ -254,6 +253,10 @@

(windowed-by-session [kgroupedstream window])

(sliding-window-by-time
[kgroupedstream window]
"Windows the KGroupedStream using a sliding time window.")

(kgroupedstream*
[kgroupedstream]
"Returns the underlying KGroupedStream object."))
Expand Down
54 changes: 53 additions & 1 deletion test/jackdaw/streams_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
(:import [java.time Duration]
[org.apache.kafka.streams.kstream
JoinWindows SessionWindows TimeWindows Transformer
ValueTransformer]
ValueTransformer SlidingWindows]
org.apache.kafka.streams.StreamsBuilder
[org.apache.kafka.common.serialization Serdes]))

Expand Down Expand Up @@ -1095,6 +1095,58 @@
(is (= [0 3] (second keyvals)))
(is (= [0 4] (nth keyvals 2))))))

(testing "sliding-window-by-time"
(let [topic-a (mock/topic "topic-a")
topic-b (mock/topic "topic-b")
window-size (Duration/ofMillis 1000)
driver (mock/build-driver (fn [builder]
(-> builder
(k/kstream topic-a)
;; (k/peek (fn [[k v]]
;; (println "Input:" k v)))
(k/group-by (fn [[k v]]
(let [result (long (/ k 10))]
;; (println "Group-by key:" result)
result))
topic-a)
(k/sliding-window-by-time
(SlidingWindows/ofTimeDifferenceWithNoGrace window-size)
topic-a)
(k/map (fn [[k v]]
(let [original-key (.key k)]
;; (println "key val:" original-key v
;; "\nWindow:" k)
[original-key v])))
(k/to topic-b))))
publish (partial mock/publish driver topic-a)]

(publish 1000 1 1)
(publish 1500 1 2)
(publish 1900 1 3)
(publish 2100 1 4)
(publish 2500 1 5)
(publish 3000 1 6)
(publish 3500 1 7)


(let [keyvals (mock/get-keyvals driver topic-b)]
;; (println "Total keyvals:" (count keyvals))
;; (doseq [kv keyvals]
;; (println "Keyval:" kv))
(is (= 11 (count keyvals)))
(is (= [[0 1] ; Window 0-1000
[0 3] ; Window 500-1500
[0 6] ; Window 900-1900
[0 5] ; Window 1001-2001
[0 9] ; Window 1100-2100
[0 14] ; Window 1500-2500
[0 12] ; Window 1501-2501
[0 9] ; Window 1901-2901
[0 15] ; Window 2000-3000
[0 11] ; Window 2101-3101
[0 18]] ;Window 2500-3500
keyvals)))))

(testing "windowed-by-time with string keys"
(let [topic-a (assoc (mock/topic "topic-a") :key-serde (Serdes/String))
topic-b (assoc (mock/topic "topic-b") :key-serde (Serdes/String))
Expand Down