Skip to content

Commit

Permalink
proto: add more convert functions (#264)
Browse files Browse the repository at this point in the history
(cherry picked from commit bc235ed)
  • Loading branch information
fanatid committed Dec 10, 2023
1 parent 4783b60 commit 6283315
Show file tree
Hide file tree
Showing 4 changed files with 147 additions and 103 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ The minor version will be incremented upon a breaking change and the patch versi

### Features

- proto: add more convert functions ([#264](https://github.com/rpcpool/yellowstone-grpc/pull/264))

### Breaking

## 2023-12-08
Expand Down
39 changes: 25 additions & 14 deletions yellowstone-grpc-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@ use {
};

#[derive(Debug, Clone)]
struct InterceptorFn {
pub struct InterceptorXToken {
x_token: Option<AsciiMetadataValue>,
}

impl Interceptor for InterceptorFn {
impl Interceptor for InterceptorXToken {
fn call(&mut self, mut request: Request<()>) -> Result<Request<()>, Status> {
if let Some(x_token) = self.x_token.clone() {
request.metadata_mut().insert("x-token", x_token);
Expand Down Expand Up @@ -65,11 +65,15 @@ pub struct GeyserGrpcClient<F> {
}

impl GeyserGrpcClient<()> {
pub const fn max_decoding_message_size() -> usize {
64 * 1024 * 1024 // 64 MiB
}

fn connect2<E, T>(
endpoint: E,
tls_config: Option<ClientTlsConfig>,
x_token: Option<T>,
) -> GeyserGrpcClientResult<(Endpoint, InterceptorFn)>
) -> GeyserGrpcClientResult<(Endpoint, InterceptorXToken)>
where
E: Into<Bytes>,
T: TryInto<AsciiMetadataValue, Error = InvalidMetadataValue>,
Expand All @@ -91,7 +95,7 @@ impl GeyserGrpcClient<()> {
}
_ => {}
}
let interceptor = InterceptorFn { x_token };
let interceptor = InterceptorXToken { x_token };

Ok((endpoint, interceptor))
}
Expand All @@ -107,11 +111,11 @@ impl GeyserGrpcClient<()> {
{
let (endpoint, interceptor) = Self::connect2(endpoint, tls_config, x_token)?;
let channel = endpoint.connect_lazy();
Ok(GeyserGrpcClient {
health: HealthClient::with_interceptor(channel.clone(), interceptor.clone()),
geyser: GeyserClient::with_interceptor(channel, interceptor)
.max_decoding_message_size(64 * 1024 * 1024), // 64 MiB
})
Ok(GeyserGrpcClient::new(
HealthClient::with_interceptor(channel.clone(), interceptor.clone()),
GeyserClient::with_interceptor(channel, interceptor)
.max_decoding_message_size(Self::max_decoding_message_size()),
))
}

pub async fn connect_with_timeout<E, T>(
Expand Down Expand Up @@ -140,15 +144,22 @@ impl GeyserGrpcClient<()> {
endpoint.connect().await?
};

Ok(GeyserGrpcClient {
health: HealthClient::with_interceptor(channel.clone(), interceptor.clone()),
geyser: GeyserClient::with_interceptor(channel, interceptor)
.max_decoding_message_size(64 * 1024 * 1024), // 64 MiB
})
Ok(GeyserGrpcClient::new(
HealthClient::with_interceptor(channel.clone(), interceptor.clone()),
GeyserClient::with_interceptor(channel, interceptor)
.max_decoding_message_size(Self::max_decoding_message_size()),
))
}
}

impl<F: Interceptor> GeyserGrpcClient<F> {
pub fn new(
health: HealthClient<InterceptedService<Channel, F>>,
geyser: GeyserClient<InterceptedService<Channel, F>>,
) -> Self {
Self { health, geyser }
}

pub async fn health_check(&mut self) -> GeyserGrpcClientResult<HealthCheckResponse> {
let request = HealthCheckRequest {
service: "geyser.Geyser".to_owned(),
Expand Down
4 changes: 2 additions & 2 deletions yellowstone-grpc-geyser/src/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,7 @@ impl<'a> MessageRef<'a> {
Self::Block(message) => UpdateOneof::Block(SubscribeUpdateBlock {
slot: message.slot,
blockhash: message.blockhash.clone(),
rewards: Some(convert_to::create_rewards(message.rewards.as_slice())),
rewards: Some(convert_to::create_rewards_obj(message.rewards.as_slice())),
block_time: message.block_time.map(convert_to::create_timestamp),
block_height: message.block_height.map(convert_to::create_block_height),
parent_slot: message.parent_slot,
Expand All @@ -443,7 +443,7 @@ impl<'a> MessageRef<'a> {
Self::BlockMeta(message) => UpdateOneof::BlockMeta(SubscribeUpdateBlockMeta {
slot: message.slot,
blockhash: message.blockhash.clone(),
rewards: Some(convert_to::create_rewards(message.rewards.as_slice())),
rewards: Some(convert_to::create_rewards_obj(message.rewards.as_slice())),
block_time: message.block_time.map(convert_to::create_timestamp),
block_height: message.block_height.map(convert_to::create_block_height),
parent_slot: message.parent_slot,
Expand Down
Loading

0 comments on commit 6283315

Please sign in to comment.