Skip to content

Commit

Permalink
Merge pull request #193 from myyrakle/feat/#159
Browse files Browse the repository at this point in the history
[#159] server.http2.enabled 기능 구현
  • Loading branch information
myyrakle authored Dec 19, 2024
2 parents ce2c840 + a61eacd commit 12a90b5
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 82 deletions.
18 changes: 18 additions & 0 deletions rupring/src/application_properties.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,17 @@ impl Default for Http1 {
}
}

#[derive(Debug, PartialEq, Clone)]
pub struct Http2 {
pub enabled: bool,
}

impl Default for Http2 {
fn default() -> Self {
Http2 { enabled: false }
}
}

// Reference: https://docs.spring.io/spring-boot/appendix/application-properties/index.html#appendix.application-properties.server
#[derive(Debug, PartialEq, Clone)]
pub struct Server {
Expand All @@ -169,6 +180,7 @@ pub struct Server {
pub thread_limit: Option<usize>,
pub request_timeout: Option<Duration>,
pub http1: Http1,
pub http2: Http2,
}

impl Default for Server {
Expand All @@ -182,6 +194,7 @@ impl Default for Server {
thread_limit: None,
request_timeout: None,
http1: Http1::default(),
http2: Http2::default(),
}
}
}
Expand Down Expand Up @@ -313,6 +326,11 @@ impl ApplicationProperties {
server.http1.keep_alive = value;
}
}
"server.http2.enabled" => {
if let Ok(value) = value.parse::<bool>() {
server.http2.enabled = value;
}
}
"environment" => {
environment = value.to_string();
}
Expand Down
175 changes: 94 additions & 81 deletions rupring/src/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ mod parse;

#[cfg(feature = "aws-lambda")]
use bootings::aws_lambda::LambdaRequestEvent;
use hyper_util::rt::TokioExecutor;
use tokio::time::error::Elapsed;
use tokio::time::Instant;

Expand All @@ -27,7 +28,6 @@ use std::sync::Arc;
use http_body_util::BodyExt;
use http_body_util::Full;
use hyper::body::Bytes;
use hyper::server::conn::http1;
use hyper::service::service_fn;
use hyper::StatusCode;
use hyper::{Request, Response};
Expand Down Expand Up @@ -82,6 +82,7 @@ pub async fn run_server(
}

let keep_alive = application_properties.server.http1.keep_alive.to_owned();
let http2_enabled = application_properties.server.http2.enabled.to_owned();

// 5. Main Server Loop
// Spawns a new async Task for each request.
Expand Down Expand Up @@ -114,95 +115,107 @@ pub async fn run_server(

// 6. create tokio task per HTTP request
tokio::task::spawn(async move {
if let Err(err) = http1::Builder::new()
.keep_alive(keep_alive)
// `service_fn` converts our function in a `Service`
.serve_connection(
io,
service_fn(move |request: Request<hyper::body::Incoming>| {
let di_context = Arc::clone(&di_context);
let application_properties = Arc::clone(&application_properties);

let running_task_count = Arc::clone(&running_task_count);

async move {
if let Some(timeout_duration) =
application_properties.server.request_timeout
{
let now = Instant::now();

let handle = tokio::time::timeout_at(
now + timeout_duration,
tokio::task::spawn(async move {
if is_graceful_shutdown {
running_task_count
.fetch_add(1, std::sync::atomic::Ordering::Release);
}

let result = process_request(
application_properties,
di_context,
root_module,
request,
)
.await;

if is_graceful_shutdown {
running_task_count
.fetch_sub(1, std::sync::atomic::Ordering::Release);
}

result
}),
);

match handle.await {
Ok(Ok(response)) => response,
Ok(Err(error)) => default_join_error_handler(error),
Err(error) => default_timeout_handler(error),
let service = service_fn(move |request: Request<hyper::body::Incoming>| {
let di_context = Arc::clone(&di_context);
let application_properties = Arc::clone(&application_properties);

let running_task_count = Arc::clone(&running_task_count);

async move {
if let Some(timeout_duration) = application_properties.server.request_timeout {
let now = Instant::now();

let handle = tokio::time::timeout_at(
now + timeout_duration,
tokio::task::spawn(async move {
if is_graceful_shutdown {
running_task_count
.fetch_add(1, std::sync::atomic::Ordering::Release);
}
} else {
let _running_task_count = Arc::clone(&running_task_count);

let handle = tokio::spawn(async move {
let running_task_count = _running_task_count;
let result = process_request(
application_properties,
di_context,
root_module,
request,
)
.await;

if is_graceful_shutdown {
running_task_count
.fetch_sub(1, std::sync::atomic::Ordering::Release);
}

if is_graceful_shutdown {
running_task_count
.fetch_add(1, std::sync::atomic::Ordering::Release);
}
result
}),
);

let result = process_request(
application_properties,
di_context,
root_module,
request,
)
.await;
match handle.await {
Ok(Ok(response)) => response,
Ok(Err(error)) => default_join_error_handler(error),
Err(error) => default_timeout_handler(error),
}
} else {
let _running_task_count = Arc::clone(&running_task_count);

if is_graceful_shutdown {
running_task_count
.fetch_sub(1, std::sync::atomic::Ordering::Release);
}
let handle = tokio::spawn(async move {
let running_task_count = _running_task_count;

result
});
if is_graceful_shutdown {
running_task_count
.fetch_add(1, std::sync::atomic::Ordering::Release);
}

let result = handle.await;
let result = process_request(
application_properties,
di_context,
root_module,
request,
)
.await;

if is_graceful_shutdown {
running_task_count
.fetch_sub(1, std::sync::atomic::Ordering::Release);
}

let response = match result {
Ok(response) => response,
Err(error) => default_join_error_handler(error),
};
result
});

return response;
}
}
}),
)
.await
{
println!("Error serving connection: {:?}", err);
let result = handle.await;

let response = match result {
Ok(response) => response,
Err(error) => default_join_error_handler(error),
};

return response;
}
}
});

if http2_enabled {
let mut http_builder =
hyper_util::server::conn::auto::Builder::new(TokioExecutor::new());

http_builder.http2().enable_connect_protocol();

if let Err(err) = http_builder
.serve_connection_with_upgrades(io, service)
.await
{
println!("Error serving connection: {:?}", err);
}
} else {
let mut http_builder = hyper::server::conn::http1::Builder::new();

if keep_alive {
http_builder.keep_alive(keep_alive);
}

if let Err(err) = http_builder.serve_connection(io, service).await {
println!("Error serving connection: {:?}", err);
}
}
});
}
Expand Down
3 changes: 2 additions & 1 deletion rupring_example/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@ server.compression.mime-types=application/json,application/xml,text/html,text/xm
server.compression.min-response-size=1024
server.compression.algorithm=gzip
server.request-timeout=3s
server.http1.keep-alive=false
server.http1.keep-alive=true
server.http2.enabled=true

0 comments on commit 12a90b5

Please sign in to comment.