Skip to content

Commit

Permalink
Merge pull request #56 from iawia002/config
Browse files Browse the repository at this point in the history
Adjust methods of getting the config
  • Loading branch information
iawia002 authored Apr 8, 2024
2 parents d7a722f + c0f03a9 commit 6947706
Show file tree
Hide file tree
Showing 9 changed files with 58 additions and 70 deletions.
22 changes: 1 addition & 21 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,4 @@ tokio = { version = "1.36.0", features = ["rt", "rt-multi-thread", "macros"] }
tokio-stream = { version = "0.1.14", features = ["net"] }
tonic = { version = "0.11.0", features = ["zstd"] }
sled = "0.34.7"
once_cell = "1.19.0"
2 changes: 1 addition & 1 deletion wacker-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,6 @@ clap.workspace = true
tokio.workspace = true
tokio-stream.workspace = true
tonic.workspace = true
once_cell.workspace = true

tabled = "0.15.0"
once_cell = "1.18.0"
23 changes: 12 additions & 11 deletions wacker-daemon/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use tokio::net::UnixListener;
use tokio::signal;
use tokio_stream::wrappers::UnixListenerStream;
use tonic::codec::CompressionEncoding;
use wacker::{Config, Server, WackerServer};
use wacker::{get_db_path, get_logs_dir, get_sock_path, Server, WackerServer};

#[derive(Parser)]
#[command(name = "wackerd")]
Expand All @@ -23,20 +23,21 @@ fn version() -> &'static str {

impl WackerDaemon {
async fn execute(self) -> Result<()> {
let config = Config::new()?;
if config.sock_path.exists() {
let sock_path = get_sock_path()?;
if sock_path.exists() {
bail!("wackerd socket file exists, is wackerd already running?");
}

let parent_path = config.sock_path.parent().unwrap();
let parent_path = sock_path.parent().unwrap();
if !parent_path.exists() {
create_dir_all(parent_path)?;
}
if !config.logs_dir.exists() {
create_dir(config.logs_dir.clone())?;
let logs_dir = get_logs_dir()?;
if !logs_dir.exists() {
create_dir(logs_dir)?;
}

let uds = UnixListener::bind(config.sock_path.clone())?;
let uds = UnixListener::bind(sock_path)?;
let uds_stream = UnixListenerStream::new(uds);

Builder::new()
Expand All @@ -54,20 +55,20 @@ impl WackerDaemon {
.write_style(WriteStyle::Never)
.init();

let db = sled::open(config.db_path.clone())?;
let db = sled::open(get_db_path()?)?;

let service = WackerServer::new(Server::new(config.clone(), db.clone()).await?)
let service = WackerServer::new(Server::new(db.clone(), logs_dir).await?)
.send_compressed(CompressionEncoding::Zstd)
.accept_compressed(CompressionEncoding::Zstd);

info!("server listening on {:?}", config.sock_path);
info!("server listening on {:?}", sock_path);
tonic::transport::Server::builder()
.add_service(service)
.serve_with_incoming_shutdown(uds_stream, async {
signal::ctrl_c().await.expect("failed to listen for event");
println!();
info!("Shutting down the server");
remove_file(config.sock_path).expect("failed to remove existing socket file");
remove_file(sock_path).expect("failed to remove existing socket file");
db.flush_async().await.expect("failed to flush the db");
})
.await?;
Expand Down
2 changes: 1 addition & 1 deletion wacker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ tokio.workspace = true
tokio-stream.workspace = true
tonic.workspace = true
sled.workspace = true
once_cell.workspace = true

dirs = "5.0.1"
wasi-common = { version = "19.0.0", features = ["tokio"] }
Expand All @@ -30,7 +31,6 @@ cap-std = "3.0.0"
rand = "0.8.5"
tower = "0.4.13"
prost = "0.12.3"
const_format = "0.2.32"
async-stream = "0.3.5"
hyper = "1.1.0"
http = "1.1.0"
Expand Down
50 changes: 28 additions & 22 deletions wacker/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,30 +1,36 @@
use anyhow::{bail, Result};
use const_format::concatcp;
use anyhow::{anyhow, Result};
use once_cell::sync::OnceCell;
use std::path::PathBuf;

const MAIN_DIR: &str = ".wacker";
const SOCK_PATH: &str = concatcp!(MAIN_DIR, "/wacker.sock");
const LOGS_DIR: &str = concatcp!(MAIN_DIR, "/logs");
const DB_PATH: &str = concatcp!(MAIN_DIR, "/db");

#[derive(Clone)]
pub struct Config {
pub sock_path: PathBuf,
pub logs_dir: PathBuf,
pub db_path: PathBuf,
static SOCK_PATH: OnceCell<PathBuf> = OnceCell::new();
static LOGS_DIR: OnceCell<PathBuf> = OnceCell::new();
static DB_PATH: OnceCell<PathBuf> = OnceCell::new();

pub fn get_sock_path() -> Result<&'static PathBuf> {
SOCK_PATH.get_or_try_init(|| -> Result<PathBuf> {
match dirs::home_dir() {
Some(home_dir) => Ok(home_dir.join(MAIN_DIR).join("wacker.sock")),
None => Err(anyhow!("can't get home dir")),
}
})
}

pub fn get_logs_dir() -> Result<&'static PathBuf> {
LOGS_DIR.get_or_try_init(|| -> Result<PathBuf> {
match dirs::home_dir() {
Some(home_dir) => Ok(home_dir.join(MAIN_DIR).join("logs")),
None => Err(anyhow!("can't get home dir")),
}
})
}

impl Config {
pub fn new() -> Result<Self> {
let home_dir = dirs::home_dir();
if home_dir.is_none() {
bail!("can't get home dir");
pub fn get_db_path() -> Result<&'static PathBuf> {
DB_PATH.get_or_try_init(|| -> Result<PathBuf> {
match dirs::home_dir() {
Some(home_dir) => Ok(home_dir.join(MAIN_DIR).join("db")),
None => Err(anyhow!("can't get home dir")),
}
let home_dir = home_dir.unwrap();
Ok(Self {
sock_path: home_dir.join(SOCK_PATH),
logs_dir: home_dir.join(LOGS_DIR),
db_path: home_dir.join(DB_PATH),
})
}
})
}
4 changes: 2 additions & 2 deletions wacker/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@ pub const PROGRAM_STATUS_ERROR: u32 = 2;
pub const PROGRAM_STATUS_STOPPED: u32 = 3;

pub async fn new_client() -> Result<WackerClient<Channel>> {
let config = Config::new()?;
let sock_path = get_sock_path()?;

let channel = Endpoint::try_from("http://[::]:50051")?
.connect_with_connector(service_fn(move |_| {
// Connect to a Uds socket
UnixStream::connect(config.sock_path.to_str().unwrap().to_string())
UnixStream::connect(sock_path)
}))
.await?;

Expand Down
9 changes: 5 additions & 4 deletions wacker/src/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ use std::collections::HashMap;
use std::fs::File;
use std::sync::Arc;

pub use self::{http::HttpEngine, wasi::WasiEngine};

#[derive(Clone, Default, Serialize, Deserialize)]
pub struct ProgramMeta {
pub path: String,
Expand All @@ -28,8 +26,11 @@ pub const PROGRAM_TYPE_HTTP: u32 = 1;
pub fn new_engines() -> Result<HashMap<u32, Arc<dyn Engine>>> {
let wasmtime_engine = new_wasmtime_engine()?;
let mut engines: HashMap<u32, Arc<dyn Engine>> = HashMap::new();
engines.insert(PROGRAM_TYPE_WASI, Arc::new(WasiEngine::new(wasmtime_engine.clone())));
engines.insert(PROGRAM_TYPE_HTTP, Arc::new(HttpEngine::new(wasmtime_engine)));
engines.insert(
PROGRAM_TYPE_WASI,
Arc::new(wasi::WasiEngine::new(wasmtime_engine.clone())),
);
engines.insert(PROGRAM_TYPE_HTTP, Arc::new(http::HttpEngine::new(wasmtime_engine)));

Ok(engines)
}
Expand Down
15 changes: 7 additions & 8 deletions wacker/src/server.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use crate::config::*;
use crate::runtime::{new_engines, Engine, ProgramMeta, PROGRAM_TYPE_HTTP, PROGRAM_TYPE_WASI};
use crate::utils::generate_random_string;
use crate::{
Expand All @@ -14,7 +13,7 @@ use std::collections::HashMap;
use std::fmt::Display;
use std::fs::{remove_file, OpenOptions};
use std::io::{ErrorKind, SeekFrom, Write};
use std::path::Path;
use std::path::{Path, PathBuf};
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::time::Duration;
Expand All @@ -31,7 +30,7 @@ pub struct Server {
db: Db,
engines: HashMap<u32, Arc<dyn Engine>>,
programs: Arc<Mutex<HashMap<String, InnerProgram>>>,
config: Config,
logs_dir: &'static PathBuf,
}

struct InnerProgram {
Expand All @@ -58,12 +57,12 @@ impl TryFrom<&mut InnerProgram> for Program {
}

impl Server {
pub async fn new(config: Config, db: Db) -> Result<Self, Error> {
pub async fn new(db: Db, logs_dir: &'static PathBuf) -> Result<Self, Error> {
let service = Self {
db,
engines: new_engines()?,
programs: Arc::new(Mutex::new(HashMap::new())),
config,
logs_dir,
};
service.load_from_db().await?;

Expand Down Expand Up @@ -92,7 +91,7 @@ impl Server {
let mut stdout = OpenOptions::new()
.create(true)
.append(true)
.open(self.config.logs_dir.join(id.clone()))?;
.open(self.logs_dir.join(id.clone()))?;
let stdout_clone = stdout.try_clone()?;

programs.insert(
Expand Down Expand Up @@ -266,7 +265,7 @@ impl Wacker for Server {
program.handler.abort();
}

if let Err(err) = remove_file(self.config.logs_dir.join(req.id.clone())) {
if let Err(err) = remove_file(self.logs_dir.join(req.id.clone())) {
if err.kind() != ErrorKind::NotFound {
return Err(Status::internal(format!(
"failed to remove the log file for {}: {}",
Expand All @@ -288,7 +287,7 @@ impl Wacker for Server {
async fn logs(&self, request: Request<LogRequest>) -> Result<Response<Self::LogsStream>, Status> {
let req = request.into_inner();

let mut file = File::open(self.config.logs_dir.join(req.id)).await?;
let mut file = File::open(self.logs_dir.join(req.id)).await?;
let mut contents = String::new();
let last_position = file.read_to_string(&mut contents).await?;
let lines: Vec<&str> = contents.split_inclusive('\n').collect();
Expand Down

0 comments on commit 6947706

Please sign in to comment.