Skip to content

Commit

Permalink
auto shut down
Browse files Browse the repository at this point in the history
  • Loading branch information
vxcall committed Sep 5, 2024
1 parent cc96d05 commit edfa240
Show file tree
Hide file tree
Showing 7 changed files with 135 additions and 31 deletions.
14 changes: 14 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ tokio = { version = "1.40.0", features = ["full"] }
aws-config = { version= "1.5.5", features = ["behavior-version-latest"] }
aws-sdk-dynamodb = "1.43.0"
aws-sdk-bedrockruntime = { version = "1.47.0", features = ["behavior-version-latest"] }
futures-util = "0.3"

[dependencies.uuid]
version = "1.10.0"
Expand Down
105 changes: 94 additions & 11 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
use std::sync::Arc;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};

use actix_web::dev::{forward_ready, ServiceRequest, ServiceResponse, Transform};
use actix_web::Error;
use actix_web::{dev::Service, middleware::Logger, web, App, HttpServer};
use anyhow::Result;

use actix_web::{middleware::Logger, web, App, HttpServer};
use aws_sdk_bedrockruntime as bedrock;
use aws_sdk_dynamodb::Client;
use futures_util::future::LocalBoxFuture;
use redis::aio::MultiplexedConnection;
use utils::app_state::AppState;
use aws_sdk_bedrockruntime as bedrock;

mod routes;
mod utils;
Expand All @@ -30,6 +33,67 @@ impl RedisClient {
}
}

struct LastActivityTime(Mutex<Instant>);

struct InactivityMiddleware {
last_activity: Arc<LastActivityTime>,
shutdown_duration: Duration,
}

impl<S, B> Transform<S, ServiceRequest> for InactivityMiddleware
where
S: Service<ServiceRequest, Response = ServiceResponse<B>, Error = Error>,
S::Future: 'static,
B: 'static,
{
type Response = ServiceResponse<B>;
type Error = Error;
type InitError = ();
type Transform = InactivityMiddlewareService<S>;
type Future = std::future::Ready<Result<Self::Transform, Self::InitError>>;

fn new_transform(&self, service: S) -> Self::Future {
std::future::ready(Ok(InactivityMiddlewareService {
service,
last_activity: self.last_activity.clone(),
shutdown_duration: self.shutdown_duration,
}))
}
}

struct InactivityMiddlewareService<S> {
service: S,
last_activity: Arc<LastActivityTime>,
#[allow(dead_code)]
shutdown_duration: Duration,
}

impl<S, B> Service<ServiceRequest> for InactivityMiddlewareService<S>
where
S: Service<ServiceRequest, Response = ServiceResponse<B>, Error = Error>,
S::Future: 'static,
B: 'static,
{
type Response = ServiceResponse<B>;
type Error = Error;
type Future = LocalBoxFuture<'static, Result<Self::Response, Self::Error>>;

forward_ready!(service);

fn call(&self, req: ServiceRequest) -> Self::Future {
let last_activity = self.last_activity.clone();
let mut last_activity = last_activity.0.lock().unwrap();
*last_activity = Instant::now();
drop(last_activity);

let fut = self.service.call(req);
Box::pin(async move {
let res = fut.await?;
Ok(res)
})
}
}

#[actix_web::main]
async fn main() -> Result<()> {
if std::env::var_os("RUST_LOG").is_none() {
Expand All @@ -40,7 +104,7 @@ async fn main() -> Result<()> {
dotenv::dotenv().ok();
env_logger::init();

println!("[+] env initialized succesfully");
println!("[+] env initialized successfully");
let address = (utils::environment_variables::ADDRESS).clone();
let port = (utils::environment_variables::PORT).clone();

Expand All @@ -54,22 +118,41 @@ async fn main() -> Result<()> {

println!("[+] server start listening...");

HttpServer::new(move || {
let last_activity = Arc::new(LastActivityTime(Mutex::new(Instant::now())));
let last_activity_clone = last_activity.clone();

let server = HttpServer::new(move || {
App::new()
.app_data(web::Data::new(AppState {
redis_client: redis_client.clone(),
dynamo_client: Arc::clone(&dynamo_client),
bedrock_client: Arc::clone(&bedrock_client),
}))
.wrap(Logger::default())
.wrap(InactivityMiddleware {
last_activity: last_activity_clone.clone(),
shutdown_duration: Duration::from_secs(300), // 5 minutes
})
.configure(routes::auth_routes::config)
.configure(routes::user_routes::config)
.configure(routes::index_routes::config)
.configure(routes::map_routes::config)
})
.bind((address, port))
.map_err(anyhow::Error::from)?
.run()
.await
.map_err(anyhow::Error::from)
.bind((address, port))?;

let server_handle = server.run();

// Spawn a task to check for inactivity
actix_web::rt::spawn(async move {
loop {
tokio::time::sleep(Duration::from_secs(60)).await;
let last_activity = last_activity.0.lock().unwrap();
if last_activity.elapsed() > Duration::from_secs(300) {
println!("[-] No activity for 5 minutes. Shutting down.");
std::process::exit(0);
}
}
});

server_handle.await.map_err(anyhow::Error::from)
}
38 changes: 22 additions & 16 deletions src/routes/handlers/map_handlers.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use actix_web::{post, web};
use crate::utils::{api_response::ApiResponse, app_state};
use actix_web::{post, web};
use aws_sdk_bedrockruntime::primitives::Blob;
use serde_json::json;

Expand All @@ -13,28 +13,34 @@ struct VibeRequest {
const REQUEST_MAX_TOKENS: u32 = 200;

#[post("/map")]
pub async fn index(app_state: web::Data<app_state::AppState>,
vibe:web::Json<VibeRequest>) -> Result<ApiResponse, ApiResponse> {

pub async fn index(
app_state: web::Data<app_state::AppState>,
vibe: web::Json<VibeRequest>,
) -> Result<ApiResponse, ApiResponse> {
println!("[+] /map - invoked {:#?}", vibe.0);

let mut request_body = json!({
"inputText": vibe.input_text,
"textGenerationConfig": {}
});
request_body["textGenerationConfig"]["maxTokenCount"] = json!(vibe.max_tokens.unwrap_or(REQUEST_MAX_TOKENS));
request_body["textGenerationConfig"]["maxTokenCount"] =
json!(vibe.max_tokens.unwrap_or(REQUEST_MAX_TOKENS));

let result = app_state.bedrock_client
.invoke_model()
.model_id("amazon.titan-text-express-v1")
.content_type("application/json")
.body(Blob::new(serde_json::to_string(&request_body).unwrap()))
.send()
.await
.map_err(|err| {
println!("map_handlers /map response error: {:#?}", err);
ApiResponse::new(500, "The service is unable to respond at this time".to_string())}
)?;
let result = app_state
.bedrock_client
.invoke_model()
.model_id("amazon.titan-text-express-v1")
.content_type("application/json")
.body(Blob::new(serde_json::to_string(&request_body).unwrap()))
.send()
.await
.map_err(|err| {
println!("map_handlers /map response error: {:#?}", err);
ApiResponse::new(
500,
"The service is unable to respond at this time".to_string(),
)
})?;

let output = std::str::from_utf8(result.body().as_ref()).unwrap();
println!("[+] /map - invoked - got response {}", output);
Expand Down
2 changes: 1 addition & 1 deletion src/routes/handlers/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
pub mod auth_handlers;
pub mod index_handlers;
pub mod map_handlers;
pub mod user_handlers;
pub mod map_handlers;
2 changes: 1 addition & 1 deletion src/routes/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@ pub mod middlewares;

pub mod auth_routes;
pub mod index_routes;
pub mod map_routes;
pub mod user_routes;
pub mod map_routes;
4 changes: 2 additions & 2 deletions src/utils/app_state.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
use std::sync::Arc;

use actix_web::web;
use aws_sdk_dynamodb::Client;
use aws_sdk_bedrockruntime::Client as BedrockClient;
use aws_sdk_dynamodb::Client;

use crate::RedisClient;

pub struct AppState {
pub redis_client: web::Data<RedisClient>,
pub dynamo_client: Arc<Client>,
pub bedrock_client: Arc<BedrockClient>
pub bedrock_client: Arc<BedrockClient>,
}

0 comments on commit edfa240

Please sign in to comment.