diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index ee6301999e..7aedde9b0b 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -55,7 +55,7 @@ jobs:
# Switch to stable Rust
- uses: actions-rs/toolchain@v1
with:
- toolchain: 1.75.0
+ toolchain: 1.76.0
components: rustfmt, clippy
override: true
- name: Cache Rust
@@ -91,7 +91,7 @@ jobs:
# Switch to stable Rust
- uses: actions-rs/toolchain@v1
with:
- toolchain: 1.75.0
+ toolchain: 1.76.0
components: rustfmt, clippy
- name: Cache Rust
uses: Swatinem/rust-cache@v2
diff --git a/.github/workflows/code-coverage.yml b/.github/workflows/code-coverage.yml
index f2bba3f2bc..88fdfc2fb2 100644
--- a/.github/workflows/code-coverage.yml
+++ b/.github/workflows/code-coverage.yml
@@ -21,7 +21,7 @@ jobs:
- name: Install Stable Toolchain
uses: dtolnay/rust-toolchain@stable
with:
- toolchain: 1.75.0
+ toolchain: 1.76.0
components: rustfmt
- name: Cache Rust
uses: Swatinem/rust-cache@v2
diff --git a/.rustfmt.toml b/.rustfmt.toml
index b2cfe19264..f87905ebe0 100644
--- a/.rustfmt.toml
+++ b/.rustfmt.toml
@@ -2,7 +2,7 @@ edition = "2021"
newline_style = "unix"
# comments
normalize_comments = true
-#wrap_comments=true
+wrap_comments = true
format_code_in_doc_comments = true
# imports
imports_granularity = "Crate"
diff --git a/Cargo.toml b/Cargo.toml
index ebdc7fbadf..0291523482 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -20,10 +20,10 @@ repository = "https://github.com/poem-web/poem"
rust-version = "1.75"
[workspace.dependencies]
-poem = { path = "poem", version = "3.0.4", default-features = false }
-poem-derive = { path = "poem-derive", version = "3.0.4" }
-poem-openapi-derive = { path = "poem-openapi-derive", version = "5.0.3" }
-poem-grpc-build = { path = "poem-grpc-build", version = "0.4.2" }
+poem = { path = "poem", version = "3.1.0", default-features = false }
+poem-derive = { path = "poem-derive", version = "3.1.0" }
+poem-openapi-derive = { path = "poem-openapi-derive", version = "5.1.0" }
+poem-grpc-build = { path = "poem-grpc-build", version = "0.5.0" }
proc-macro-crate = "3.0.0"
proc-macro2 = "1.0.29"
@@ -31,6 +31,7 @@ quote = "1.0.9"
syn = { version = "2.0" }
tokio = "1.39.1"
serde_json = "1.0.68"
+sonic-rs = "0.3.5"
serde = { version = "1.0.130", features = ["derive"] }
thiserror = "1.0.30"
regex = "1.5.5"
diff --git a/README.md b/README.md
index a853c09552..7a9003b6a2 100644
--- a/README.md
+++ b/README.md
@@ -9,9 +9,9 @@
-
-
+
+
diff --git a/examples/grpc/helloworld_compressed/Cargo.toml b/examples/grpc/helloworld_compressed/Cargo.toml
new file mode 100644
index 0000000000..d1723b85f5
--- /dev/null
+++ b/examples/grpc/helloworld_compressed/Cargo.toml
@@ -0,0 +1,23 @@
+[package]
+name = "example-grpc-helloworld-compressed"
+version.workspace = true
+edition.workspace = true
+publish.workspace = true
+
+[dependencies]
+poem.workspace = true
+poem-grpc = { workspace = true, features = [
+ "gzip",
+ "deflate",
+ "brotli",
+ "zstd",
+] }
+prost.workspace = true
+tokio = { workspace = true, features = ["rt-multi-thread", "macros"] }
+
+[build-dependencies]
+poem-grpc-build.workspace = true
+
+[[bin]]
+name = "grpc-helloworld-client"
+path = "src/client.rs"
diff --git a/examples/grpc/helloworld_compressed/build.rs b/examples/grpc/helloworld_compressed/build.rs
new file mode 100644
index 0000000000..a388ebfa5d
--- /dev/null
+++ b/examples/grpc/helloworld_compressed/build.rs
@@ -0,0 +1,7 @@
+use std::io::Result;
+
+use poem_grpc_build::compile_protos;
+
+fn main() -> Result<()> {
+ compile_protos(&["./proto/helloworld.proto"], &["./proto"])
+}
diff --git a/examples/grpc/helloworld_compressed/proto/helloworld.proto b/examples/grpc/helloworld_compressed/proto/helloworld.proto
new file mode 100644
index 0000000000..8de5d08ef4
--- /dev/null
+++ b/examples/grpc/helloworld_compressed/proto/helloworld.proto
@@ -0,0 +1,37 @@
+// Copyright 2015 gRPC authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+syntax = "proto3";
+
+option java_multiple_files = true;
+option java_package = "io.grpc.examples.helloworld";
+option java_outer_classname = "HelloWorldProto";
+
+package helloworld;
+
+// The greeting service definition.
+service Greeter {
+ // Sends a greeting
+ rpc SayHello (HelloRequest) returns (HelloReply) {}
+}
+
+// The request message containing the user's name.
+message HelloRequest {
+ string name = 1;
+}
+
+// The response message containing the greetings
+message HelloReply {
+ string message = 1;
+}
\ No newline at end of file
diff --git a/examples/grpc/helloworld_compressed/src/client.rs b/examples/grpc/helloworld_compressed/src/client.rs
new file mode 100644
index 0000000000..7f9abeaea9
--- /dev/null
+++ b/examples/grpc/helloworld_compressed/src/client.rs
@@ -0,0 +1,22 @@
+use poem_grpc::{ClientConfig, CompressionEncoding, Request};
+
+poem_grpc::include_proto!("helloworld");
+
+#[tokio::main]
+async fn main() -> Result<(), Box> {
+ let mut client = GreeterClient::new(
+ ClientConfig::builder()
+ .uri("http://localhost:3000")
+ .build()
+ .unwrap(),
+ );
+ client.set_send_compressed(CompressionEncoding::GZIP);
+ client.set_accept_compressed([CompressionEncoding::GZIP]);
+
+ let request = Request::new(HelloRequest {
+ name: "Poem".into(),
+ });
+ let response = client.say_hello(request).await?;
+ println!("RESPONSE={response:?}");
+ Ok(())
+}
diff --git a/examples/grpc/helloworld_compressed/src/main.rs b/examples/grpc/helloworld_compressed/src/main.rs
new file mode 100644
index 0000000000..f6f4c918a4
--- /dev/null
+++ b/examples/grpc/helloworld_compressed/src/main.rs
@@ -0,0 +1,35 @@
+use poem::{listener::TcpListener, Server};
+use poem_grpc::{CompressionEncoding, Request, Response, RouteGrpc, Status};
+
+poem_grpc::include_proto!("helloworld");
+
+struct GreeterService;
+
+impl Greeter for GreeterService {
+ async fn say_hello(
+ &self,
+ request: Request,
+ ) -> Result, Status> {
+ let reply = HelloReply {
+ message: format!("Hello {}!", request.into_inner().name),
+ };
+ Ok(Response::new(reply))
+ }
+}
+
+#[tokio::main]
+async fn main() -> Result<(), std::io::Error> {
+ let route = RouteGrpc::new().add_service(
+ GreeterServer::new(GreeterService)
+ .send_compressed(CompressionEncoding::GZIP)
+ .accept_compressed([
+ CompressionEncoding::GZIP,
+ CompressionEncoding::DEFLATE,
+ CompressionEncoding::BROTLI,
+ CompressionEncoding::ZSTD,
+ ]),
+ );
+ Server::new(TcpListener::bind("0.0.0.0:3000"))
+ .run(route)
+ .await
+}
diff --git a/poem-derive/Cargo.toml b/poem-derive/Cargo.toml
index 3e713e854b..122ba3c392 100644
--- a/poem-derive/Cargo.toml
+++ b/poem-derive/Cargo.toml
@@ -1,6 +1,6 @@
[package]
name = "poem-derive"
-version = "3.0.4"
+version = "3.1.0"
authors.workspace = true
edition.workspace = true
license.workspace = true
diff --git a/poem-grpc-build/Cargo.toml b/poem-grpc-build/Cargo.toml
index dfa382455f..34500d3355 100644
--- a/poem-grpc-build/Cargo.toml
+++ b/poem-grpc-build/Cargo.toml
@@ -1,6 +1,6 @@
[package]
name = "poem-grpc-build"
-version = "0.4.2"
+version = "0.5.0"
authors.workspace = true
edition.workspace = true
license.workspace = true
diff --git a/poem-grpc-build/src/client.rs b/poem-grpc-build/src/client.rs
index a27fea099f..89554d16e0 100644
--- a/poem-grpc-build/src/client.rs
+++ b/poem-grpc-build/src/client.rs
@@ -117,6 +117,16 @@ pub(crate) fn generate(config: &GrpcConfig, service: &Service, buf: &mut String)
self
}
+ /// Set the compression encoding for sending
+ pub fn set_send_compressed(&mut self, encoding: #crate_name::CompressionEncoding) {
+ self.cli.set_send_compressed(encoding);
+ }
+
+ /// Set the compression encodings for accepting
+ pub fn set_accept_compressed(&mut self, encodings: impl ::std::convert::Into<::std::sync::Arc<[#crate_name::CompressionEncoding]>>) {
+ self.cli.set_accept_compressed(encodings);
+ }
+
#(
#[allow(dead_code)]
#methods
diff --git a/poem-grpc-build/src/server.rs b/poem-grpc-build/src/server.rs
index 73e5cde804..a8f1196548 100644
--- a/poem-grpc-build/src/server.rs
+++ b/poem-grpc-build/src/server.rs
@@ -99,8 +99,22 @@ pub(crate) fn generate(config: &GrpcConfig, service: &Service, buf: &mut String)
}
#[allow(unused_imports)]
- #[derive(Clone)]
- pub struct #server_ident(::std::sync::Arc);
+ pub struct #server_ident {
+ inner: ::std::sync::Arc,
+ send_compressd: ::std::option::Option<#crate_name::CompressionEncoding>,
+ accept_compressed: ::std::sync::Arc<[#crate_name::CompressionEncoding]>,
+ }
+
+ impl ::std::clone::Clone for #server_ident {
+ #[inline]
+ fn clone(&self) -> Self {
+ Self {
+ inner: self.inner.clone(),
+ send_compressd: self.send_compressd,
+ accept_compressed: self.accept_compressed.clone(),
+ }
+ }
+ }
impl #crate_name::Service for #server_ident {
const NAME: &'static str = #service_name;
@@ -109,8 +123,29 @@ pub(crate) fn generate(config: &GrpcConfig, service: &Service, buf: &mut String)
#[allow(dead_code)]
impl #server_ident {
+ /// Create a new GRPC server
pub fn new(service: T) -> Self {
- Self(::std::sync::Arc::new(service))
+ Self {
+ inner: ::std::sync::Arc::new(service),
+ send_compressd: ::std::option::Option::None,
+ accept_compressed: ::std::sync::Arc::new([]),
+ }
+ }
+
+ /// Set the compression encoding for sending
+ pub fn send_compressed(self, encoding: #crate_name::CompressionEncoding) -> Self {
+ Self {
+ send_compressd: Some(encoding),
+ ..self
+ }
+ }
+
+ /// Set the compression encodings for accepting
+ pub fn accept_compressed(self, encodings: impl ::std::convert::Into<::std::sync::Arc<[#crate_name::CompressionEncoding]>>) -> Self {
+ Self {
+ accept_compressed: encodings.into(),
+ ..self
+ }
}
}
@@ -191,7 +226,7 @@ fn generate_unary(codec_list: &[Path], method_info: MethodInfo) -> TokenStream {
crate_name,
codec_list,
quote! {
- #crate_name::server::GrpcServer::new(codec).unary(#proxy_service_ident(svc.clone()), req).await
+ #crate_name::server::GrpcServer::new(codec, server.send_compressd, &server.accept_compressed).unary(#proxy_service_ident(server.inner.clone()), req).await
},
);
@@ -211,9 +246,9 @@ fn generate_unary(codec_list: &[Path], method_info: MethodInfo) -> TokenStream {
}
route = route.at(#path, ::poem::endpoint::make({
- let svc = self.0.clone();
+ let server = self.clone();
move |req| {
- let svc = svc.clone();
+ let server = server.clone();
async move { #call }
}
}));
@@ -235,7 +270,7 @@ fn generate_client_streaming(codec_list: &[Path], method_info: MethodInfo) -> To
crate_name,
codec_list,
quote! {
- #crate_name::server::GrpcServer::new(codec).client_streaming(#proxy_service_ident(svc.clone()), req).await
+ #crate_name::server::GrpcServer::new(codec, server.send_compressd, &server.accept_compressed).client_streaming(#proxy_service_ident(server.inner.clone()), req).await
},
);
@@ -255,9 +290,9 @@ fn generate_client_streaming(codec_list: &[Path], method_info: MethodInfo) -> To
}
route = route.at(#path, ::poem::endpoint::make({
- let svc = self.0.clone();
+ let server = self.clone();
move |req| {
- let svc = svc.clone();
+ let server = server.clone();
async move { #call }
}
}));
@@ -279,7 +314,7 @@ fn generate_server_streaming(codec_list: &[Path], method_info: MethodInfo) -> To
crate_name,
codec_list,
quote! {
- #crate_name::server::GrpcServer::new(codec).server_streaming(#proxy_service_ident(svc.clone()), req).await
+ #crate_name::server::GrpcServer::new(codec, server.send_compressd, &server.accept_compressed).server_streaming(#proxy_service_ident(server.inner.clone()), req).await
},
);
@@ -299,9 +334,9 @@ fn generate_server_streaming(codec_list: &[Path], method_info: MethodInfo) -> To
}
route = route.at(#path, ::poem::endpoint::make({
- let svc = self.0.clone();
+ let server = self.clone();
move |req| {
- let svc = svc.clone();
+ let server = server.clone();
async move { #call }
}
}));
@@ -323,7 +358,7 @@ fn generate_bidirectional_streaming(codec_list: &[Path], method_info: MethodInfo
crate_name,
codec_list,
quote! {
- #crate_name::server::GrpcServer::new(codec).bidirectional_streaming(#proxy_service_ident(svc.clone()), req).await
+ #crate_name::server::GrpcServer::new(codec, server.send_compressd, &server.accept_compressed).bidirectional_streaming(#proxy_service_ident(server.inner.clone()), req).await
},
);
@@ -343,9 +378,9 @@ fn generate_bidirectional_streaming(codec_list: &[Path], method_info: MethodInfo
}
route = route.at(#path, ::poem::endpoint::make({
- let svc = self.0.clone();
+ let server = self.clone();
move |req| {
- let svc = svc.clone();
+ let server = server.clone();
async move { #call }
}
}));
diff --git a/poem-grpc/CHANGELOG.md b/poem-grpc/CHANGELOG.md
index 0c6dfd82cf..88ab5917bc 100644
--- a/poem-grpc/CHANGELOG.md
+++ b/poem-grpc/CHANGELOG.md
@@ -4,6 +4,10 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
+# [0.5.0] 2024-09-08
+
+- add support for GRPC compression
+
# [0.4.2] 2024-07-19
- Fix #840: Grpc build emit package when package is empty [#841](https://github.com/poem-web/poem/pull/841)
diff --git a/poem-grpc/Cargo.toml b/poem-grpc/Cargo.toml
index 0efd641ae7..01a6aa3a12 100644
--- a/poem-grpc/Cargo.toml
+++ b/poem-grpc/Cargo.toml
@@ -1,6 +1,6 @@
[package]
name = "poem-grpc"
-version = "0.4.3"
+version = "0.5.0"
authors.workspace = true
edition.workspace = true
license.workspace = true
@@ -16,6 +16,11 @@ categories = ["network-programming", "asynchronous"]
[features]
default = []
json-codec = ["serde", "serde_json"]
+gzip = ["async-compression/gzip"]
+deflate = ["async-compression/deflate"]
+brotli = ["async-compression/brotli"]
+zstd = ["async-compression/zstd"]
+example_generated = []
[dependencies]
poem = { workspace = true, default-features = true }
@@ -23,7 +28,6 @@ poem = { workspace = true, default-features = true }
futures-util.workspace = true
async-stream = "0.3.3"
tokio = { workspace = true, features = ["io-util", "rt", "sync", "net"] }
-flate2 = "1.0.24"
itoa = "1.0.2"
percent-encoding = "2.1.0"
bytes.workspace = true
@@ -43,6 +47,8 @@ http-body-util = "0.1.0"
tokio-rustls.workspace = true
tower-service = "0.3.2"
webpki-roots = "0.26"
+async-compression = { version = "0.4.0", optional = true, features = ["tokio"] }
+sync_wrapper = { version = "1.0.0", features = ["futures"] }
[build-dependencies]
poem-grpc-build.workspace = true
diff --git a/poem-grpc/README.md b/poem-grpc/README.md
index 54729377c5..c8d4ba8105 100644
--- a/poem-grpc/README.md
+++ b/poem-grpc/README.md
@@ -20,9 +20,9 @@
-
-
+
+
@@ -63,7 +63,7 @@ This crate uses `#![forbid(unsafe_code)]` to ensure everything is implemented in
## MSRV
-The minimum supported Rust version for this crate is `1.75.0`.
+The minimum supported Rust version for this crate is `1.76.0`.
## Contributing
diff --git a/poem-grpc/build.rs b/poem-grpc/build.rs
index 81dbd1a58c..661c6a4596 100644
--- a/poem-grpc/build.rs
+++ b/poem-grpc/build.rs
@@ -13,5 +13,12 @@ fn main() -> Result<()> {
// for test
poem_grpc_build::Config::new()
.internal()
- .compile(&["proto/test_harness.proto"], &["proto/"])
+ .compile(&["proto/test_harness.proto"], &["proto/"])?;
+
+ // example
+ poem_grpc_build::Config::new()
+ .internal()
+ .compile(&["src/example_generated/routeguide.proto"], &[] as &[&str])?;
+
+ Ok(())
}
diff --git a/poem-grpc/src/client.rs b/poem-grpc/src/client.rs
index e42db405cc..f5ff77c46c 100644
--- a/poem-grpc/src/client.rs
+++ b/poem-grpc/src/client.rs
@@ -18,9 +18,10 @@ use rustls::ClientConfig as TlsClientConfig;
use crate::{
codec::Codec,
+ compression::get_incoming_encodings,
connector::HttpsConnector,
encoding::{create_decode_response_body, create_encode_request_body},
- Code, Metadata, Request, Response, Status, Streaming,
+ Code, CompressionEncoding, Metadata, Request, Response, Status, Streaming,
};
pub(crate) type BoxBody = http_body_util::combinators::BoxBody;
@@ -155,6 +156,8 @@ impl ClientConfigBuilder {
#[derive(Clone)]
pub struct GrpcClient {
ep: Arc + 'static>,
+ send_compressd: Option,
+ accept_compressed: Arc<[CompressionEncoding]>,
}
impl GrpcClient {
@@ -162,6 +165,8 @@ impl GrpcClient {
pub fn new(config: ClientConfig) -> Self {
Self {
ep: create_client_endpoint(config),
+ send_compressd: None,
+ accept_compressed: Arc::new([]),
}
}
@@ -173,9 +178,19 @@ impl GrpcClient {
{
Self {
ep: Arc::new(ToDynEndpoint(ep.map_to_response())),
+ send_compressd: None,
+ accept_compressed: Arc::new([]),
}
}
+ pub fn set_send_compressed(&mut self, encoding: CompressionEncoding) {
+ self.send_compressd = Some(encoding);
+ }
+
+ pub fn set_accept_compressed(&mut self, encodings: impl Into>) {
+ self.accept_compressed = encodings.into();
+ }
+
pub fn with(mut self, middleware: M) -> Self
where
M: Middleware + 'static>>,
@@ -198,10 +213,12 @@ impl GrpcClient {
message,
extensions,
} = request;
- let mut http_request = create_http_request::(path, metadata, extensions);
+ let mut http_request =
+ create_http_request::(path, metadata, extensions, self.send_compressd);
http_request.set_body(create_encode_request_body(
codec.encoder(),
Streaming::new(futures_util::stream::once(async move { Ok(message) })),
+ self.send_compressd,
));
let mut resp = self
@@ -218,7 +235,9 @@ impl GrpcClient {
}
let body = resp.take_body();
- let mut stream = create_decode_response_body(codec.decoder(), resp.headers(), body)?;
+ let incoming_encoding = get_incoming_encodings(resp.headers(), &self.accept_compressed)?;
+ let mut stream =
+ create_decode_response_body(codec.decoder(), resp.headers(), body, incoming_encoding)?;
let message = stream
.try_next()
@@ -243,8 +262,13 @@ impl GrpcClient {
message,
extensions,
} = request;
- let mut http_request = create_http_request::(path, metadata, extensions);
- http_request.set_body(create_encode_request_body(codec.encoder(), message));
+ let mut http_request =
+ create_http_request::(path, metadata, extensions, self.send_compressd);
+ http_request.set_body(create_encode_request_body(
+ codec.encoder(),
+ message,
+ self.send_compressd,
+ ));
let mut resp = self
.ep
@@ -260,7 +284,9 @@ impl GrpcClient {
}
let body = resp.take_body();
- let mut stream = create_decode_response_body(codec.decoder(), resp.headers(), body)?;
+ let incoming_encoding = get_incoming_encodings(resp.headers(), &self.accept_compressed)?;
+ let mut stream =
+ create_decode_response_body(codec.decoder(), resp.headers(), body, incoming_encoding)?;
let message = stream
.try_next()
@@ -285,10 +311,12 @@ impl GrpcClient {
message,
extensions,
} = request;
- let mut http_request = create_http_request::(path, metadata, extensions);
+ let mut http_request =
+ create_http_request::(path, metadata, extensions, self.send_compressd);
http_request.set_body(create_encode_request_body(
codec.encoder(),
Streaming::new(futures_util::stream::once(async move { Ok(message) })),
+ self.send_compressd,
));
let mut resp = self
@@ -305,7 +333,9 @@ impl GrpcClient {
}
let body = resp.take_body();
- let stream = create_decode_response_body(codec.decoder(), resp.headers(), body)?;
+ let incoming_encoding = get_incoming_encodings(resp.headers(), &self.accept_compressed)?;
+ let stream =
+ create_decode_response_body(codec.decoder(), resp.headers(), body, incoming_encoding)?;
Ok(Response {
metadata: Metadata {
@@ -326,8 +356,13 @@ impl GrpcClient {
message,
extensions,
} = request;
- let mut http_request = create_http_request::(path, metadata, extensions);
- http_request.set_body(create_encode_request_body(codec.encoder(), message));
+ let mut http_request =
+ create_http_request::(path, metadata, extensions, self.send_compressd);
+ http_request.set_body(create_encode_request_body(
+ codec.encoder(),
+ message,
+ self.send_compressd,
+ ));
let mut resp = self
.ep
@@ -343,7 +378,9 @@ impl GrpcClient {
}
let body = resp.take_body();
- let stream = create_decode_response_body(codec.decoder(), resp.headers(), body)?;
+ let incoming_encoding = get_incoming_encodings(resp.headers(), &self.accept_compressed)?;
+ let stream =
+ create_decode_response_body(codec.decoder(), resp.headers(), body, incoming_encoding)?;
Ok(Response {
metadata: Metadata {
@@ -358,6 +395,7 @@ fn create_http_request(
path: &str,
metadata: Metadata,
extensions: Extensions,
+ send_compressd: Option,
) -> HttpRequest {
let mut http_request = HttpRequest::builder()
.uri_str(path)
@@ -368,6 +406,12 @@ fn create_http_request(
.finish();
http_request.headers_mut().extend(metadata.headers);
*http_request.extensions_mut() = extensions;
+ if let Some(send_compressd) = send_compressd {
+ http_request.headers_mut().insert(
+ "grpc-encoding",
+ HeaderValue::from_str(send_compressd.as_str()).expect("BUG: invalid encoding"),
+ );
+ }
http_request
}
diff --git a/poem-grpc/src/compression.rs b/poem-grpc/src/compression.rs
new file mode 100644
index 0000000000..71fd738260
--- /dev/null
+++ b/poem-grpc/src/compression.rs
@@ -0,0 +1,186 @@
+use std::{io::Result as IoResult, str::FromStr};
+
+use http::HeaderMap;
+
+use crate::{Code, Metadata, Status};
+
+/// The compression encodings.
+#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
+pub enum CompressionEncoding {
+ /// gzip
+ #[cfg(feature = "gzip")]
+ #[cfg_attr(docsrs, doc(cfg(feature = "gzip")))]
+ GZIP,
+ /// deflate
+ #[cfg(feature = "deflate")]
+ #[cfg_attr(docsrs, doc(cfg(feature = "deflate")))]
+ DEFLATE,
+ /// brotli
+ #[cfg(feature = "brotli")]
+ #[cfg_attr(docsrs, doc(cfg(feature = "brotli")))]
+ BROTLI,
+ /// zstd
+ #[cfg(feature = "zstd")]
+ #[cfg_attr(docsrs, doc(cfg(feature = "zstd")))]
+ ZSTD,
+}
+
+impl FromStr for CompressionEncoding {
+ type Err = ();
+
+ #[inline]
+ fn from_str(s: &str) -> Result {
+ match s {
+ #[cfg(feature = "gzip")]
+ "gzip" => Ok(CompressionEncoding::GZIP),
+ #[cfg(feature = "deflate")]
+ "deflate" => Ok(CompressionEncoding::DEFLATE),
+ #[cfg(feature = "brotli")]
+ "br" => Ok(CompressionEncoding::BROTLI),
+ #[cfg(feature = "zstd")]
+ "zstd" => Ok(CompressionEncoding::ZSTD),
+ _ => Err(()),
+ }
+ }
+}
+
+impl CompressionEncoding {
+ /// Returns the encoding name.
+ #[allow(unreachable_patterns)]
+ pub fn as_str(&self) -> &'static str {
+ match self {
+ #[cfg(feature = "gzip")]
+ CompressionEncoding::GZIP => "gzip",
+ #[cfg(feature = "deflate")]
+ CompressionEncoding::DEFLATE => "deflate",
+ #[cfg(feature = "brotli")]
+ CompressionEncoding::BROTLI => "br",
+ #[cfg(feature = "zstd")]
+ CompressionEncoding::ZSTD => "zstd",
+ _ => unreachable!(),
+ }
+ }
+
+ #[allow(
+ unreachable_code,
+ unused_imports,
+ unused_mut,
+ unused_variables,
+ unreachable_patterns
+ )]
+ pub(crate) async fn encode(&self, data: &[u8]) -> IoResult> {
+ use tokio::io::AsyncReadExt;
+
+ let mut buf = Vec::new();
+
+ match self {
+ #[cfg(feature = "gzip")]
+ CompressionEncoding::GZIP => {
+ async_compression::tokio::bufread::GzipEncoder::new(data)
+ .read_to_end(&mut buf)
+ .await?;
+ }
+ #[cfg(feature = "deflate")]
+ CompressionEncoding::DEFLATE => {
+ async_compression::tokio::bufread::DeflateEncoder::new(data)
+ .read_to_end(&mut buf)
+ .await?;
+ }
+ #[cfg(feature = "brotli")]
+ CompressionEncoding::BROTLI => {
+ async_compression::tokio::bufread::BrotliEncoder::new(data)
+ .read_to_end(&mut buf)
+ .await?;
+ }
+ #[cfg(feature = "zstd")]
+ CompressionEncoding::ZSTD => {
+ async_compression::tokio::bufread::ZstdEncoder::new(data)
+ .read_to_end(&mut buf)
+ .await?;
+ }
+ _ => unreachable!(),
+ }
+
+ Ok(buf)
+ }
+
+ #[allow(
+ unreachable_code,
+ unused_imports,
+ unused_mut,
+ unused_variables,
+ unreachable_patterns
+ )]
+ pub(crate) async fn decode(&self, data: &[u8]) -> IoResult> {
+ use tokio::io::AsyncReadExt;
+
+ let mut buf = Vec::new();
+
+ match self {
+ #[cfg(feature = "gzip")]
+ CompressionEncoding::GZIP => {
+ async_compression::tokio::bufread::GzipDecoder::new(data)
+ .read_to_end(&mut buf)
+ .await?;
+ }
+ #[cfg(feature = "deflate")]
+ CompressionEncoding::DEFLATE => {
+ async_compression::tokio::bufread::DeflateDecoder::new(data)
+ .read_to_end(&mut buf)
+ .await?;
+ }
+ #[cfg(feature = "brotli")]
+ CompressionEncoding::BROTLI => {
+ async_compression::tokio::bufread::BrotliDecoder::new(data)
+ .read_to_end(&mut buf)
+ .await?;
+ }
+ #[cfg(feature = "zstd")]
+ CompressionEncoding::ZSTD => {
+ async_compression::tokio::bufread::ZstdDecoder::new(data)
+ .read_to_end(&mut buf)
+ .await?;
+ }
+ _ => unreachable!(),
+ }
+
+ Ok(buf)
+ }
+}
+
+fn unimplemented(accept_compressed: &[CompressionEncoding]) -> Status {
+ let mut md = Metadata::new();
+ let mut accept_encoding = String::new();
+ let mut iter = accept_compressed.iter();
+ if let Some(encoding) = iter.next() {
+ accept_encoding.push_str(encoding.as_str());
+ }
+ for encoding in iter {
+ accept_encoding.push_str(", ");
+ accept_encoding.push_str(encoding.as_str());
+ }
+ md.append("grpc-accept-encoding", accept_encoding);
+ Status::new(Code::Unimplemented)
+ .with_metadata(md)
+ .with_message("unsupported encoding")
+}
+
+pub(crate) fn get_incoming_encodings(
+ headers: &HeaderMap,
+ accept_compressed: &[CompressionEncoding],
+) -> Result