Skip to content

Commit

Permalink
Merge pull request #67 from iawia002/batch-ops
Browse files Browse the repository at this point in the history
Support delete/stop/restart multiple programs at once
  • Loading branch information
iawia002 authored Apr 24, 2024
2 parents 4963c1c + c61d1eb commit 2dd1f25
Show file tree
Hide file tree
Showing 8 changed files with 77 additions and 65 deletions.
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,9 @@ Commands:
run Runs a WebAssembly program
serve Serves an HTTP WebAssembly program
list Lists running WebAssembly programs [aliases: ps]
stop Stops a WebAssembly program
restart Restarts a WebAssembly program
delete Deletes a WebAssembly program [aliases: rm]
stop Stops WebAssembly programs
restart Restarts WebAssembly programs
delete Deletes WebAssembly programs [aliases: rm]
logs Fetches logs of a program [aliases: log]
help Print this message or the help of the given subcommand(s)
Expand Down
8 changes: 4 additions & 4 deletions wacker-cli/src/commands/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@ use wacker::{Client, DeleteRequest};

#[derive(Parser)]
pub struct DeleteCommand {
/// Program ID
#[arg(required = true)]
id: String,
/// Program IDs
#[arg(required = true, value_name = "IDs")]
ids: Vec<String>,
}

impl DeleteCommand {
pub async fn execute(self, mut client: Client<Channel>) -> Result<()> {
match client.delete(DeleteRequest { id: self.id }).await {
match client.delete(DeleteRequest { ids: self.ids }).await {
Ok(_) => Ok(()),
Err(err) => Err(anyhow!(err.message().to_string())),
}
Expand Down
8 changes: 4 additions & 4 deletions wacker-cli/src/commands/restart.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@ use wacker::{Client, RestartRequest};

#[derive(Parser)]
pub struct RestartCommand {
/// Program ID
#[arg(required = true)]
id: String,
/// Program IDs
#[arg(required = true, value_name = "IDs")]
ids: Vec<String>,
}

impl RestartCommand {
pub async fn execute(self, mut client: Client<Channel>) -> Result<()> {
match client.restart(RestartRequest { id: self.id }).await {
match client.restart(RestartRequest { ids: self.ids }).await {
Ok(_) => Ok(()),
Err(err) => Err(anyhow!(err.message().to_string())),
}
Expand Down
8 changes: 4 additions & 4 deletions wacker-cli/src/commands/stop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,15 @@ use wacker::{Client, StopRequest};

#[derive(Parser)]
pub struct StopCommand {
/// Program ID
#[arg(required = true)]
id: String,
/// Program IDs
#[arg(required = true, value_name = "IDs")]
ids: Vec<String>,
}

impl StopCommand {
/// Executes the command.
pub async fn execute(self, mut client: Client<Channel>) -> Result<()> {
match client.stop(StopRequest { id: self.id }).await {
match client.stop(StopRequest { ids: self.ids }).await {
Ok(_) => Ok(()),
Err(err) => Err(anyhow!(err.message().to_string())),
}
Expand Down
6 changes: 3 additions & 3 deletions wacker-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ enum Subcommand {
/// Lists running WebAssembly programs
#[command(visible_alias = "ps")]
List(commands::ListCommand),
/// Stops a WebAssembly program
/// Stops WebAssembly programs
Stop(commands::StopCommand),
/// Restarts a WebAssembly program
/// Restarts WebAssembly programs
Restart(commands::RestartCommand),
/// Deletes a WebAssembly program
/// Deletes WebAssembly programs
#[command(visible_alias = "rm")]
Delete(commands::DeleteCommand),
/// Fetches logs of a program
Expand Down
6 changes: 3 additions & 3 deletions wacker/proto/wacker.proto
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,15 @@ message ListResponse {
}

message StopRequest {
string id = 1;
repeated string ids = 1;
}

message RestartRequest {
string id = 1;
repeated string ids = 1;
}

message DeleteRequest {
string id = 1;
repeated string ids = 1;
}

message LogRequest {
Expand Down
88 changes: 48 additions & 40 deletions wacker/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,67 +223,75 @@ impl Wacker for Server {

async fn stop(&self, request: Request<StopRequest>) -> Result<Response<()>, Status> {
let req = request.into_inner();
info!("Stop the program: {}", req.id);

let mut programs = self.programs.lock();
match programs.get_mut(req.id.as_str()) {
Some(program) => {
if !program.handler.is_finished() {
program.handler.abort();
program.status = PROGRAM_STATUS_STOPPED;

for id in req.ids {
info!("Stop the program: {}", id);

match programs.get_mut(id.as_str()) {
Some(program) => {
if !program.handler.is_finished() {
program.handler.abort();
program.status = PROGRAM_STATUS_STOPPED;
}
}
Ok(Response::new(()))
None => return Err(Status::not_found(format!("program {} not exists", id))),
}
None => Err(Status::not_found(format!("program {} not exists", req.id))),
}
Ok(Response::new(()))
}

async fn restart(&self, request: Request<RestartRequest>) -> Result<Response<()>, Status> {
let req = request.into_inner();
info!("Restart the program: {}", req.id);

let meta = {
let programs = self.programs.lock();
let program = programs.get(req.id.as_str());
if program.is_none() {
return Err(Status::not_found(format!("program {} not exists", req.id)));
}
for id in req.ids {
info!("Restart the program: {}", id);

let program = program.unwrap();
if !program.handler.is_finished() {
program.handler.abort();
}
program.meta.clone()
};
let meta = {
let programs = self.programs.lock();
let program = programs.get(id.as_str());
if program.is_none() {
return Err(Status::not_found(format!("program {} not exists", id)));
}

let program = program.unwrap();
if !program.handler.is_finished() {
program.handler.abort();
}
program.meta.clone()
};

self.run_inner(req.id, meta).await.map_err(to_status)?;
self.run_inner(id, meta).await.map_err(to_status)?;
}
Ok(Response::new(()))
}

async fn delete(&self, request: Request<DeleteRequest>) -> Result<Response<()>, Status> {
let req = request.into_inner();
info!("Delete the program: {}", req.id);

let mut programs = self.programs.lock();
if let Some(program) = programs.get(req.id.as_str()) {
if !program.handler.is_finished() {
program.handler.abort();
}

if let Err(err) = remove_file(self.logs_dir.join(req.id.clone())) {
if err.kind() != ErrorKind::NotFound {
return Err(Status::internal(format!(
"failed to remove the log file for {}: {}",
req.id.clone(),
err
)));
for id in req.ids {
info!("Delete the program: {}", id);

if let Some(program) = programs.get(id.as_str()) {
if !program.handler.is_finished() {
program.handler.abort();
}

if let Err(err) = remove_file(self.logs_dir.join(id.as_str())) {
if err.kind() != ErrorKind::NotFound {
return Err(Status::internal(format!(
"failed to remove the log file for {}: {}",
id.as_str(),
err
)));
}
}
}

self.db.remove(req.id.clone()).map_err(to_status)?;
programs.remove(req.id.clone().as_str());
self.db.remove(id.as_str()).map_err(to_status)?;
programs.remove(id.as_str());
}
}

Ok(Response::new(()))
}

Expand Down
12 changes: 8 additions & 4 deletions wacker/tests/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ async fn stop() -> Result<()> {
.into_inner();
sleep(Duration::from_secs(1)).await;

client.stop(StopRequest { id: response.id }).await?;
client.stop(StopRequest { ids: vec![response.id] }).await?;
sleep(Duration::from_secs(1)).await;

let response = client.list(()).await?.into_inner();
Expand Down Expand Up @@ -139,9 +139,13 @@ async fn restart() -> Result<()> {
.into_inner();
sleep(Duration::from_secs(1)).await;

let response = client.restart(RestartRequest { id: run_resp.id }).await;
let response = client.restart(RestartRequest { ids: vec![run_resp.id] }).await;
assert!(response.is_ok());
let response = client.restart(RestartRequest { id: serve_resp.id }).await;
let response = client
.restart(RestartRequest {
ids: vec![serve_resp.id],
})
.await;
assert!(response.is_ok());

server.shutdown().await;
Expand All @@ -163,7 +167,7 @@ async fn delete() -> Result<()> {
.into_inner();
sleep(Duration::from_secs(1)).await;

client.delete(DeleteRequest { id: response.id }).await?;
client.delete(DeleteRequest { ids: vec![response.id] }).await?;

let response = client.list(()).await?.into_inner();
assert_eq!(response.programs.len(), 0);
Expand Down

0 comments on commit 2dd1f25

Please sign in to comment.