diff --git a/nats/src/jetstream/mod.rs b/nats/src/jetstream/mod.rs index 6c6615263..4af4c7b29 100644 --- a/nats/src/jetstream/mod.rs +++ b/nats/src/jetstream/mod.rs @@ -1764,6 +1764,76 @@ impl JetStream { .map(|dr| dr.success) } + /// Pause a `JetStream` consumer until the given time. + pub fn pause_consumer(&self, stream: S, consumer: C, pause_until: DateTime) -> io::Result + where + S: AsRef, + C: AsRef, + { + let stream = stream.as_ref(); + if stream.is_empty() { + return Err(io::Error::new( + ErrorKind::InvalidInput, + "the stream name must not be empty", + )); + } + let consumer = consumer.as_ref(); + if consumer.is_empty() { + return Err(io::Error::new( + ErrorKind::InvalidInput, + "the consumer name must not be empty", + )); + } + + let subject = format!( + "{}CONSUMER.PAUSE.{}.{}", + self.api_prefix(), + stream, + consumer + ); + + let req = serde_json::ser::to_vec(&PauseConsumerRequest { + pause_until: Some(pause_until), + })?; + + self.js_request::(&subject, &req) + } + + /// Resume a `JetStream` consumer. + pub fn resume_consumer(&self, stream: S, consumer: C) -> io::Result + where + S: AsRef, + C: AsRef, + { + let stream = stream.as_ref(); + if stream.is_empty() { + return Err(io::Error::new( + ErrorKind::InvalidInput, + "the stream name must not be empty", + )); + } + let consumer = consumer.as_ref(); + if consumer.is_empty() { + return Err(io::Error::new( + ErrorKind::InvalidInput, + "the consumer name must not be empty", + )); + } + + let subject = format!( + "{}CONSUMER.PAUSE.{}.{}", + self.api_prefix(), + stream, + consumer + ); + + let req = serde_json::ser::to_vec(&PauseConsumerRequest { + pause_until: None, + })?; + + self.js_request::(&subject, &req) + } + /// Query `JetStream` consumer information. pub fn consumer_info(&self, stream: S, consumer: C) -> io::Result where diff --git a/nats/src/jetstream/types.rs b/nats/src/jetstream/types.rs index 2740ec5b8..196170555 100644 --- a/nats/src/jetstream/types.rs +++ b/nats/src/jetstream/types.rs @@ -121,6 +121,18 @@ pub(crate) struct DeleteResponse { pub success: bool, } +#[derive(Deserialize)] +pub(crate) struct PauseResponse { + pub paused: bool, + pub pause_until: Option, + pub pause_remaining: Option, +} + +#[derive(Debug, Default, Serialize, Deserialize)] +pub(crate) struct PauseConsumerRequest { + pub pause_until: Option, +} + #[derive(Debug, Default, Serialize, Deserialize)] pub(crate) struct CreateConsumerRequest { pub stream_name: String, @@ -252,6 +264,8 @@ pub struct ConsumerConfig { /// Threshold for ephemeral consumer inactivity #[serde(default, with = "serde_nanos", skip_serializing_if = "is_default")] pub inactive_threshold: Duration, + /// PauseUntil is for suspending the consumer until the deadline. + pub pause_until: Option, } pub(crate) enum ConsumerKind { @@ -680,6 +694,11 @@ pub struct ConsumerInfo { /// Indicates if any client is connected and receiving messages from a push consumer #[serde(default)] pub push_bound: bool, + /// Paused indicates whether the consumer is paused. + pub paused: bool, + /// PauseRemaining contains the amount of time left until the consumer unpauses. It will only + /// be non-zero if the consumer is currently paused. + pub pause_remaining: Option, } /// Information about the stream's, consumer's associated `JetStream` cluster