From d2a772f2a23e6ed8219d2f16af7f88878097388a Mon Sep 17 00:00:00 2001 From: Ivan Reshetnikov Date: Wed, 17 Jan 2024 11:56:13 +0100 Subject: [PATCH] feat: support blocking subscriptions (#56) --- relay_client/src/http.rs | 36 +++++++++++++++++++++++++++--- relay_client/src/websocket.rs | 42 ++++++++++++++++++++++++++++++++--- relay_rpc/src/rpc.rs | 10 +++++++++ relay_rpc/src/rpc/tests.rs | 20 +++++++++++++---- 4 files changed, 98 insertions(+), 10 deletions(-) diff --git a/relay_client/src/http.rs b/relay_client/src/http.rs index 87e7af3..2e3df43 100644 --- a/relay_client/src/http.rs +++ b/relay_client/src/http.rs @@ -128,9 +128,22 @@ impl Client { .map(|_| ()) } - /// Subscribes on topic to receive messages. + /// Subscribes on topic to receive messages. The request is resolved + /// optimistically as soon as the relay receives it. pub async fn subscribe(&self, topic: Topic) -> Response { - self.request(rpc::Subscribe { topic }).await + self.request(rpc::Subscribe { + topic, + block: false, + }) + .await + } + + /// Subscribes on topic to receive messages. The request is resolved only + /// when fully processed by the relay. + /// Note: This function is experimental and will likely be removed in the + /// future. + pub async fn subscribe_blocking(&self, topic: Topic) -> Response { + self.request(rpc::Subscribe { topic, block: true }).await } /// Unsubscribes from a topic. @@ -223,13 +236,30 @@ impl Client { self.request(payload).await } - /// Subscribes on multiple topics to receive messages. + /// Subscribes on multiple topics to receive messages. The request is + /// resolved optimistically as soon as the relay receives it. pub async fn batch_subscribe( &self, topics: impl Into>, ) -> Response { self.request(rpc::BatchSubscribe { topics: topics.into(), + block: false, + }) + .await + } + + /// Subscribes on multiple topics to receive messages. The request is + /// resolved only when fully processed by the relay. + /// Note: This function is experimental and will likely be removed in the + /// future. + pub async fn batch_subscribe_blocking( + &self, + topics: impl Into>, + ) -> Response { + self.request(rpc::BatchSubscribe { + topics: topics.into(), + block: true, }) .await } diff --git a/relay_client/src/websocket.rs b/relay_client/src/websocket.rs index 4805f1d..c3f71c4 100644 --- a/relay_client/src/websocket.rs +++ b/relay_client/src/websocket.rs @@ -165,9 +165,25 @@ impl Client { EmptyResponseFuture::new(response) } - /// Subscribes on topic to receive messages. + /// Subscribes on topic to receive messages. The request is resolved + /// optimistically as soon as the relay receives it. pub fn subscribe(&self, topic: Topic) -> ResponseFuture { - let (request, response) = create_request(Subscribe { topic }); + let (request, response) = create_request(Subscribe { + topic, + block: false, + }); + + self.request(request); + + response + } + + /// Subscribes on topic to receive messages. The request is resolved only + /// when fully processed by the relay. + /// Note: This function is experimental and will likely be removed in the + /// future. + pub fn subscribe_blocking(&self, topic: Topic) -> ResponseFuture { + let (request, response) = create_request(Subscribe { topic, block: true }); self.request(request); @@ -204,10 +220,30 @@ impl Client { FetchMessageStream::new(self.clone(), topics.into()) } - /// Subscribes on multiple topics to receive messages. + /// Subscribes on multiple topics to receive messages. The request is + /// resolved optimistically as soon as the relay receives it. pub fn batch_subscribe(&self, topics: impl Into>) -> ResponseFuture { let (request, response) = create_request(BatchSubscribe { topics: topics.into(), + block: false, + }); + + self.request(request); + + response + } + + /// Subscribes on multiple topics to receive messages. The request is + /// resolved only when fully processed by the relay. + /// Note: This function is experimental and will likely be removed in the + /// future. + pub fn batch_subscribe_blocking( + &self, + topics: impl Into>, + ) -> ResponseFuture { + let (request, response) = create_request(BatchSubscribe { + topics: topics.into(), + block: true, }); self.request(request); diff --git a/relay_rpc/src/rpc.rs b/relay_rpc/src/rpc.rs index ebbf799..e50564d 100644 --- a/relay_rpc/src/rpc.rs +++ b/relay_rpc/src/rpc.rs @@ -306,6 +306,11 @@ pub struct ErrorData { pub struct Subscribe { /// The topic to subscribe to. pub topic: Topic, + + /// Whether to disable optimistic response. By default optimistic response + /// is enabled. + #[serde(default)] + pub block: bool, } impl RequestPayload for Subscribe { @@ -403,6 +408,11 @@ pub struct FetchResponse { pub struct BatchSubscribe { /// The topics to subscribe to. pub topics: Vec, + + /// Whether to disable optimistic response. By default optimistic response + /// is enabled. + #[serde(default)] + pub block: bool, } impl RequestPayload for BatchSubscribe { diff --git a/relay_rpc/src/rpc/tests.rs b/relay_rpc/src/rpc/tests.rs index 6fc6a01..074c492 100644 --- a/relay_rpc/src/rpc/tests.rs +++ b/relay_rpc/src/rpc/tests.rs @@ -31,6 +31,7 @@ fn subscribe() { 1659980684711969.into(), Params::Subscribe(Subscribe { topic: "c4163cf65859106b3f5435fc296e7765411178ed452d1c30337a6230138c9840".into(), + block: false, }), )); @@ -38,7 +39,7 @@ fn subscribe() { assert_eq!( &serialized, - r#"{"id":1659980684711969,"jsonrpc":"2.0","method":"irn_subscribe","params":{"topic":"c4163cf65859106b3f5435fc296e7765411178ed452d1c30337a6230138c9840"}}"# + r#"{"id":1659980684711969,"jsonrpc":"2.0","method":"irn_subscribe","params":{"topic":"c4163cf65859106b3f5435fc296e7765411178ed452d1c30337a6230138c9840","block":false}}"# ); let deserialized: Payload = serde_json::from_str(&serialized).unwrap(); @@ -206,7 +207,8 @@ fn deserialize_batch_methods() { topics: vec![ Topic::from("c4163cf65859106b3f5435fc296e7765411178ed452d1c30337a6230138c9840"), Topic::from("c4163cf65859106b3f5435fc296e7765411178ed452d1c30337a6230138c9841") - ] + ], + block: false }) }) ); @@ -332,6 +334,7 @@ fn validation() { jsonrpc: jsonrpc.clone(), params: Params::Subscribe(Subscribe { topic: topic.clone(), + block: false, }), }; assert_eq!(request.validate(), Ok(())); @@ -342,6 +345,7 @@ fn validation() { jsonrpc: jsonrpc.clone(), params: Params::Subscribe(Subscribe { topic: Topic::from("invalid"), + block: false, }), }; assert_eq!( @@ -459,6 +463,7 @@ fn validation() { jsonrpc: jsonrpc.clone(), params: Params::BatchSubscribe(BatchSubscribe { topics: vec![topic.clone()], + block: false, }), }; assert_eq!(request.validate(), Ok(())); @@ -467,7 +472,10 @@ fn validation() { let request = Request { id, jsonrpc: jsonrpc.clone(), - params: Params::BatchSubscribe(BatchSubscribe { topics: vec![] }), + params: Params::BatchSubscribe(BatchSubscribe { + topics: vec![], + block: false, + }), }; assert_eq!(request.validate(), Err(ValidationError::BatchEmpty)); @@ -478,7 +486,10 @@ fn validation() { let request = Request { id, jsonrpc: jsonrpc.clone(), - params: Params::BatchSubscribe(BatchSubscribe { topics }), + params: Params::BatchSubscribe(BatchSubscribe { + topics, + block: false, + }), }; assert_eq!( request.validate(), @@ -496,6 +507,7 @@ fn validation() { topics: vec![Topic::from( "c4163cf65859106b3f5435fc296e7765411178ed452d1c30337a6230138c98401", )], + block: false, }), }; assert_eq!(