Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Page over stream #159

Merged
merged 5 commits into from
May 14, 2024
Merged

Page over stream #159

merged 5 commits into from
May 14, 2024

Conversation

mmmries
Copy link
Collaborator

@mmmries mmmries commented Apr 25, 2024

I did a bit of experimenting with having a functionality interface for a pull consumer that I think might be useful for people. The basic idea is that the caller calls Pager.init(...) and then repeatedly calls Pager.page to get the next "page" of messages. When the page function returns {:done, messages} it means the consumer has reached the current end of the stream and the client should then call Pager.cleanup to immediately cleanup the subscription and consumer.

def init(conn, stream_name, opts) do
name = "gnat_stream_pager_#{Util.nuid()}"

first_seq = Keyword.fetch!(opts, :from_seq)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I imagine that we would end up allowing other options like since: utc_datetime or limit: 1000 in these opts

description: "Gnat Stream Pager",
opt_start_seq: first_seq,
replay_policy: :instant,
inactive_threshold: 30_000_000_000
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Jetstream will auto-cleanup our consumer after 30sec of inactivity, so the final cleanup function call is not crucial

consumer_name: name,
domain: nil,
inbox: inbox,
batch: 10,
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should also allow for domain and batch to be passed in via opts

@@ -0,0 +1,75 @@
defmodule Gnat.Jetstream.Pager do
@moduledoc false
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Keeping this undocumented until we feel comfortable with the API

@@ -10,7 +10,9 @@ Sends `AckAck` acknowledgement to the server.
Acknowledges a message was completely handled.
"""
@spec ack(message :: Gnat.message()) :: :ok
def ack(message)
def ack(%{reply_to: nil}) do
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ran into a case where I got a %{status: "408", reply_to: nil} message at the end of the stream and my code tried to ack the message which resulted in a TCP error and it killed my gnat connection. So I thought it would be good to validate the reply_to here with a more useful error message

@@ -25,14 +25,18 @@ defmodule Gnat do
* `sid` - The subscription ID corresponding to this message. You generally won't need to use this value directly.
* `reply_to` - A topic supplied for expected replies
* `headers` - A set of NATS message headers on the message
* `status` - Similar to an HTTP status, this is present for messages with headers and can indicate the specific purpose of a message. Example `status: "408"`
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we indicate whether this will always be a number , or a string, etc? Or maybe a link to the NATS reference with a list of status replies, etc?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmmm I can find several examples in the ADRs, but I don't think we can provide an exhaustive list since clients can publish their own messages with headers and provide their own status codes

@mmmries mmmries merged commit 297ae7b into main May 14, 2024
3 checks passed
@mmmries mmmries deleted the page_over_stream branch May 14, 2024 04:11
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants