Skip to content

Commit

Permalink
feat: add pause consumer
Browse files Browse the repository at this point in the history
Signed-off-by: Yordis Prieto <[email protected]>
  • Loading branch information
yordis committed Mar 14, 2024
1 parent 177c4c7 commit 62e2703
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 0 deletions.
70 changes: 70 additions & 0 deletions nats/src/jetstream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1764,6 +1764,76 @@ impl JetStream {
.map(|dr| dr.success)
}

/// Pause a `JetStream` consumer until the given time.
pub fn pause_consumer<S, C>(&self, stream: S, consumer: C, pause_until: DateTime) -> io::Result<PauseResponse>
where
S: AsRef<str>,
C: AsRef<str>,
{
let stream = stream.as_ref();
if stream.is_empty() {
return Err(io::Error::new(
ErrorKind::InvalidInput,
"the stream name must not be empty",
));
}
let consumer = consumer.as_ref();
if consumer.is_empty() {
return Err(io::Error::new(
ErrorKind::InvalidInput,
"the consumer name must not be empty",
));
}

let subject = format!(
"{}CONSUMER.PAUSE.{}.{}",
self.api_prefix(),
stream,
consumer
);

let req = serde_json::ser::to_vec(&PauseConsumerRequest {
pause_until: Some(pause_until),
})?;

self.js_request::<PauseResponse>(&subject, &req)
}

/// Resume a `JetStream` consumer.
pub fn resume_consumer<S, C>(&self, stream: S, consumer: C) -> io::Result<PauseResponse>
where
S: AsRef<str>,
C: AsRef<str>,
{
let stream = stream.as_ref();
if stream.is_empty() {
return Err(io::Error::new(
ErrorKind::InvalidInput,
"the stream name must not be empty",
));
}
let consumer = consumer.as_ref();
if consumer.is_empty() {
return Err(io::Error::new(
ErrorKind::InvalidInput,
"the consumer name must not be empty",
));
}

let subject = format!(
"{}CONSUMER.PAUSE.{}.{}",
self.api_prefix(),
stream,
consumer
);

let req = serde_json::ser::to_vec(&PauseConsumerRequest {
pause_until: None,
})?;

self.js_request::<PauseResponse>(&subject, &req)
}

/// Query `JetStream` consumer information.
pub fn consumer_info<S, C>(&self, stream: S, consumer: C) -> io::Result<ConsumerInfo>
where
Expand Down
19 changes: 19 additions & 0 deletions nats/src/jetstream/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,18 @@ pub(crate) struct DeleteResponse {
pub success: bool,
}

#[derive(Deserialize)]
pub(crate) struct PauseResponse {
pub paused: bool,
pub pause_until: Option<DateTime>,
pub pause_remaining: Option<Duration>,
}

#[derive(Debug, Default, Serialize, Deserialize)]
pub(crate) struct PauseConsumerRequest {
pub pause_until: Option<DateTime>,
}

#[derive(Debug, Default, Serialize, Deserialize)]
pub(crate) struct CreateConsumerRequest {
pub stream_name: String,
Expand Down Expand Up @@ -252,6 +264,8 @@ pub struct ConsumerConfig {
/// Threshold for ephemeral consumer inactivity
#[serde(default, with = "serde_nanos", skip_serializing_if = "is_default")]
pub inactive_threshold: Duration,
/// PauseUntil is for suspending the consumer until the deadline.
pub pause_until: Option<DateTime>,
}

pub(crate) enum ConsumerKind {
Expand Down Expand Up @@ -680,6 +694,11 @@ pub struct ConsumerInfo {
/// Indicates if any client is connected and receiving messages from a push consumer
#[serde(default)]
pub push_bound: bool,
/// Paused indicates whether the consumer is paused.
pub paused: bool,
/// PauseRemaining contains the amount of time left until the consumer unpauses. It will only
/// be non-zero if the consumer is currently paused.
pub pause_remaining: Option<Duration>,
}

/// Information about the stream's, consumer's associated `JetStream` cluster
Expand Down
28 changes: 28 additions & 0 deletions nats/tests/jetstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -791,3 +791,31 @@ pub fn run_basic_jetstream() -> (nats_server::Server, Connection, JetStream) {

(s, nc, js)
}

#[test]
fn jetstream_create_paused_consumer() {
const CONSUMER_NAME: &str = "CONSUMER1";

let s = nats_server::run_server("tests/configs/jetstream.conf");
let nc = nats::Options::new()
.error_callback(|err| println!("error!: {err}"))
.connect(s.client_url())
.unwrap();
let js = nats::jetstream::new(nc.clone());
let pause_until = time::OffsetDateTime::now_utc() + Duration::from_secs(100);

js.add_consumer(
"TEST",
ConsumerConfig {
durable_name: Some(CONSUMER_NAME.to_string()),
pause_until: Some(pause_until),
..Default::default()
},
)
.unwrap();


let info = js.consumer_info("TEST", CONSUMER_NAME.to_string()).unwrap();
assert_eq!(info.paused, true);
assert_ne!(info.pause_remaining, Option::None);
}

0 comments on commit 62e2703

Please sign in to comment.