diff --git a/Cargo.toml b/Cargo.toml index 40578c2..98dacd0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -34,8 +34,8 @@ prisma-client-rust-sdk = { git = "https://github.com/pauliesnug/prisma-client-ru # rspc dependencies rspc = { version = "1.0.0-rc.5", features = [ - "uuid", - "chrono" + "tracing", + "unstable" ] } specta = { version = "2.0.0-rc.6" } tauri-specta = { version = "2.0.0-rc.3" } @@ -46,12 +46,22 @@ swift-rs = { version = "1.0.6" } tracing = { git = "https://github.com/tokio-rs/tracing", rev = "f93cfa087e6ebdcbd8ecdcccca47d73c3a89ab94" } tracing-subscriber = { git = "https://github.com/tokio-rs/tracing", rev = "f93cfa087e6ebdcbd8ecdcccca47d73c3a89ab94", features = ["env-filter"] } tracing-appender = { git = "https://github.com/tokio-rs/tracing", rev = "f93cfa087e6ebdcbd8ecdcccca47d73c3a89ab94" } +interpulse = { git = "https://github.com/pulseflow/workers" } # general use packages -tokio = { version = "1.33.0", features = ["full"] } +tokio = { version = "1.33.0", features = [ + "full", + "sync", + "rt-multi-thread", + "io-util", + "macros", + "time", + "process", +] } uuid = { version = "1.4.1", features = ["v4", "serde"] } serde = { version = "1.0", features = ["derive"] } -reqwest = { version = "0.11", features = ["blocking", "json"] } +reqwest = { version = "0.11", features = ["blocking", "json", "native-tls-vendored"] } +tokio-util = { version = "0.7", features = ["io"] } clap = { version = "4.4", features = ["derive"] } chrono = { version = "0.4", features = ["serde"] } thiserror = { version = "1.0" } @@ -60,6 +70,9 @@ itertools = { version = "0.11" } dashmap = { version = "5.5", features = ["serde"] } regex = { version = "1.10" } bytes = { version = "1.5" } +once_cell = { version = "1.18" } +base64 = { version = "0.21" } +async-trait = { version = "0.1" } anyhow = { version = "1.0" } cargo_metadata = { version = "0.18" } serde_json = { version = "1.0" } diff --git a/core/Cargo.toml b/core/Cargo.toml index 55ff945..da88a54 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -10,7 +10,39 @@ homepage.workspace = true authors.workspace = true [dependencies] +notify = { version = "5.2", default-features = false, features = ["macos_fsevent"] } + nexus-prisma = { path = "../crates/prisma" } nexus-utils = { path = "../crates/utils"} +rspc.workspace = true +prisma-client-rust.workspace = true +interpulse.workspace = true +thiserror.workspace = true +chrono.workspace = true +tokio.workspace = true +base64.workspace = true +futures.workspace = true +uuid.workspace = true +async-trait.workspace = true +tracing.workspace = true +tracing-subscriber.workspace = true +tracing-appender.workspace = true +async-stream.workspace = true +once_cell.workspace = true +itertools.workspace = true +regex.workspace = true +bytes.workspace = true +reqwest.workspace = true tauri-specta.workspace = true + + +[dependencies.openssl] +version = "=0.10.57" +features = ["vendored"] +[dependencies.openssl-sys] +version = "=0.9.93" +features = ["vendored"] + +[target.'cfg(target_os = "macos")'.dependencies] +plist = "1" diff --git a/core/src/api/mod.rs b/core/src/api/mod.rs new file mode 100644 index 0000000..e69de29 diff --git a/core/src/env.rs b/core/src/env.rs new file mode 100644 index 0000000..c031bbd --- /dev/null +++ b/core/src/env.rs @@ -0,0 +1,4 @@ +pub struct Env { + pub api_url: String, + pub client_id: String, +} diff --git a/core/src/instance/mod.rs b/core/src/instance/mod.rs new file mode 100644 index 0000000..e69de29 diff --git a/core/src/internal/mod.rs b/core/src/internal/mod.rs new file mode 100644 index 0000000..e69de29 diff --git a/core/src/job/mod.rs b/core/src/job/mod.rs new file mode 100644 index 0000000..e69de29 diff --git a/core/src/lib.rs b/core/src/lib.rs index e69de29..7de47f2 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -0,0 +1,146 @@ +#![warn(clippy::unwrap_used, clippy::panic)] + +use crate::api::notifications::{Notification, NotificationData, NotificationId}; +use crate::api::{CoreEvent, Router}; +pub use crate::env::Env; +use crate::node::config; +use crate::notifications::Notifications; + +use chrono::{DateTime, Utc}; +pub use nexus_prisma::*; +use reqwest::{RequestBuilder, Response}; +use std::path::{Path, PathBuf}; +use std::sync::{atomic::AtomicBool, Arc}; +use thiserror::Error; +use tokio::{fs, sync::broadcast}; +use tracing::{error, info, warn}; +use tracing_appender::non_blocking::{NonBlocking, WorkerGuard}; +use tracing_appender::rolling::{RollingFileAppender, Rotation}; +use tracing_subscriber::filter::{Directive, FromEnvError, LevelFilter}; +use tracing_subscriber::fmt as tracing_fmt; +use tracing_subscriber::{prelude::*, EnvFilter}; + +pub mod api; +pub mod env; +pub mod instance; +pub mod internal; +pub mod job; +pub mod library; +pub mod node; +pub mod preferences; +pub mod notifications; +pub mod utils; + +/// Represents a single running Instance. +/// Holds references to all services. +pub struct Node { + pub data_dir: PathBuf, + pub config: Arc, + pub jobs: Arc, + pub notifications: Notifications, + pub event_bus: (broadcast::Sender, broadcast::Receiver), + pub env: env::Env, + pub http: reqwest::Client, +} + +impl std::fmt::Debug for Node { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Node") + .field("data_dir", &self.data_dir) + .finish() + } +} + +impl Node { + pub async fn new( + data_dir: impl AsRef, + env: env::Env, + ) -> Result<(Arc, Arc), NodeError> { + let data_dir = data_dir.as_ref(); + + info!("starting core with data_dir '{}'", data_dir.display()); + + #[cfg(debug_assertions)] + let init_data = utils::debug_init::InitConfig::load(data_dir).await?; + + let _ = fs::create_dir_all(&data_dir).await; + + let event_bus = broadcast::channel(1024); + let config = config::Manager::new(data_dir.to_path_buf()) + .await + .map_err(NodeError::FailedToInitConfig)?; + + let (jobs, jobs_actor) = job::Jobs::new(); + let (instances, instances_actor) = instance::Instances::new(); + let node = Arc::new(Node { + data_dir: data_dir.to_path_buf(), + config, + jobs, + notifications: notifications::Notifications::new(), + event_bus, + env, + http: reqwest::Client::new(), + }); + + for feature in node.config.get().await.features { + feature.restore(&node); + } + + jobs_actor.start(node.clone()); + + let router = api::mount(); + + info!("nexus online"); + Ok((node, router)) + } + + pub async fn shutdown(&self) { + info!("nexus shutting down..."); + self.jobs.shutdown().await; + info!("nexus shutdown"); + } + + pub(crate) fn emit(&self, event: CoreEvent) { + if let Err(e) = self.event_bus.0.send(event) { + warn!("could not send event to event bus: {e:?}"); + } + } + + pub async fn emit_notification(&self, data: NotificationData, expires: Option>) { + let notification = Notification { + id: NotificationId::Node(self.notifications._internal_next_id()), + data, + read: false, + expires, + }; + + match self + .config + .write(|mut cfg| cfg.notifications.push(notification.clone())) + .await + { + Ok(_) => { + self.notifications._internal_send(notification); + } + Err(err) => { + error!("could not save notification to config: {:?}", err); + } + } + } +} + +/// Error type for Node related errors. +#[derive(Error, Debug)] +pub enum NodeError { + #[error("NodeError::FailedToInitConfig({0})")] + FailedToInitConfig(utils::migrator::MigratorError), + #[error("failed to initialize instance manager: {0}")] + FailedToInitInstanceManager(#[from] instance::InstanceManagerError), + #[error("failed to validate platform value: {0}")] + InvalidPlatformId(u8), + #[cfg(debug_assertions)] + #[error("failed to initialize configuration: {0}")] + InitConfig(#[from] utils::debug_init::InitConfigError), + #[error("failed to use logger: {0}")] + Logger(#[from] FromEnvError), +} diff --git a/core/src/library/mod.rs b/core/src/library/mod.rs new file mode 100644 index 0000000..e69de29 diff --git a/core/src/node/mod.rs b/core/src/node/mod.rs new file mode 100644 index 0000000..e69de29 diff --git a/core/src/notifications.rs b/core/src/notifications.rs new file mode 100644 index 0000000..afad66b --- /dev/null +++ b/core/src/notifications.rs @@ -0,0 +1,26 @@ +use std::sync::{atomic::AtomicU32, Arc}; +use tokio::sync::broadcast; +use crate::api::notifications::Notification; + +#[derive(Clone)] +pub struct Notifications(broadcast::Sender, Arc); + +impl Notifications { + #[allow(clippy::new_without_default)] + pub fn new() -> Self { + let (tx, _) = broadcast::channel(30); + Self(tx, Arc::new(AtomicU32::new(0))) + } + + pub fn subscribe(&self) -> broadcast::Receiver { + self.0.subscribe() + } + + pub fn _internal_send(&self, notif: Notification) { + self.0.send(notif).ok(); + } + + pub fn _internal_next_id(&self) -> u32 { + self.1.fetch_add(1, std::sync::atomic::Ordering::SeqCst, order) + } +} \ No newline at end of file diff --git a/core/src/preferences/mod.rs b/core/src/preferences/mod.rs new file mode 100644 index 0000000..e69de29 diff --git a/core/src/utils/mod.rs b/core/src/utils/mod.rs new file mode 100644 index 0000000..e69de29