Skip to content

Commit

Permalink
Check reply_to before attempting an ack
Browse files Browse the repository at this point in the history
  • Loading branch information
mmmries committed Apr 25, 2024
1 parent e790ea4 commit ad882fd
Showing 1 changed file with 15 additions and 5 deletions.
20 changes: 15 additions & 5 deletions lib/gnat/jetstream/jetstream.ex
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ Sends `AckAck` acknowledgement to the server.
Acknowledges a message was completely handled.
"""
@spec ack(message :: Gnat.message()) :: :ok
def ack(message)
def ack(%{reply_to: nil}) do
{:error, "Cannot ack message with no reply-to"}
end

def ack(%{gnat: gnat, reply_to: reply_to}) do
Gnat.pub(gnat, reply_to, "")
Expand All @@ -23,7 +25,9 @@ Acknowledges the message was handled and requests delivery of the next message t
subject. Only applies to Pull-mode.
"""
@spec ack_next(message :: Gnat.message(), consumer_subject :: binary()) :: :ok
def ack_next(message, consumer_subject)
def ack_next(%{reply_to: nil}, consumer_subject) do
{:error, "Cannot ack message with no reply-to"}
end

def ack_next(%{gnat: gnat, reply_to: reply_to}, consumer_subject) do
Gnat.pub(gnat, reply_to, "+NXT", reply_to: consumer_subject)
Expand All @@ -36,7 +40,9 @@ Signals that the message will not be processed now and processing can move onto
NAK'd message will be retried.
"""
@spec nack(message :: Gnat.message()) :: :ok
def nack(message)
def nack(%{reply_to: nil}) do
{:error, "Cannot ack message with no reply-to"}
end

def nack(%{gnat: gnat, reply_to: reply_to}) do
Gnat.pub(gnat, reply_to, "-NAK")
Expand All @@ -49,7 +55,9 @@ When sent before the `AckWait` period indicates that work is ongoing and the per
extended by another equal to `AckWait`.
"""
@spec ack_progress(message :: Gnat.message()) :: :ok
def ack_progress(message)
def ack_progress(%{reply_to: nil}) do
{:error, "Cannot ack message with no reply-to"}
end

def ack_progress(%{gnat: gnat, reply_to: reply_to}) do
Gnat.pub(gnat, reply_to, "+WPI")
Expand All @@ -62,7 +70,9 @@ Instructs the server to stop redelivery of a message without acknowledging it as
processed.
"""
@spec ack_term(message :: Gnat.message()) :: :ok
def ack_term(message)
def ack_term(%{reply_to: nil}) do
{:error, "Cannot ack message with no reply-to"}
end

def ack_term(%{gnat: gnat, reply_to: reply_to}) do
Gnat.pub(gnat, reply_to, "+TERM")
Expand Down

0 comments on commit ad882fd

Please sign in to comment.