diff --git a/rupring/Cargo.toml b/rupring/Cargo.toml index de25d01..dfd1161 100644 --- a/rupring/Cargo.toml +++ b/rupring/Cargo.toml @@ -16,7 +16,7 @@ homepage = "https://github.com/myyrakle/rupring/blob/master/README.md" [dependencies] rupring_macro={ version="0.11.0", path="../rupring_macro" } hyper = { version = "1", features = ["full"] } -tokio = { version = "1", features = ["full"] } +tokio = { version = "1", features = ["rt", "net", "rt-multi-thread", "time"] } http-body-util = "0.1.0" hyper-util = { version = "0.1", features = ["full"] } bytes = "1.5.0" diff --git a/rupring/src/application_properties.rs b/rupring/src/application_properties.rs index 95f32f0..5dd49d9 100644 --- a/rupring/src/application_properties.rs +++ b/rupring/src/application_properties.rs @@ -27,12 +27,13 @@ | server.compression.min-response-size | The minimum response size to compress. (byte) | 2048 | | server.compression.algorithm | The compression algorithm to use. (gzip,deflate) | gzip | | server.thread.limit | The thread limit to use. | None(max) | +| server.request-timeout | The request timeout. (300 = 300 millisecond, 3s = 3 second, 2m = 2 minute) | No Timeout | | banner.enabled | Whether to enable the banner. | true | | banner.location | The location of the banner file. | None | | banner.charset | The charset of the banner file. (UTF-8, UTF-16) | UTF-8 | */ -use std::collections::HashMap; +use std::{collections::HashMap, time::Duration}; #[derive(Debug, PartialEq, Clone)] pub struct ApplicationProperties { @@ -154,6 +155,7 @@ pub struct Server { pub shutdown: ShutdownType, pub timeout_per_shutdown_phase: String, pub thread_limit: Option, + pub request_timeout: Option, } impl Default for Server { @@ -165,6 +167,7 @@ impl Default for Server { shutdown: ShutdownType::Immediate, timeout_per_shutdown_phase: "30s".to_string(), thread_limit: None, + request_timeout: None, } } } @@ -271,6 +274,28 @@ impl ApplicationProperties { server.thread_limit = Some(value); } } + "server.request-timeout" => { + // * = millisecond + // *s = second + // *m = minute + let timeout = value.trim_end_matches(|c| !char::is_numeric(c)); + + if let Ok(timeout) = timeout.parse::() { + if timeout == 0 { + continue; + } + + let duration = match value.chars().last() { + Some('s') => std::time::Duration::from_secs(timeout), + Some('m') => std::time::Duration::from_secs(timeout * 60), + _ => std::time::Duration::from_millis(timeout), + }; + + println!("timeout: {:?}", duration); + + server.request_timeout = Some(duration); + } + } "environment" => { environment = value.to_string(); } diff --git a/rupring/src/core/mod.rs b/rupring/src/core/mod.rs index e961a5c..8af2ab0 100644 --- a/rupring/src/core/mod.rs +++ b/rupring/src/core/mod.rs @@ -7,6 +7,8 @@ mod parse; #[cfg(feature = "aws-lambda")] use bootings::aws_lambda::LambdaRequestEvent; +use tokio::time::error::Elapsed; +use tokio::time::Instant; use crate::application_properties; use crate::application_properties::CompressionAlgorithm; @@ -16,6 +18,7 @@ pub(crate) mod route; use std::collections::HashMap; use std::convert::Infallible; +use std::error::Error; use std::net::SocketAddr; use std::sync::atomic::AtomicBool; use std::sync::atomic::AtomicU64; @@ -109,27 +112,89 @@ pub async fn run_server( // 6. create tokio task per HTTP request tokio::task::spawn(async move { - if is_graceful_shutdown { - running_task_count.fetch_add(1, std::sync::atomic::Ordering::Release); - } - if let Err(err) = http1::Builder::new() .keep_alive(true) // `service_fn` converts our function in a `Service` .serve_connection( io, - service_fn(|request: Request| { + service_fn(move |request: Request| { let di_context = Arc::clone(&di_context); let application_properties = Arc::clone(&application_properties); + let running_task_count = Arc::clone(&running_task_count); + async move { - process_request( - application_properties, - di_context, - root_module, - request, - ) - .await + if let Some(timeout_duration) = + application_properties.server.request_timeout + { + let now = Instant::now(); + + let handle = tokio::time::timeout_at( + now + timeout_duration, + tokio::task::spawn(async move { + if is_graceful_shutdown { + running_task_count + .fetch_add(1, std::sync::atomic::Ordering::Release); + } + + let result = process_request( + application_properties, + di_context, + root_module, + request, + ) + .await; + + if is_graceful_shutdown { + running_task_count + .fetch_sub(1, std::sync::atomic::Ordering::Release); + } + + result + }), + ); + + match handle.await { + Ok(Ok(response)) => response, + Ok(Err(error)) => default_join_error_handler(error), + Err(error) => default_timeout_handler(error), + } + } else { + let _running_task_count = Arc::clone(&running_task_count); + + let handle = tokio::spawn(async move { + let running_task_count = _running_task_count; + + if is_graceful_shutdown { + running_task_count + .fetch_add(1, std::sync::atomic::Ordering::Release); + } + + let result = process_request( + application_properties, + di_context, + root_module, + request, + ) + .await; + + if is_graceful_shutdown { + running_task_count + .fetch_sub(1, std::sync::atomic::Ordering::Release); + } + + result + }); + + let result = handle.await; + + let response = match result { + Ok(response) => response, + Err(error) => default_join_error_handler(error), + }; + + return response; + } } }), ) @@ -137,10 +202,6 @@ pub async fn run_server( { println!("Error serving connection: {:?}", err); } - - if is_graceful_shutdown { - running_task_count.fetch_sub(1, std::sync::atomic::Ordering::Release); - } }); } } @@ -312,6 +373,36 @@ fn default_404_handler() -> Result>, Infallible> { return Ok::>, Infallible>(response); } +fn default_timeout_handler(error: Elapsed) -> Result>, Infallible> { + let mut response: hyper::Response> = Response::builder() + .body(Full::new(Bytes::from(format!( + "Request Timeout: {}", + error.to_string() + )))) + .unwrap(); + + if let Ok(status) = StatusCode::from_u16(500) { + *response.status_mut() = status; + } + + return Ok::>, Infallible>(response); +} + +fn default_join_error_handler(error: impl Error) -> Result>, Infallible> { + let mut response: hyper::Response> = Response::builder() + .body(Full::new(Bytes::from(format!( + "Internal Server Error: {:?}", + error + )))) + .unwrap(); + + if let Ok(status) = StatusCode::from_u16(500) { + *response.status_mut() = status; + } + + return Ok::>, Infallible>(response); +} + async fn process_request( application_properties: Arc, di_context: Arc, diff --git a/rupring/src/lib.rs b/rupring/src/lib.rs index 7a19e33..3a2b89c 100644 --- a/rupring/src/lib.rs +++ b/rupring/src/lib.rs @@ -487,7 +487,7 @@ impl RupringFactory { runtime_builder.worker_threads(thread_limit); } - let runtime = Builder::new_multi_thread().enable_all().build()?; + let runtime = runtime_builder.build()?; let result = runtime.block_on(async { core::run_server(self.application_properties, self.root_module).await @@ -508,7 +508,7 @@ impl RupringFactory { runtime_builder.worker_threads(thread_limit); } - let runtime = Builder::new_multi_thread().enable_all().build()?; + let runtime = runtime_builder.build()?; let result = runtime.block_on(async { core::run_server_on_aws_lambda(self.application_properties, self.root_module).await diff --git a/rupring_example/application.properties b/rupring_example/application.properties index e058ea9..1c044da 100644 --- a/rupring_example/application.properties +++ b/rupring_example/application.properties @@ -1,7 +1,8 @@ server.port=8080 server.shutdown=graceful -server.timeout-per-shutdown-phase=3s +server.timeout-per-shutdown-phase=30s server.compression.enabled=true server.compression.mime-types=application/json,application/xml,text/html,text/xml,text/plain server.compression.min-response-size=1024 -server.compression.algorithm=gzip \ No newline at end of file +server.compression.algorithm=gzip +server.request-timeout=3s \ No newline at end of file diff --git a/rupring_example/src/domains/root/controller.rs b/rupring_example/src/domains/root/controller.rs index df78513..d63356c 100644 --- a/rupring_example/src/domains/root/controller.rs +++ b/rupring_example/src/domains/root/controller.rs @@ -1,5 +1,5 @@ #[derive(Debug, Clone)] -#[rupring::Controller(prefix=/, routes=[index], tags=["root"])] +#[rupring::Controller(prefix=/, routes=[index, slow], tags=["root"])] pub struct RootController {} #[rupring::Get(path = /)] @@ -12,3 +12,13 @@ pub fn index(request: rupring::Request) -> rupring::Response { rupring::Response::new().text("123214") } + +#[rupring::Get(path = /slow)] +#[summary = "그냥 느린 API입니다."] +#[description = "별다른 기능은 없습니다."] +#[tags = [root]] +pub fn slow(request: rupring::Request) -> rupring::Response { + std::thread::sleep(std::time::Duration::from_secs(30)); + + rupring::Response::new().text("Slow") +}