diff --git a/.gitignore b/.gitignore index cf83efd..30cc0c7 100644 --- a/.gitignore +++ b/.gitignore @@ -20,4 +20,6 @@ Cargo.lock test.json -.vscode \ No newline at end of file +.vscode + +*.zip \ No newline at end of file diff --git a/README.md b/README.md index 505d5e3..2055756 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # rupring -![](https://img.shields.io/badge/language-Rust-red) ![](https://img.shields.io/badge/version-0.11.0-brightgreen) [![GitHub license](https://img.shields.io/badge/license-MIT-blue.svg)](https://github.com/myyrakle/rupring/blob/master/LICENSE) +![](https://img.shields.io/badge/language-Rust-red) ![](https://img.shields.io/badge/version-0.12.0-brightgreen) [![GitHub license](https://img.shields.io/badge/license-MIT-blue.svg)](https://github.com/myyrakle/rupring/blob/master/LICENSE) Spring Comes to Rust diff --git a/application.properties b/application.properties deleted file mode 100644 index e058ea9..0000000 --- a/application.properties +++ /dev/null @@ -1,7 +0,0 @@ -server.port=8080 -server.shutdown=graceful -server.timeout-per-shutdown-phase=3s -server.compression.enabled=true -server.compression.mime-types=application/json,application/xml,text/html,text/xml,text/plain -server.compression.min-response-size=1024 -server.compression.algorithm=gzip \ No newline at end of file diff --git a/rupring/Cargo.toml b/rupring/Cargo.toml index ce6d83a..f975615 100644 --- a/rupring/Cargo.toml +++ b/rupring/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "rupring" -version = "0.11.0" +version = "0.12.0" edition = "2021" license = "MIT" authors = ["myyrakle "] @@ -36,4 +36,10 @@ features = [ ] [target.'cfg(target_os = "linux")'.dependencies] -signal-hook = "0.3.17" \ No newline at end of file +signal-hook = "0.3.17" + +[features] +default = [] + +full = ["aws-lambda"] +aws-lambda = [] \ No newline at end of file diff --git a/rupring/src/core/boot.rs b/rupring/src/core/boot.rs index 1392039..f0ba91e 100644 --- a/rupring/src/core/boot.rs +++ b/rupring/src/core/boot.rs @@ -21,3 +21,13 @@ where app.listen().unwrap(); } + +#[cfg(feature = "aws-lambda")] +pub fn run_on_aws_lambda(root_module: T) +where + T: IModule + Clone + Copy + Sync + Send + 'static, +{ + let app = crate::RupringFactory::create(root_module); + + app.listen_on_aws_lambda().unwrap(); +} diff --git a/rupring/src/core/bootings/aws_lambda.rs b/rupring/src/core/bootings/aws_lambda.rs new file mode 100644 index 0000000..c5e7625 --- /dev/null +++ b/rupring/src/core/bootings/aws_lambda.rs @@ -0,0 +1,80 @@ +use std::{collections::HashMap, str::FromStr}; + +use hyper::Uri; +use serde::Serialize; + +use crate::utils; + +// Reference: https://docs.aws.amazon.com/ko_kr/lambda/latest/dg/runtimes-api.html +fn get_aws_lambda_runtime_api() -> Option { + std::env::var("AWS_LAMBDA_RUNTIME_API").ok() +} + +#[derive(Debug, Default, Clone)] +pub struct LambdaRequestContext { + pub aws_request_id: String, + pub response_body: String, +} + +pub async fn get_request_context() -> anyhow::Result { + let aws_lambda_runtime_api = match get_aws_lambda_runtime_api() { + Some(api) => api, + None => return Err(anyhow::anyhow!("AWS_LAMBDA_RUNTIME_API is not set")), + }; + + let url = Uri::from_str( + format!("http://{aws_lambda_runtime_api}/2018-06-01/runtime/invocation/next").as_str(), + )?; + + let mut headers = HashMap::new(); + headers.insert(hyper::header::HOST, "localhost".to_owned()); + + let response = + utils::hyper::send_http_request(url, hyper::Method::GET, headers, "".to_owned()).await?; + + let mut request_context = LambdaRequestContext::default(); + + if let Some(aws_request_id) = response.headers.get("Lambda-Runtime-Aws-Request-Id") { + request_context.aws_request_id = aws_request_id.to_str()?.to_string(); + } + + request_context.response_body = response.body; + + Ok(request_context) +} + +#[derive(Debug, Default, Clone, Serialize)] +pub struct LambdaReponse { + #[serde(rename = "statusCode")] + pub status_code: u16, + #[serde(rename = "headers")] + pub headers: HashMap, + #[serde(rename = "body")] + pub body: String, +} + +pub async fn send_response_to_lambda( + aws_request_id: String, + response: LambdaReponse, +) -> anyhow::Result<()> { + let response = serde_json::to_string(&response)?; + + let aws_lambda_runtime_api = match get_aws_lambda_runtime_api() { + Some(api) => api, + None => return Err(anyhow::anyhow!("AWS_LAMBDA_RUNTIME_API is not set")), + }; + + let url = Uri::from_str( + format!( + "http://{aws_lambda_runtime_api}/2018-06-01/runtime/invocation/{aws_request_id}/response" + ) + .as_str(), + )?; + + let mut headers = HashMap::new(); + headers.insert(hyper::header::HOST, "localhost".to_owned()); + + let _ = utils::hyper::send_http_request(url, hyper::Method::POST, headers, response).await?; + + Ok(()) +} diff --git a/rupring/src/core/bootings/mod.rs b/rupring/src/core/bootings/mod.rs new file mode 100644 index 0000000..187fd95 --- /dev/null +++ b/rupring/src/core/bootings/mod.rs @@ -0,0 +1 @@ +pub mod aws_lambda; diff --git a/rupring/src/core/compression.rs b/rupring/src/core/compression.rs new file mode 100644 index 0000000..5124493 --- /dev/null +++ b/rupring/src/core/compression.rs @@ -0,0 +1,20 @@ +pub fn compress_with_gzip(body: &[u8]) -> anyhow::Result> { + use std::io::Write; + + let mut encoder = flate2::write::GzEncoder::new(Vec::new(), flate2::Compression::default()); + encoder.write_all(body)?; + let compressed = encoder.finish()?; + + Ok(compressed) +} + +pub fn compress_with_deflate(body: &[u8]) -> anyhow::Result> { + use std::io::Write; + + let mut encoder = + flate2::write::DeflateEncoder::new(Vec::new(), flate2::Compression::default()); + encoder.write_all(body)?; + let compressed = encoder.finish()?; + + Ok(compressed) +} diff --git a/rupring/src/core/graceful.rs b/rupring/src/core/graceful.rs index 8e25963..0ee323a 100644 --- a/rupring/src/core/graceful.rs +++ b/rupring/src/core/graceful.rs @@ -1,4 +1,11 @@ -use std::sync::{atomic::AtomicBool, Arc}; +use std::sync::{ + atomic::{AtomicBool, AtomicU64}, + Arc, +}; + +use log::Level; + +use crate::{application_properties, logger::print_system_log}; #[derive(Debug, Clone)] pub struct SignalFlags { @@ -24,3 +31,68 @@ impl SignalFlags { Ok(()) } } + +pub fn handle_graceful_shutdown( + application_properties: &application_properties::ApplicationProperties, + service_avaliable: Arc, + running_task_count: Arc, +) { + let signal_flags = SignalFlags::new(); + let shutdown_timeout_duration = application_properties.server.shutdown_timeout_duration(); + + if let Err(error) = signal_flags.register_hooks() { + print_system_log( + Level::Error, + format!("Error registering signal hooks: {:?}", error).as_str(), + ); + } else { + print_system_log(Level::Info, "Graceful shutdown enabled"); + + let service_avaliable = Arc::clone(&service_avaliable); + let running_task_count = Arc::clone(&running_task_count); + tokio::spawn(async move { + let sigterm = Arc::clone(&signal_flags.sigterm); + let sigint = Arc::clone(&signal_flags.sigint); + + loop { + if sigterm.load(std::sync::atomic::Ordering::Relaxed) { + print_system_log( + Level::Info, + "SIGTERM received. Try to shutdown gracefully...", + ); + service_avaliable.store(false, std::sync::atomic::Ordering::Release); + break; + } + + if sigint.load(std::sync::atomic::Ordering::Relaxed) { + print_system_log( + Level::Info, + "SIGINT received. Try to shutdown gracefully...", + ); + service_avaliable.store(false, std::sync::atomic::Ordering::Release); + break; + } + + tokio::time::sleep(tokio::time::Duration::from_millis(500)).await; + } + + let shutdown_request_time = std::time::Instant::now(); + + loop { + if running_task_count.load(std::sync::atomic::Ordering::Relaxed) == 0 { + print_system_log(Level::Info, "All tasks are done. Shutting down..."); + std::process::exit(0); + } + + // timeout 지나면 강제로 종료 + let now = std::time::Instant::now(); + if now.duration_since(shutdown_request_time) >= shutdown_timeout_duration { + print_system_log(Level::Info, "Shutdown timeout reached. Forcing shutdown..."); + std::process::exit(0); + } + + tokio::time::sleep(tokio::time::Duration::from_millis(500)).await; + } + }); + } +} diff --git a/rupring/src/core/mod.rs b/rupring/src/core/mod.rs index 7042c6c..e63fa4e 100644 --- a/rupring/src/core/mod.rs +++ b/rupring/src/core/mod.rs @@ -1,5 +1,7 @@ mod banner; pub mod boot; +mod bootings; +mod compression; mod graceful; mod parse; @@ -16,6 +18,7 @@ use std::sync::atomic::AtomicBool; use std::sync::atomic::AtomicU64; use std::sync::Arc; +use bootings::aws_lambda::LambdaReponse; use http_body_util::BodyExt; use http_body_util::Full; use hyper::body::Bytes; @@ -32,71 +35,6 @@ use crate::logger::print_system_log; use crate::swagger::context::SwaggerContext; use crate::IModule; -pub fn handle_graceful_shutdown( - application_properties: &application_properties::ApplicationProperties, - service_avaliable: Arc, - running_task_count: Arc, -) { - let signal_flags = graceful::SignalFlags::new(); - let shutdown_timeout_duration = application_properties.server.shutdown_timeout_duration(); - - if let Err(error) = signal_flags.register_hooks() { - print_system_log( - Level::Error, - format!("Error registering signal hooks: {:?}", error).as_str(), - ); - } else { - print_system_log(Level::Info, "Graceful shutdown enabled"); - - let service_avaliable = Arc::clone(&service_avaliable); - let running_task_count = Arc::clone(&running_task_count); - tokio::spawn(async move { - let sigterm = Arc::clone(&signal_flags.sigterm); - let sigint = Arc::clone(&signal_flags.sigint); - - loop { - if sigterm.load(std::sync::atomic::Ordering::Relaxed) { - print_system_log( - Level::Info, - "SIGTERM received. Try to shutdown gracefully...", - ); - service_avaliable.store(false, std::sync::atomic::Ordering::Release); - break; - } - - if sigint.load(std::sync::atomic::Ordering::Relaxed) { - print_system_log( - Level::Info, - "SIGINT received. Try to shutdown gracefully...", - ); - service_avaliable.store(false, std::sync::atomic::Ordering::Release); - break; - } - - tokio::time::sleep(tokio::time::Duration::from_millis(500)).await; - } - - let shutdown_request_time = std::time::Instant::now(); - - loop { - if running_task_count.load(std::sync::atomic::Ordering::Relaxed) == 0 { - print_system_log(Level::Info, "All tasks are done. Shutting down..."); - std::process::exit(0); - } - - // timeout 지나면 강제로 종료 - let now = std::time::Instant::now(); - if now.duration_since(shutdown_request_time) >= shutdown_timeout_duration { - print_system_log(Level::Info, "Shutdown timeout reached. Forcing shutdown..."); - std::process::exit(0); - } - - tokio::time::sleep(tokio::time::Duration::from_millis(500)).await; - } - }); - } -} - pub async fn run_server( application_properties: application_properties::ApplicationProperties, root_module: impl IModule + Clone + Copy + Send + Sync + 'static, @@ -131,7 +69,7 @@ pub async fn run_server( let running_task_count = Arc::new(AtomicU64::new(0)); if is_graceful_shutdown { - handle_graceful_shutdown( + graceful::handle_graceful_shutdown( &application_properties, Arc::clone(&service_avaliable), Arc::clone(&running_task_count), @@ -178,13 +116,18 @@ pub async fn run_server( // `service_fn` converts our function in a `Service` .serve_connection( io, - service_fn(|req: Request| { + service_fn(|request: Request| { let di_context = Arc::clone(&di_context); let application_properties = Arc::clone(&application_properties); async move { - process_request(application_properties, di_context, root_module, req) - .await + process_request( + application_properties, + di_context, + root_module, + request, + ) + .await } }), ) @@ -200,6 +143,90 @@ pub async fn run_server( } } +#[cfg(feature = "aws-lambda")] +pub async fn run_server_on_aws_lambda( + application_properties: application_properties::ApplicationProperties, + root_module: impl IModule + Clone + Copy + Send + Sync + 'static, +) -> Result<(), Box> { + // 1. DI Context Initialize + let mut di_context = di::DIContext::new(); + di_context.initialize(Box::new(root_module.clone())); + let di_context = Arc::new(di_context); + + // 2. Prepare Swagger Serving, if enabled + if let Some(swagger_context) = di_context.get::() { + swagger_context.initialize_from_module(root_module.clone()); + } + + // 3. ready, set, go! + banner::print_banner(); + + let socket_address = make_address(&application_properties)?; + + print_system_log( + Level::Info, + format!("Starting Application on {}", socket_address).as_str(), + ); + + let application_properties = Arc::new(application_properties); + + // 4. extract request context from AWS Lambda event + let lambda_request_context = bootings::aws_lambda::get_request_context().await?; + + let hyper_request = hyper::Request::builder() + .method("GET") + .uri("http://localhost/") + .body("".to_string()) + .unwrap(); + + // 5. process request + let mut response = process_request( + application_properties, + di_context, + root_module, + hyper_request, + ) + .await?; + + // 6. convert response to AWS Lambda response format + let status_code = response.status(); + let headermap = response.headers(); + + let mut headers = HashMap::new(); + + for (header_name, header_value) in headermap { + let header_name = header_name.to_string(); + let header_value = header_value.to_str().unwrap_or("").to_string(); + + headers.insert(header_name, header_value); + } + + let response_body = match response.body_mut().collect().await { + Ok(body) => { + let body = body.to_bytes(); + let body = String::from_utf8(body.to_vec()).unwrap_or("".to_string()); + + body + } + Err(err) => { + return Err(Box::new(err)); + } + }; + + // 7. send response to AWS Lambda + bootings::aws_lambda::send_response_to_lambda( + lambda_request_context.aws_request_id, + LambdaReponse { + status_code: status_code.as_u16(), + headers, + body: response_body, + }, + ) + .await?; + + return Ok(()); +} + fn make_address( application_properties: &application_properties::ApplicationProperties, ) -> anyhow::Result { @@ -228,18 +255,21 @@ fn default_404_handler() -> Result>, Infallible> { return Ok::>, Infallible>(response); } -async fn process_request( +async fn process_request( application_properties: Arc, di_context: Arc, root_module: impl IModule + Clone + Copy + Send + Sync + 'static, - req: Request, -) -> Result>, Infallible> { + request: Request, +) -> Result>, Infallible> +where + T: hyper::body::Body, +{ // 1. Prepare URI matching let di_context = Arc::clone(&di_context); - let uri = req.uri(); + let uri = request.uri(); let request_path = uri.path(); - let request_method = req.method(); + let request_method = request.method(); print_system_log( Level::Info, @@ -270,7 +300,7 @@ async fn process_request( // 3.2. Parse Headers let mut headers = HashMap::new(); - for (header_name, header_value) in req.headers() { + for (header_name, header_value) in request.headers() { let header_name = header_name.to_string(); let header_value = header_value.to_str().unwrap_or("").to_string(); @@ -284,16 +314,16 @@ async fn process_request( let request_method = request_method.to_owned(); let request_path = request_path.to_owned(); - let request_body = match req.collect().await { + let request_body = match request.collect().await { Ok(body) => { let body = body.to_bytes(); let body = String::from_utf8(body.to_vec()).unwrap_or("".to_string()); body } - Err(err) => { + Err(_) => { return Ok::>, Infallible>(Response::new(Full::new(Bytes::from( - format!("Error reading request body: {:?}", err), + format!("Error reading request body"), )))); } }; @@ -406,7 +436,7 @@ fn post_process_response( match application_properties.server.compression.algorithm { CompressionAlgorithm::Gzip => { // compression - let compressed_bytes = compress_with_gzip(&response.body); + let compressed_bytes = compression::compress_with_gzip(&response.body); let compressed_bytes = match compressed_bytes { Ok(compressed_bytes) => compressed_bytes, @@ -430,7 +460,7 @@ fn post_process_response( } CompressionAlgorithm::Deflate => { // compression - let compressed_bytes = compress_with_deflate(&response.body); + let compressed_bytes = compression::compress_with_deflate(&response.body); let compressed_bytes = match compressed_bytes { Ok(compressed_bytes) => compressed_bytes, @@ -457,24 +487,3 @@ fn post_process_response( response } - -fn compress_with_gzip(body: &[u8]) -> anyhow::Result> { - use std::io::Write; - - let mut encoder = flate2::write::GzEncoder::new(Vec::new(), flate2::Compression::default()); - encoder.write_all(body)?; - let compressed = encoder.finish()?; - - Ok(compressed) -} - -fn compress_with_deflate(body: &[u8]) -> anyhow::Result> { - use std::io::Write; - - let mut encoder = - flate2::write::DeflateEncoder::new(Vec::new(), flate2::Compression::default()); - encoder.write_all(body)?; - let compressed = encoder.finish()?; - - Ok(compressed) -} diff --git a/rupring/src/lib.rs b/rupring/src/lib.rs index 6df8a72..9144c85 100644 --- a/rupring/src/lib.rs +++ b/rupring/src/lib.rs @@ -174,10 +174,48 @@ impl rupring::IProvider for HomeService { # Application Properties - rupring provides various execution options through a special configuration file called application.properties. - Please refer to the corresponding [document](crate::application_properties) for more details. + +# AWS Lambda +- rupring provides the option to run on AWS Lambda. +- Supported Lambda Runtimes + 1. Amazon Linux 2 + 2. Amazon Linux 2023 + +## How to use +1. Enable the "aws-lambda" feature flag. +```ignore +rupring={ version = "0.12.0", features=["aws-lambda"] } +``` + +2. Use the `rupring::run_on_aws_lambda` function instead of `rupring::run`. +```rust,ignore +fn main() { + rupring::run_on_aws_lambda(RootModule {}) +} +``` + +3. Compile and create an executable file. (x86_64-unknown-linux-musl) +```bash +rustup target add x86_64-unknown-linux-musl +cargo build --release --target x86_64-unknown-linux-musl +``` + +3. Zip the executable file and upload it to the AWS console. +- The name of the executable file must be `bootstrap`. +```bash +zip -j bootstrap.zip ./target/x86_64-unknown-linux-musl/release/bootstrap +``` + +4. ...and upload it as a file to the AWS console */ pub(crate) mod core; +pub(crate) mod utils; pub use core::boot::run; + +// #[cfg(feature = "aws_lambda")] +pub use core::boot::run_on_aws_lambda; + pub mod di; /// header constants @@ -457,6 +495,27 @@ impl RupringFactory { return result; } + + #[cfg(feature = "aws-lambda")] + pub fn listen_on_aws_lambda(self) -> Result<(), Box> { + use tokio::runtime::Builder; + + let mut runtime_builder = Builder::new_multi_thread(); + + runtime_builder.enable_all(); + + if let Some(thread_limit) = self.application_properties.server.thread_limit { + runtime_builder.worker_threads(thread_limit); + } + + let runtime = Builder::new_multi_thread().enable_all().build()?; + + let result = runtime.block_on(async { + core::run_server_on_aws_lambda(self.application_properties, self.root_module).await + }); + + return result; + } } /// RupringDto derive macro diff --git a/rupring/src/utils/hyper.rs b/rupring/src/utils/hyper.rs new file mode 100644 index 0000000..615ffed --- /dev/null +++ b/rupring/src/utils/hyper.rs @@ -0,0 +1,65 @@ +use std::collections::HashMap; + +use http_body_util::BodyExt; +use hyper::{header::HeaderName, Method, Uri}; +use hyper_util::rt::TokioIo; +use tokio::net::TcpStream; + +#[allow(dead_code)] +pub struct HTTPResponse { + pub status_code: u16, + pub headers: hyper::HeaderMap, + pub body: String, +} + +pub async fn send_http_request( + url: Uri, + method: Method, + headers: HashMap, + request_body: String, +) -> anyhow::Result { + let host = url + .authority() + .ok_or(anyhow::anyhow!("host:port is not set"))?; + + let stream = TcpStream::connect(host.as_str()).await?; + + let (mut sender, connection) = + hyper::client::conn::http1::handshake(TokioIo::new(stream)).await?; + + tokio::task::spawn(async move { + if let Err(err) = connection.await { + println!("Connection failed: {:?}", err); + } + }); + + let mut hyper_request = hyper::Request::builder().method(method).uri(url); + + for (key, value) in headers.iter() { + hyper_request = hyper_request.header(key, value); + } + + let hyper_request = hyper_request.body(request_body).unwrap(); + + let mut response = sender.send_request(hyper_request).await?; + + let headers = response.headers().to_owned(); + + let response_body = match response.body_mut().collect().await { + Ok(body) => { + let body = body.to_bytes(); + let body = String::from_utf8(body.to_vec()).unwrap_or("".to_string()); + + body + } + Err(err) => { + return Err(anyhow::anyhow!("Failed to read response body: {:?}", err)); + } + }; + + Ok(HTTPResponse { + status_code: response.status().as_u16(), + headers, + body: response_body, + }) +} diff --git a/rupring/src/utils/mod.rs b/rupring/src/utils/mod.rs new file mode 100644 index 0000000..4342b3c --- /dev/null +++ b/rupring/src/utils/mod.rs @@ -0,0 +1 @@ +pub mod hyper; diff --git a/rupring_example/Cargo.toml b/rupring_example/Cargo.toml index 4429287..b7b4c39 100644 --- a/rupring_example/Cargo.toml +++ b/rupring_example/Cargo.toml @@ -2,9 +2,17 @@ name = "rupring_example" version = "0.1.0" edition = "2021" +default-run = "example" [dependencies] mockall = "0.13.1" -rupring={ version = "0.11.0" } -# rupring={ path="../rupring" } +rupring={ version = "0.12.0", path="../rupring", features=["full"] } serde = { version="1.0.193", features=["derive"] } + +[[bin]] +name = "example" +path = "src/main.rs" + +[[bin]] +name = "bootstrap" +path = "src/lambda.rs" diff --git a/rupring_example/README.md b/rupring_example/README.md index 3eac399..5233b42 100644 --- a/rupring_example/README.md +++ b/rupring_example/README.md @@ -10,12 +10,31 @@ very easy cargo run ``` -## Docker +## Run with Docker ```bash +cd ./rupring_example sudo docker build -t test . sudo docker run -p 8080:8000 test # ... curl http://localhost:8080 ``` + +## Deploy to AWS Lambda + +1. Create an AWS Lambda. The runtime must be either Amazon Linux 2, Amazon Linux 2023. + +2. Compile and create an executable file. + +```bash +rustup target add x86_64-unknown-linux-musl +cargo build --release --target x86_64-unknown-linux-musl +``` + +3. Zip the executable file and upload it to the AWS console. + +```bash +zip -j bootstrap.zip ./target/x86_64-unknown-linux-musl/release/bootstrap +# ...and upload it as a file to the AWS console +``` diff --git a/rupring_example/src/lambda.rs b/rupring_example/src/lambda.rs new file mode 100644 index 0000000..974251d --- /dev/null +++ b/rupring_example/src/lambda.rs @@ -0,0 +1,8 @@ +use domains::root::module::RootModule; + +pub(crate) mod domains; +pub(crate) mod middlewares; + +fn main() { + rupring::run_on_aws_lambda(RootModule {}) +}