Skip to content

Commit

Permalink
feat: support blocking subscriptions (#56)
Browse files Browse the repository at this point in the history
  • Loading branch information
heilhead authored Jan 17, 2024
1 parent 4d05390 commit d2a772f
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 10 deletions.
36 changes: 33 additions & 3 deletions relay_client/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,22 @@ impl Client {
.map(|_| ())
}

/// Subscribes on topic to receive messages.
/// Subscribes on topic to receive messages. The request is resolved
/// optimistically as soon as the relay receives it.
pub async fn subscribe(&self, topic: Topic) -> Response<rpc::Subscribe> {
self.request(rpc::Subscribe { topic }).await
self.request(rpc::Subscribe {
topic,
block: false,
})
.await
}

/// Subscribes on topic to receive messages. The request is resolved only
/// when fully processed by the relay.
/// Note: This function is experimental and will likely be removed in the
/// future.
pub async fn subscribe_blocking(&self, topic: Topic) -> Response<rpc::Subscribe> {
self.request(rpc::Subscribe { topic, block: true }).await
}

/// Unsubscribes from a topic.
Expand Down Expand Up @@ -223,13 +236,30 @@ impl Client {
self.request(payload).await
}

/// Subscribes on multiple topics to receive messages.
/// Subscribes on multiple topics to receive messages. The request is
/// resolved optimistically as soon as the relay receives it.
pub async fn batch_subscribe(
&self,
topics: impl Into<Vec<Topic>>,
) -> Response<rpc::BatchSubscribe> {
self.request(rpc::BatchSubscribe {
topics: topics.into(),
block: false,
})
.await
}

/// Subscribes on multiple topics to receive messages. The request is
/// resolved only when fully processed by the relay.
/// Note: This function is experimental and will likely be removed in the
/// future.
pub async fn batch_subscribe_blocking(
&self,
topics: impl Into<Vec<Topic>>,
) -> Response<rpc::BatchSubscribe> {
self.request(rpc::BatchSubscribe {
topics: topics.into(),
block: true,
})
.await
}
Expand Down
42 changes: 39 additions & 3 deletions relay_client/src/websocket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,9 +165,25 @@ impl Client {
EmptyResponseFuture::new(response)
}

/// Subscribes on topic to receive messages.
/// Subscribes on topic to receive messages. The request is resolved
/// optimistically as soon as the relay receives it.
pub fn subscribe(&self, topic: Topic) -> ResponseFuture<Subscribe> {
let (request, response) = create_request(Subscribe { topic });
let (request, response) = create_request(Subscribe {
topic,
block: false,
});

self.request(request);

response
}

/// Subscribes on topic to receive messages. The request is resolved only
/// when fully processed by the relay.
/// Note: This function is experimental and will likely be removed in the
/// future.
pub fn subscribe_blocking(&self, topic: Topic) -> ResponseFuture<Subscribe> {
let (request, response) = create_request(Subscribe { topic, block: true });

self.request(request);

Expand Down Expand Up @@ -204,10 +220,30 @@ impl Client {
FetchMessageStream::new(self.clone(), topics.into())
}

/// Subscribes on multiple topics to receive messages.
/// Subscribes on multiple topics to receive messages. The request is
/// resolved optimistically as soon as the relay receives it.
pub fn batch_subscribe(&self, topics: impl Into<Vec<Topic>>) -> ResponseFuture<BatchSubscribe> {
let (request, response) = create_request(BatchSubscribe {
topics: topics.into(),
block: false,
});

self.request(request);

response
}

/// Subscribes on multiple topics to receive messages. The request is
/// resolved only when fully processed by the relay.
/// Note: This function is experimental and will likely be removed in the
/// future.
pub fn batch_subscribe_blocking(
&self,
topics: impl Into<Vec<Topic>>,
) -> ResponseFuture<BatchSubscribe> {
let (request, response) = create_request(BatchSubscribe {
topics: topics.into(),
block: true,
});

self.request(request);
Expand Down
10 changes: 10 additions & 0 deletions relay_rpc/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,11 @@ pub struct ErrorData {
pub struct Subscribe {
/// The topic to subscribe to.
pub topic: Topic,

/// Whether to disable optimistic response. By default optimistic response
/// is enabled.
#[serde(default)]
pub block: bool,
}

impl RequestPayload for Subscribe {
Expand Down Expand Up @@ -403,6 +408,11 @@ pub struct FetchResponse {
pub struct BatchSubscribe {
/// The topics to subscribe to.
pub topics: Vec<Topic>,

/// Whether to disable optimistic response. By default optimistic response
/// is enabled.
#[serde(default)]
pub block: bool,
}

impl RequestPayload for BatchSubscribe {
Expand Down
20 changes: 16 additions & 4 deletions relay_rpc/src/rpc/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,15 @@ fn subscribe() {
1659980684711969.into(),
Params::Subscribe(Subscribe {
topic: "c4163cf65859106b3f5435fc296e7765411178ed452d1c30337a6230138c9840".into(),
block: false,
}),
));

let serialized = serde_json::to_string(&payload).unwrap();

assert_eq!(
&serialized,
r#"{"id":1659980684711969,"jsonrpc":"2.0","method":"irn_subscribe","params":{"topic":"c4163cf65859106b3f5435fc296e7765411178ed452d1c30337a6230138c9840"}}"#
r#"{"id":1659980684711969,"jsonrpc":"2.0","method":"irn_subscribe","params":{"topic":"c4163cf65859106b3f5435fc296e7765411178ed452d1c30337a6230138c9840","block":false}}"#
);

let deserialized: Payload = serde_json::from_str(&serialized).unwrap();
Expand Down Expand Up @@ -206,7 +207,8 @@ fn deserialize_batch_methods() {
topics: vec![
Topic::from("c4163cf65859106b3f5435fc296e7765411178ed452d1c30337a6230138c9840"),
Topic::from("c4163cf65859106b3f5435fc296e7765411178ed452d1c30337a6230138c9841")
]
],
block: false
})
})
);
Expand Down Expand Up @@ -332,6 +334,7 @@ fn validation() {
jsonrpc: jsonrpc.clone(),
params: Params::Subscribe(Subscribe {
topic: topic.clone(),
block: false,
}),
};
assert_eq!(request.validate(), Ok(()));
Expand All @@ -342,6 +345,7 @@ fn validation() {
jsonrpc: jsonrpc.clone(),
params: Params::Subscribe(Subscribe {
topic: Topic::from("invalid"),
block: false,
}),
};
assert_eq!(
Expand Down Expand Up @@ -459,6 +463,7 @@ fn validation() {
jsonrpc: jsonrpc.clone(),
params: Params::BatchSubscribe(BatchSubscribe {
topics: vec![topic.clone()],
block: false,
}),
};
assert_eq!(request.validate(), Ok(()));
Expand All @@ -467,7 +472,10 @@ fn validation() {
let request = Request {
id,
jsonrpc: jsonrpc.clone(),
params: Params::BatchSubscribe(BatchSubscribe { topics: vec![] }),
params: Params::BatchSubscribe(BatchSubscribe {
topics: vec![],
block: false,
}),
};
assert_eq!(request.validate(), Err(ValidationError::BatchEmpty));

Expand All @@ -478,7 +486,10 @@ fn validation() {
let request = Request {
id,
jsonrpc: jsonrpc.clone(),
params: Params::BatchSubscribe(BatchSubscribe { topics }),
params: Params::BatchSubscribe(BatchSubscribe {
topics,
block: false,
}),
};
assert_eq!(
request.validate(),
Expand All @@ -496,6 +507,7 @@ fn validation() {
topics: vec![Topic::from(
"c4163cf65859106b3f5435fc296e7765411178ed452d1c30337a6230138c98401",
)],
block: false,
}),
};
assert_eq!(
Expand Down

0 comments on commit d2a772f

Please sign in to comment.