From ad882fd3adc3f428ca602f10f15027a2ed6ad9f5 Mon Sep 17 00:00:00 2001 From: Michael Ries Date: Thu, 25 Apr 2024 09:05:42 -0700 Subject: [PATCH] Check reply_to before attempting an ack --- lib/gnat/jetstream/jetstream.ex | 20 +++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/lib/gnat/jetstream/jetstream.ex b/lib/gnat/jetstream/jetstream.ex index 8d9fd4c..61e5f41 100644 --- a/lib/gnat/jetstream/jetstream.ex +++ b/lib/gnat/jetstream/jetstream.ex @@ -10,7 +10,9 @@ Sends `AckAck` acknowledgement to the server. Acknowledges a message was completely handled. """ @spec ack(message :: Gnat.message()) :: :ok -def ack(message) +def ack(%{reply_to: nil}) do + {:error, "Cannot ack message with no reply-to"} +end def ack(%{gnat: gnat, reply_to: reply_to}) do Gnat.pub(gnat, reply_to, "") @@ -23,7 +25,9 @@ Acknowledges the message was handled and requests delivery of the next message t subject. Only applies to Pull-mode. """ @spec ack_next(message :: Gnat.message(), consumer_subject :: binary()) :: :ok -def ack_next(message, consumer_subject) +def ack_next(%{reply_to: nil}, consumer_subject) do + {:error, "Cannot ack message with no reply-to"} +end def ack_next(%{gnat: gnat, reply_to: reply_to}, consumer_subject) do Gnat.pub(gnat, reply_to, "+NXT", reply_to: consumer_subject) @@ -36,7 +40,9 @@ Signals that the message will not be processed now and processing can move onto NAK'd message will be retried. """ @spec nack(message :: Gnat.message()) :: :ok -def nack(message) +def nack(%{reply_to: nil}) do + {:error, "Cannot ack message with no reply-to"} +end def nack(%{gnat: gnat, reply_to: reply_to}) do Gnat.pub(gnat, reply_to, "-NAK") @@ -49,7 +55,9 @@ When sent before the `AckWait` period indicates that work is ongoing and the per extended by another equal to `AckWait`. """ @spec ack_progress(message :: Gnat.message()) :: :ok -def ack_progress(message) +def ack_progress(%{reply_to: nil}) do + {:error, "Cannot ack message with no reply-to"} +end def ack_progress(%{gnat: gnat, reply_to: reply_to}) do Gnat.pub(gnat, reply_to, "+WPI") @@ -62,7 +70,9 @@ Instructs the server to stop redelivery of a message without acknowledging it as processed. """ @spec ack_term(message :: Gnat.message()) :: :ok -def ack_term(message) +def ack_term(%{reply_to: nil}) do + {:error, "Cannot ack message with no reply-to"} +end def ack_term(%{gnat: gnat, reply_to: reply_to}) do Gnat.pub(gnat, reply_to, "+TERM")