Skip to content

Commit

Permalink
feat(reflection): Expose ReflectionService (#2066)
Browse files Browse the repository at this point in the history
  • Loading branch information
tottoto authored Nov 26, 2024
1 parent bdccf58 commit 94587ce
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 10 deletions.
1 change: 1 addition & 0 deletions tonic-reflection/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ server = ["prost-types", "dep:tokio", "dep:tokio-stream"]
default = ["server"]

[dependencies]
pin-project = "1"
prost = "0.13"
prost-types = {version = "0.13", optional = true}
tokio = { version = "1.0", features = ["sync", "rt"], optional = true }
Expand Down
41 changes: 36 additions & 5 deletions tonic-reflection/src/server/v1.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use std::sync::Arc;
use std::{fmt, sync::Arc};

use pin_project::pin_project;
use tokio::sync::mpsc;
use tokio_stream::{wrappers::ReceiverStream, StreamExt};
use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt};
use tonic::{Request, Response, Status, Streaming};

use super::ReflectionServiceState;
Expand All @@ -13,14 +14,15 @@ use crate::pb::v1::{
ServerReflectionResponse, ServiceResponse,
};

/// An implementation for `ServerReflection`.
#[derive(Debug)]
pub(super) struct ReflectionService {
pub struct ReflectionService {
state: Arc<ReflectionServiceState>,
}

#[tonic::async_trait]
impl ServerReflection for ReflectionService {
type ServerReflectionInfoStream = ReceiverStream<Result<ServerReflectionResponse, Status>>;
type ServerReflectionInfoStream = ServerReflectionInfoStream;

async fn server_reflection_info(
&self,
Expand Down Expand Up @@ -91,7 +93,9 @@ impl ServerReflection for ReflectionService {
}
});

Ok(Response::new(ReceiverStream::new(resp_rx)))
Ok(Response::new(ServerReflectionInfoStream(
ReceiverStream::new(resp_rx),
)))
}
}

Expand All @@ -102,3 +106,30 @@ impl From<ReflectionServiceState> for ReflectionService {
}
}
}

/// A response stream.
#[pin_project]
pub struct ServerReflectionInfoStream(
#[pin] ReceiverStream<Result<ServerReflectionResponse, Status>>,
);

impl Stream for ServerReflectionInfoStream {
type Item = Result<ServerReflectionResponse, Status>;

fn poll_next(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
self.project().0.poll_next(cx)
}

fn size_hint(&self) -> (usize, Option<usize>) {
self.0.size_hint()
}
}

impl fmt::Debug for ServerReflectionInfoStream {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_tuple("ServerReflectionInfoStream").finish()
}
}
41 changes: 36 additions & 5 deletions tonic-reflection/src/server/v1alpha.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use std::sync::Arc;
use std::{fmt, sync::Arc};

use pin_project::pin_project;
use tokio::sync::mpsc;
use tokio_stream::{wrappers::ReceiverStream, StreamExt};
use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt};
use tonic::{Request, Response, Status, Streaming};

use super::ReflectionServiceState;
Expand All @@ -13,14 +14,15 @@ use crate::pb::v1alpha::{
ServerReflectionResponse, ServiceResponse,
};

/// An implementation for `ServerReflection`.
#[derive(Debug)]
pub(super) struct ReflectionService {
pub struct ReflectionService {
state: Arc<ReflectionServiceState>,
}

#[tonic::async_trait]
impl ServerReflection for ReflectionService {
type ServerReflectionInfoStream = ReceiverStream<Result<ServerReflectionResponse, Status>>;
type ServerReflectionInfoStream = ServerReflectionInfoStream;

async fn server_reflection_info(
&self,
Expand Down Expand Up @@ -91,7 +93,9 @@ impl ServerReflection for ReflectionService {
}
});

Ok(Response::new(ReceiverStream::new(resp_rx)))
Ok(Response::new(ServerReflectionInfoStream(
ReceiverStream::new(resp_rx),
)))
}
}

Expand All @@ -102,3 +106,30 @@ impl From<ReflectionServiceState> for ReflectionService {
}
}
}

/// A response stream.
#[pin_project]
pub struct ServerReflectionInfoStream(
#[pin] ReceiverStream<Result<ServerReflectionResponse, Status>>,
);

impl Stream for ServerReflectionInfoStream {
type Item = Result<ServerReflectionResponse, Status>;

fn poll_next(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
self.project().0.poll_next(cx)
}

fn size_hint(&self) -> (usize, Option<usize>) {
self.0.size_hint()
}
}

impl fmt::Debug for ServerReflectionInfoStream {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_tuple("ServerReflectionInfoStream").finish()
}
}

0 comments on commit 94587ce

Please sign in to comment.