diff --git a/Cargo.lock b/Cargo.lock index d7354afc..12c64698 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4499,6 +4499,7 @@ dependencies = [ "reqwest 0.12.5", "serde_json", "simple_moving_average", + "sled", "snops-common", "tarpc", "tokio", @@ -4530,6 +4531,8 @@ name = "snops-common" version = "0.1.0" dependencies = [ "anyhow", + "bincode", + "bytes", "checkpoint", "chrono", "clap", @@ -4545,10 +4548,12 @@ dependencies = [ "regex", "serde", "serde_json", + "sled", "strum_macros", "tarpc", "thiserror", "tokio", + "tracing", "url", "wildmatch", ] diff --git a/crates/snops-agent/Cargo.toml b/crates/snops-agent/Cargo.toml index 69e21229..bca66f71 100644 --- a/crates/snops-agent/Cargo.toml +++ b/crates/snops-agent/Cargo.toml @@ -27,6 +27,7 @@ nix = { workspace = true, features = ["signal"] } reqwest = { workspace = true, features = ["json", "stream"] } serde_json.workspace = true simple_moving_average.workspace = true +sled.workspace = true snops-common = { workspace = true, features = ["aot_cmds"] } tarpc.workspace = true tokio = { workspace = true, features = [ diff --git a/crates/snops-agent/src/db.rs b/crates/snops-agent/src/db.rs new file mode 100644 index 00000000..f0da398b --- /dev/null +++ b/crates/snops-agent/src/db.rs @@ -0,0 +1,80 @@ +use std::{ + io::{Read, Write}, + path::PathBuf, + sync::Mutex, +}; + +use snops_common::{ + db::{error::DatabaseError, tree::DbTree, Database as DatabaseTrait}, + format::{DataFormat, DataReadError, DataWriteError}, +}; + +#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)] +#[repr(u8)] +pub enum AgentDbString { + /// JSON web token of agent. + Jwt, + /// Process ID of node. Used to keep track of zombie node processes. + NodePid, +} + +impl DataFormat for AgentDbString { + type Header = u8; + const LATEST_HEADER: Self::Header = 1; + + fn read_data(reader: &mut R, header: &Self::Header) -> Result { + if *header != Self::LATEST_HEADER { + return Err(DataReadError::unsupported( + "AgentDbString", + Self::LATEST_HEADER, + header, + )); + } + + Ok(match u8::read_data(reader, &())? { + 0 => Self::Jwt, + 1 => Self::NodePid, + _ => return Err(DataReadError::custom("invalid agent DB string type")), + }) + } + + fn write_data(&self, writer: &mut W) -> Result { + (*self as u8).write_data(writer) + } +} + +pub struct Database { + #[allow(unused)] + pub db: sled::Db, + + pub jwt_mutex: Mutex>, + pub strings: DbTree, +} + +impl DatabaseTrait for Database { + fn open(path: &PathBuf) -> Result { + let db = sled::open(path)?; + let strings = DbTree::new(db.open_tree(b"v1/strings")?); + let jwt_mutex = Mutex::new(strings.restore(&AgentDbString::Jwt)?); + + Ok(Self { + db, + jwt_mutex, + strings, + }) + } +} + +impl Database { + pub fn jwt(&self) -> Option { + self.jwt_mutex.lock().unwrap().clone() + } + + pub fn set_jwt(&self, jwt: Option) -> Result<(), DatabaseError> { + let mut lock = self.jwt_mutex.lock().unwrap(); + self.strings + .save_option(&AgentDbString::Jwt, jwt.as_ref())?; + *lock = jwt; + Ok(()) + } +} diff --git a/crates/snops-agent/src/main.rs b/crates/snops-agent/src/main.rs index 32d751af..d7bc4e96 100644 --- a/crates/snops-agent/src/main.rs +++ b/crates/snops-agent/src/main.rs @@ -1,5 +1,6 @@ mod api; mod cli; +mod db; mod metrics; mod net; mod reconcile; @@ -22,7 +23,9 @@ use futures_util::stream::{FuturesUnordered, StreamExt}; use http::HeaderValue; use snops_common::{ constant::{ENV_AGENT_KEY, HEADER_AGENT_KEY}, + db::Database, rpc::{agent::AgentService, control::ControlServiceClient, RpcTransport}, + util::OpaqueDebug, }; use tarpc::server::Channel; use tokio::{ @@ -36,7 +39,7 @@ use tokio_tungstenite::{ use tracing::{error, info, level_filters::LevelFilter, warn}; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; -use crate::rpc::{AgentRpcServer, MuxedMessageIncoming, MuxedMessageOutgoing, JWT_FILE}; +use crate::rpc::{AgentRpcServer, MuxedMessageIncoming, MuxedMessageOutgoing}; use crate::state::GlobalState; const PING_HEADER: &[u8] = b"snops-agent"; @@ -108,10 +111,8 @@ async fn main() { .await .expect("failed to create data path"); - // get the JWT from the file, if possible - let jwt = tokio::fs::read_to_string(args.path.join(JWT_FILE)) - .await - .ok(); + // open the database + let db = db::Database::open(&args.path.join("store")).expect("failed to open database"); // create rpc channels let (client_response_in, client_transport, mut client_request_out) = RpcTransport::new(); @@ -134,14 +135,14 @@ async fn main() { // create the client state let state = Arc::new(GlobalState { + client, + db: OpaqueDebug(db), started: Instant::now(), connected: Mutex::new(Instant::now()), - client, external_addr, internal_addrs, cli: args, endpoint, - jwt: Mutex::new(jwt), loki: Default::default(), env_info: Default::default(), agent_state: Default::default(), @@ -193,15 +194,12 @@ async fn main() { state.env_info.write().await.take(); // attach JWT if we have one - { - let jwt = state.jwt.lock().expect("failed to acquire jwt"); - if let Some(jwt) = jwt.as_deref() { - req.headers_mut().insert( - "Authorization", - HeaderValue::from_bytes(format!("Bearer {jwt}").as_bytes()) - .expect("attach authorization header"), - ); - } + if let Some(jwt) = state.db.jwt() { + req.headers_mut().insert( + "Authorization", + HeaderValue::from_bytes(format!("Bearer {jwt}").as_bytes()) + .expect("attach authorization header"), + ); } // attach agent key if one is set in env vars diff --git a/crates/snops-agent/src/rpc.rs b/crates/snops-agent/src/rpc.rs index 47ed3198..c441d1df 100644 --- a/crates/snops-agent/src/rpc.rs +++ b/crates/snops-agent/src/rpc.rs @@ -19,9 +19,6 @@ use tracing::{debug, error, info, trace, warn}; use crate::{api, metrics::MetricComputer, reconcile, state::AppState}; -/// The JWT file name. -pub const JWT_FILE: &str = "jwt"; - /// A multiplexed message, incoming on the websocket. pub type MuxedMessageIncoming = MuxMessage, ClientMessage>; @@ -53,14 +50,9 @@ impl AgentService for AgentRpcServer { if let Some(token) = handshake.jwt { // cache the JWT in the state JWT mutex self.state - .jwt - .lock() - .expect("failed to acquire JWT lock") - .replace(token.to_owned()); - - tokio::fs::write(self.state.cli.path.join(JWT_FILE), token) - .await - .expect("failed to write jwt file"); + .db + .set_jwt(Some(token)) + .map_err(|_| ReconcileError::Database)?; } // store loki server URL diff --git a/crates/snops-agent/src/state.rs b/crates/snops-agent/src/state.rs index aff7012b..8cabf29b 100644 --- a/crates/snops-agent/src/state.rs +++ b/crates/snops-agent/src/state.rs @@ -12,6 +12,7 @@ use snops_common::{ api::EnvInfo, rpc::control::ControlServiceClient, state::{AgentId, AgentPeer, AgentState, EnvId, TransferId, TransferStatus}, + util::OpaqueDebug, }; use tarpc::context; use tokio::{ @@ -22,7 +23,7 @@ use tokio::{ }; use tracing::info; -use crate::{cli::Cli, metrics::Metrics, transfers::TransferTx}; +use crate::{cli::Cli, db::Database, metrics::Metrics, transfers::TransferTx}; pub const NODE_GRACEFUL_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(10); @@ -31,6 +32,7 @@ pub type AppState = Arc; /// Global state for this agent runner. pub struct GlobalState { pub client: ControlServiceClient, + pub db: OpaqueDebug, pub started: Instant, pub connected: Mutex, @@ -39,7 +41,6 @@ pub struct GlobalState { pub status_api_port: u16, pub cli: Cli, pub endpoint: String, - pub jwt: Mutex>, pub loki: Mutex>, pub agent_state: RwLock, pub env_info: RwLock>, diff --git a/crates/snops-common/Cargo.toml b/crates/snops-common/Cargo.toml index 53ba1181..1b2c2588 100644 --- a/crates/snops-common/Cargo.toml +++ b/crates/snops-common/Cargo.toml @@ -11,6 +11,8 @@ mangen = ["anyhow", "clap_mangen"] [dependencies] anyhow = { workspace = true, optional = true } +bincode.workspace = true +bytes.workspace = true checkpoint = { workspace = true, features = ["serde"] } chrono = { workspace = true, features = ["serde"] } clap.workspace = true @@ -26,10 +28,12 @@ regex.workspace = true rand.workspace = true serde.workspace = true serde_json.workspace = true +sled.workspace = true strum_macros.workspace = true tarpc.workspace = true thiserror.workspace = true tokio = { workspace = true, features = ["process"] } +tracing.workspace = true url.workspace = true wildmatch.workspace = true diff --git a/crates/snops/src/db/error.rs b/crates/snops-common/src/db/error.rs similarity index 88% rename from crates/snops/src/db/error.rs rename to crates/snops-common/src/db/error.rs index 2274d263..7760ba39 100644 --- a/crates/snops/src/db/error.rs +++ b/crates/snops-common/src/db/error.rs @@ -19,7 +19,7 @@ pub enum DatabaseError { #[error("transaction error: {0}")] TransactionError(#[from] sled::transaction::TransactionError), #[error("error writing data: {0}")] - DataWriteError(#[from] snops_common::format::DataWriteError), + DataWriteError(#[from] crate::format::DataWriteError), #[error("error reading data: {0}")] - DataReadError(#[from] snops_common::format::DataReadError), + DataReadError(#[from] crate::format::DataReadError), } diff --git a/crates/snops-common/src/db/mod.rs b/crates/snops-common/src/db/mod.rs new file mode 100644 index 00000000..183d5d14 --- /dev/null +++ b/crates/snops-common/src/db/mod.rs @@ -0,0 +1,10 @@ +use std::path::PathBuf; + +use self::error::DatabaseError; + +pub mod error; +pub mod tree; + +pub trait Database: Sized { + fn open(path: &PathBuf) -> Result; +} diff --git a/crates/snops/src/db/tree.rs b/crates/snops-common/src/db/tree.rs similarity index 77% rename from crates/snops/src/db/tree.rs rename to crates/snops-common/src/db/tree.rs index 9e60bcb9..00df8184 100644 --- a/crates/snops/src/db/tree.rs +++ b/crates/snops-common/src/db/tree.rs @@ -1,14 +1,14 @@ use bytes::Buf; -use snops_common::format::{read_dataformat, write_dataformat, DataFormat}; use super::error::DatabaseError; +use crate::format::{read_dataformat, write_dataformat, DataFormat}; -pub struct DbTree { +pub struct DbTree { tree: sled::Tree, - _phantom: std::marker::PhantomData<(Key, Value)>, + _phantom: std::marker::PhantomData<(K, V)>, } -impl DbTree { +impl DbTree { pub fn new(tree: sled::Tree) -> Self { Self { tree, @@ -16,7 +16,7 @@ impl DbTree { } } - pub fn read_all(&self) -> impl Iterator { + pub fn read_all(&self) -> impl Iterator { self.tree.iter().filter_map(|row| { let (key_bytes, value_bytes) = match row { Ok((key, value)) => (key, value), @@ -26,7 +26,7 @@ impl DbTree { } }; - let key = match Key::read_data(&mut key_bytes.reader(), &Key::LATEST_HEADER) { + let key = match K::read_data(&mut key_bytes.reader(), &K::LATEST_HEADER) { Ok(key) => key, Err(e) => { tracing::error!("Error parsing key from store: {e}"); @@ -49,7 +49,7 @@ impl DbTree { pub fn read_with_prefix( &self, prefix: &Prefix, - ) -> Result, DatabaseError> { + ) -> Result, DatabaseError> { Ok(self .tree .scan_prefix(prefix.to_byte_vec()?) @@ -62,7 +62,7 @@ impl DbTree { } }; - let key = match Key::read_data(&mut key_bytes.reader(), &Key::LATEST_HEADER) { + let key = match K::read_data(&mut key_bytes.reader(), &K::LATEST_HEADER) { Ok(key) => key, Err(e) => { tracing::error!("Error parsing key from store: {e}"); @@ -82,7 +82,7 @@ impl DbTree { })) } - pub fn restore(&self, key: &Key) -> Result, DatabaseError> { + pub fn restore(&self, key: &K) -> Result, DatabaseError> { Ok(self .tree .get(key.to_byte_vec()?)? @@ -90,7 +90,7 @@ impl DbTree { .transpose()?) } - pub fn save(&self, key: &Key, value: &Value) -> Result<(), DatabaseError> { + pub fn save(&self, key: &K, value: &V) -> Result<(), DatabaseError> { let key_bytes = key.to_byte_vec()?; let mut value_bytes = Vec::new(); write_dataformat(&mut value_bytes, value)?; @@ -98,7 +98,14 @@ impl DbTree { Ok(()) } - pub fn delete(&self, key: &Key) -> Result { + pub fn save_option(&self, key: &K, value: Option<&V>) -> Result<(), DatabaseError> { + match value { + Some(value) => self.save(key, value), + None => self.delete(key).map(|_| ()), + } + } + + pub fn delete(&self, key: &K) -> Result { Ok(self.tree.remove(key.to_byte_vec()?)?.is_some()) } @@ -118,7 +125,7 @@ impl DbTree { } }; - let key = match Key::read_data(&mut key_bytes.reader(), &Key::LATEST_HEADER) { + let key = match K::read_data(&mut key_bytes.reader(), &K::LATEST_HEADER) { Ok(key) => key, Err(e) => { tracing::error!("Error parsing key from store: {e}"); diff --git a/crates/snops-common/src/lib.rs b/crates/snops-common/src/lib.rs index 9f88d853..08c11be4 100644 --- a/crates/snops-common/src/lib.rs +++ b/crates/snops-common/src/lib.rs @@ -7,9 +7,11 @@ pub mod state; pub use lasso; pub mod api; pub mod constant; +pub mod db; pub mod format; pub mod key_source; pub mod node_targets; +pub mod util; #[cfg(feature = "clipages")] pub mod clipages; @@ -20,6 +22,7 @@ pub mod prelude { pub use crate::rpc::*; pub use crate::set::*; pub use crate::state::*; + pub use crate::util::*; } lazy_static::lazy_static! { diff --git a/crates/snops-common/src/rpc/error.rs b/crates/snops-common/src/rpc/error.rs index 3fb17a0a..c6145ad4 100644 --- a/crates/snops-common/src/rpc/error.rs +++ b/crates/snops-common/src/rpc/error.rs @@ -122,6 +122,8 @@ pub enum ReconcileError { CheckpointLoadError, #[error("agent did not provide a local private key")] NoLocalPrivateKey, + #[error("generic database error")] + Database, #[error("unknown error")] Unknown, } diff --git a/crates/snops/src/util.rs b/crates/snops-common/src/util.rs similarity index 100% rename from crates/snops/src/util.rs rename to crates/snops-common/src/util.rs diff --git a/crates/snops/src/db/mod.rs b/crates/snops/src/db.rs similarity index 56% rename from crates/snops/src/db/mod.rs rename to crates/snops/src/db.rs index 589ff05e..c9a6a7d3 100644 --- a/crates/snops/src/db/mod.rs +++ b/crates/snops/src/db.rs @@ -1,16 +1,15 @@ use std::path::PathBuf; -use snops_common::state::{AgentId, EnvId, NetworkId, StorageId}; +use snops_common::{ + db::{error::DatabaseError, tree::DbTree, Database as DatabaseTrait}, + state::{AgentId, EnvId, NetworkId, StorageId}, +}; -use self::{error::DatabaseError, tree::DbTree}; use crate::{ persist::{PersistEnv, PersistStorage}, state::Agent, }; -pub mod error; -pub mod tree; - pub struct Database { #[allow(unused)] pub(crate) db: sled::Db, @@ -23,16 +22,18 @@ pub struct Database { pub(crate) agents: DbTree, } -impl Database { - pub fn open(path: &PathBuf) -> Result { +impl DatabaseTrait for Database { + fn open(path: &PathBuf) -> Result { let db = sled::open(path)?; + let envs = DbTree::new(db.open_tree(b"v2/envs")?); + let storage = DbTree::new(db.open_tree(b"v2/storage")?); + let agents = DbTree::new(db.open_tree(b"v2/agents")?); Ok(Self { - envs: DbTree::new(db.open_tree(b"v2/envs")?), - storage: DbTree::new(db.open_tree(b"v2/storage")?), - agents: DbTree::new(db.open_tree(b"v2/agents")?), - db, + envs, + storage, + agents, }) } } diff --git a/crates/snops/src/main.rs b/crates/snops/src/main.rs index 5b74f961..1b8225a4 100644 --- a/crates/snops/src/main.rs +++ b/crates/snops/src/main.rs @@ -15,7 +15,6 @@ pub mod persist; pub mod schema; pub mod server; pub mod state; -pub mod util; #[tokio::main] async fn main() { diff --git a/crates/snops/src/server/error.rs b/crates/snops/src/server/error.rs index 4cf727bc..04827488 100644 --- a/crates/snops/src/server/error.rs +++ b/crates/snops/src/server/error.rs @@ -2,12 +2,13 @@ use axum::{response::IntoResponse, Json}; use http::StatusCode; use serde::{ser::SerializeStruct, Serialize, Serializer}; use serde_json::json; -use snops_common::{aot_cmds::AotCmdError, impl_into_status_code, impl_into_type_str}; +use snops_common::{ + aot_cmds::AotCmdError, db::error::DatabaseError, impl_into_status_code, impl_into_type_str, +}; use thiserror::Error; use crate::{ cannon::error::CannonError, - db::error::DatabaseError, env::error::{EnvError, EnvRequestError, ExecutionError}, error::DeserializeError, schema::error::SchemaError, diff --git a/crates/snops/src/server/mod.rs b/crates/snops/src/server/mod.rs index dea45483..4cc1cd1a 100644 --- a/crates/snops/src/server/mod.rs +++ b/crates/snops/src/server/mod.rs @@ -18,6 +18,7 @@ use prometheus_http_query::Client as PrometheusClient; use serde::Deserialize; use snops_common::{ constant::HEADER_AGENT_KEY, + db::Database, prelude::*, rpc::{ agent::{AgentServiceClient, Handshake}, diff --git a/crates/snops/src/state/global.rs b/crates/snops/src/state/global.rs index 5ae06faa..2922ed94 100644 --- a/crates/snops/src/state/global.rs +++ b/crates/snops/src/state/global.rs @@ -11,6 +11,7 @@ use snops_common::{ node_targets::NodeTargets, rpc::error::SnarkosRequestError, state::{AgentId, AgentPeer, AgentState, EnvId, LatestBlockInfo, NetworkId, StorageId}, + util::OpaqueDebug, }; use tokio::sync::{Mutex, Semaphore}; use tracing::info; @@ -23,7 +24,6 @@ use crate::{ error::StateError, schema::storage::{LoadedStorage, STORAGE_DIR}, server::{error::StartError, prometheus::HttpsdResponse}, - util::OpaqueDebug, }; lazy_static::lazy_static! {