diff --git a/CHANGELOG.md b/CHANGELOG.md index 80b3108..ebaa603 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,8 @@ # Change Log ## [Unreleased] +### Added +* Add `:visibility-timeout-in-heartbeat` option to control visibility timeout in heartbeat mode. ## 0.4.91 ### Fixed diff --git a/README.md b/README.md index 2b2f760..a5566f4 100644 --- a/README.md +++ b/README.md @@ -65,7 +65,8 @@ See [API docs](https://cljdoc.org/d/toyokumo/gluttony/CURRENT) for detail. ### Heartbeat If you don't know how long it takes to process a message, pass `:hearbeat` and `:heartbeat-timeout` options. -Then Gluttony extends the message visibility per `:hearbeat` seconds to `:heartbeat-timeout` seconds. +Then Gluttony extends the message visibility per `:hearbeat` seconds to `:hearbeat + 1` seconds. +(Extended seconds is configurable by `:visibility-timeout-in-heartbeat` option) See [AWS documents](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/working-with-messages.html) for more detail. diff --git a/build.edn b/build.edn index e97ee38..b98f729 100644 --- a/build.edn +++ b/build.edn @@ -1,5 +1,5 @@ {:lib toyokumo/gluttony - :version "0.4.{{git/commit-count}}" + :version "0.5.{{git/commit-count}}" :documents [{:file "CHANGELOG.md" :match "Unreleased" :action :append-after diff --git a/src/gluttony/core.clj b/src/gluttony/core.clj index ecdbc87..3090e22 100644 --- a/src/gluttony/core.clj +++ b/src/gluttony/core.clj @@ -39,43 +39,46 @@ (raise 10)))) Optional arguments: - :client - the SQS client, which is the instance of cognitect.aws.client.Client. - if missing, cognitect.aws.client.api/client would be called. - :num-workers - the number of workers processing messages concurrently. - default: (Runtime/availableProcessors) - 1 - :num-receivers - the number of receivers polling from sqs. - default: (num-workers / 10) because each receiver is able to receive - up to 10 messages at a time. - :message-channel-size - the number of messages to prefetch from sqs. - default: 20 * num-receivers - :receive-limit - the number of messages to receive at a time. 1 to 10. - default: 10 - :consume-limit - the number of processing messages at the same time. 0 to 1024 - If the consume run asynchronously, for instance inside go block, - you may want to use this option. - default: 0, which means gluttony doesn't care about how many message - are processed simultaneously. - :long-polling-duration - the duration (in seconds) for which the call waits for a message to - arrive in the queue before returning. 0 to 20. - default: 20 - :exceptional-poll-delay-ms - when an Exception is received while polling, receiver wait for the - number of ms until polling again. - default: 10000 (10 seconds). - :heartbeat - the duration (in seconds) for which the consumer extends message - visibility if the message is being processed. 1 to 43199. - default: nil - If it isn't set, heartbeat doesn't work. - If it's set, :heartbeat-timeout is required. - Refer to AWS documents for more detail: https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/working-with-messages.html - If you set this option and :consume-limit, recommend to make - :consume-limit bigger than :message-channel-size so as not to block - heartbeat requests. - :heartbeat-timeout - the timeout (in seconds) of heartbeat. - If your consume function doesn't call respond or raise within heartbeat - timeout, the consumer doesn't extend message visibility any more. - 2 to 43200. - default: nil - :heartbeat-timeout must be longer than :heartbeat. + :client - the SQS client, which is the instance of cognitect.aws.client.Client. + if missing, cognitect.aws.client.api/client would be called. + :num-workers - the number of workers processing messages concurrently. + default: (Runtime/availableProcessors) - 1 + :num-receivers - the number of receivers polling from sqs. + default: (num-workers / 10) because each receiver is able to receive + up to 10 messages at a time. + :message-channel-size - the number of messages to prefetch from sqs. + default: 20 * num-receivers + :receive-limit - the number of messages to receive at a time. 1 to 10. + default: 10 + :consume-limit - the number of processing messages at the same time. 0 to 1024 + If the consume run asynchronously, for instance inside go block, + you may want to use this option. + default: 0, which means gluttony doesn't care about how many message + are processed simultaneously. + :long-polling-duration - the duration (in seconds) for which the call waits for a message to + arrive in the queue before returning. 0 to 20. + default: 20 + :exceptional-poll-delay-ms - when an Exception is received while polling, receiver wait for the + number of ms until polling again. + default: 10000 (10 seconds). + :heartbeat - the duration (in seconds) for which the consumer extends message + visibility if the message is being processed. 1 to 43199. + default: nil + If it isn't set, heartbeat doesn't work. + If it's set, :heartbeat-timeout is required. + Refer to AWS documents for more detail: https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/working-with-messages.html + If you set this option and :consume-limit, recommend to make + :consume-limit bigger than :message-channel-size so as not to block + heartbeat requests. + :heartbeat-timeout - the timeout (in seconds) of heartbeat. + If your consume function doesn't call respond or raise within heartbeat + timeout, the consumer doesn't extend message visibility any more. + 2 to 43200. + default: nil + :heartbeat-timeout must be longer than :heartbeat. + :visibility-timeout-in-heartbeat - control visibility timeout (in seconds) in heartbeat. + default: :heartbeat + 1 + :visibility-timeout-in-heartbeat must be longer than :heartbeat. Output: a instance of gluttony.record.consumer.Consumer" ^Consumer [queue-url consume & [opts]] @@ -96,6 +99,9 @@ 20) exceptional-poll-delay-ms (or (:exceptional-poll-delay-ms opts) 10000) + visibility-timeout-in-heartbeat (or (:visibility-timeout-in-heartbeat opts) + (when (:heartbeat opts) + (inc (:heartbeat opts)))) consumer (c/new-consumer {:queue-url queue-url :consume consume :client client @@ -109,6 +115,7 @@ :exceptional-poll-delay-ms exceptional-poll-delay-ms :heartbeat (:heartbeat opts) :heartbeat-timeout (:heartbeat-timeout opts) + :visibility-timeout-in-heartbeat visibility-timeout-in-heartbeat :receiver-enabled (atom true)})] (p/-start consumer))) diff --git a/src/gluttony/record/consumer.clj b/src/gluttony/record/consumer.clj index 11670d0..9333813 100644 --- a/src/gluttony/record/consumer.clj +++ b/src/gluttony/record/consumer.clj @@ -79,7 +79,7 @@ (defn- heartbeat* "When heartbeat parameter is set, a heartbeat process start after the first heartbeat" - [{:keys [client queue-url heartbeat heartbeat-timeout]} p message] + [{:keys [client queue-url heartbeat heartbeat-timeout visibility-timeout-in-heartbeat]} p message] (when heartbeat (let [heartbeat-msecs (* heartbeat 1000) start (System/currentTimeMillis)] @@ -91,7 +91,7 @@ (log/debugf "message-id:%s heartbeat" (:message-id message)) (sqs/change-message-visibility client {:queue-url queue-url :receipt-handle (:receipt-handle message) - :visibility-timeout (inc heartbeat)}) + :visibility-timeout visibility-timeout-in-heartbeat}) (a/Consumer m)) diff --git a/test/gluttony/core_test.clj b/test/gluttony/core_test.clj index 1912366..61fc2ec 100644 --- a/test/gluttony/core_test.clj +++ b/test/gluttony/core_test.clj @@ -38,7 +38,8 @@ :exceptional-poll-delay-ms 10000 :consume-chan nil :heartbeat nil - :heartbeat-timeout nil} + :heartbeat-timeout nil + :visibility-timeout-in-heartbeat nil} (dissoc consumer :message-chan :receiver-enabled))) (stop-consumer consumer))) @@ -66,7 +67,8 @@ :long-polling-duration 20 :exceptional-poll-delay-ms 10000 :heartbeat 60 - :heartbeat-timeout 300} + :heartbeat-timeout 300 + :visibility-timeout-in-heartbeat 61} (dissoc consumer :message-chan :consume-chan :receiver-enabled))) (stop-consumer consumer)))) diff --git a/test/gluttony/record/consumer_test.clj b/test/gluttony/record/consumer_test.clj index 3810d79..61ef7f2 100644 --- a/test/gluttony/record/consumer_test.clj +++ b/test/gluttony/record/consumer_test.clj @@ -163,7 +163,8 @@ :consume-limit 0 :long-polling-duration 20 :exceptional-poll-delay-ms 0 - :heartbeat 60})) + :heartbeat 60 + :visibility-timeout-in-heartbeat 61})) "heartbeat is set but heartbeat-timeout isn't set") (is (thrown? AssertionError (new-consumer {:queue-url "https://ap..." @@ -176,7 +177,39 @@ :receive-limit 10 :consume-limit 0 :long-polling-duration 20 - :exceptional-poll-delay-ms 0 + :exceptional-poll-delay-ms 1000 + :heartbeat 60 + :heartbeat-timeout 10 + :visibility-timeout-in-heartbeat 61})) + "heartbeat is bigger than heartbeat-timeout") + (is (thrown? AssertionError + (new-consumer {:queue-url "https://ap..." + :consume (fn [_ _ _]) + :client client + :given-client? true + :num-workers 1 + :num-receivers 1 + :message-channel-size 10 + :receive-limit 10 + :consume-limit 0 + :long-polling-duration 20 + :exceptional-poll-delay-ms 1000 + :heartbeat 60 + :heartbeat-timeout 300})) + "heartbeat is set but visibility-timeout-in-heartbeat isn't set") + (is (thrown? AssertionError + (new-consumer {:queue-url "https://ap..." + :consume (fn [_ _ _]) + :client client + :given-client? true + :num-workers 1 + :num-receivers 1 + :message-channel-size 10 + :receive-limit 10 + :consume-limit 0 + :long-polling-duration 20 + :exceptional-poll-delay-ms 1000 :heartbeat 60 - :heartbeat-timeout 10})) - "heartbeat is bigger than heartbeat-timeout"))) + :heartbeat-timeout 300 + :visibility-timeout-in-heartbeat 59})) + "heartbeat is bigger than visibility-timeout-in-heartbeat")))