|
3 | 3 | [ziggurat.config :refer :all]
|
4 | 4 | [ziggurat.messaging.producer :as producer]
|
5 | 5 | [ziggurat.message-payload :refer [map->MessagePayload]]
|
6 |
| - [ziggurat.metrics :as metrics]) |
| 6 | + [ziggurat.metrics :as metrics] |
| 7 | + [cambium.core :as clog]) |
7 | 8 | (:import (org.apache.kafka.common.errors WakeupException)
|
8 | 9 | (java.time Duration Instant)
|
9 | 10 | (tech.gojek.ziggurat.internal InvalidReturnTypeException)
|
|
26 | 27 | (producer/retry batch-payload))
|
27 | 28 | ([batch current-retry-count topic-entity]
|
28 | 29 | (when (pos? (count batch))
|
29 |
| - (let [message (map->MessagePayload {:message batch |
30 |
| - :retry-count current-retry-count |
| 30 | + (let [message (map->MessagePayload {:message batch |
| 31 | + :retry-count current-retry-count |
31 | 32 | :topic-entity topic-entity})]
|
32 | 33 | (producer/retry message)))))
|
33 | 34 |
|
|
49 | 50 | batch-size (count batch)]
|
50 | 51 | (try
|
51 | 52 | (when (not-empty batch)
|
52 |
| - (log/infof "[Consumer Group: %s] Processing the batch with %d messages" topic-entity batch-size) |
53 |
| - (let [start-time (Instant/now) |
54 |
| - result (batch-handler batch) |
55 |
| - time-taken-in-millis (.toMillis (Duration/between start-time (Instant/now)))] |
| 53 | + (clog/info {:batch-size batch-size} (format "[Consumer Group: %s] Processing the batch with %d messages" topic-entity batch-size)) |
| 54 | + (let [start-time (Instant/now) |
| 55 | + result (batch-handler batch) |
| 56 | + time-taken-in-millis (.toMillis (Duration/between start-time (Instant/now)))] |
56 | 57 | (validate-batch-processing-result result)
|
57 | 58 | (let [messages-to-be-retried (:retry result)
|
58 | 59 | to-be-retried-count (count messages-to-be-retried)
|
59 | 60 | skip-count (count (:skip result))
|
60 | 61 | success-count (- batch-size (+ to-be-retried-count skip-count))]
|
61 |
| - (log/infof "[Consumer Group: %s] Processed the batch with success: [%d], skip: [%d] and retries: [%d] \n" |
62 |
| - topic-entity success-count skip-count to-be-retried-count) |
| 62 | + |
| 63 | + (clog/info {:messages-successfully-processed success-count :messages-skipped skip-count :messages-to-be-retried to-be-retried-count} (format "[Consumer Group: %s] Processed the batch with success: [%d], skip: [%d] and retries: [%d] \n" topic-entity success-count skip-count to-be-retried-count)) |
63 | 64 | (publish-batch-process-metrics topic-entity batch-size success-count skip-count to-be-retried-count time-taken-in-millis)
|
64 | 65 | (retry messages-to-be-retried current-retry-count topic-entity))))
|
65 | 66 | (catch InvalidReturnTypeException e
|
|
78 | 79 |
|
79 | 80 | (defn poll-for-messages
|
80 | 81 | [^Consumer consumer handler-fn topic-entity consumer-config]
|
81 |
| - (try |
82 |
| - (loop [records []] |
83 |
| - (when (not-empty records) |
84 |
| - (let [batch-payload (create-batch-payload records topic-entity)] |
85 |
| - (process handler-fn batch-payload))) |
86 |
| - (recur (seq (.poll consumer (Duration/ofMillis (or (:poll-timeout-ms-config consumer-config) DEFAULT_POLL_TIMEOUT_MS_CONFIG)))))) |
87 |
| - (catch WakeupException e |
88 |
| - (log/errorf e "WakeupException while polling for messages for: %s" topic-entity)) |
89 |
| - (catch Exception e |
90 |
| - (log/errorf e "Exception while polling for messages for: %s" topic-entity)) |
91 |
| - (finally (do (log/info "Closing the Kafka Consumer for: " topic-entity) |
92 |
| - (.close consumer))))) |
| 82 | + (clog/with-logging-context {:consumer-group topic-entity} |
| 83 | + (try |
| 84 | + (loop [records []] |
| 85 | + (when (not-empty records) |
| 86 | + (let [batch-payload (create-batch-payload records topic-entity)] |
| 87 | + (process handler-fn batch-payload))) |
| 88 | + (recur (seq (.poll consumer (Duration/ofMillis (or (:poll-timeout-ms-config consumer-config) DEFAULT_POLL_TIMEOUT_MS_CONFIG)))))) |
| 89 | + (catch WakeupException e |
| 90 | + (log/errorf e "WakeupException while polling for messages for: %s" topic-entity)) |
| 91 | + (catch Exception e |
| 92 | + (log/errorf e "Exception while polling for messages for: %s" topic-entity)) |
| 93 | + (finally (do (log/info "Closing the Kafka Consumer for: " topic-entity) |
| 94 | + (.close consumer)))))) |
93 | 95 |
|
0 commit comments