Skip to content

Commit 03f2218

Browse files
macalimlimmjayprateek
authored andcommitted
when nippy fails, publish to dead set instead of rejecting
1 parent ed467ca commit 03f2218

File tree

4 files changed

+204
-175
lines changed

4 files changed

+204
-175
lines changed

src/ziggurat/messaging/consumer.clj

+41-28
Original file line numberDiff line numberDiff line change
@@ -1,30 +1,43 @@
11
(ns ziggurat.messaging.consumer
2-
(:require [ziggurat.mapper :as mpr]
3-
[clojure.tools.logging :as log]
2+
(:require [clojure.tools.logging :as log]
43
[langohr.basic :as lb]
54
[langohr.channel :as lch]
65
[langohr.consumers :as lcons]
7-
[ziggurat.kafka-consumer.consumer-handler :as ch]
86
[taoensso.nippy :as nippy]
9-
[ziggurat.config :refer [get-in-config]]
7+
[ziggurat.config :refer [get-in-config rabbitmq-config]]
8+
[ziggurat.kafka-consumer.consumer-handler :as ch]
9+
[ziggurat.mapper :as mpr]
1010
[ziggurat.messaging.connection :refer [connection]]
11-
[ziggurat.sentry :refer [sentry-reporter]]
12-
[ziggurat.messaging.util :refer :all]
11+
[ziggurat.messaging.util :as util]
1312
[ziggurat.metrics :as metrics]
1413
[ziggurat.util.error :refer [report-error]]))
1514

15+
(defn- reject-message
16+
[ch delivery-tag]
17+
(lb/reject ch delivery-tag))
18+
19+
(defn- publish-to-dead-set
20+
[ch delivery-tag topic-entity payload]
21+
(let [{:keys [exchange-name]} (:dead-letter (rabbitmq-config))
22+
exchange (util/prefixed-queue-name topic-entity exchange-name)]
23+
(try
24+
(lb/publish ch exchange "" payload)
25+
(catch Exception e
26+
(log/error e "Exception was encountered while publishing to RabbitMQ")
27+
(reject-message ch delivery-tag)))))
28+
1629
(defn convert-and-ack-message
1730
"De-serializes the message payload (`payload`) using `nippy/thaw` and converts it to `MessagePayload`. Acks the message
1831
if `ack?` is true."
19-
[ch {:keys [delivery-tag] :as meta} ^bytes payload ack? topic-entity]
32+
[ch {:keys [delivery-tag]} ^bytes payload ack? topic-entity]
2033
(try
2134
(let [message (nippy/thaw payload)]
2235
(when ack?
2336
(lb/ack ch delivery-tag))
2437
message)
2538
(catch Exception e
26-
(lb/reject ch delivery-tag false)
27-
(report-error e "Error while decoding message")
39+
(report-error e "Error while decoding message, publishing to dead queue...")
40+
(publish-to-dead-set ch delivery-tag topic-entity payload)
2841
(metrics/increment-count ["rabbitmq-message" "conversion"] "failure" {:topic_name (name topic-entity)})
2942
nil)))
3043

@@ -33,16 +46,16 @@
3346
(lb/ack ch delivery-tag))
3447

3548
(defn process-message-from-queue [ch meta payload topic-entity processing-fn]
36-
(let [delivery-tag (:delivery-tag meta)
37-
message-payload (convert-and-ack-message ch meta payload false topic-entity)]
49+
(let [delivery-tag (:delivery-tag meta)
50+
message-payload (convert-and-ack-message ch meta payload false topic-entity)]
3851
(when message-payload
3952
(log/infof "Processing message [%s] from RabbitMQ " message-payload)
4053
(try
4154
(log/debug "Calling processor-fn with the message-payload - " message-payload " with retry count - " (:retry-count message-payload))
4255
(processing-fn message-payload)
4356
(ack-message ch delivery-tag)
4457
(catch Exception e
45-
(lb/reject ch delivery-tag true)
58+
(publish-to-dead-set ch delivery-tag topic-entity payload)
4659
(report-error e "Error while processing message-payload from RabbitMQ")
4760
(metrics/increment-count ["rabbitmq-message" "process"] "failure" {:topic_name (name topic-entity)}))))))
4861

@@ -60,8 +73,8 @@
6073
(construct-queue-name topic-entity nil))
6174
([topic-entity channel]
6275
(if (nil? channel)
63-
(prefixed-queue-name topic-entity (get-in-config [:rabbit-mq :dead-letter :queue-name]))
64-
(prefixed-channel-name topic-entity channel (get-in-config [:rabbit-mq :dead-letter :queue-name])))))
76+
(util/prefixed-queue-name topic-entity (get-in-config [:rabbit-mq :dead-letter :queue-name]))
77+
(util/prefixed-channel-name topic-entity channel (get-in-config [:rabbit-mq :dead-letter :queue-name])))))
6578

6679
(defn get-dead-set-messages
6780
"This method can be used to read and optionally ack messages in dead-letter queue, based on the value of `ack?`.
@@ -94,20 +107,20 @@
94107

95108
(defn- start-subscriber* [ch prefetch-count queue-name wrapped-mapper-fn topic-entity]
96109
(lb/qos ch prefetch-count)
97-
(let [consumer-tag (lcons/subscribe ch
98-
queue-name
99-
(message-handler wrapped-mapper-fn topic-entity)
100-
{:handle-shutdown-signal-fn (fn [consumer_tag reason]
101-
(log/infof "channel closed with consumer tag: %s, reason: %s " consumer_tag, reason))
102-
:handle-consume-ok-fn (fn [consumer_tag]
103-
(log/infof "consumer started for %s with consumer tag %s " queue-name consumer_tag))})]))
110+
(lcons/subscribe ch
111+
queue-name
112+
(message-handler wrapped-mapper-fn topic-entity)
113+
{:handle-shutdown-signal-fn (fn [consumer_tag reason]
114+
(log/infof "channel closed with consumer tag: %s, reason: %s " consumer_tag, reason))
115+
:handle-consume-ok-fn (fn [consumer_tag]
116+
(log/infof "consumer started for %s with consumer tag %s " queue-name consumer_tag))}))
104117

105118
(defn start-retry-subscriber* [handler-fn topic-entity]
106119
(when (get-in-config [:retry :enabled])
107120
(dotimes [_ (get-in-config [:jobs :instant :worker-count])]
108121
(start-subscriber* (lch/open connection)
109122
(get-in-config [:jobs :instant :prefetch-count])
110-
(prefixed-queue-name topic-entity (get-in-config [:rabbit-mq :instant :queue-name]))
123+
(util/prefixed-queue-name topic-entity (get-in-config [:rabbit-mq :instant :queue-name]))
111124
handler-fn
112125
topic-entity))))
113126

@@ -118,20 +131,20 @@
118131
(dotimes [_ (get-in-config [:stream-router topic-entity :channels channel-key :worker-count])]
119132
(start-subscriber* (lch/open connection)
120133
1
121-
(prefixed-channel-name topic-entity channel-key (get-in-config [:rabbit-mq :instant :queue-name]))
134+
(util/prefixed-channel-name topic-entity channel-key (get-in-config [:rabbit-mq :instant :queue-name]))
122135
(mpr/channel-mapper-func channel-handler-fn channel-key)
123136
topic-entity)))))
124137

125138
(defn start-subscribers
126139
"Starts the subscriber to the instant queue of the rabbitmq"
127140
[stream-routes batch-routes]
128141
(doseq [stream-route stream-routes]
129-
(let [topic-entity (first stream-route)
130-
handler (-> stream-route second :handler-fn)
131-
channels (-> stream-route second (dissoc :handler-fn))]
142+
(let [topic-entity (first stream-route)
143+
handler (-> stream-route second :handler-fn)
144+
channels (-> stream-route second (dissoc :handler-fn))]
132145
(start-channels-subscriber channels topic-entity)
133146
(start-retry-subscriber* (mpr/mapper-func handler (keys channels)) topic-entity)))
134147
(doseq [batch-route batch-routes]
135-
(let [topic-entity (first batch-route)
136-
handler (-> batch-route second :handler-fn)]
148+
(let [topic-entity (first batch-route)
149+
handler (-> batch-route second :handler-fn)]
137150
(start-retry-subscriber* (fn [message] (ch/process handler message)) topic-entity))))

0 commit comments

Comments
 (0)