Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#159] server.http2.enabled 기능 구현 #193

Merged
merged 3 commits into from
Dec 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions rupring/src/application_properties.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,17 @@ impl Default for Http1 {
}
}

#[derive(Debug, PartialEq, Clone)]
pub struct Http2 {
pub enabled: bool,
}

impl Default for Http2 {
fn default() -> Self {
Http2 { enabled: false }
}
}

// Reference: https://docs.spring.io/spring-boot/appendix/application-properties/index.html#appendix.application-properties.server
#[derive(Debug, PartialEq, Clone)]
pub struct Server {
Expand All @@ -169,6 +180,7 @@ pub struct Server {
pub thread_limit: Option<usize>,
pub request_timeout: Option<Duration>,
pub http1: Http1,
pub http2: Http2,
}

impl Default for Server {
Expand All @@ -182,6 +194,7 @@ impl Default for Server {
thread_limit: None,
request_timeout: None,
http1: Http1::default(),
http2: Http2::default(),
}
}
}
Expand Down Expand Up @@ -313,6 +326,11 @@ impl ApplicationProperties {
server.http1.keep_alive = value;
}
}
"server.http2.enabled" => {
if let Ok(value) = value.parse::<bool>() {
server.http2.enabled = value;
}
}
"environment" => {
environment = value.to_string();
}
Expand Down
175 changes: 94 additions & 81 deletions rupring/src/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ mod parse;

#[cfg(feature = "aws-lambda")]
use bootings::aws_lambda::LambdaRequestEvent;
use hyper_util::rt::TokioExecutor;
use tokio::time::error::Elapsed;
use tokio::time::Instant;

Expand All @@ -27,7 +28,6 @@ use std::sync::Arc;
use http_body_util::BodyExt;
use http_body_util::Full;
use hyper::body::Bytes;
use hyper::server::conn::http1;
use hyper::service::service_fn;
use hyper::StatusCode;
use hyper::{Request, Response};
Expand Down Expand Up @@ -82,6 +82,7 @@ pub async fn run_server(
}

let keep_alive = application_properties.server.http1.keep_alive.to_owned();
let http2_enabled = application_properties.server.http2.enabled.to_owned();

// 5. Main Server Loop
// Spawns a new async Task for each request.
Expand Down Expand Up @@ -114,95 +115,107 @@ pub async fn run_server(

// 6. create tokio task per HTTP request
tokio::task::spawn(async move {
if let Err(err) = http1::Builder::new()
.keep_alive(keep_alive)
// `service_fn` converts our function in a `Service`
.serve_connection(
io,
service_fn(move |request: Request<hyper::body::Incoming>| {
let di_context = Arc::clone(&di_context);
let application_properties = Arc::clone(&application_properties);

let running_task_count = Arc::clone(&running_task_count);

async move {
if let Some(timeout_duration) =
application_properties.server.request_timeout
{
let now = Instant::now();

let handle = tokio::time::timeout_at(
now + timeout_duration,
tokio::task::spawn(async move {
if is_graceful_shutdown {
running_task_count
.fetch_add(1, std::sync::atomic::Ordering::Release);
}

let result = process_request(
application_properties,
di_context,
root_module,
request,
)
.await;

if is_graceful_shutdown {
running_task_count
.fetch_sub(1, std::sync::atomic::Ordering::Release);
}

result
}),
);

match handle.await {
Ok(Ok(response)) => response,
Ok(Err(error)) => default_join_error_handler(error),
Err(error) => default_timeout_handler(error),
let service = service_fn(move |request: Request<hyper::body::Incoming>| {
let di_context = Arc::clone(&di_context);
let application_properties = Arc::clone(&application_properties);

let running_task_count = Arc::clone(&running_task_count);

async move {
if let Some(timeout_duration) = application_properties.server.request_timeout {
let now = Instant::now();

let handle = tokio::time::timeout_at(
now + timeout_duration,
tokio::task::spawn(async move {
if is_graceful_shutdown {
running_task_count
.fetch_add(1, std::sync::atomic::Ordering::Release);
}
} else {
let _running_task_count = Arc::clone(&running_task_count);

let handle = tokio::spawn(async move {
let running_task_count = _running_task_count;
let result = process_request(
application_properties,
di_context,
root_module,
request,
)
.await;

if is_graceful_shutdown {
running_task_count
.fetch_sub(1, std::sync::atomic::Ordering::Release);
}

if is_graceful_shutdown {
running_task_count
.fetch_add(1, std::sync::atomic::Ordering::Release);
}
result
}),
);

let result = process_request(
application_properties,
di_context,
root_module,
request,
)
.await;
match handle.await {
Ok(Ok(response)) => response,
Ok(Err(error)) => default_join_error_handler(error),
Err(error) => default_timeout_handler(error),
}
} else {
let _running_task_count = Arc::clone(&running_task_count);

if is_graceful_shutdown {
running_task_count
.fetch_sub(1, std::sync::atomic::Ordering::Release);
}
let handle = tokio::spawn(async move {
let running_task_count = _running_task_count;

result
});
if is_graceful_shutdown {
running_task_count
.fetch_add(1, std::sync::atomic::Ordering::Release);
}

let result = handle.await;
let result = process_request(
application_properties,
di_context,
root_module,
request,
)
.await;

if is_graceful_shutdown {
running_task_count
.fetch_sub(1, std::sync::atomic::Ordering::Release);
}

let response = match result {
Ok(response) => response,
Err(error) => default_join_error_handler(error),
};
result
});

return response;
}
}
}),
)
.await
{
println!("Error serving connection: {:?}", err);
let result = handle.await;

let response = match result {
Ok(response) => response,
Err(error) => default_join_error_handler(error),
};

return response;
}
}
});

if http2_enabled {
let mut http_builder =
hyper_util::server::conn::auto::Builder::new(TokioExecutor::new());

http_builder.http2().enable_connect_protocol();

if let Err(err) = http_builder
.serve_connection_with_upgrades(io, service)
.await
{
println!("Error serving connection: {:?}", err);
}
} else {
let mut http_builder = hyper::server::conn::http1::Builder::new();

if keep_alive {
http_builder.keep_alive(keep_alive);
}

if let Err(err) = http_builder.serve_connection(io, service).await {
println!("Error serving connection: {:?}", err);
}
}
});
}
Expand Down
3 changes: 2 additions & 1 deletion rupring_example/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@ server.compression.mime-types=application/json,application/xml,text/html,text/xm
server.compression.min-response-size=1024
server.compression.algorithm=gzip
server.request-timeout=3s
server.http1.keep-alive=false
server.http1.keep-alive=true
server.http2.enabled=true
Loading