Skip to content

Commit

Permalink
add support for GRPC compression
Browse files Browse the repository at this point in the history
  • Loading branch information
sunli829 committed Sep 2, 2024
1 parent 21b8301 commit 810b6b6
Show file tree
Hide file tree
Showing 16 changed files with 708 additions and 112 deletions.
23 changes: 23 additions & 0 deletions examples/grpc/helloworld_compressed/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
[package]
name = "example-grpc-helloworld-compressed"
version.workspace = true
edition.workspace = true
publish.workspace = true

[dependencies]
poem.workspace = true
poem-grpc = { workspace = true, features = [
"gzip",
"deflate",
"brotli",
"zstd",
] }
prost.workspace = true
tokio = { workspace = true, features = ["rt-multi-thread", "macros"] }

[build-dependencies]
poem-grpc-build.workspace = true

[[bin]]
name = "grpc-helloworld-client"
path = "src/client.rs"
7 changes: 7 additions & 0 deletions examples/grpc/helloworld_compressed/build.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
use std::io::Result;

use poem_grpc_build::compile_protos;

fn main() -> Result<()> {
compile_protos(&["./proto/helloworld.proto"], &["./proto"])
}
37 changes: 37 additions & 0 deletions examples/grpc/helloworld_compressed/proto/helloworld.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Copyright 2015 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

syntax = "proto3";

option java_multiple_files = true;
option java_package = "io.grpc.examples.helloworld";
option java_outer_classname = "HelloWorldProto";

package helloworld;

// The greeting service definition.
service Greeter {
// Sends a greeting
rpc SayHello (HelloRequest) returns (HelloReply) {}
}

// The request message containing the user's name.
message HelloRequest {
string name = 1;
}

// The response message containing the greetings
message HelloReply {
string message = 1;
}
22 changes: 22 additions & 0 deletions examples/grpc/helloworld_compressed/src/client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
use poem_grpc::{ClientConfig, CompressionEncoding, Request};

poem_grpc::include_proto!("helloworld");

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
let mut client = GreeterClient::new(
ClientConfig::builder()
.uri("http://localhost:3000")
.build()
.unwrap(),
);
client.set_send_compressed(CompressionEncoding::GZIP);
client.set_accept_compressed([CompressionEncoding::GZIP]);

let request = Request::new(HelloRequest {
name: "Poem".into(),
});
let response = client.say_hello(request).await?;
println!("RESPONSE={response:?}");
Ok(())
}
35 changes: 35 additions & 0 deletions examples/grpc/helloworld_compressed/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
use poem::{listener::TcpListener, Server};
use poem_grpc::{CompressionEncoding, Request, Response, RouteGrpc, Status};

poem_grpc::include_proto!("helloworld");

struct GreeterService;

impl Greeter for GreeterService {
async fn say_hello(
&self,
request: Request<HelloRequest>,
) -> Result<Response<HelloReply>, Status> {
let reply = HelloReply {
message: format!("Hello {}!", request.into_inner().name),
};
Ok(Response::new(reply))
}
}

#[tokio::main]
async fn main() -> Result<(), std::io::Error> {
let route = RouteGrpc::new().add_service(
GreeterServer::new(GreeterService)
.send_compressed(CompressionEncoding::GZIP)
.accept_compressed([
CompressionEncoding::GZIP,
CompressionEncoding::DEFLATE,
CompressionEncoding::BROTLI,
CompressionEncoding::ZSTD,
]),
);
Server::new(TcpListener::bind("0.0.0.0:3000"))
.run(route)
.await
}
10 changes: 10 additions & 0 deletions poem-grpc-build/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,16 @@ pub(crate) fn generate(config: &GrpcConfig, service: &Service, buf: &mut String)
self
}

/// Set the compression encoding for sending
pub fn set_send_compressed(&mut self, encoding: #crate_name::CompressionEncoding) {
self.cli.set_send_compressed(encoding);
}

/// Set the compression encodings for accepting
pub fn set_accept_compressed(&mut self, encodings: impl ::std::convert::Into<::std::sync::Arc<[#crate_name::CompressionEncoding]>>) {
self.cli.set_accept_compressed(encodings);
}

#(
#[allow(dead_code)]
#methods
Expand Down
65 changes: 50 additions & 15 deletions poem-grpc-build/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,22 @@ pub(crate) fn generate(config: &GrpcConfig, service: &Service, buf: &mut String)
}

#[allow(unused_imports)]
#[derive(Clone)]
pub struct #server_ident<T>(::std::sync::Arc<T>);
pub struct #server_ident<T> {
inner: ::std::sync::Arc<T>,
send_compressd: ::std::option::Option<#crate_name::CompressionEncoding>,
accept_compressed: ::std::sync::Arc<[#crate_name::CompressionEncoding]>,
}

impl<T> ::std::clone::Clone for #server_ident<T> {
#[inline]
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
send_compressd: self.send_compressd,
accept_compressed: self.accept_compressed.clone(),
}
}
}

impl<T: #service_ident> #crate_name::Service for #server_ident<T> {
const NAME: &'static str = #service_name;
Expand All @@ -109,8 +123,29 @@ pub(crate) fn generate(config: &GrpcConfig, service: &Service, buf: &mut String)

#[allow(dead_code)]
impl<T> #server_ident<T> {
/// Create a new GRPC server
pub fn new(service: T) -> Self {
Self(::std::sync::Arc::new(service))
Self {
inner: ::std::sync::Arc::new(service),
send_compressd: ::std::option::Option::None,
accept_compressed: ::std::default::Default::default(),
}
}

/// Set the compression encoding for sending
pub fn send_compressed(self, encoding: #crate_name::CompressionEncoding) -> Self {
Self {
send_compressd: Some(encoding),
..self
}
}

/// Set the compression encodings for accepting
pub fn accept_compressed(self, encodings: impl ::std::convert::Into<::std::sync::Arc<[#crate_name::CompressionEncoding]>>) -> Self {
Self {
accept_compressed: encodings.into(),
..self
}
}
}

Expand Down Expand Up @@ -191,7 +226,7 @@ fn generate_unary(codec_list: &[Path], method_info: MethodInfo) -> TokenStream {
crate_name,
codec_list,
quote! {
#crate_name::server::GrpcServer::new(codec).unary(#proxy_service_ident(svc.clone()), req).await
#crate_name::server::GrpcServer::new(codec, server.send_compressd, &server.accept_compressed).unary(#proxy_service_ident(server.inner.clone()), req).await
},
);

Expand All @@ -211,9 +246,9 @@ fn generate_unary(codec_list: &[Path], method_info: MethodInfo) -> TokenStream {
}

route = route.at(#path, ::poem::endpoint::make({
let svc = self.0.clone();
let server = self.clone();
move |req| {
let svc = svc.clone();
let server = server.clone();
async move { #call }
}
}));
Expand All @@ -235,7 +270,7 @@ fn generate_client_streaming(codec_list: &[Path], method_info: MethodInfo) -> To
crate_name,
codec_list,
quote! {
#crate_name::server::GrpcServer::new(codec).client_streaming(#proxy_service_ident(svc.clone()), req).await
#crate_name::server::GrpcServer::new(codec, server.send_compressd, &server.accept_compressed).client_streaming(#proxy_service_ident(server.inner.clone()), req).await
},
);

Expand All @@ -255,9 +290,9 @@ fn generate_client_streaming(codec_list: &[Path], method_info: MethodInfo) -> To
}

route = route.at(#path, ::poem::endpoint::make({
let svc = self.0.clone();
let server = self.clone();
move |req| {
let svc = svc.clone();
let server = server.clone();
async move { #call }
}
}));
Expand All @@ -279,7 +314,7 @@ fn generate_server_streaming(codec_list: &[Path], method_info: MethodInfo) -> To
crate_name,
codec_list,
quote! {
#crate_name::server::GrpcServer::new(codec).server_streaming(#proxy_service_ident(svc.clone()), req).await
#crate_name::server::GrpcServer::new(codec, server.send_compressd, &server.accept_compressed).server_streaming(#proxy_service_ident(server.inner.clone()), req).await
},
);

Expand All @@ -299,9 +334,9 @@ fn generate_server_streaming(codec_list: &[Path], method_info: MethodInfo) -> To
}

route = route.at(#path, ::poem::endpoint::make({
let svc = self.0.clone();
let server = self.clone();
move |req| {
let svc = svc.clone();
let server = server.clone();
async move { #call }
}
}));
Expand All @@ -323,7 +358,7 @@ fn generate_bidirectional_streaming(codec_list: &[Path], method_info: MethodInfo
crate_name,
codec_list,
quote! {
#crate_name::server::GrpcServer::new(codec).bidirectional_streaming(#proxy_service_ident(svc.clone()), req).await
#crate_name::server::GrpcServer::new(codec, server.send_compressd, &server.accept_compressed).bidirectional_streaming(#proxy_service_ident(server.inner.clone()), req).await
},
);

Expand All @@ -343,9 +378,9 @@ fn generate_bidirectional_streaming(codec_list: &[Path], method_info: MethodInfo
}

route = route.at(#path, ::poem::endpoint::make({
let svc = self.0.clone();
let server = self.clone();
move |req| {
let svc = svc.clone();
let server = server.clone();
async move { #call }
}
}));
Expand Down
8 changes: 7 additions & 1 deletion poem-grpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,18 @@ categories = ["network-programming", "asynchronous"]
[features]
default = []
json-codec = ["serde", "serde_json"]
gzip = ["async-compression/gzip"]
deflate = ["async-compression/deflate"]
brotli = ["async-compression/brotli"]
zstd = ["async-compression/zstd"]
example_generated = []

[dependencies]
poem = { workspace = true, default-features = true }

futures-util.workspace = true
async-stream = "0.3.3"
tokio = { workspace = true, features = ["io-util", "rt", "sync", "net"] }
flate2 = "1.0.24"
itoa = "1.0.2"
percent-encoding = "2.1.0"
bytes.workspace = true
Expand All @@ -43,6 +47,8 @@ http-body-util = "0.1.0"
tokio-rustls.workspace = true
tower-service = "0.3.2"
webpki-roots = "0.26"
async-compression = { version = "0.4.0", optional = true, features = ["tokio"] }
sync_wrapper = { version = "1.0.0", features = ["futures"] }

[build-dependencies]
poem-grpc-build.workspace = true
Expand Down
9 changes: 8 additions & 1 deletion poem-grpc/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,12 @@ fn main() -> Result<()> {
// for test
poem_grpc_build::Config::new()
.internal()
.compile(&["proto/test_harness.proto"], &["proto/"])
.compile(&["proto/test_harness.proto"], &["proto/"])?;

// example
poem_grpc_build::Config::new()
.internal()
.compile(&["src/example_generated/routeguide.proto"], &[] as &[&str])?;

Ok(())
}
Loading

0 comments on commit 810b6b6

Please sign in to comment.