From b5e663ef31935df4b0ba0bbdd730f85b46b15c35 Mon Sep 17 00:00:00 2001 From: Tomasz Pietrek Date: Mon, 2 Sep 2024 14:10:22 +0200 Subject: [PATCH] Add get_info to Stream Signed-off-by: Tomasz Pietrek --- async-nats/src/jetstream/stream.rs | 19 ++++++++++++++--- async-nats/tests/jetstream_tests.rs | 32 +++++++++++++++++++++++++++++ 2 files changed, 48 insertions(+), 3 deletions(-) diff --git a/async-nats/src/jetstream/stream.rs b/async-nats/src/jetstream/stream.rs index e2e178a7d..ea648ca8d 100755 --- a/async-nats/src/jetstream/stream.rs +++ b/async-nats/src/jetstream/stream.rs @@ -118,6 +118,8 @@ impl Display for DeleteMessageErrorKind { pub type DeleteMessageError = Error; /// Handle to operations that can be performed on a `Stream`. +/// It's generic over the type of `info` field to allow `Stream` with or without +/// info contents. #[derive(Debug, Clone)] pub struct Stream { pub(crate) info: T, @@ -179,6 +181,17 @@ impl Stream { } impl Stream { + /// Retrieves `info` about [Stream] from the server. As this variant of [Stream] does not + /// contain or stores `Info`, it is returned directly. + pub async fn get_info(&self) -> Result { + let subject = format!("STREAM.INFO.{}", self.name); + + match self.context.request(subject, &json!({})).await? { + Response::Ok::(info) => Ok(info), + Response::Err { error } => Err(error.into()), + } + } + /// Gets next message for a [Stream]. /// /// Requires a [Stream] with `allow_direct` set to `true`. @@ -1234,7 +1247,7 @@ pub enum StorageType { } /// Shows config and current state for this stream. -#[derive(Debug, Deserialize, Clone)] +#[derive(Debug, Deserialize, Clone, PartialEq, Eq)] pub struct Info { /// The configuration associated with this stream. pub config: Config, @@ -1259,7 +1272,7 @@ pub struct DeleteStatus { } /// information about the given stream. -#[derive(Debug, Deserialize, Clone, Copy)] +#[derive(Debug, Deserialize, Clone, Copy, PartialEq, Eq)] pub struct State { /// The number of messages contained in this stream pub messages: u64, @@ -1454,7 +1467,7 @@ pub struct PeerInfo { pub lag: Option, } -#[derive(Debug, Clone, Deserialize)] +#[derive(Debug, Clone, Deserialize, PartialEq, Eq)] pub struct SourceInfo { /// Source name. pub name: String, diff --git a/async-nats/tests/jetstream_tests.rs b/async-nats/tests/jetstream_tests.rs index 15dfba495..6ce218f10 100755 --- a/async-nats/tests/jetstream_tests.rs +++ b/async-nats/tests/jetstream_tests.rs @@ -2303,6 +2303,38 @@ mod jetstream { assert!(messages.next().await.is_none()); } + #[tokio::test] + async fn stream_info() { + let server = nats_server::run_server("tests/configs/jetstream.conf"); + let client = ConnectOptions::new() + .event_callback(|err| async move { println!("error: {err:?}") }) + .connect(server.client_url()) + .await + .unwrap(); + + let context = async_nats::jetstream::new(client); + + context + .create_stream(stream::Config { + name: "events".into(), + subjects: vec!["events".into()], + ..Default::default() + }) + .await + .unwrap(); + + let mut stream = context.get_stream("events").await.unwrap(); + assert_eq!( + stream.info().await.unwrap().clone(), + stream.cached_info().clone() + ); + + assert_eq!( + stream.get_info().await.unwrap().clone(), + stream.cached_info().clone() + ); + } + #[tokio::test] async fn consumer_info() { let server = nats_server::run_server("tests/configs/jetstream.conf");