Skip to content

Commit

Permalink
Add stream field discard_new_per_subject (#163)
Browse files Browse the repository at this point in the history
Co-authored-by: Michael <[email protected]>
  • Loading branch information
0xAX and mmmries authored Jun 24, 2024
1 parent 3df7739 commit 1fa524e
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 2 deletions.
9 changes: 7 additions & 2 deletions lib/gnat/jetstream/api/stream.ex
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ defmodule Gnat.Jetstream.API.Stream do
* `:discard` - determines what happens when a Stream reaches its limits. It has the following options:
- `:old` - the default option. Old messages are deleted.
- `:new` - refuses new messages.
* `:discard_new_per_subject` - - allows to enable discarding new messages per subject when limits are reached.
Requires `discard: :new` and the `:max_msgs_per_subject` to be configured.
* `:domain` - JetStream domain, mainly used for leaf nodes.
See [JetStream on Leaf Nodes](https://docs.nats.io/running-a-nats-service/configuration/leafnodes/jetstream_leafnodes).
* `:duplicate_window` - the window within which to track duplicate messages, expressed in nanoseconds.
Expand Down Expand Up @@ -91,7 +93,8 @@ defmodule Gnat.Jetstream.API.Stream do
num_replicas: 1,
retention: :limits,
sealed: false,
storage: :file
storage: :file,
discard_new_per_subject: false
]

@type nanoseconds :: non_neg_integer()
Expand Down Expand Up @@ -127,7 +130,8 @@ defmodule Gnat.Jetstream.API.Stream do
sources: nil | list(source()),
storage: :file | :memory,
subjects: nil | list(binary()),
template_owner: nil | binary()
template_owner: nil | binary(),
discard_new_per_subject: boolean()
}

@typedoc """
Expand Down Expand Up @@ -505,6 +509,7 @@ defmodule Gnat.Jetstream.API.Stream do
|> put_if_exist(:allow_rollup_hdrs, stream, "allow_rollup_hdrs")
|> put_if_exist(:deny_delete, stream, "deny_delete")
|> put_if_exist(:deny_purge, stream, "deny_purge")
|> put_if_exist(:discard_new_per_subject, stream, "discard_new_per_subject")
|> put_if_exist(:mirror_direct, stream, "mirror_direct")
|> put_if_exist(:sealed, stream, "sealed")
end
Expand Down
46 changes: 46 additions & 0 deletions test/jetstream/api/stream_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,52 @@ defmodule Gnat.Jetstream.API.StreamTest do
assert :ok = Stream.delete(:gnat, "LIST_OFFSET_TEST_TWO")
end

test "create stream with discard_new_per_subject: true" do
stream = %Stream{name: "DISCARD_NEW_PER_SUBJECT_TEST",
subjects: ["STREAM_TEST"],
max_msgs_per_subject: 1,
discard_new_per_subject: true,
discard: :new}
assert {:ok, _response} = Stream.create(:gnat, stream)

assert {:ok, _} = Gnat.request(:gnat, "STREAM_TEST", "first message")
assert {:ok, response} =
Stream.get_message(:gnat, "DISCARD_NEW_PER_SUBJECT_TEST", %{
last_by_subj: "STREAM_TEST"})
%{
data: "first message",
hdrs: nil,
subject: "STREAM_TEST",
time: %DateTime{}
} = response

assert {:ok, _} = Gnat.request(:gnat, "STREAM_TEST", "second message")
assert {:ok, response} =
Stream.get_message(:gnat, "DISCARD_NEW_PER_SUBJECT_TEST", %{
last_by_subj: "STREAM_TEST"})
%{
data: "first message",
hdrs: nil,
subject: "STREAM_TEST",
time: %DateTime{}
} = response

Stream.purge(:gnat, "DISCARD_NEW_PER_SUBJECT_TEST")

assert {:ok, _} = Gnat.request(:gnat, "STREAM_TEST", "second message")
assert {:ok, response} =
Stream.get_message(:gnat, "DISCARD_NEW_PER_SUBJECT_TEST", %{
last_by_subj: "STREAM_TEST"})
%{
data: "second message",
hdrs: nil,
subject: "STREAM_TEST",
time: %DateTime{}
} = response

assert :ok = Stream.delete(:gnat, "DISCARD_NEW_PER_SUBJECT_TEST")
end

test "updating a stream" do
stream = %Stream{name: "UPDATE_TEST", subjects: ["STREAM_TEST"]}
assert {:ok, _response} = Stream.create(:gnat, stream)
Expand Down

0 comments on commit 1fa524e

Please sign in to comment.