Skip to content

Commit

Permalink
feat(2781): support gRPC reflection custom headers (#2790)
Browse files Browse the repository at this point in the history
  • Loading branch information
beelchester authored Sep 25, 2024
1 parent c8b1f2c commit 8e3e965
Show file tree
Hide file tree
Showing 12 changed files with 123 additions and 19 deletions.
2 changes: 1 addition & 1 deletion examples/grpc-reflection.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
schema
@server(port: 8000)
@upstream(baseURL: "http://localhost:50051", httpCache: 42, batch: {delay: 10})
@link(src: "http://localhost:50051", type: Grpc) {
@link(src: "http://localhost:50051", type: Grpc, headers: [{key: "authorization", value: "Bearer 123"}]) {
query: Query
}

Expand Down
4 changes: 4 additions & 0 deletions generated/.tailcallrc.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,10 @@ The @link directive allows you to import external resources, such as configurati
will be later used by `@grpc` directive –.
"""
directive @link(
"""
Custom headers for gRPC reflection server.
"""
headers: [KeyValue]
"""
The id of the link. It is used to reference the link in the schema.
"""
Expand Down
10 changes: 10 additions & 0 deletions generated/.tailcallrc.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -808,6 +808,16 @@
"description": "The @link directive allows you to import external resources, such as configuration – which will be merged into the config importing it –, or a .proto file – which will be later used by `@grpc` directive –.",
"type": "object",
"properties": {
"headers": {
"description": "Custom headers for gRPC reflection server.",
"type": [
"array",
"null"
],
"items": {
"$ref": "#/definitions/KeyValue"
}
},
"id": {
"description": "The id of the link. It is used to reference the link in the schema.",
"type": [
Expand Down
5 changes: 5 additions & 0 deletions src/core/config/link.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use serde::{Deserialize, Serialize};
use tailcall_macros::DirectiveDefinition;

use super::super::is_default;
use super::KeyValue;

#[derive(
Default,
Expand Down Expand Up @@ -57,6 +58,10 @@ pub struct Link {
/// The type of the link. It can be `Config`, or `Protobuf`.
#[serde(default, skip_serializing_if = "is_default", rename = "type")]
pub type_of: LinkType,
///
/// Custom headers for gRPC reflection server.
#[serde(default, skip_serializing_if = "is_default")]
pub headers: Option<Vec<KeyValue>>,
/// Additional metadata pertaining to the linked resource.
#[serde(default, skip_serializing_if = "is_default")]
pub meta: Option<serde_json::Value>,
Expand Down
5 changes: 4 additions & 1 deletion src/core/config/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,10 @@ impl ConfigReader {
})
}
LinkType::Grpc => {
let meta = self.proto_reader.fetch(link.src.as_str()).await?;
let meta = self
.proto_reader
.fetch(link.src.as_str(), link.headers.clone())
.await?;

for m in meta {
extensions.add_proto(m);
Expand Down
1 change: 1 addition & 0 deletions src/core/generator/generator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ impl Generator {
id: None,
src: metadata.path.to_owned(),
type_of: LinkType::Protobuf,
headers: None,
meta: None,
});
Ok(config)
Expand Down
1 change: 1 addition & 0 deletions src/core/grpc/data_loader_request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ mod tests {
id: None,
src: test_file.to_string(),
type_of: LinkType::Protobuf,
headers: None,
meta: None,
}]);
let method = GrpcMethod {
Expand Down
1 change: 1 addition & 0 deletions src/core/grpc/protobuf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,7 @@ pub mod tests {
id: Some(id.clone()),
src: path.to_string(),
type_of: LinkType::Protobuf,
headers: None,
meta: None,
}]);

Expand Down
1 change: 1 addition & 0 deletions src/core/grpc/request_template.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ mod tests {
id: Some(id.clone()),
src: test_file.to_string(),
type_of: LinkType::Protobuf,
headers: None,
meta: None,
}]);
let method = GrpcMethod {
Expand Down
86 changes: 73 additions & 13 deletions src/core/proto_reader/fetch.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::str::FromStr;

use anyhow::{Context, Result};
use base64::prelude::BASE64_STANDARD;
use base64::Engine;
Expand All @@ -9,7 +11,7 @@ use serde::{Deserialize, Serialize};
use serde_json::json;

use crate::core::blueprint::GrpcMethod;
use crate::core::config::ConfigReaderContext;
use crate::core::config::{ConfigReaderContext, KeyValue};
use crate::core::grpc::protobuf::ProtobufSet;
use crate::core::grpc::request_template::RequestBody;
use crate::core::grpc::RequestTemplate;
Expand Down Expand Up @@ -72,11 +74,16 @@ struct ReflectionResponse {
pub struct GrpcReflection {
server_reflection_method: GrpcMethod,
url: String,
headers: Option<Vec<KeyValue>>,
target_runtime: TargetRuntime,
}

impl GrpcReflection {
pub fn new<T: AsRef<str>>(url: T, target_runtime: TargetRuntime) -> Self {
pub fn new<T: AsRef<str>>(
url: T,
headers: Option<Vec<KeyValue>>,
target_runtime: TargetRuntime,
) -> Self {
let server_reflection_method = GrpcMethod {
package: "grpc.reflection.v1alpha".to_string(),
service: "ServerReflection".to_string(),
Expand All @@ -85,6 +92,7 @@ impl GrpcReflection {
Self {
server_reflection_method,
url: url.as_ref().to_string(),
headers,
target_runtime,
}
}
Expand Down Expand Up @@ -135,16 +143,28 @@ impl GrpcReflection {
)
.as_str(),
);

let mut headers = vec![];
if let Some(custom_headers) = &self.headers {
for header in custom_headers {
headers.push((
HeaderName::from_str(&header.key)?,
Mustache::parse(header.value.as_str()),
));
}
}
headers.push((
HeaderName::from_static("content-type"),
Mustache::parse("application/grpc+proto"),
));
let body_ = Some(RequestBody {
mustache: Some(Mustache::parse(body.to_string().as_str())),
value: Default::default(),
});
let req_template = RequestTemplate {
url: Mustache::parse(url.as_str()),
headers: vec![(
HeaderName::from_static("content-type"),
Mustache::parse("application/grpc+proto"),
)],
body: Some(RequestBody {
mustache: Some(Mustache::parse(body.to_string().as_str())),
value: Default::default(),
}),
headers,
body: body_,
operation: operation.clone(),
operation_type: Default::default(),
};
Expand Down Expand Up @@ -230,6 +250,7 @@ mod grpc_fetch {

let grpc_reflection = GrpcReflection::new(
format!("http://localhost:{}", server.port()),
None,
crate::core::runtime::test::init(None),
);

Expand Down Expand Up @@ -258,6 +279,7 @@ mod grpc_fetch {

let grpc_reflection = GrpcReflection::new(
format!("http://localhost:{}", server.port()),
None,
crate::core::runtime::test::init(None),
);

Expand Down Expand Up @@ -290,7 +312,7 @@ mod grpc_fetch {
let runtime = crate::core::runtime::test::init(None);

let grpc_reflection =
GrpcReflection::new(format!("http://localhost:{}", server.port()), runtime);
GrpcReflection::new(format!("http://localhost:{}", server.port()), None, runtime);

let resp = grpc_reflection.list_all_files().await?;

Expand Down Expand Up @@ -322,7 +344,7 @@ mod grpc_fetch {
let runtime = crate::core::runtime::test::init(None);

let grpc_reflection =
GrpcReflection::new(format!("http://localhost:{}", server.port()), runtime);
GrpcReflection::new(format!("http://localhost:{}", server.port()), None, runtime);

let resp = grpc_reflection.list_all_files().await;

Expand All @@ -349,7 +371,7 @@ mod grpc_fetch {
let runtime = crate::core::runtime::test::init(None);

let grpc_reflection =
GrpcReflection::new(format!("http://localhost:{}", server.port()), runtime);
GrpcReflection::new(format!("http://localhost:{}", server.port()), None, runtime);

let result = grpc_reflection.get_by_service("nonexistent.Service").await;
assert!(result.is_err());
Expand All @@ -358,4 +380,42 @@ mod grpc_fetch {

Ok(())
}

#[tokio::test]
async fn test_custom_headers_resp_list_all() -> Result<()> {
let server = start_mock_server();

let http_reflection_service_not_found = server.mock(|when, then| {
when.method(httpmock::Method::POST)
.path("/grpc.reflection.v1alpha.ServerReflection/ServerReflectionInfo")
.header("authorization", "Bearer 123");
then.status(200).body(get_fake_resp());
});

let runtime = crate::core::runtime::test::init(None);

let grpc_reflection = GrpcReflection::new(
format!("http://localhost:{}", server.port()),
Some(vec![KeyValue {
key: "authorization".to_string(),
value: "Bearer 123".to_string(),
}]),
runtime,
);

let resp = grpc_reflection.list_all_files().await?;

assert_eq!(
[
"news.NewsService".to_string(),
"grpc.reflection.v1alpha.ServerReflection".to_string()
]
.to_vec(),
resp
);

http_reflection_service_not_found.assert();

Ok(())
}
}
13 changes: 11 additions & 2 deletions src/core/proto_reader/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use futures_util::FutureExt;
use prost_reflect::prost_types::{FileDescriptorProto, FileDescriptorSet};
use protox::file::{FileResolver, GoogleFileResolver};

use crate::core::config::KeyValue;
use crate::core::proto_reader::fetch::GrpcReflection;
use crate::core::resource_reader::{Cached, ResourceReader};
use crate::core::runtime::TargetRuntime;
Expand All @@ -31,8 +32,16 @@ impl ProtoReader {
}

/// Fetches proto files from a grpc server (grpc reflection)
pub async fn fetch<T: AsRef<str>>(&self, url: T) -> anyhow::Result<Vec<ProtoMetadata>> {
let grpc_reflection = Arc::new(GrpcReflection::new(url.as_ref(), self.runtime.clone()));
pub async fn fetch<T: AsRef<str>>(
&self,
url: T,
headers: Option<Vec<KeyValue>>,
) -> anyhow::Result<Vec<ProtoMetadata>> {
let grpc_reflection = Arc::new(GrpcReflection::new(
url.as_ref(),
headers,
self.runtime.clone(),
));

let mut proto_metadata = vec![];
let service_list = grpc_reflection.list_all_files().await?;
Expand Down
13 changes: 11 additions & 2 deletions tailcall-upstream-grpc/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@ use opentelemetry_otlp::WithExportConfig;
use opentelemetry_sdk::propagation::TraceContextPropagator;
use opentelemetry_sdk::{runtime, Resource};
use tonic::metadata::MetadataMap;
use tonic::service::interceptor::InterceptedService;
use tonic::transport::Server as TonicServer;
use tonic::{Response, Status};
use tonic::{Request, Response, Status};
use tonic_tracing_opentelemetry::middleware::server;
use tower::make::Shared;
use tracing_subscriber::layer::SubscriberExt;
Expand Down Expand Up @@ -215,6 +216,14 @@ fn init_tracer() -> Result<(), Error> {
Ok(())
}

/// Intercepts the request and checks if the token is valid.
fn intercept(req: Request<()>) -> Result<Request<()>, Status> {
match req.metadata().get("authorization") {
Some(token) if token == "Bearer 123" => Ok(req),
_ => Err(Status::permission_denied("Unauthorized")),
}
}

#[tokio::main]
async fn main() -> Result<(), Error> {
if std::env::var("HONEYCOMB_API_KEY").is_ok() {
Expand All @@ -234,7 +243,7 @@ async fn main() -> Result<(), Error> {
let tonic_service = TonicServer::builder()
.layer(server::OtelGrpcLayer::default())
.add_service(NewsServiceServer::new(news_service))
.add_service(service)
.add_service(InterceptedService::new(service, intercept))
.into_service();
let make_svc = Shared::new(tonic_service);
println!("Server listening on grpc://{}", addr);
Expand Down

1 comment on commit 8e3e965

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Running 30s test @ http://localhost:8000/graphql

4 threads and 100 connections

Thread Stats Avg Stdev Max +/- Stdev
Latency 11.58ms 4.51ms 116.13ms 87.35%
Req/Sec 2.19k 266.78 2.79k 79.25%

261584 requests in 30.03s, 1.31GB read

Requests/sec: 8712.03

Transfer/sec: 44.72MB

Please sign in to comment.