Skip to content

Commit

Permalink
Merge pull request #191 from myyrakle/feat/#174
Browse files Browse the repository at this point in the history
[#174] request-timeout 옵션 구현
  • Loading branch information
myyrakle authored Dec 15, 2024
2 parents 60a96a3 + 0453536 commit f226ede
Show file tree
Hide file tree
Showing 6 changed files with 150 additions and 23 deletions.
2 changes: 1 addition & 1 deletion rupring/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ homepage = "https://github.com/myyrakle/rupring/blob/master/README.md"
[dependencies]
rupring_macro={ version="0.11.0", path="../rupring_macro" }
hyper = { version = "1", features = ["full"] }
tokio = { version = "1", features = ["full"] }
tokio = { version = "1", features = ["rt", "net", "rt-multi-thread", "time"] }
http-body-util = "0.1.0"
hyper-util = { version = "0.1", features = ["full"] }
bytes = "1.5.0"
Expand Down
27 changes: 26 additions & 1 deletion rupring/src/application_properties.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,13 @@
| server.compression.min-response-size | The minimum response size to compress. (byte) | 2048 |
| server.compression.algorithm | The compression algorithm to use. (gzip,deflate) | gzip |
| server.thread.limit | The thread limit to use. | None(max) |
| server.request-timeout | The request timeout. (300 = 300 millisecond, 3s = 3 second, 2m = 2 minute) | No Timeout |
| banner.enabled | Whether to enable the banner. | true |
| banner.location | The location of the banner file. | None |
| banner.charset | The charset of the banner file. (UTF-8, UTF-16) | UTF-8 |
*/

use std::collections::HashMap;
use std::{collections::HashMap, time::Duration};

#[derive(Debug, PartialEq, Clone)]
pub struct ApplicationProperties {
Expand Down Expand Up @@ -154,6 +155,7 @@ pub struct Server {
pub shutdown: ShutdownType,
pub timeout_per_shutdown_phase: String,
pub thread_limit: Option<usize>,
pub request_timeout: Option<Duration>,
}

impl Default for Server {
Expand All @@ -165,6 +167,7 @@ impl Default for Server {
shutdown: ShutdownType::Immediate,
timeout_per_shutdown_phase: "30s".to_string(),
thread_limit: None,
request_timeout: None,
}
}
}
Expand Down Expand Up @@ -271,6 +274,28 @@ impl ApplicationProperties {
server.thread_limit = Some(value);
}
}
"server.request-timeout" => {
// * = millisecond
// *s = second
// *m = minute
let timeout = value.trim_end_matches(|c| !char::is_numeric(c));

if let Ok(timeout) = timeout.parse::<u64>() {
if timeout == 0 {
continue;
}

let duration = match value.chars().last() {
Some('s') => std::time::Duration::from_secs(timeout),
Some('m') => std::time::Duration::from_secs(timeout * 60),
_ => std::time::Duration::from_millis(timeout),
};

println!("timeout: {:?}", duration);

server.request_timeout = Some(duration);
}
}
"environment" => {
environment = value.to_string();
}
Expand Down
123 changes: 107 additions & 16 deletions rupring/src/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ mod parse;

#[cfg(feature = "aws-lambda")]
use bootings::aws_lambda::LambdaRequestEvent;
use tokio::time::error::Elapsed;
use tokio::time::Instant;

use crate::application_properties;
use crate::application_properties::CompressionAlgorithm;
Expand All @@ -16,6 +18,7 @@ pub(crate) mod route;

use std::collections::HashMap;
use std::convert::Infallible;
use std::error::Error;
use std::net::SocketAddr;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::AtomicU64;
Expand Down Expand Up @@ -109,38 +112,96 @@ pub async fn run_server(

// 6. create tokio task per HTTP request
tokio::task::spawn(async move {
if is_graceful_shutdown {
running_task_count.fetch_add(1, std::sync::atomic::Ordering::Release);
}

if let Err(err) = http1::Builder::new()
.keep_alive(true)
// `service_fn` converts our function in a `Service`
.serve_connection(
io,
service_fn(|request: Request<hyper::body::Incoming>| {
service_fn(move |request: Request<hyper::body::Incoming>| {
let di_context = Arc::clone(&di_context);
let application_properties = Arc::clone(&application_properties);

let running_task_count = Arc::clone(&running_task_count);

async move {
process_request(
application_properties,
di_context,
root_module,
request,
)
.await
if let Some(timeout_duration) =
application_properties.server.request_timeout
{
let now = Instant::now();

let handle = tokio::time::timeout_at(
now + timeout_duration,
tokio::task::spawn(async move {
if is_graceful_shutdown {
running_task_count
.fetch_add(1, std::sync::atomic::Ordering::Release);
}

let result = process_request(
application_properties,
di_context,
root_module,
request,
)
.await;

if is_graceful_shutdown {
running_task_count
.fetch_sub(1, std::sync::atomic::Ordering::Release);
}

result
}),
);

match handle.await {
Ok(Ok(response)) => response,
Ok(Err(error)) => default_join_error_handler(error),
Err(error) => default_timeout_handler(error),
}
} else {
let _running_task_count = Arc::clone(&running_task_count);

let handle = tokio::spawn(async move {
let running_task_count = _running_task_count;

if is_graceful_shutdown {
running_task_count
.fetch_add(1, std::sync::atomic::Ordering::Release);
}

let result = process_request(
application_properties,
di_context,
root_module,
request,
)
.await;

if is_graceful_shutdown {
running_task_count
.fetch_sub(1, std::sync::atomic::Ordering::Release);
}

result
});

let result = handle.await;

let response = match result {
Ok(response) => response,
Err(error) => default_join_error_handler(error),
};

return response;
}
}
}),
)
.await
{
println!("Error serving connection: {:?}", err);
}

if is_graceful_shutdown {
running_task_count.fetch_sub(1, std::sync::atomic::Ordering::Release);
}
});
}
}
Expand Down Expand Up @@ -312,6 +373,36 @@ fn default_404_handler() -> Result<Response<Full<Bytes>>, Infallible> {
return Ok::<Response<Full<Bytes>>, Infallible>(response);
}

fn default_timeout_handler(error: Elapsed) -> Result<Response<Full<Bytes>>, Infallible> {
let mut response: hyper::Response<Full<Bytes>> = Response::builder()
.body(Full::new(Bytes::from(format!(
"Request Timeout: {}",
error.to_string()
))))
.unwrap();

if let Ok(status) = StatusCode::from_u16(500) {
*response.status_mut() = status;
}

return Ok::<Response<Full<Bytes>>, Infallible>(response);
}

fn default_join_error_handler(error: impl Error) -> Result<Response<Full<Bytes>>, Infallible> {
let mut response: hyper::Response<Full<Bytes>> = Response::builder()
.body(Full::new(Bytes::from(format!(
"Internal Server Error: {:?}",
error
))))
.unwrap();

if let Ok(status) = StatusCode::from_u16(500) {
*response.status_mut() = status;
}

return Ok::<Response<Full<Bytes>>, Infallible>(response);
}

async fn process_request<T>(
application_properties: Arc<application_properties::ApplicationProperties>,
di_context: Arc<di::DIContext>,
Expand Down
4 changes: 2 additions & 2 deletions rupring/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,7 @@ impl<T: IModule + Clone + Copy + Sync + Send + 'static> RupringFactory<T> {
runtime_builder.worker_threads(thread_limit);
}

let runtime = Builder::new_multi_thread().enable_all().build()?;
let runtime = runtime_builder.build()?;

let result = runtime.block_on(async {
core::run_server(self.application_properties, self.root_module).await
Expand All @@ -508,7 +508,7 @@ impl<T: IModule + Clone + Copy + Sync + Send + 'static> RupringFactory<T> {
runtime_builder.worker_threads(thread_limit);
}

let runtime = Builder::new_multi_thread().enable_all().build()?;
let runtime = runtime_builder.build()?;

let result = runtime.block_on(async {
core::run_server_on_aws_lambda(self.application_properties, self.root_module).await
Expand Down
5 changes: 3 additions & 2 deletions rupring_example/application.properties
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
server.port=8080
server.shutdown=graceful
server.timeout-per-shutdown-phase=3s
server.timeout-per-shutdown-phase=30s
server.compression.enabled=true
server.compression.mime-types=application/json,application/xml,text/html,text/xml,text/plain
server.compression.min-response-size=1024
server.compression.algorithm=gzip
server.compression.algorithm=gzip
server.request-timeout=3s
12 changes: 11 additions & 1 deletion rupring_example/src/domains/root/controller.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#[derive(Debug, Clone)]
#[rupring::Controller(prefix=/, routes=[index], tags=["root"])]
#[rupring::Controller(prefix=/, routes=[index, slow], tags=["root"])]
pub struct RootController {}

#[rupring::Get(path = /)]
Expand All @@ -12,3 +12,13 @@ pub fn index(request: rupring::Request) -> rupring::Response {

rupring::Response::new().text("123214")
}

#[rupring::Get(path = /slow)]
#[summary = "그냥 느린 API입니다."]
#[description = "별다른 기능은 없습니다."]
#[tags = [root]]
pub fn slow(request: rupring::Request) -> rupring::Response {
std::thread::sleep(std::time::Duration::from_secs(30));

rupring::Response::new().text("Slow")
}

0 comments on commit f226ede

Please sign in to comment.