-
Notifications
You must be signed in to change notification settings - Fork 79
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Adds helper functions for working with transducers and an example.
- Loading branch information
Charles Reese
committed
Sep 23, 2019
1 parent
9b5bbd0
commit 354823e
Showing
8 changed files
with
390 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
log/ |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
# XF Word Count | ||
|
||
This is the classic 'word count' example done using transducers. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,28 @@ | ||
{:paths | ||
["src" "resources"] | ||
|
||
:deps | ||
{fundingcircle/jackdaw {:mvn/version "0.6.9-transducers-SNAPSHOT" | ||
:exclusions [org.apache.zookeeper/zookeeper]} | ||
org.clojure/clojure {:mvn/version "1.10.1"} | ||
org.clojure/tools.logging {:mvn/version "0.4.1"} | ||
org.apache.kafka/kafka-streams {:mvn/version "2.3.0"} | ||
org.apache.kafka/kafka-streams-test-utils {:mvn/version "2.3.0"} | ||
ch.qos.logback/logback-classic {:mvn/version "1.2.3"} | ||
integrant {:mvn/version "0.7.0"}} | ||
|
||
:aliases | ||
{:dev | ||
{:extra-paths ["dev" "../../dev"] | ||
:extra-deps {integrant/repl {:mvn/version "0.3.1"} | ||
danlentz/clj-uuid {:mvn/version "0.1.7" | ||
:exclusions [primitive-math]}}} | ||
|
||
:test | ||
{:extra-paths ["test"] | ||
:extra-deps {com.cognitect/test-runner {:git/url "https://github.com/cognitect-labs/test-runner.git" | ||
:sha "028a6d41ac9ac5d5c405dfc38e4da6b4cc1255d5"}} | ||
:main-opts ["-m" "cognitect.test-runner"]}} | ||
|
||
:mvn/repos | ||
{"confluent" {:url "https://packages.confluent.io/maven/"}}} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,71 @@ | ||
(ns user | ||
"Use this namespace for interactive development. | ||
This namespace requires libs needed to reset the app and helpers | ||
from `jackdaw.repl`. WARNING: Do no use `clj-refactor` (or | ||
equivalent) to clean this namespace since these tools cannot tell | ||
which libs are actually required." | ||
(:gen-class) | ||
(:require [clojure.string :as str] | ||
[integrant.core :as ig] | ||
[integrant.repl :refer [clear go halt prep init reset reset-all]] | ||
[jackdaw.admin :as ja] | ||
[jackdaw.serdes :as js] | ||
[jackdaw.repl :refer :all] | ||
[jackdaw.streams :as j] | ||
[jackdaw.streams.xform :as jxf] | ||
[xf-word-count :as xfwc]) | ||
(:import org.apache.kafka.streams.kstream.ValueTransformer | ||
[org.apache.kafka.streams.state KeyValueStore Stores] | ||
org.apache.kafka.streams.StreamsBuilder)) | ||
|
||
|
||
(def repl-config | ||
"The development config. | ||
When the 'dev' alias is active, this config will be used." | ||
{:topology {:topology-builder xfwc/topology-builder | ||
:xform xfwc/xf | ||
:swap-fn jxf/kv-store-swap-fn} | ||
|
||
:topics {:streams-config xfwc/streams-config | ||
:client-config (select-keys xfwc/streams-config | ||
["bootstrap.servers"]) | ||
:topology (ig/ref :topology)} | ||
|
||
:app {:streams-config xfwc/streams-config | ||
:topology (ig/ref :topology) | ||
:topics (ig/ref :topics)}}) | ||
|
||
|
||
(defmethod ig/init-key :topology [_ {:keys [topology-builder xform swap-fn]}] | ||
(let [streams-builder (j/streams-builder)] | ||
((topology-builder topic-metadata #(xform % swap-fn)) streams-builder))) | ||
|
||
(defmethod ig/init-key :topics [_ {:keys [streams-config client-config topology] | ||
:as opts}] | ||
(let [topic-metadata (topology->topic-metadata topology streams-config)] | ||
(with-open [client (ja/->AdminClient client-config)] | ||
(ja/create-topics! client (vals topic-metadata))) | ||
(assoc opts :topic-metadata topic-metadata))) | ||
|
||
(defmethod ig/init-key :app [_ {:keys [streams-config topology] :as opts}] | ||
(let [streams-app (j/kafka-streams topology streams-config)] | ||
(j/start streams-app) | ||
(assoc opts :streams-app streams-app))) | ||
|
||
(defmethod ig/halt-key! :topics [_ {:keys [client-config topic-metadata]}] | ||
(let [re (re-pattern (str "(" (->> topic-metadata | ||
keys | ||
(map name) | ||
(str/join "|")) | ||
")"))] | ||
(re-delete-topics client-config re))) | ||
|
||
(defmethod ig/halt-key! :app [_ {:keys [streams-config topics streams-app]}] | ||
(j/close streams-app) | ||
(destroy-state-stores streams-config) | ||
(let [re (re-pattern (str "(" (get streams-config "application.id") ")"))] | ||
(re-delete-topics (:client-config topics) re))) | ||
|
||
|
||
(integrant.repl/set-prep! (constantly repl-config)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
../../resources/logback.xml |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,147 @@ | ||
(ns xf-word-count | ||
"This is the classic 'word count' example done using transducers." | ||
(:gen-class) | ||
(:require [clojure.string :as str] | ||
[clojure.tools.logging :refer [info]] | ||
[jackdaw.serdes :as js] | ||
[jackdaw.streams :as j] | ||
[jackdaw.streams.xform :as jxf]) | ||
(:import org.apache.kafka.streams.kstream.ValueTransformer | ||
[org.apache.kafka.streams.state KeyValueStore Stores] | ||
org.apache.kafka.streams.StreamsBuilder)) | ||
|
||
|
||
(defn xf-running-total | ||
[state swap-fn] | ||
(fn [rf] | ||
(fn | ||
([] (rf)) | ||
([result] (rf result)) | ||
([result input] | ||
(let [next (as-> input % | ||
(swap-fn state #(merge-with (fnil + 0) %1 %2) %) | ||
(select-keys % (keys input)) | ||
(map vec %))] | ||
(rf result next)))))) | ||
|
||
(defn xf | ||
[state swap-fn] | ||
(comp | ||
(map (fn [x] (str/split x #" "))) | ||
(map frequencies) | ||
(xf-running-total state swap-fn))) | ||
|
||
|
||
(comment | ||
;; Use this comment block to explore Word Count using Clojure | ||
;; transducers. | ||
|
||
;; Launch a Clojure REPL: | ||
;; ``` | ||
;; cd <path-to-jackdaw>/examples/xf-word-count | ||
;; clj -A:dev | ||
;; ``` | ||
|
||
;; Emacs users: Open a project file, e.g. this one, and enter | ||
;; `M-x cider-jack-in`. | ||
|
||
;; Evaluate the form: | ||
(def coll | ||
["inside every large program" | ||
"is a small program" | ||
"struggling to get out"]) | ||
|
||
;; Let's counts the words. | ||
|
||
;; Evaluate the form: | ||
(transduce (xf (atom {}) swap!) concat coll) | ||
|
||
;; You should see output like the following: | ||
;; (["inside" 1] | ||
;; ["every" 1] | ||
;; ["large" 1] | ||
;; ["program" 1] | ||
;; ["is" 1] | ||
;; ["a" 1] | ||
;; ["small" 1] | ||
;; ["program" 2] | ||
;; ["struggling" 1] | ||
;; ["to" 1] | ||
;; ["get" 1] | ||
;; ["out" 1]) | ||
|
||
;; This time, let's count the words using | ||
;; `jackdaw.streams.xform/fake-kv-store` which implements the | ||
;; KeyValueStore interface with overrides for get and put." | ||
|
||
;; Evaluate the form: | ||
(transduce (xf (jxf/fake-kv-store {}) jxf/kv-store-swap-fn) concat coll) | ||
|
||
;; You should see the same output. | ||
) | ||
|
||
|
||
(def streams-config | ||
{"application.id" "xf-word-count" | ||
"bootstrap.servers" (or (System/getenv "BOOTSTRAP_SERVERS") "localhost:9092") | ||
"cache.max.bytes.buffering" "0"}) | ||
|
||
|
||
(defn topology-builder | ||
[{:keys [input output] :as topics} xf] | ||
(fn [builder] | ||
(jxf/add-state-store! builder) | ||
(-> (j/kstream builder input) | ||
(jxf/transduce-kstream xf) | ||
(j/to output)) | ||
builder)) | ||
|
||
|
||
(comment | ||
;; Use this comment block to explore Word Count as a stream | ||
;; processing applicaton. | ||
|
||
;; For more detail, see the comment block in | ||
;; <path-to-jackdaw>/examples/word-count/src/word_count.clj | ||
|
||
;; Start ZooKeeper and Kafka: | ||
;; ``` | ||
;; <path-to-directory>/bin/confluent local start kafka | ||
;; ``` | ||
|
||
;; Launch a Clojure REPL: | ||
;; ``` | ||
;; cd <path-to-jackdaw>/examples/xf-word-count | ||
;; clj -A:dev | ||
;; ``` | ||
|
||
;; Emacs users: Open a project file, e.g. this one, and enter | ||
;; `M-x cider-jack-in`. | ||
|
||
;; Evaluate the form: | ||
(reset) | ||
|
||
;; Evaluate the form: | ||
(let [coll ["inside every large program" | ||
"is a small program" | ||
"struggling to get out"]] | ||
(doseq [x coll] | ||
(publish (:input topic-metadata) nil x))) | ||
|
||
;; Evaluate the form: | ||
(get-keyvals (:output topic-metadata)) | ||
|
||
;; You should see output like the following: | ||
;; (["inside" 1] | ||
;; ["every" 1] | ||
;; ["large" 1] | ||
;; ["program" 1] | ||
;; ["is" 1] | ||
;; ["a" 1] | ||
;; ["small" 1] | ||
;; ["program" 2] | ||
;; ["struggling" 1] | ||
;; ["to" 1] | ||
;; ["get" 1] | ||
;; ["out" 1]) | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,76 @@ | ||
(ns xf-word-count-test | ||
(:gen-class) | ||
(:require [clojure.test :refer [deftest is]] | ||
[jackdaw.serdes :as js] | ||
[jackdaw.streams :as j] | ||
[jackdaw.streams.protocols :as jsp] | ||
[jackdaw.test :as jt] | ||
[jackdaw.test.fixtures :as jtf] | ||
[xf-word-count :as xfwc]) | ||
(:import java.util.Properties | ||
org.apache.kafka.streams.TopologyTestDriver)) | ||
|
||
(def test-config | ||
{:broker-config {"bootstrap.servers" "localhost:9092"} | ||
:topic-metadata {:input | ||
{:topic-name "input" | ||
:partition-count 1 | ||
:replication-factor 1 | ||
:key-serde (js/edn-serde) | ||
:value-serde (js/edn-serde)} | ||
|
||
:output | ||
{:topic-name "output" | ||
:partition-count 1 | ||
:replication-factor 1 | ||
:key-serde (js/edn-serde) | ||
:value-serde (js/edn-serde)}} | ||
:app-config xfwc/streams-config | ||
:enable? (System/getenv "BOOTSTRAP_SERVERS")}) | ||
|
||
(defn topology-builder | ||
[topic-metadata] | ||
(xfwc/topology-builder topic-metadata #(xfwc/xf % xfwc/kv-store-swap-fn))) | ||
|
||
(defn props-for | ||
[x] | ||
(doto (Properties.) | ||
(.putAll (reduce-kv (fn [m k v] | ||
(assoc m (str k) (str v))) | ||
{} | ||
x)))) | ||
|
||
(def mock-transport-config | ||
{:driver (let [streams-builder (j/streams-builder) | ||
topology ((topology-builder (:topic-metadata test-config)) streams-builder)] | ||
(TopologyTestDriver. (.build (j/streams-builder* topology)) | ||
(props-for (:app-config test-config))))}) | ||
|
||
(def test-transport | ||
(jt/mock-transport mock-transport-config (:topic-metadata test-config))) | ||
|
||
(defn done? | ||
[journal] | ||
(= 12 (count (get-in journal [:topics :output])))) | ||
|
||
(def commands | ||
[[:write! :input "inside every large program" {:key-fn (constantly "")}] | ||
[:write! :input "is a small program" {:key-fn (constantly "")}] | ||
[:write! :input "struggling to get out" {:key-fn (constantly "")}] | ||
[:watch done? {:timeout 2000}]]) | ||
|
||
(defn word-count | ||
[journal word] | ||
(->> (get-in journal [:topics :output]) | ||
(filter (fn [x] (= word (:key x)))) | ||
last | ||
:value)) | ||
|
||
(deftest test-xf-word-count | ||
(jtf/with-fixtures [(jtf/integration-fixture topology-builder test-config)] | ||
(jackdaw.test/with-test-machine test-transport | ||
(fn [machine] | ||
(let [{:keys [results journal]} (jackdaw.test/run-test machine commands)] | ||
|
||
(is (= 1 (word-count journal "large"))) | ||
(is (= 2 (word-count journal "program")))))))) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,63 @@ | ||
(ns jackdaw.streams.xform | ||
"Helper functions for working with transducers." | ||
(:gen-class) | ||
(:require [jackdaw.serdes :as js] | ||
[jackdaw.streams :as j]) | ||
(:import org.apache.kafka.streams.kstream.ValueTransformer | ||
[org.apache.kafka.streams.state KeyValueStore Stores] | ||
org.apache.kafka.streams.StreamsBuilder)) | ||
|
||
(defn fake-kv-store | ||
"Creates an instance of org.apache.kafka.streams.state.KeyValueStore | ||
with overrides for get and put." | ||
[init] | ||
(let [store (volatile! init)] | ||
(reify KeyValueStore | ||
(get [_ k] | ||
(clojure.core/get @store k)) | ||
|
||
(put [_ k v] | ||
(vswap! store assoc k v))))) | ||
|
||
(defn kv-store-swap-fn | ||
"Takes an instance of KeyValueStore, a function f, and a map m, and | ||
updates the store in a manner similar to `clojure.core/swap!`." | ||
[^KeyValueStore store f m] | ||
(let [prev (reduce (fn [m k] | ||
(assoc m k (.get store k))) | ||
{} | ||
(keys m)) | ||
next (f prev m)] | ||
(doall (map (fn [[k v]] (.put store k v)) next)) | ||
next)) | ||
|
||
(defn value-transformer | ||
"Creates an instance of org.apache.kafka.streams.kstream.ValueTransformer | ||
with overrides for init, transform, and close." | ||
[xf] | ||
(let [ctx (atom nil)] | ||
(reify | ||
ValueTransformer | ||
(init [_ context] | ||
(reset! ctx context)) | ||
(transform [_ v] | ||
(let [^KeyValueStore store (.getStateStore @ctx "transducer")] | ||
(first (into [] (xf store) [v])))) | ||
(close [_])))) | ||
|
||
(defn transduce-kstream | ||
[kstream xf] | ||
"Takes kstream and xf and transduces the stream." | ||
(-> kstream | ||
(j/transform-values (fn [] (value-transformer xf)) ["transducer"]) | ||
(j/flat-map (fn [[_ v]] v)))) | ||
|
||
(defn add-state-store! | ||
[builder] | ||
"Takes a builder and adds a state store." | ||
(doto ^StreamsBuilder (j/streams-builder* builder) | ||
(.addStateStore (Stores/keyValueStoreBuilder | ||
(Stores/persistentKeyValueStore "transducer") | ||
(js/edn-serde) | ||
(js/edn-serde)))) | ||
builder) |