Skip to content

Commit

Permalink
Merge pull request #14 from toyokumo/add-visibility-timeout-in-heartb…
Browse files Browse the repository at this point in the history
…eat-option

Add visibility timeout in heartbeat option
  • Loading branch information
egs33 authored Mar 1, 2023
2 parents 20d2384 + 98a3131 commit 958b905
Show file tree
Hide file tree
Showing 7 changed files with 99 additions and 49 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# Change Log

## [Unreleased]
### Added
* Add `:visibility-timeout-in-heartbeat` option to control visibility timeout in heartbeat mode.

## 0.4.91
### Fixed
Expand Down
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ See [API docs](https://cljdoc.org/d/toyokumo/gluttony/CURRENT) for detail.
### Heartbeat
If you don't know how long it takes to process a message, pass `:hearbeat` and `:heartbeat-timeout` options.

Then Gluttony extends the message visibility per `:hearbeat` seconds to `:heartbeat-timeout` seconds.
Then Gluttony extends the message visibility per `:hearbeat` seconds to `:hearbeat + 1` seconds.
(Extended seconds is configurable by `:visibility-timeout-in-heartbeat` option)

See [AWS documents](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/working-with-messages.html)
for more detail.
Expand Down
2 changes: 1 addition & 1 deletion build.edn
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{:lib toyokumo/gluttony
:version "0.4.{{git/commit-count}}"
:version "0.5.{{git/commit-count}}"
:documents [{:file "CHANGELOG.md"
:match "Unreleased"
:action :append-after
Expand Down
81 changes: 44 additions & 37 deletions src/gluttony/core.clj
Original file line number Diff line number Diff line change
Expand Up @@ -39,43 +39,46 @@
(raise 10))))
Optional arguments:
:client - the SQS client, which is the instance of cognitect.aws.client.Client.
if missing, cognitect.aws.client.api/client would be called.
:num-workers - the number of workers processing messages concurrently.
default: (Runtime/availableProcessors) - 1
:num-receivers - the number of receivers polling from sqs.
default: (num-workers / 10) because each receiver is able to receive
up to 10 messages at a time.
:message-channel-size - the number of messages to prefetch from sqs.
default: 20 * num-receivers
:receive-limit - the number of messages to receive at a time. 1 to 10.
default: 10
:consume-limit - the number of processing messages at the same time. 0 to 1024
If the consume run asynchronously, for instance inside go block,
you may want to use this option.
default: 0, which means gluttony doesn't care about how many message
are processed simultaneously.
:long-polling-duration - the duration (in seconds) for which the call waits for a message to
arrive in the queue before returning. 0 to 20.
default: 20
:exceptional-poll-delay-ms - when an Exception is received while polling, receiver wait for the
number of ms until polling again.
default: 10000 (10 seconds).
:heartbeat - the duration (in seconds) for which the consumer extends message
visibility if the message is being processed. 1 to 43199.
default: nil
If it isn't set, heartbeat doesn't work.
If it's set, :heartbeat-timeout is required.
Refer to AWS documents for more detail: https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/working-with-messages.html
If you set this option and :consume-limit, recommend to make
:consume-limit bigger than :message-channel-size so as not to block
heartbeat requests.
:heartbeat-timeout - the timeout (in seconds) of heartbeat.
If your consume function doesn't call respond or raise within heartbeat
timeout, the consumer doesn't extend message visibility any more.
2 to 43200.
default: nil
:heartbeat-timeout must be longer than :heartbeat.
:client - the SQS client, which is the instance of cognitect.aws.client.Client.
if missing, cognitect.aws.client.api/client would be called.
:num-workers - the number of workers processing messages concurrently.
default: (Runtime/availableProcessors) - 1
:num-receivers - the number of receivers polling from sqs.
default: (num-workers / 10) because each receiver is able to receive
up to 10 messages at a time.
:message-channel-size - the number of messages to prefetch from sqs.
default: 20 * num-receivers
:receive-limit - the number of messages to receive at a time. 1 to 10.
default: 10
:consume-limit - the number of processing messages at the same time. 0 to 1024
If the consume run asynchronously, for instance inside go block,
you may want to use this option.
default: 0, which means gluttony doesn't care about how many message
are processed simultaneously.
:long-polling-duration - the duration (in seconds) for which the call waits for a message to
arrive in the queue before returning. 0 to 20.
default: 20
:exceptional-poll-delay-ms - when an Exception is received while polling, receiver wait for the
number of ms until polling again.
default: 10000 (10 seconds).
:heartbeat - the duration (in seconds) for which the consumer extends message
visibility if the message is being processed. 1 to 43199.
default: nil
If it isn't set, heartbeat doesn't work.
If it's set, :heartbeat-timeout is required.
Refer to AWS documents for more detail: https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/working-with-messages.html
If you set this option and :consume-limit, recommend to make
:consume-limit bigger than :message-channel-size so as not to block
heartbeat requests.
:heartbeat-timeout - the timeout (in seconds) of heartbeat.
If your consume function doesn't call respond or raise within heartbeat
timeout, the consumer doesn't extend message visibility any more.
2 to 43200.
default: nil
:heartbeat-timeout must be longer than :heartbeat.
:visibility-timeout-in-heartbeat - control visibility timeout (in seconds) in heartbeat.
default: :heartbeat + 1
:visibility-timeout-in-heartbeat must be longer than :heartbeat.
Output:
a instance of gluttony.record.consumer.Consumer"
^Consumer [queue-url consume & [opts]]
Expand All @@ -96,6 +99,9 @@
20)
exceptional-poll-delay-ms (or (:exceptional-poll-delay-ms opts)
10000)
visibility-timeout-in-heartbeat (or (:visibility-timeout-in-heartbeat opts)
(when (:heartbeat opts)
(inc (:heartbeat opts))))
consumer (c/new-consumer {:queue-url queue-url
:consume consume
:client client
Expand All @@ -109,6 +115,7 @@
:exceptional-poll-delay-ms exceptional-poll-delay-ms
:heartbeat (:heartbeat opts)
:heartbeat-timeout (:heartbeat-timeout opts)
:visibility-timeout-in-heartbeat visibility-timeout-in-heartbeat
:receiver-enabled (atom true)})]
(p/-start consumer)))

Expand Down
13 changes: 9 additions & 4 deletions src/gluttony/record/consumer.clj
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@

(defn- heartbeat*
"When heartbeat parameter is set, a heartbeat process start after the first heartbeat"
[{:keys [client queue-url heartbeat heartbeat-timeout]} p message]
[{:keys [client queue-url heartbeat heartbeat-timeout visibility-timeout-in-heartbeat]} p message]
(when heartbeat
(let [heartbeat-msecs (* heartbeat 1000)
start (System/currentTimeMillis)]
Expand All @@ -91,7 +91,7 @@
(log/debugf "message-id:%s heartbeat" (:message-id message))
(sqs/change-message-visibility client {:queue-url queue-url
:receipt-handle (:receipt-handle message)
:visibility-timeout (inc heartbeat)})
:visibility-timeout visibility-timeout-in-heartbeat})
(a/<! (a/timeout heartbeat-msecs))
(recur)))))))

Expand Down Expand Up @@ -133,6 +133,7 @@
consume-chan
heartbeat
heartbeat-timeout
visibility-timeout-in-heartbeat
receiver-enabled]
p/IConsumer
(-start [this]
Expand Down Expand Up @@ -172,7 +173,8 @@
long-polling-duration
exceptional-poll-delay-ms
heartbeat
heartbeat-timeout]}]
heartbeat-timeout
visibility-timeout-in-heartbeat]}]
{:pre [(not (str/blank? queue-url))
(ifn? consume)
(instance? Client client)
Expand All @@ -187,5 +189,8 @@
(or (= nil heartbeat heartbeat-timeout)
(and (integer? heartbeat) (integer? heartbeat-timeout)
(<= 1 heartbeat 43199) (<= 2 heartbeat-timeout 43200)
(< heartbeat heartbeat-timeout)))]}
(< heartbeat heartbeat-timeout)))
(or (nil? heartbeat)
(and (integer? visibility-timeout-in-heartbeat)
(< heartbeat visibility-timeout-in-heartbeat)))]}
(map->Consumer m))
6 changes: 4 additions & 2 deletions test/gluttony/core_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@
:exceptional-poll-delay-ms 10000
:consume-chan nil
:heartbeat nil
:heartbeat-timeout nil}
:heartbeat-timeout nil
:visibility-timeout-in-heartbeat nil}
(dissoc consumer :message-chan :receiver-enabled)))
(stop-consumer consumer)))

Expand Down Expand Up @@ -66,7 +67,8 @@
:long-polling-duration 20
:exceptional-poll-delay-ms 10000
:heartbeat 60
:heartbeat-timeout 300}
:heartbeat-timeout 300
:visibility-timeout-in-heartbeat 61}
(dissoc consumer :message-chan :consume-chan :receiver-enabled)))
(stop-consumer consumer))))

Expand Down
41 changes: 37 additions & 4 deletions test/gluttony/record/consumer_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,8 @@
:consume-limit 0
:long-polling-duration 20
:exceptional-poll-delay-ms 0
:heartbeat 60}))
:heartbeat 60
:visibility-timeout-in-heartbeat 61}))
"heartbeat is set but heartbeat-timeout isn't set")
(is (thrown? AssertionError
(new-consumer {:queue-url "https://ap..."
Expand All @@ -176,7 +177,39 @@
:receive-limit 10
:consume-limit 0
:long-polling-duration 20
:exceptional-poll-delay-ms 0
:exceptional-poll-delay-ms 1000
:heartbeat 60
:heartbeat-timeout 10
:visibility-timeout-in-heartbeat 61}))
"heartbeat is bigger than heartbeat-timeout")
(is (thrown? AssertionError
(new-consumer {:queue-url "https://ap..."
:consume (fn [_ _ _])
:client client
:given-client? true
:num-workers 1
:num-receivers 1
:message-channel-size 10
:receive-limit 10
:consume-limit 0
:long-polling-duration 20
:exceptional-poll-delay-ms 1000
:heartbeat 60
:heartbeat-timeout 300}))
"heartbeat is set but visibility-timeout-in-heartbeat isn't set")
(is (thrown? AssertionError
(new-consumer {:queue-url "https://ap..."
:consume (fn [_ _ _])
:client client
:given-client? true
:num-workers 1
:num-receivers 1
:message-channel-size 10
:receive-limit 10
:consume-limit 0
:long-polling-duration 20
:exceptional-poll-delay-ms 1000
:heartbeat 60
:heartbeat-timeout 10}))
"heartbeat is bigger than heartbeat-timeout")))
:heartbeat-timeout 300
:visibility-timeout-in-heartbeat 59}))
"heartbeat is bigger than visibility-timeout-in-heartbeat")))

0 comments on commit 958b905

Please sign in to comment.