Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wacker: use fuzzy id matching in delete/stop/restart/logs operations #68

Merged
merged 1 commit into from
Apr 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions wacker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ env_logger = "0.11.3"
chrono = "0.4.38"
sled = "0.34.7"
log = "0.4.21"
rayon = "1.10.0"

[build-dependencies]
anyhow.workspace = true
Expand Down
79 changes: 49 additions & 30 deletions wacker/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use async_stream::try_stream;
use async_trait::async_trait;
use log::{error, info, warn};
use parking_lot::Mutex;
use rayon::prelude::*;
use sled::Db;
use std::fmt::Display;
use std::fs::{remove_file, OpenOptions};
Expand Down Expand Up @@ -135,6 +136,23 @@ impl Server {
Err(err) => Err(Status::internal(err.to_string())),
}
}

fn get_program_keys(&self) -> Vec<String> {
let programs = self.programs.lock();
programs.par_iter().map(|(key, _)| key.clone()).collect()
}
}

fn search_id(keys: &Vec<String>, id: &str) -> Result<String> {
let positions: Vec<_> = keys.par_iter().positions(|s| s.starts_with(id)).collect();
match positions.len() {
0 => Err(anyhow!("program {} not found", id)),
1 => Ok(keys[positions[0]].clone()),
_ => Err(anyhow!(
"ambiguous program id {}, more than one program starts with this id",
id
)),
}
}

fn to_status<E: Display>(err: E) -> Status {
Expand Down Expand Up @@ -223,38 +241,35 @@ impl Wacker for Server {

async fn stop(&self, request: Request<StopRequest>) -> Result<Response<()>, Status> {
let req = request.into_inner();
let keys = self.get_program_keys();
let mut programs = self.programs.lock();

for id in req.ids {
let id = search_id(keys.as_ref(), id.as_str()).map_err(to_status)?;

info!("Stop the program: {}", id);

match programs.get_mut(id.as_str()) {
Some(program) => {
if !program.handler.is_finished() {
program.handler.abort();
program.status = PROGRAM_STATUS_STOPPED;
}
}
None => return Err(Status::not_found(format!("program {} not exists", id))),
let program = programs.get_mut(id.as_str()).unwrap();
if !program.handler.is_finished() {
program.handler.abort();
program.status = PROGRAM_STATUS_STOPPED;
}
}
Ok(Response::new(()))
}

async fn restart(&self, request: Request<RestartRequest>) -> Result<Response<()>, Status> {
let req = request.into_inner();
let keys = self.get_program_keys();

for id in req.ids {
let id = search_id(keys.as_ref(), id.as_str()).map_err(to_status)?;

info!("Restart the program: {}", id);

let meta = {
let programs = self.programs.lock();
let program = programs.get(id.as_str());
if program.is_none() {
return Err(Status::not_found(format!("program {} not exists", id)));
}

let program = program.unwrap();
let program = programs.get(id.as_str()).unwrap();
if !program.handler.is_finished() {
program.handler.abort();
}
Expand All @@ -268,29 +283,31 @@ impl Wacker for Server {

async fn delete(&self, request: Request<DeleteRequest>) -> Result<Response<()>, Status> {
let req = request.into_inner();
let keys = self.get_program_keys();
let mut programs = self.programs.lock();

for id in req.ids {
let id = search_id(keys.as_ref(), id.as_str()).map_err(to_status)?;

info!("Delete the program: {}", id);

if let Some(program) = programs.get(id.as_str()) {
if !program.handler.is_finished() {
program.handler.abort();
}
let program = programs.get(id.as_str()).unwrap();
if !program.handler.is_finished() {
program.handler.abort();
}

if let Err(err) = remove_file(self.logs_dir.join(id.as_str())) {
if err.kind() != ErrorKind::NotFound {
return Err(Status::internal(format!(
"failed to remove the log file for {}: {}",
id.as_str(),
err
)));
}
if let Err(err) = remove_file(self.logs_dir.join(id.as_str())) {
if err.kind() != ErrorKind::NotFound {
return Err(Status::internal(format!(
"failed to remove the log file for {}: {}",
id.as_str(),
err
)));
}

self.db.remove(id.as_str()).map_err(to_status)?;
programs.remove(id.as_str());
}

self.db.remove(id.as_str()).map_err(to_status)?;
programs.remove(id.as_str());
}
Ok(Response::new(()))
}
Expand All @@ -299,8 +316,10 @@ impl Wacker for Server {

async fn logs(&self, request: Request<LogRequest>) -> Result<Response<Self::LogsStream>, Status> {
let req = request.into_inner();
let keys = self.get_program_keys();
let id = search_id(keys.as_ref(), req.id.as_str()).map_err(to_status)?;

let mut file = File::open(self.logs_dir.join(req.id)).await?;
let mut file = File::open(self.logs_dir.join(id)).await?;
let mut contents = String::new();
let last_position = file.read_to_string(&mut contents).await?;
let lines: Vec<&str> = contents.split_inclusive('\n').collect();
Expand Down
68 changes: 53 additions & 15 deletions wacker/tests/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,16 +98,19 @@ async fn stop() -> Result<()> {
server.start().await;

let mut client = server.client().await;
let response = client
client
.run(RunRequest {
path: "./tests/wasm/time.wasm".parse()?,
args: vec![],
})
.await?
.into_inner();
.await?;
sleep(Duration::from_secs(1)).await;

client.stop(StopRequest { ids: vec![response.id] }).await?;
client
.stop(StopRequest {
ids: vec!["t".to_string()],
})
.await?;
sleep(Duration::from_secs(1)).await;

let response = client.list(()).await?.into_inner();
Expand Down Expand Up @@ -139,11 +142,9 @@ async fn restart() -> Result<()> {
.into_inner();
sleep(Duration::from_secs(1)).await;

let response = client.restart(RestartRequest { ids: vec![run_resp.id] }).await;
assert!(response.is_ok());
let response = client
.restart(RestartRequest {
ids: vec![serve_resp.id],
ids: vec![run_resp.id, serve_resp.id],
})
.await;
assert!(response.is_ok());
Expand All @@ -158,16 +159,19 @@ async fn delete() -> Result<()> {
server.start().await;

let mut client = server.client().await;
let response = client
client
.run(RunRequest {
path: "./tests/wasm/hello.wasm".parse()?,
args: vec![],
})
.await?
.into_inner();
.await?;
sleep(Duration::from_secs(1)).await;

client.delete(DeleteRequest { ids: vec![response.id] }).await?;
client
.delete(DeleteRequest {
ids: vec!["h".to_string()],
})
.await?;

let response = client.list(()).await?.into_inner();
assert_eq!(response.programs.len(), 0);
Expand All @@ -177,23 +181,57 @@ async fn delete() -> Result<()> {
}

#[tokio::test(flavor = "multi_thread")]
async fn logs() -> Result<()> {
async fn delete_ambiguous_id() -> Result<()> {
let mut server = TestServer::new();
server.start().await;

let mut client = server.client().await;
client
.run(RunRequest {
path: "./tests/wasm/hello.wasm".parse()?,
args: vec![],
})
.await?;
client
.run(RunRequest {
path: "./tests/wasm/hello.wasm".parse()?,
args: vec![],
})
.await?;
sleep(Duration::from_secs(1)).await;

let response = client
.delete(DeleteRequest {
ids: vec!["hello".to_string()],
})
.await;
assert!(response.is_err());
assert_eq!(
response.err().unwrap().message(),
"ambiguous program id hello, more than one program starts with this id"
);

server.shutdown().await;
Ok(())
}

#[tokio::test(flavor = "multi_thread")]
async fn logs() -> Result<()> {
let mut server = TestServer::new();
server.start().await;

let mut client = server.client().await;
client
.run(RunRequest {
path: "./tests/wasm/hello.wasm".parse()?,
args: vec![],
})
.await?
.into_inner();
.await?;
sleep(Duration::from_secs(3)).await;

let mut response = client
.logs(LogRequest {
id: response.id,
id: "hello".to_string(),
follow: false,
tail: 1,
})
Expand Down
Loading