From 900d01078bc9857007bcfcfafdd9d5990d7a19c8 Mon Sep 17 00:00:00 2001 From: Indrajith Indraprastham Date: Fri, 20 Oct 2023 09:15:36 +0530 Subject: [PATCH] [Feature/EV-38] Remove opentracing code (#275) * [EV-38][Indra] Remove modules tracer and tracer-test * [EV-38][Indra] Remove opentracing tests from consumer-connection test * [EV-38][Indra] Remove opentracing from connection_helper * [EV-38][Indra] Remove opentracing from ziggurat.producer * [EV-38][Indra] Remove opentracing tests from ziggurat.messaging.producer-test * [EV-38][Indra] Remove opentracing init-test * [EV-38][Indra] Remove tracer-enabled arg to connection helper from producer connection helper test * [EV-38][Indra] Remove tracer related test from consumer_test * [EV-38][Indra] Remove opentracing related dependencies from project.clj and fixtures * [EV-38][Indra] Remove opentracing content from Readme * [EV-38][Indra] Remove opentracing from streams * [EV-38][Indra] Fix linting * [EV-38][Indra] Remove ziggurat.tracer references * [EV-38][Indra] Bump the version to v4.9.4 --- README.md | 32 ------- project.clj | 8 +- resources/config.test.edn | 2 +- src/ziggurat/init.clj | 7 +- src/ziggurat/messaging/connection_helper.clj | 11 +-- src/ziggurat/producer.clj | 11 +-- src/ziggurat/streams.clj | 36 +++----- src/ziggurat/tracer.clj | 92 ------------------- test/ziggurat/fixtures.clj | 15 +-- test/ziggurat/init_test.clj | 31 ++----- .../messaging/consumer_connection_test.clj | 79 +--------------- test/ziggurat/messaging/consumer_test.clj | 37 -------- .../messaging/producer_connection_test.clj | 12 +-- test/ziggurat/messaging/producer_test.clj | 33 ------- test/ziggurat/producer_test.clj | 26 +----- test/ziggurat/streams_test.clj | 32 +------ test/ziggurat/tracer_test.clj | 77 ---------------- test/ziggurat/util/rabbitmq.clj | 1 - 18 files changed, 46 insertions(+), 496 deletions(-) delete mode 100644 src/ziggurat/tracer.clj delete mode 100644 test/ziggurat/tracer_test.clj diff --git a/README.md b/README.md index 3a5dfb8d..ef8a3b21 100644 --- a/README.md +++ b/README.md @@ -425,38 +425,6 @@ and different timeout values. :enable [true :bool]}}}}} ``` -## Tracing - -[Open Tracing](https://opentracing.io/docs/overview/) enables to identify the amount of time spent in various stages of the work flow. - -Currently, the execution of the handler function is traced. If the message consumed has the corresponding tracing headers, then the E2E life time of the message from the time of production till the time of consumption can be traced. - -Tracing has been added to the following flows: - -1. Normal basic consume -2. Retry via rabbitmq -3. Produce to rabbitmq channel -4. Produce to another kafka topic - -By default, tracing is done via [Jaeger](https://www.jaegertracing.io/) based on the env configs. Please refer [Jaeger Configuration](https://github.com/jaegertracing/jaeger-client-java/tree/master/jaeger-core#configuration-via-environment) -and [Jaeger Architecture](https://www.jaegertracing.io/docs/1.13/architecture/) to set the respective env variables. -To enable custom tracer, a custom tracer provider function name can be set in `:custom-provider`. The corresponding function will be executed in runtime to create a tracer. In the event of any errors while executing the custom tracer provider, a Noop tracer will be created. - -To enable tracing, the following config needs to be added to the `config.edn` under `:ziggurat` key. - -```clojure -:tracer {:enabled [true :bool] - :custom-provider ""} -``` - -Example Jaeger Env Config: - -``` -JAEGER_SERVICE_NAME: "service-name" -JAEGER_AGENT_HOST: "localhost" -JAEGER_AGENT_PORT: 6831 -``` - ## Deprecation Notice * Sentry has been deprecated. diff --git a/project.clj b/project.clj index 7d847eb3..2d031e70 100644 --- a/project.clj +++ b/project.clj @@ -2,7 +2,7 @@ (cemerick.pomegranate.aether/register-wagon-factory! "http" #(org.apache.maven.wagon.providers.http.HttpWagon.)) -(defproject tech.gojek/ziggurat "4.9.3" +(defproject tech.gojek/ziggurat "4.9.4" :description "A stream processing framework to build stateless applications on kafka" :url "https://github.com/gojektech/ziggurat" :license {:name "Apache License, Version 2.0" @@ -22,12 +22,6 @@ [mount "0.1.16"] [io.jaegertracing/jaeger-core "1.6.0"] [io.jaegertracing/jaeger-client "1.6.0"] - [io.opentracing/opentracing-api "0.33.0"] - [io.opentracing/opentracing-mock "0.33.0"] - [io.opentracing/opentracing-noop "0.33.0"] - [io.opentracing.contrib/opentracing-kafka-streams "0.1.15" :exclusions [org.lz4/lz4-java com.github.luben/zstd-jni org.apache.kafka/kafka-streams org.slf4j/slf4j-api org.xerial.snappy/snappy-java]] - [io.opentracing.contrib/opentracing-kafka-client "0.1.15" :exclusions [org.lz4/lz4-java com.github.luben/zstd-jni org.slf4j/slf4j-api org.xerial.snappy/snappy-java]] - [io.opentracing.contrib/opentracing-rabbitmq-client "0.1.11" :exclusions [com.rabbitmq/amqp-client]] [org.apache.httpcomponents/fluent-hc "4.5.13"] [org.apache.kafka/kafka-clients "2.8.2" :exclusions [org.slf4j/slf4j-log4j12 log4j]] [org.apache.kafka/kafka-streams "2.8.2" :exclusions [org.slf4j/slf4j-log4j12 log4j]] diff --git a/resources/config.test.edn b/resources/config.test.edn index 00c9180c..aa079f80 100644 --- a/resources/config.test.edn +++ b/resources/config.test.edn @@ -91,7 +91,7 @@ :default-api-timeout-ms-config [60000 :int] :key-deserializer-class-config "org.apache.kafka.common.serialization.ByteArrayDeserializer" :value-deserializer-class-config "org.apache.kafka.common.serialization.ByteArrayDeserializer"}} - :tracer {:enabled [true :bool] + :tracer {:enabled [false :bool] :custom-provider ""} :new-relic {:report-errors false} :log-format "text"}} diff --git a/src/ziggurat/init.clj b/src/ziggurat/init.clj index 6c496e00..439b5e5b 100644 --- a/src/ziggurat/init.clj +++ b/src/ziggurat/init.clj @@ -21,7 +21,6 @@ [ziggurat.sentry :refer [sentry-reporter]] [ziggurat.server :as server] [ziggurat.streams :as streams] - [ziggurat.tracer :as tracer] [ziggurat.util.java-util :as util]) (:gen-class :methods [^{:static true} [init [java.util.Map] void]] @@ -150,15 +149,13 @@ (defn start-common-states [] (start* #{#'metrics/statsd-reporter #'sentry-reporter - #'nrepl-server/server - #'tracer/tracer})) + #'nrepl-server/server})) (defn stop-common-states [] (mount/stop #'config/config #'sentry-reporter #'metrics/statsd-reporter - #'nrepl-server/server - #'tracer/tracer)) + #'nrepl-server/server)) (defn start "Starts up Ziggurat's config, reporters, actor fn, rabbitmq connection and then streams, server etc" diff --git a/src/ziggurat/messaging/connection_helper.clj b/src/ziggurat/messaging/connection_helper.clj index 88a0a35b..d7f5e054 100644 --- a/src/ziggurat/messaging/connection_helper.clj +++ b/src/ziggurat/messaging/connection_helper.clj @@ -5,12 +5,10 @@ [ziggurat.config :refer [ziggurat-config]] [ziggurat.sentry :refer [sentry-reporter]] [ziggurat.channel :refer [get-keys-for-topic]] - [ziggurat.tracer :refer [tracer]] [ziggurat.messaging.util :as util] [ziggurat.util.error :refer [report-error]]) (:import [com.rabbitmq.client ShutdownListener ConnectionFactory AddressResolver] [java.util.concurrent Executors ExecutorService] - [io.opentracing.contrib.rabbitmq TracingConnectionFactory] [com.rabbitmq.client.impl DefaultCredentialsProvider])) (defn is-connection-required? [] @@ -48,10 +46,8 @@ (util/create-address-resolver rabbitmq-config) (:connection-name rabbitmq-config)))) -(defn create-connection [config tracer-enabled] - (if tracer-enabled - (create-rmq-connection (TracingConnectionFactory. tracer) config) - (create-rmq-connection (ConnectionFactory.) config))) +(defn create-connection [config] + (create-rmq-connection (ConnectionFactory.) config)) (defn- get-connection-config [is-producer?] @@ -70,8 +66,7 @@ (when (is-connection-required?) (try (let - [is-tracer-enabled? (get-in (ziggurat-config) [:tracer :enabled]) - connection (create-connection (get-connection-config is-producer?) is-tracer-enabled?)] + [connection (create-connection (get-connection-config is-producer?))] (log/info "Connection created " connection) (doto connection (.addShutdownListener diff --git a/src/ziggurat/producer.clj b/src/ziggurat/producer.clj index 40c68cda..478f7350 100644 --- a/src/ziggurat/producer.clj +++ b/src/ziggurat/producer.clj @@ -45,10 +45,8 @@ (:require [clojure.tools.logging :as log] [mount.core :refer [defstate]] [ziggurat.config :refer [build-producer-config-properties ziggurat-config]] - [ziggurat.tracer :refer [tracer]] [ziggurat.util.java-util :refer [get-key]]) - (:import (io.opentracing.contrib.kafka TracingKafkaProducer) - (org.apache.kafka.clients.producer KafkaProducer ProducerRecord)) + (:import (org.apache.kafka.clients.producer KafkaProducer ProducerRecord)) (:gen-class :methods [^{:static true} [send [String String Object Object] java.util.concurrent.Future] ^{:static true} [send [String String int Object Object] java.util.concurrent.Future]] @@ -70,9 +68,8 @@ (do (log/info "Starting Kafka producers ...") (reduce (fn [producers [stream-config-key properties]] (log/debug "Constructing Kafka producer associated with [" stream-config-key "] ") - (let [kp (KafkaProducer. properties) - tkp (TracingKafkaProducer. kp tracer)] - (assoc producers stream-config-key tkp))) + (let [kp (KafkaProducer. properties)] + (assoc producers stream-config-key kp))) {} (seq (producer-properties-map)))) (log/info "No producers found. Can not initiate start.")) @@ -85,7 +82,7 @@ (.flush) (.close))) (seq kafka-producers)))) - (log/info "No producers found.n Can not initiate stop."))) + (log/info "No producers found. Can not initiate stop."))) (defn send "A wrapper around `org.apache.kafka.clients.producer.KafkaProducer#send` which enables diff --git a/src/ziggurat/streams.clj b/src/ziggurat/streams.clj index f79006af..d867cd76 100644 --- a/src/ziggurat/streams.clj +++ b/src/ziggurat/streams.clj @@ -8,13 +8,9 @@ [ziggurat.message-payload :refer [->MessagePayload]] [ziggurat.metrics :as metrics] [ziggurat.timestamp-transformer :as timestamp-transformer] - [ziggurat.tracer :refer [tracer]] [ziggurat.util.map :as umap] [cambium.core :as clog]) - (:import [io.opentracing.contrib.kafka TracingKafkaUtils] - [io.opentracing.contrib.kafka.streams TracingKafkaClientSupplier] - [io.opentracing.tag Tags] - [java.time Duration] + (:import [java.time Duration] [java.util Properties] [java.util.regex Pattern] [org.apache.kafka.common.errors TimeoutException] @@ -126,22 +122,13 @@ (doseq [[topic-entity stream] streams] (close-stream topic-entity stream))) -(defn- traced-handler-fn [handler-fn channels message topic-entity] - (let [parent-ctx (TracingKafkaUtils/extractSpanContext (:headers message) tracer) - span (as-> tracer t - (.buildSpan t "Message-Handler") - (.withTag t (.getKey Tags/SPAN_KIND) Tags/SPAN_KIND_CONSUMER) - (.withTag t (.getKey Tags/COMPONENT) "ziggurat") - (if (nil? parent-ctx) - t - (.asChildOf t parent-ctx)) - (.start t))] - (try - ((mapper-func handler-fn channels) (-> (->MessagePayload (:value message) topic-entity) - (assoc :headers (:headers message)) - (assoc :metadata (:metadata message)))) - (finally - (.finish span))))) +(defn- mapped-handler-fn [handler-fn channels message topic-entity] + (try + ((mapper-func handler-fn channels) + (-> (->MessagePayload (:value message) topic-entity) + (assoc :headers (:headers message)) + (assoc :metadata (:metadata message)))) + (finally))) (defn- join-streams [oldest-processed-message-in-s topic-entity stream-1 stream-2] @@ -187,7 +174,7 @@ {stream :stream} (reduce (partial join-streams oldest-processed-message-in-s topic-entity) stream-map)] (->> stream (header-transform-values) - (map-values #(traced-handler-fn handler-fn channels % topic-entity))) + (map-values #(mapped-handler-fn handler-fn channels % topic-entity))) (.build builder)))) (defn- topology [handler-fn {:keys [origin-topic oldest-processed-message-in-s]} topic-entity channels] @@ -198,7 +185,7 @@ (timestamp-transform-values topic-entity-name oldest-processed-message-in-s) (header-transform-values) (map-values #(log-and-report-metrics topic-entity-name %)) - (map-values #(traced-handler-fn handler-fn channels % topic-entity))) + (map-values #(mapped-handler-fn handler-fn channels % topic-entity))) (.build builder))) (defn- start-stream* [handler-fn stream-config topic-entity channels] @@ -209,8 +196,7 @@ (when-not (nil? top) (KafkaStreams. ^Topology top - ^Properties (properties stream-config) - (new TracingKafkaClientSupplier tracer))))) + ^Properties (properties stream-config))))) (defn- merge-consumer-type-config [config] diff --git a/src/ziggurat/tracer.clj b/src/ziggurat/tracer.clj deleted file mode 100644 index 3b5cf854..00000000 --- a/src/ziggurat/tracer.clj +++ /dev/null @@ -1,92 +0,0 @@ -(ns ziggurat.tracer - "This namespace creates a [tracer](https://opentracing.io/docs/overview/tracers/) - that can be used to trace the various stages of the application workflow. - - The following flows are traced: - 1. Normal basic consume - 2. Retry via rabbitmq - 3. Produce to rabbitmq channel - 4. Produce to another kafka topic - - At the time of initialization, an instance of `io.opentracing Tracer` - is created based on the configuration. - - - If tracer is disabled, then a `NoopTracer` is created, - which will basically do nothing. - - - If tracer is enabled, then by default a [Jaeger](https://www.jaegertracing.io/) - tracer will be created based on the environment variables. Please refer - [Jaeger Configuration](https://github.com/jaegertracing/jaeger-client-java/tree/master/jaeger-core#configuration-via-environment) - and [Jaeger Architecture](https://www.jaegertracing.io/docs/1.13/architecture/) - to set the respective env variables. - - Example Jaeger Env Config: - ` - JAEGER_SERVICE_NAME: \"service-name\" - JAEGER_AGENT_HOST: \"localhost\" - JAEGER_AGENT_PORT: 6831 - ` - - - Custom tracer can be created by passing the name of a custom tracer - provider function as `:custom-provider`.The corresponding function - will be executed to create the tracer. - - In the event of any errors, a NoopTracer will be created - - Example tracer configuration: - - ` - :tracer {:enabled [true :bool]\n - :custom-provider \"\"} - ` - - Usage: - ` - In `ziggurat.streams/traced-handler-fn`, around the execution of the handler function, - a span is started, activated and finished. If there are trace-id headers in the kafka message, - this span will be tied to the same trace. If not, a new trace will be started. - - Any part of the handler function execution can be traced as a child span of this activated parent span. - Please refer to the [doc](https://github.com/opentracing/opentracing-java#starting-a-new-span) - to understand how to create child spans. - - The trace ids are propagated to rabbitmq using `io.opentracing.contrib.rabbitmq.TracingConnection`. - Hence rabbitmq flows are also traced. - - The trace ids are propagated back to kafka using `io.opentracing.contrib.kafka.TracingKafkaProducer`. - Hence push back to kafka flow is traced. - `" - (:require [mount.core :refer [defstate]] - [ziggurat.config :refer [ziggurat-config]] - [clojure.tools.logging :as log]) - (:import [io.jaegertracing Configuration] - [io.opentracing.noop NoopTracerFactory] - [io.opentracing Tracer])) - -(defn default-tracer-provider [] - (.getTracer (Configuration/fromEnv))) - -(defn create-tracer [] - (try - (let [tracer-config (:tracer (ziggurat-config)) - custom-provider (:custom-provider tracer-config)] - (if (or (nil? tracer-config) (false? (:enabled tracer-config))) - (NoopTracerFactory/create) - - (if (or (nil? custom-provider) (empty? custom-provider)) - (default-tracer-provider) - - (let [custom-tracer (apply (resolve (symbol custom-provider)) [])] - (if-not (instance? Tracer custom-tracer) - (throw (RuntimeException. "Tracer provider did not return a valid tracer")) - custom-tracer))))) - (catch Exception e - (log/error "Failed to create tracer with exception " e) - (log/info "Creating Noop tracer") - (NoopTracerFactory/create)))) - -(declare tracer) - -(defstate tracer - :start (create-tracer) - :stop #()) diff --git a/test/ziggurat/fixtures.clj b/test/ziggurat/fixtures.clj index e8bf673b..d38b3c33 100644 --- a/test/ziggurat/fixtures.clj +++ b/test/ziggurat/fixtures.clj @@ -14,10 +14,8 @@ [ziggurat.messaging.util :as util] [ziggurat.metrics :as metrics] [ziggurat.producer :as producer] - [ziggurat.server :refer [server]] - [ziggurat.tracer :as tracer]) - (:import (io.opentracing.mock MockTracer) - (java.util Properties) + [ziggurat.server :refer [server]]) + (:import (java.util Properties) (org.apache.kafka.clients.consumer ConsumerConfig) (org.apache.kafka.clients.producer ProducerConfig)) (:gen-class @@ -67,14 +65,8 @@ (f) (mount/stop #'metrics/statsd-reporter)) -(defn mount-tracer [] - (with-redefs [tracer/create-tracer (fn [] (MockTracer.))] - (-> (mount/only [#'tracer/tracer]) - (mount/start)))) - (defn mount-config-with-tracer [f] (mount-config) - (mount-tracer) (f) (mount/stop)) @@ -119,8 +111,6 @@ (let [stream-routes {:default {:handler-fn #(constantly nil) :channel-1 #(constantly nil)}}] (mount-config) - (mount-tracer) - (-> (mount/only [#'producer-connection #'consumer-connection #'channel-pool]) (mount/with-args {:stream-routes stream-routes}) @@ -163,7 +153,6 @@ (defn mount-producer-with-config-and-tracer [f] (mount-config) - (mount-tracer) (mount-producer) (binding [*bootstrap-servers* (get-in (config/ziggurat-config) [:stream-router :default :bootstrap-servers])] (binding [*consumer-properties* (doto (Properties.) diff --git a/test/ziggurat/init_test.clj b/test/ziggurat/init_test.clj index d7b97c95..7da3c2a0 100644 --- a/test/ziggurat/init_test.clj +++ b/test/ziggurat/init_test.clj @@ -11,11 +11,9 @@ [ziggurat.streams :as streams] [ziggurat.nrepl-server :as nrs] [ziggurat.server.test-utils :as tu] - [ziggurat.tracer :as tracer] [ziggurat.fixtures :refer [with-config]] [cambium.logback.json.flat-layout :as flat] - [cambium.codec :as codec]) - (:import (io.opentracing.mock MockTracer))) + [cambium.codec :as codec])) (defn exp [x n] (if (zero? n) 1 @@ -34,8 +32,7 @@ nrs/stop (constantly nil) rmqc/start-connection (fn [_] (do (reset! result (* @result 2)) nil)) rmqc/stop-connection (constantly nil) - cpool/destroy-channel-pool (constantly nil) - tracer/create-tracer (fn [] (MockTracer.))] + cpool/destroy-channel-pool (constantly nil)] (with-config (do (init/start #(reset! result (+ @result 3)) {} {} [] nil) (init/stop #() nil) @@ -49,8 +46,7 @@ server/stop (constantly nil) nrs/start (constantly nil) nrs/stop (constantly nil) - streams/stop-streams (fn [_] (reset! result (* @result 2))) - tracer/create-tracer (fn [] (MockTracer.))] + streams/stop-streams (fn [_] (reset! result (* @result 2)))] (with-config (do (init/start #() {} {} [] nil) (init/stop #(reset! result (+ @result 3)) nil) @@ -61,8 +57,7 @@ (let [result (atom 1)] (with-redefs [streams/start-streams (constantly nil) streams/stop-streams (constantly nil) - rmqc/stop-connection (fn [_] (reset! result (* @result 2))) - tracer/create-tracer (fn [] (MockTracer.))] + rmqc/stop-connection (fn [_] (reset! result (* @result 2)))] (with-config (do (init/start #() {} {} [] nil) (init/stop #(reset! result (+ @result 3)) nil) @@ -79,8 +74,7 @@ (swap! make-queues-called + 1) (is (= all-routes (merge expected-stream-routes expected-batch-routes)))) messaging-consumer/start-subscribers (constantly nil) - config/config-file "config.test.edn" - tracer/create-tracer (fn [] (MockTracer.))] + config/config-file "config.test.edn"] (with-config (do (init/start #() expected-stream-routes expected-batch-routes [] nil) (init/stop #() nil) @@ -98,8 +92,7 @@ (is (= stream-routes expected-stream-routes)) (is (= batch-routes expected-batch-routes))) messaging-producer/make-queues (constantly nil) - config/config-file "config.test.edn" - tracer/create-tracer (fn [] (MockTracer.))] + config/config-file "config.test.edn"] (with-config (do (init/start #() expected-stream-routes expected-batch-routes [] nil) @@ -204,8 +197,7 @@ (deftest ziggurat-routes-serve-actor-routes-test (testing "The routes added by actor should be served along with ziggurat-routes" (with-redefs [streams/start-streams (constantly nil) - streams/stop-streams (constantly nil) - tracer/create-tracer (fn [] (MockTracer.))] + streams/stop-streams (constantly nil)] (with-config (do (init/start #() {} {} [["test-ping" (fn [_request] {:status 200 :body "pong"})]] nil) @@ -218,8 +210,7 @@ (testing "Deadset management and server api modes should run both actor and deadset management routes" (with-redefs [streams/start-streams (constantly nil) - streams/stop-streams (constantly nil) - tracer/create-tracer (fn [] (MockTracer.))] + streams/stop-streams (constantly nil)] (with-config (do (init/start #() {} {} [["test-ping" (fn [_request] {:status 200 :body "pong"})]] [:management-api :api-server]) @@ -232,8 +223,7 @@ (testing "The routes not added by actor should return 404" (with-redefs [streams/start-streams (constantly nil) - streams/stop-streams (constantly nil) - tracer/create-tracer (fn [] (MockTracer.))] + streams/stop-streams (constantly nil)] (with-config (do (init/start #() {} {} [] nil) (let [{:keys [status]} (tu/get (-> (config/ziggurat-config) :http-server :port) "/test-ping" true false)] @@ -242,8 +232,7 @@ (testing "The ziggurat routes should work fine when actor routes are not provided" (with-redefs [streams/start-streams (constantly nil) - streams/stop-streams (constantly nil) - tracer/create-tracer (fn [] (MockTracer.))] + streams/stop-streams (constantly nil)] (with-config (do (init/start #() {} {} [] nil) (let [{:keys [status]} (tu/get (-> (config/ziggurat-config) :http-server :port) "/ping" true false)] diff --git a/test/ziggurat/messaging/consumer_connection_test.clj b/test/ziggurat/messaging/consumer_connection_test.clj index f2cd1bc1..4771e545 100644 --- a/test/ziggurat/messaging/consumer_connection_test.clj +++ b/test/ziggurat/messaging/consumer_connection_test.clj @@ -3,9 +3,8 @@ [ziggurat.fixtures :as fix] [mount.core :as mount] [ziggurat.config :as config] - [ziggurat.messaging.connection-helper :as mc :refer [create-connection create-rmq-connection]] - [ziggurat.messaging.consumer-connection :refer [consumer-connection]] - [ziggurat.util.error :refer [report-error]])) + [ziggurat.messaging.connection-helper :as mc :refer [create-rmq-connection]] + [ziggurat.messaging.consumer-connection :refer [consumer-connection]])) (use-fixtures :once fix/mount-config-with-tracer) @@ -31,7 +30,7 @@ (mount/stop #'consumer-connection) (is @executor-present?))))) -(deftest start-connection-test-with-tracer-disabled +(deftest start-connection-test (testing "[consumer-connection] should provide the correct number of threads for the thread pool if channels are present" (let [thread-count (atom 0) orig-rmq-connect create-rmq-connection @@ -128,76 +127,4 @@ (mount/stop #'consumer-connection) (is (= 34 @thread-count)))))) -(deftest start-connection-test-with-tracer-enabled - (testing "[consumer-connection] should provide the correct number of threads for the thread pool if channels are present" - (let [thread-count (atom 0) - orig-create-conn mc/create-connection - create-connect-called? (atom false) - ziggurat-config (config/ziggurat-config) - stream-routes {:default {:handler-fn (constantly :channel-1) - :channel-1 (constantly :success)}}] - (with-redefs [mc/create-connection (fn [provided-config tracer-enabled] - (reset! create-connect-called? true) - (reset! thread-count (.getCorePoolSize (:executor provided-config))) - (orig-create-conn provided-config tracer-enabled)) - config/ziggurat-config (constantly (assoc ziggurat-config - :jobs {:instant {:worker-count 4}} - :retry {:enabled true} - :stream-router {:default {:channels {:channel-1 {:worker-count 10}}}}))] - (-> (mount/only #{#'consumer-connection}) - (mount/with-args {:stream-routes stream-routes}) - (mount/start)) - (mount/stop #'consumer-connection) - (is (= 22 @thread-count)) - (is @create-connect-called?)))) - (testing "should provide the correct number of threads for the thread pool for multiple channels" - (let [thread-count (atom 0) - orig-create-conn mc/create-connection - ziggurat-config (config/ziggurat-config) - stream-routes {:default {:handler-fn (constantly :success)}}] - (with-redefs [mc/create-connection (fn [provided-config tracer-enabled] - (reset! thread-count (.getCorePoolSize (:executor provided-config))) - (orig-create-conn provided-config tracer-enabled)) - config/ziggurat-config (constantly (assoc ziggurat-config - :jobs {:instant {:worker-count 4}} - :retry {:enabled true} - :stream-router {:default {:channels {:channel-1 {:worker-count 5} - :channel-2 {:worker-count 10}}}}))] - (-> (mount/only #{#'consumer-connection}) - (mount/with-args {:stream-routes stream-routes}) - (mount/start)) - (mount/stop #'consumer-connection) - (is (= 27 @thread-count))))) - - (testing "should provide the correct number of threads for the thread pool when channels are not present" - (let [thread-count (atom 0) - orig-create-conn mc/create-connection - ziggurat-config (config/ziggurat-config)] - (with-redefs [mc/create-connection (fn [provided-config tracer-enabled] - (reset! thread-count (.getCorePoolSize (:executor provided-config))) - (orig-create-conn provided-config tracer-enabled)) - config/ziggurat-config (constantly (assoc ziggurat-config - :jobs {:instant {:worker-count 4}} - :retry {:enabled true} - :stream-router {:default {}}))] - (mount/start (mount/only [#'consumer-connection])) - (mount/stop #'consumer-connection) - (is (= 12 @thread-count))))) - - (testing "should provide the correct number of threads for the thread pool for multiple stream routes" - (let [thread-count (atom 0) - orig-create-conn mc/create-connection - ziggurat-config (config/ziggurat-config)] - (with-redefs [mc/create-connection (fn [provided-config tracer-enabled] - (reset! thread-count (.getCorePoolSize (:executor provided-config))) - (orig-create-conn provided-config tracer-enabled)) - config/ziggurat-config (constantly (assoc ziggurat-config - :jobs {:instant {:worker-count 4}} - :retry {:enabled true} - :stream-router {:default {:channels {:channel-1 {:worker-count 10}}} - :default-1 {:channels {:channel-1 {:worker-count 8}}}}))] - (mount/start (mount/only [#'consumer-connection])) - (mount/stop #'consumer-connection) - (is (= 34 @thread-count)))))) - diff --git a/test/ziggurat/messaging/consumer_test.clj b/test/ziggurat/messaging/consumer_test.clj index 3c3c5f45..91341d0b 100644 --- a/test/ziggurat/messaging/consumer_test.clj +++ b/test/ziggurat/messaging/consumer_test.clj @@ -10,7 +10,6 @@ [ziggurat.messaging.consumer-connection :refer [consumer-connection]] [ziggurat.messaging.producer :as producer] [ziggurat.messaging.util :refer [prefixed-queue-name]] - [ziggurat.tracer :refer [tracer]] [ziggurat.util.error :refer [report-error]] [ziggurat.util.rabbitmq :as util]) (:import (com.rabbitmq.client Channel))) @@ -265,42 +264,6 @@ (consumer/start-channels-subscriber {channel channel-fn} topic-entity) (is (= expected-prefetch-count @prefetch-count-used))))))) -(deftest start-retry-subscriber-test - (testing "creates a span when tracer is enabled" - (fix/with-queues {topic-entity {:handler-fn #(constantly nil)}} - (let [retry-counter (atom 0) - call-counter (atom 0) - success-promise (promise) - retry-count 3 - message-payload (assoc (gen-message-payload topic-entity) :retry-count 3) - original-zig-config (ziggurat-config) - rmq-ch (lch/open consumer-connection)] - (.reset tracer) - (with-redefs [ziggurat-config (fn [] (-> original-zig-config - (update-in [:retry :count] (constantly retry-count)) - (update-in [:retry :enabled] (constantly true)) - (update-in [:jobs :instant :worker-count] (constantly 1))))] - - (consumer/start-retry-subscriber* (mock-mapper-fn {:retry-counter-atom retry-counter - :call-counter-atom call-counter - :retry-limit 0 - :success-promise success-promise}) topic-entity) - - (producer/publish-to-delay-queue message-payload) - (when-let [promise-success? (deref success-promise 5000 :timeout)] - (is (not (= :timeout promise-success?))) - (is (= true promise-success?))) - (util/close rmq-ch) - (Thread/sleep 500) - (let [finished-spans (.finishedSpans tracer)] - (is (= 2 (.size finished-spans))) - (is (= "send" (-> finished-spans - (.get 0) - (.operationName)))) - (is (= "receive" (-> finished-spans - (.get 1) - (.operationName)))))))))) - (deftest process-message-test (testing "process-message function should ack message after once processing finishes" (fix/with-queues {topic-entity {:handler-fn #(constantly nil)}} diff --git a/test/ziggurat/messaging/producer_connection_test.clj b/test/ziggurat/messaging/producer_connection_test.clj index 62ae27c9..aa2fceca 100644 --- a/test/ziggurat/messaging/producer_connection_test.clj +++ b/test/ziggurat/messaging/producer_connection_test.clj @@ -91,9 +91,9 @@ orig-create-conn connection-helper/create-connection ziggurat-config (config/ziggurat-config) stream-routes {:default {:handler-fn (constantly :success)}}] - (with-redefs [connection-helper/create-connection (fn [provided-config tracer-enabled] + (with-redefs [connection-helper/create-connection (fn [provided-config] (reset! create-connect-called? true) - (orig-create-conn provided-config tracer-enabled)) + (orig-create-conn provided-config)) config/ziggurat-config (constantly (assoc ziggurat-config :retry {:enabled true}))] (-> (mount/only #{#'producer-connection}) @@ -107,9 +107,9 @@ orig-create-conn connection-helper/create-connection ziggurat-config (config/ziggurat-config) stream-routes {:default {:handler-fn (constantly :success)}}] - (with-redefs [connection-helper/create-connection (fn [provided-config tracer-enabled] + (with-redefs [connection-helper/create-connection (fn [provided-config] (reset! create-connect-called? true) - (orig-create-conn provided-config tracer-enabled)) + (orig-create-conn provided-config)) config/ziggurat-config (constantly (assoc ziggurat-config :retry {:enabled false}))] (-> (mount/only #{#'producer-connection}) @@ -126,9 +126,9 @@ :channel-1 (constantly :success)} :default-1 {:handler-fn (constantly :channel-3) :channel-3 (constantly :success)}}] - (with-redefs [connection-helper/create-connection (fn [provided-config tracer-enabled] + (with-redefs [connection-helper/create-connection (fn [provided-config] (reset! create-connect-called? true) - (orig-create-conn provided-config tracer-enabled)) + (orig-create-conn provided-config)) config/ziggurat-config (constantly (assoc ziggurat-config :retry {:enabled false}))] (-> (mount/only #{#'producer-connection}) diff --git a/test/ziggurat/messaging/producer_test.clj b/test/ziggurat/messaging/producer_test.clj index 03c94999..625e40c8 100644 --- a/test/ziggurat/messaging/producer_test.clj +++ b/test/ziggurat/messaging/producer_test.clj @@ -12,7 +12,6 @@ [ziggurat.util.rabbitmq :as rmq] [langohr.basic :as lb] [ziggurat.config :as config] - [ziggurat.tracer :refer [tracer]] [ziggurat.message-payload :refer [->MessagePayload]] [ziggurat.metrics :as metrics]) (:import [org.apache.kafka.common.header.internals RecordHeaders RecordHeader] @@ -676,38 +675,6 @@ (producer/publish "random-exchange" {:topic-entity "hello"} 12345) (is (= 0 @publish-called)))))) -(deftest publish-to-delay-queue-test - (testing "creates a span when tracer is enabled" - (let [stream-routes {:default {:handler-fn #(constantly nil) - :channel-1 #(constantly nil)}}] - (with-redefs [metrics/multi-ns-report-histogram (fn [_ _ _] nil)] - (.reset tracer) - (fix/with-queues - stream-routes - (do - (producer/retry message-payload) - (let [finished-spans (.finishedSpans tracer)] - (is (= 1 (.size finished-spans))) - (is (= "send" (-> finished-spans - (.get 0) - (.operationName))))))))))) - -(deftest publish-to-channel-instant-queue-test - (testing "creates a span when tracer is enabled" - (let [stream-routes {:default {:handler-fn #(constantly nil) - :channel-1 #(constantly nil)}}] - (with-redefs [metrics/multi-ns-report-histogram (fn [_ _ _] nil)] - (.reset tracer) - (fix/with-queues - stream-routes - (do - (producer/publish-to-channel-instant-queue :channel-1 message-payload) - (let [finished-spans (.finishedSpans tracer)] - (is (= 1 (.size finished-spans))) - (is (= "send" (-> finished-spans - (.get 0) - (.operationName))))))))))) - (deftest get-channel-queue-timeout-ms-test (let [message (assoc message-payload :retry-count 2)] (testing "when retries are enabled" diff --git a/test/ziggurat/producer_test.clj b/test/ziggurat/producer_test.clj index 81ac623e..824d14e8 100644 --- a/test/ziggurat/producer_test.clj +++ b/test/ziggurat/producer_test.clj @@ -4,18 +4,12 @@ [clojure.test.check.generators :as gen] [ziggurat.config :refer [ziggurat-config]] [ziggurat.fixtures :as fix :refer [*producer-properties* *consumer-properties*]] - [ziggurat.producer :refer [producer-properties-map send kafka-producers -send]] - [ziggurat.tracer :refer [tracer]]) - (:import [io.opentracing.contrib.kafka TracingKafkaProducer] - [org.apache.kafka.clients.producer KafkaProducer] + [ziggurat.producer :refer [producer-properties-map send kafka-producers -send]]) + (:import [org.apache.kafka.clients.producer KafkaProducer] [org.apache.kafka.streams.integration.utils IntegrationTestUtils])) (use-fixtures :once fix/mount-producer-with-config-and-tracer) -(def valid-config {:key-serializer-class "org.apache.kafka.common.serialization.StringSerializer" - :value-serializer-class "org.apache.kafka.common.serialization.StringSerializer" - :bootstrap-servers "valid_bootstrap_server1, valid_bootstrap_server2, valid_bootstrap_server3"}) - (defn stream-router-config-without-producer []) (:stream-router {:default {:application-id "test" :bootstrap-servers (get-in (ziggurat-config) [:stream-router :default :bootstrap-servers]) @@ -65,22 +59,6 @@ ; valid producer configs. (is (seq (producer-properties-map)))) -(deftest send-data-with-tracer-enabled - (with-redefs [kafka-producers (hash-map :default (TracingKafkaProducer. (KafkaProducer. *producer-properties*) tracer))] - (let [alphanum-gen (gen/such-that #(not (blank? %)) gen/string-alphanumeric) - topic (gen/generate alphanum-gen 10) - key "message" - value "Hello World!!"] - (.reset tracer) - (send :default topic key value) - (let [result (IntegrationTestUtils/waitUntilMinKeyValueRecordsReceived *consumer-properties* topic 1 7000) - finished-spans (.finishedSpans tracer)] - (is (= value (.value (first result)))) - (is (= 1 (.size finished-spans))) - (is (= (str "To_" topic) (-> finished-spans - (.get 0) - (.operationName)))))))) - (deftest java-send-test (let [stream-config-key ":entity" expected-stream-config-key (keyword (subs stream-config-key 1)) diff --git a/test/ziggurat/streams_test.clj b/test/ziggurat/streams_test.clj index 834efa70..efd3a5e3 100644 --- a/test/ziggurat/streams_test.clj +++ b/test/ziggurat/streams_test.clj @@ -8,10 +8,8 @@ [ziggurat.middleware.json :as json-middleware] [ziggurat.middleware.stream-joins :as stream-joins-middleware] [ziggurat.streams :refer [add-stream-thread get-stream-thread-count remove-stream-thread start-streams stop-streams stop-stream start-stream]] - [ziggurat.streams :refer [handle-uncaught-exception start-stream start-streams stop-stream stop-streams]] - [ziggurat.tracer :refer [tracer]]) + [ziggurat.streams :refer [handle-uncaught-exception start-stream start-streams stop-stream stop-streams]]) (:import [com.gojek.test.proto Example$Photo] - [io.opentracing.tag Tags] [java.util Properties] [org.apache.kafka.clients.producer ProducerConfig] (org.apache.kafka.common.utils MockTime) @@ -349,34 +347,6 @@ (stop-streams streams) (is (= times @message-received-count)))) -(deftest start-streams-test-with-tracer - (let [message-received-count (atom 0) - mapped-fn (get-mapped-fn message-received-count) - times 1 - kvs (repeat times message-key-value) - handler-fn (default-middleware/protobuf->hash mapped-fn proto-class :default) - streams (start-streams {:default {:handler-fn handler-fn}} - (-> (ziggurat-config) - (assoc-in [:stream-router :default :application-id] (rand-application-id)) - (assoc-in [:stream-router :default :changelog-topic-replication-factor] changelog-topic-replication-factor)))] - (Thread/sleep 10000) ;;waiting for streams to start - (IntegrationTestUtils/produceKeyValuesSynchronously (get-in (ziggurat-config) [:stream-router :default :origin-topic]) - kvs - (props) - (MockTime.)) - (Thread/sleep 10000) ;;wating for streams to consume messages - (stop-streams streams) - (is (= times @message-received-count)) - (let [finished-spans (.finishedSpans tracer) - tags (-> finished-spans - (.get 1) - (.tags))] - (is (= 2 (.size finished-spans))) ;;2 spans - one from the TracingKafkaClientSupplier and one for the actual handler function - (is (= "Message-Handler" (-> finished-spans - (.get 1) - (.operationName)))) - (is (= {(.getKey Tags/SPAN_KIND) Tags/SPAN_KIND_CONSUMER, (.getKey Tags/COMPONENT) "ziggurat"} tags))))) - (deftest start-streams-test-when-tracer-is-not-configured (let [message-received-count (atom 0) mapped-fn (get-mapped-fn message-received-count) diff --git a/test/ziggurat/tracer_test.clj b/test/ziggurat/tracer_test.clj deleted file mode 100644 index a00299e7..00000000 --- a/test/ziggurat/tracer_test.clj +++ /dev/null @@ -1,77 +0,0 @@ -(ns ziggurat.tracer-test - (:require [clojure.test :refer :all] - [mount.core :as mount] - [ziggurat.config :refer [ziggurat-config]] - [ziggurat.fixtures :as fix] - [ziggurat.tracer :as tracer]) - (:import [io.opentracing.mock MockTracer] - (io.jaegertracing.internal JaegerTracer$Builder))) - -(use-fixtures :once fix/silence-logging) - -(defn custom-tracer-provider [] - (MockTracer.)) - -(deftest mount-tracer-test - (testing "should start JaegerTracer when tracer is enabled and tracer provider is empty" - (fix/mount-config) - (with-redefs [tracer/default-tracer-provider (fn [] (.build (JaegerTracer$Builder. "test")))] - (mount/start (mount/only [#'tracer/tracer])) - (is (= "JaegerTracer" (.getSimpleName (.getClass tracer/tracer))))) - (mount/stop)) - - (testing "should start JaegerTracer when tracer is enabled and tracer provider is nil" - (fix/mount-config) - (with-redefs [tracer/default-tracer-provider (fn [] (.build (JaegerTracer$Builder. "test")))] - (mount/start (mount/only [#'tracer/tracer])) - (is (= "JaegerTracer" (.getSimpleName (.getClass tracer/tracer))))) - (mount/stop)) - - (testing "should execute create custom tracer when tracer is enabled and tracer provider is set" - (fix/mount-config) - (with-redefs [ziggurat-config (fn [] {:tracer {:enabled true - :custom-provider "ziggurat.tracer-test/custom-tracer-provider"}})] - (mount/start (mount/only [#'tracer/tracer])) - (is (= "MockTracer" (.getSimpleName (.getClass tracer/tracer))))) - (mount/stop)) - - (testing "should handle gracefully when custom tracer provider returns nil and create NoopTracer" - (fix/mount-config) - (with-redefs [custom-tracer-provider (fn [] nil) - ziggurat-config (fn [] {:tracer {:enabled true - :custom-provider "ziggurat.tracer-test/custom-tracer-provider"}})] - (mount/start (mount/only [#'tracer/tracer])) - (is (= "NoopTracerImpl" (.getSimpleName (.getClass tracer/tracer))))) - (mount/stop)) - - (testing "should handle gracefully when custom tracer provider returns non tracer instance and create NoopTracer" - (fix/mount-config) - (with-redefs [custom-tracer-provider (fn [] "") - ziggurat-config (fn [] {:tracer {:enabled true - :custom-provider "ziggurat.tracer-test/custom-tracer-provider"}})] - (mount/start (mount/only [#'tracer/tracer])) - (is (= "NoopTracerImpl" (.getSimpleName (.getClass tracer/tracer))))) - (mount/stop)) - - (testing "should start NoopTracer when tracer is not enabled" - (fix/mount-config) - (with-redefs [ziggurat-config (fn [] {:tracer {:enabled false}})] - (mount/start (mount/only [#'tracer/tracer])) - (is (= "NoopTracerImpl" (.getSimpleName (.getClass tracer/tracer))))) - (mount/stop)) - - (testing "should start NoopTracer when tracer is not configured" - (fix/mount-config) - (with-redefs [ziggurat-config (fn [] {})] - (mount/start (mount/only [#'tracer/tracer])) - (is (= "NoopTracerImpl" (.getSimpleName (.getClass tracer/tracer))))) - (mount/stop)) - - (testing "should handle create tracer exception gracefully and create NoopTracer" - (fix/mount-config) - (with-redefs [custom-tracer-provider (fn [] (throw (RuntimeException.))) - ziggurat-config (fn [] {:tracer {:enabled true - :custom-provider "ziggurat.tracer-test/custom-tracer-provider"}})] - (mount/start (mount/only [#'tracer/tracer])) - (is (= "NoopTracerImpl" (.getSimpleName (.getClass tracer/tracer))))) - (mount/stop))) diff --git a/test/ziggurat/util/rabbitmq.clj b/test/ziggurat/util/rabbitmq.clj index 15ccb95c..0aef927f 100644 --- a/test/ziggurat/util/rabbitmq.clj +++ b/test/ziggurat/util/rabbitmq.clj @@ -8,7 +8,6 @@ [ziggurat.messaging.util :refer [prefixed-channel-name]] [ziggurat.messaging.producer :refer [delay-queue-name]] [ziggurat.messaging.util :as rutil] - [ziggurat.tracer :refer [tracer]] [clojure.tools.logging :as log]) (:import (com.rabbitmq.client AlreadyClosedException Channel)))