Skip to content

Commit

Permalink
Merge pull request #173 from myyrakle/feat/#147
Browse files Browse the repository at this point in the history
[#147] AWS Lambda용 부팅 옵션 추가
  • Loading branch information
myyrakle authored Dec 10, 2024
2 parents cbd77ca + 3aeb973 commit 5dc533d
Show file tree
Hide file tree
Showing 16 changed files with 469 additions and 116 deletions.
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,6 @@ Cargo.lock

test.json

.vscode
.vscode

*.zip
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# rupring

![](https://img.shields.io/badge/language-Rust-red) ![](https://img.shields.io/badge/version-0.11.0-brightgreen) [![GitHub license](https://img.shields.io/badge/license-MIT-blue.svg)](https://github.com/myyrakle/rupring/blob/master/LICENSE)
![](https://img.shields.io/badge/language-Rust-red) ![](https://img.shields.io/badge/version-0.12.0-brightgreen) [![GitHub license](https://img.shields.io/badge/license-MIT-blue.svg)](https://github.com/myyrakle/rupring/blob/master/LICENSE)

Spring Comes to Rust

Expand Down
7 changes: 0 additions & 7 deletions application.properties

This file was deleted.

10 changes: 8 additions & 2 deletions rupring/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "rupring"
version = "0.11.0"
version = "0.12.0"
edition = "2021"
license = "MIT"
authors = ["myyrakle <[email protected]>"]
Expand Down Expand Up @@ -36,4 +36,10 @@ features = [
]

[target.'cfg(target_os = "linux")'.dependencies]
signal-hook = "0.3.17"
signal-hook = "0.3.17"

[features]
default = []

full = ["aws-lambda"]
aws-lambda = []
10 changes: 10 additions & 0 deletions rupring/src/core/boot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,13 @@ where

app.listen().unwrap();
}

#[cfg(feature = "aws-lambda")]
pub fn run_on_aws_lambda<T>(root_module: T)
where
T: IModule + Clone + Copy + Sync + Send + 'static,
{
let app = crate::RupringFactory::create(root_module);

app.listen_on_aws_lambda().unwrap();
}
80 changes: 80 additions & 0 deletions rupring/src/core/bootings/aws_lambda.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
use std::{collections::HashMap, str::FromStr};

use hyper::Uri;
use serde::Serialize;

use crate::utils;

// Reference: https://docs.aws.amazon.com/ko_kr/lambda/latest/dg/runtimes-api.html
fn get_aws_lambda_runtime_api() -> Option<String> {
std::env::var("AWS_LAMBDA_RUNTIME_API").ok()
}

#[derive(Debug, Default, Clone)]
pub struct LambdaRequestContext {
pub aws_request_id: String,
pub response_body: String,
}

pub async fn get_request_context() -> anyhow::Result<LambdaRequestContext> {
let aws_lambda_runtime_api = match get_aws_lambda_runtime_api() {
Some(api) => api,
None => return Err(anyhow::anyhow!("AWS_LAMBDA_RUNTIME_API is not set")),
};

let url = Uri::from_str(
format!("http://{aws_lambda_runtime_api}/2018-06-01/runtime/invocation/next").as_str(),
)?;

let mut headers = HashMap::new();
headers.insert(hyper::header::HOST, "localhost".to_owned());

let response =
utils::hyper::send_http_request(url, hyper::Method::GET, headers, "".to_owned()).await?;

let mut request_context = LambdaRequestContext::default();

if let Some(aws_request_id) = response.headers.get("Lambda-Runtime-Aws-Request-Id") {
request_context.aws_request_id = aws_request_id.to_str()?.to_string();
}

request_context.response_body = response.body;

Ok(request_context)
}

#[derive(Debug, Default, Clone, Serialize)]
pub struct LambdaReponse {
#[serde(rename = "statusCode")]
pub status_code: u16,
#[serde(rename = "headers")]
pub headers: HashMap<String, String>,
#[serde(rename = "body")]
pub body: String,
}

pub async fn send_response_to_lambda(
aws_request_id: String,
response: LambdaReponse,
) -> anyhow::Result<()> {
let response = serde_json::to_string(&response)?;

let aws_lambda_runtime_api = match get_aws_lambda_runtime_api() {
Some(api) => api,
None => return Err(anyhow::anyhow!("AWS_LAMBDA_RUNTIME_API is not set")),
};

let url = Uri::from_str(
format!(
"http://{aws_lambda_runtime_api}/2018-06-01/runtime/invocation/{aws_request_id}/response"
)
.as_str(),
)?;

let mut headers = HashMap::new();
headers.insert(hyper::header::HOST, "localhost".to_owned());

let _ = utils::hyper::send_http_request(url, hyper::Method::POST, headers, response).await?;

Ok(())
}
1 change: 1 addition & 0 deletions rupring/src/core/bootings/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub mod aws_lambda;
20 changes: 20 additions & 0 deletions rupring/src/core/compression.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
pub fn compress_with_gzip(body: &[u8]) -> anyhow::Result<Vec<u8>> {
use std::io::Write;

let mut encoder = flate2::write::GzEncoder::new(Vec::new(), flate2::Compression::default());
encoder.write_all(body)?;
let compressed = encoder.finish()?;

Ok(compressed)
}

pub fn compress_with_deflate(body: &[u8]) -> anyhow::Result<Vec<u8>> {
use std::io::Write;

let mut encoder =
flate2::write::DeflateEncoder::new(Vec::new(), flate2::Compression::default());
encoder.write_all(body)?;
let compressed = encoder.finish()?;

Ok(compressed)
}
74 changes: 73 additions & 1 deletion rupring/src/core/graceful.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,11 @@
use std::sync::{atomic::AtomicBool, Arc};
use std::sync::{
atomic::{AtomicBool, AtomicU64},
Arc,
};

use log::Level;

use crate::{application_properties, logger::print_system_log};

#[derive(Debug, Clone)]
pub struct SignalFlags {
Expand All @@ -24,3 +31,68 @@ impl SignalFlags {
Ok(())
}
}

pub fn handle_graceful_shutdown(
application_properties: &application_properties::ApplicationProperties,
service_avaliable: Arc<AtomicBool>,
running_task_count: Arc<AtomicU64>,
) {
let signal_flags = SignalFlags::new();
let shutdown_timeout_duration = application_properties.server.shutdown_timeout_duration();

if let Err(error) = signal_flags.register_hooks() {
print_system_log(
Level::Error,
format!("Error registering signal hooks: {:?}", error).as_str(),
);
} else {
print_system_log(Level::Info, "Graceful shutdown enabled");

let service_avaliable = Arc::clone(&service_avaliable);
let running_task_count = Arc::clone(&running_task_count);
tokio::spawn(async move {
let sigterm = Arc::clone(&signal_flags.sigterm);
let sigint = Arc::clone(&signal_flags.sigint);

loop {
if sigterm.load(std::sync::atomic::Ordering::Relaxed) {
print_system_log(
Level::Info,
"SIGTERM received. Try to shutdown gracefully...",
);
service_avaliable.store(false, std::sync::atomic::Ordering::Release);
break;
}

if sigint.load(std::sync::atomic::Ordering::Relaxed) {
print_system_log(
Level::Info,
"SIGINT received. Try to shutdown gracefully...",
);
service_avaliable.store(false, std::sync::atomic::Ordering::Release);
break;
}

tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
}

let shutdown_request_time = std::time::Instant::now();

loop {
if running_task_count.load(std::sync::atomic::Ordering::Relaxed) == 0 {
print_system_log(Level::Info, "All tasks are done. Shutting down...");
std::process::exit(0);
}

// timeout 지나면 강제로 종료
let now = std::time::Instant::now();
if now.duration_since(shutdown_request_time) >= shutdown_timeout_duration {
print_system_log(Level::Info, "Shutdown timeout reached. Forcing shutdown...");
std::process::exit(0);
}

tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
}
});
}
}
Loading

0 comments on commit 5dc533d

Please sign in to comment.