Skip to content

Commit

Permalink
feature(rust): i hate package management
Browse files Browse the repository at this point in the history
  • Loading branch information
pauliesnug committed Oct 25, 2023
1 parent 13ccad3 commit ccbcd27
Show file tree
Hide file tree
Showing 13 changed files with 225 additions and 4 deletions.
21 changes: 17 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ prisma-client-rust-sdk = { git = "https://github.com/pauliesnug/prisma-client-ru

# rspc dependencies
rspc = { version = "1.0.0-rc.5", features = [
"uuid",
"chrono"
"tracing",
"unstable"
] }
specta = { version = "2.0.0-rc.6" }
tauri-specta = { version = "2.0.0-rc.3" }
Expand All @@ -46,12 +46,22 @@ swift-rs = { version = "1.0.6" }
tracing = { git = "https://github.com/tokio-rs/tracing", rev = "f93cfa087e6ebdcbd8ecdcccca47d73c3a89ab94" }
tracing-subscriber = { git = "https://github.com/tokio-rs/tracing", rev = "f93cfa087e6ebdcbd8ecdcccca47d73c3a89ab94", features = ["env-filter"] }
tracing-appender = { git = "https://github.com/tokio-rs/tracing", rev = "f93cfa087e6ebdcbd8ecdcccca47d73c3a89ab94" }
interpulse = { git = "https://github.com/pulseflow/workers" }

# general use packages
tokio = { version = "1.33.0", features = ["full"] }
tokio = { version = "1.33.0", features = [
"full",
"sync",
"rt-multi-thread",
"io-util",
"macros",
"time",
"process",
] }
uuid = { version = "1.4.1", features = ["v4", "serde"] }
serde = { version = "1.0", features = ["derive"] }
reqwest = { version = "0.11", features = ["blocking", "json"] }
reqwest = { version = "0.11", features = ["blocking", "json", "native-tls-vendored"] }
tokio-util = { version = "0.7", features = ["io"] }
clap = { version = "4.4", features = ["derive"] }
chrono = { version = "0.4", features = ["serde"] }
thiserror = { version = "1.0" }
Expand All @@ -60,6 +70,9 @@ itertools = { version = "0.11" }
dashmap = { version = "5.5", features = ["serde"] }
regex = { version = "1.10" }
bytes = { version = "1.5" }
once_cell = { version = "1.18" }
base64 = { version = "0.21" }
async-trait = { version = "0.1" }
anyhow = { version = "1.0" }
cargo_metadata = { version = "0.18" }
serde_json = { version = "1.0" }
Expand Down
32 changes: 32 additions & 0 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,39 @@ homepage.workspace = true
authors.workspace = true

[dependencies]
notify = { version = "5.2", default-features = false, features = ["macos_fsevent"] }

nexus-prisma = { path = "../crates/prisma" }
nexus-utils = { path = "../crates/utils"}

rspc.workspace = true
prisma-client-rust.workspace = true
interpulse.workspace = true
thiserror.workspace = true
chrono.workspace = true
tokio.workspace = true
base64.workspace = true
futures.workspace = true
uuid.workspace = true
async-trait.workspace = true
tracing.workspace = true
tracing-subscriber.workspace = true
tracing-appender.workspace = true
async-stream.workspace = true
once_cell.workspace = true
itertools.workspace = true
regex.workspace = true
bytes.workspace = true
reqwest.workspace = true
tauri-specta.workspace = true


[dependencies.openssl]
version = "=0.10.57"
features = ["vendored"]
[dependencies.openssl-sys]
version = "=0.9.93"
features = ["vendored"]

[target.'cfg(target_os = "macos")'.dependencies]
plist = "1"
Empty file added core/src/api/mod.rs
Empty file.
4 changes: 4 additions & 0 deletions core/src/env.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
pub struct Env {
pub api_url: String,
pub client_id: String,
}
Empty file added core/src/instance/mod.rs
Empty file.
Empty file added core/src/internal/mod.rs
Empty file.
Empty file added core/src/job/mod.rs
Empty file.
146 changes: 146 additions & 0 deletions core/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
#![warn(clippy::unwrap_used, clippy::panic)]

use crate::api::notifications::{Notification, NotificationData, NotificationId};
use crate::api::{CoreEvent, Router};
pub use crate::env::Env;
use crate::node::config;
use crate::notifications::Notifications;

use chrono::{DateTime, Utc};
pub use nexus_prisma::*;
use reqwest::{RequestBuilder, Response};
use std::path::{Path, PathBuf};
use std::sync::{atomic::AtomicBool, Arc};
use thiserror::Error;
use tokio::{fs, sync::broadcast};
use tracing::{error, info, warn};
use tracing_appender::non_blocking::{NonBlocking, WorkerGuard};
use tracing_appender::rolling::{RollingFileAppender, Rotation};
use tracing_subscriber::filter::{Directive, FromEnvError, LevelFilter};
use tracing_subscriber::fmt as tracing_fmt;
use tracing_subscriber::{prelude::*, EnvFilter};

pub mod api;
pub mod env;
pub mod instance;
pub mod internal;
pub mod job;
pub mod library;
pub mod node;
pub mod preferences;
pub mod notifications;
pub mod utils;

/// Represents a single running Instance.
/// Holds references to all services.
pub struct Node {
pub data_dir: PathBuf,
pub config: Arc<config::Manager>,
pub jobs: Arc<job::Jobs>,
pub notifications: Notifications,
pub event_bus: (broadcast::Sender<CoreEvent>, broadcast::Receiver<CoreEvent>),
pub env: env::Env,
pub http: reqwest::Client,
}

impl std::fmt::Debug for Node {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Node")
.field("data_dir", &self.data_dir)
.finish()
}
}

impl Node {
pub async fn new(
data_dir: impl AsRef<Path>,
env: env::Env,
) -> Result<(Arc<Node>, Arc<Router>), NodeError> {
let data_dir = data_dir.as_ref();

info!("starting core with data_dir '{}'", data_dir.display());

#[cfg(debug_assertions)]
let init_data = utils::debug_init::InitConfig::load(data_dir).await?;

let _ = fs::create_dir_all(&data_dir).await;

let event_bus = broadcast::channel(1024);
let config = config::Manager::new(data_dir.to_path_buf())
.await
.map_err(NodeError::FailedToInitConfig)?;

let (jobs, jobs_actor) = job::Jobs::new();
let (instances, instances_actor) = instance::Instances::new();
let node = Arc::new(Node {
data_dir: data_dir.to_path_buf(),
config,
jobs,
notifications: notifications::Notifications::new(),
event_bus,
env,
http: reqwest::Client::new(),
});

for feature in node.config.get().await.features {
feature.restore(&node);
}

jobs_actor.start(node.clone());

let router = api::mount();

info!("nexus online");
Ok((node, router))
}

pub async fn shutdown(&self) {
info!("nexus shutting down...");
self.jobs.shutdown().await;
info!("nexus shutdown");
}

pub(crate) fn emit(&self, event: CoreEvent) {
if let Err(e) = self.event_bus.0.send(event) {
warn!("could not send event to event bus: {e:?}");
}
}

pub async fn emit_notification(&self, data: NotificationData, expires: Option<DateTime<Utc>>) {
let notification = Notification {
id: NotificationId::Node(self.notifications._internal_next_id()),
data,
read: false,
expires,
};

match self
.config
.write(|mut cfg| cfg.notifications.push(notification.clone()))
.await
{
Ok(_) => {
self.notifications._internal_send(notification);
}
Err(err) => {
error!("could not save notification to config: {:?}", err);
}
}
}
}

/// Error type for Node related errors.
#[derive(Error, Debug)]
pub enum NodeError {
#[error("NodeError::FailedToInitConfig({0})")]
FailedToInitConfig(utils::migrator::MigratorError),
#[error("failed to initialize instance manager: {0}")]
FailedToInitInstanceManager(#[from] instance::InstanceManagerError),
#[error("failed to validate platform value: {0}")]
InvalidPlatformId(u8),
#[cfg(debug_assertions)]
#[error("failed to initialize configuration: {0}")]
InitConfig(#[from] utils::debug_init::InitConfigError),
#[error("failed to use logger: {0}")]
Logger(#[from] FromEnvError),
}
Empty file added core/src/library/mod.rs
Empty file.
Empty file added core/src/node/mod.rs
Empty file.
26 changes: 26 additions & 0 deletions core/src/notifications.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
use std::sync::{atomic::AtomicU32, Arc};
use tokio::sync::broadcast;
use crate::api::notifications::Notification;

#[derive(Clone)]
pub struct Notifications(broadcast::Sender<Notification>, Arc<AtomicU32>);

impl Notifications {
#[allow(clippy::new_without_default)]
pub fn new() -> Self {
let (tx, _) = broadcast::channel(30);
Self(tx, Arc::new(AtomicU32::new(0)))
}

pub fn subscribe(&self) -> broadcast::Receiver<Notification> {
self.0.subscribe()
}

pub fn _internal_send(&self, notif: Notification) {
self.0.send(notif).ok();
}

pub fn _internal_next_id(&self) -> u32 {
self.1.fetch_add(1, std::sync::atomic::Ordering::SeqCst, order)
}
}
Empty file added core/src/preferences/mod.rs
Empty file.
Empty file added core/src/utils/mod.rs
Empty file.

0 comments on commit ccbcd27

Please sign in to comment.