Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support blocking subscriptions #56

Merged
merged 2 commits into from
Jan 17, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 29 additions & 3 deletions relay_client/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,20 @@ impl Client {
.map(|_| ())
}

/// Subscribes on topic to receive messages.
/// Subscribes on topic to receive messages. The request is resolved
/// optimistically as soon as the relay receives it.
pub async fn subscribe(&self, topic: Topic) -> Response<rpc::Subscribe> {
self.request(rpc::Subscribe { topic }).await
self.request(rpc::Subscribe {
topic,
block: false,
})
.await
}

/// Subscribes on topic to receive messages. The request is resolved only
/// when fully processed by the relay.
pub async fn subscribe_blocking(&self, topic: Topic) -> Response<rpc::Subscribe> {
self.request(rpc::Subscribe { topic, block: true }).await
}
heilhead marked this conversation as resolved.
Show resolved Hide resolved

/// Unsubscribes from a topic.
Expand Down Expand Up @@ -223,13 +234,28 @@ impl Client {
self.request(payload).await
}

/// Subscribes on multiple topics to receive messages.
/// Subscribes on multiple topics to receive messages. The request is
/// resolved optimistically as soon as the relay receives it.
pub async fn batch_subscribe(
&self,
topics: impl Into<Vec<Topic>>,
) -> Response<rpc::BatchSubscribe> {
self.request(rpc::BatchSubscribe {
topics: topics.into(),
block: false,
})
.await
}

/// Subscribes on multiple topics to receive messages. The request is
/// resolved only when fully processed by the relay.
pub async fn batch_subscribe_blocking(
&self,
topics: impl Into<Vec<Topic>>,
) -> Response<rpc::BatchSubscribe> {
self.request(rpc::BatchSubscribe {
topics: topics.into(),
block: true,
})
.await
}
Expand Down
38 changes: 35 additions & 3 deletions relay_client/src/websocket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,9 +165,23 @@ impl Client {
EmptyResponseFuture::new(response)
}

/// Subscribes on topic to receive messages.
/// Subscribes on topic to receive messages. The request is resolved
/// optimistically as soon as the relay receives it.
pub fn subscribe(&self, topic: Topic) -> ResponseFuture<Subscribe> {
let (request, response) = create_request(Subscribe { topic });
let (request, response) = create_request(Subscribe {
topic,
block: false,
});

self.request(request);

response
}

/// Subscribes on topic to receive messages. The request is resolved only
/// when fully processed by the relay.
pub fn subscribe_blocking(&self, topic: Topic) -> ResponseFuture<Subscribe> {
let (request, response) = create_request(Subscribe { topic, block: true });

self.request(request);

Expand Down Expand Up @@ -204,10 +218,28 @@ impl Client {
FetchMessageStream::new(self.clone(), topics.into())
}

/// Subscribes on multiple topics to receive messages.
/// Subscribes on multiple topics to receive messages. The request is
/// resolved optimistically as soon as the relay receives it.
pub fn batch_subscribe(&self, topics: impl Into<Vec<Topic>>) -> ResponseFuture<BatchSubscribe> {
let (request, response) = create_request(BatchSubscribe {
topics: topics.into(),
block: false,
});

self.request(request);

response
}

/// Subscribes on multiple topics to receive messages. The request is
/// resolved only when fully processed by the relay.
pub fn batch_subscribe_blocking(
&self,
topics: impl Into<Vec<Topic>>,
) -> ResponseFuture<BatchSubscribe> {
let (request, response) = create_request(BatchSubscribe {
topics: topics.into(),
block: true,
});

self.request(request);
Expand Down
10 changes: 10 additions & 0 deletions relay_rpc/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,11 @@ pub struct ErrorData {
pub struct Subscribe {
/// The topic to subscribe to.
pub topic: Topic,

/// Whether to disable optimistic response. By default optimistic response
/// is enabled.
#[serde(default)]
pub block: bool,
}

impl RequestPayload for Subscribe {
Expand Down Expand Up @@ -403,6 +408,11 @@ pub struct FetchResponse {
pub struct BatchSubscribe {
/// The topics to subscribe to.
pub topics: Vec<Topic>,

/// Whether to disable optimistic response. By default optimistic response
/// is enabled.
#[serde(default)]
pub block: bool,
}

impl RequestPayload for BatchSubscribe {
Expand Down
20 changes: 16 additions & 4 deletions relay_rpc/src/rpc/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,15 @@ fn subscribe() {
1659980684711969.into(),
Params::Subscribe(Subscribe {
topic: "c4163cf65859106b3f5435fc296e7765411178ed452d1c30337a6230138c9840".into(),
block: false,
}),
));

let serialized = serde_json::to_string(&payload).unwrap();

assert_eq!(
&serialized,
r#"{"id":1659980684711969,"jsonrpc":"2.0","method":"irn_subscribe","params":{"topic":"c4163cf65859106b3f5435fc296e7765411178ed452d1c30337a6230138c9840"}}"#
r#"{"id":1659980684711969,"jsonrpc":"2.0","method":"irn_subscribe","params":{"topic":"c4163cf65859106b3f5435fc296e7765411178ed452d1c30337a6230138c9840","block":false}}"#
);

let deserialized: Payload = serde_json::from_str(&serialized).unwrap();
Expand Down Expand Up @@ -206,7 +207,8 @@ fn deserialize_batch_methods() {
topics: vec![
Topic::from("c4163cf65859106b3f5435fc296e7765411178ed452d1c30337a6230138c9840"),
Topic::from("c4163cf65859106b3f5435fc296e7765411178ed452d1c30337a6230138c9841")
]
],
block: false
})
})
);
Expand Down Expand Up @@ -332,6 +334,7 @@ fn validation() {
jsonrpc: jsonrpc.clone(),
params: Params::Subscribe(Subscribe {
topic: topic.clone(),
block: false,
}),
};
assert_eq!(request.validate(), Ok(()));
Expand All @@ -342,6 +345,7 @@ fn validation() {
jsonrpc: jsonrpc.clone(),
params: Params::Subscribe(Subscribe {
topic: Topic::from("invalid"),
block: false,
}),
};
assert_eq!(
Expand Down Expand Up @@ -459,6 +463,7 @@ fn validation() {
jsonrpc: jsonrpc.clone(),
params: Params::BatchSubscribe(BatchSubscribe {
topics: vec![topic.clone()],
block: false,
}),
};
assert_eq!(request.validate(), Ok(()));
Expand All @@ -467,7 +472,10 @@ fn validation() {
let request = Request {
id,
jsonrpc: jsonrpc.clone(),
params: Params::BatchSubscribe(BatchSubscribe { topics: vec![] }),
params: Params::BatchSubscribe(BatchSubscribe {
topics: vec![],
block: false,
}),
};
assert_eq!(request.validate(), Err(ValidationError::BatchEmpty));

Expand All @@ -478,7 +486,10 @@ fn validation() {
let request = Request {
id,
jsonrpc: jsonrpc.clone(),
params: Params::BatchSubscribe(BatchSubscribe { topics }),
params: Params::BatchSubscribe(BatchSubscribe {
topics,
block: false,
}),
};
assert_eq!(
request.validate(),
Expand All @@ -496,6 +507,7 @@ fn validation() {
topics: vec![Topic::from(
"c4163cf65859106b3f5435fc296e7765411178ed452d1c30337a6230138c98401",
)],
block: false,
}),
};
assert_eq!(
Expand Down
Loading