Skip to content

Commit

Permalink
feat: task clean-up with new sans-io-runtime (#179)
Browse files Browse the repository at this point in the history
* feat: task clean-up with new sans-io-runtime

* update tool chain to 1.82 for fixing some libs

* fix clippy warns

* fix clippy warns
  • Loading branch information
giangndm authored Nov 8, 2024
1 parent 5d16fd6 commit 794aecc
Show file tree
Hide file tree
Showing 29 changed files with 772 additions and 133 deletions.
8 changes: 4 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,15 @@ members = [
]

[workspace.dependencies]
bincode = "1.3.3"
bincode = "1.3"
serde = { version = "1.0", features = ["derive"] }
thiserror = "1.0"
log = "0.4"
rand = "0.8"
parking_lot = "0.12"
env_logger = "0.11"
clap = { version = "4.4", features = ["derive", "env"] }
mockall = "0.12.1"
num_enum = "0.7.2"
mockall = "0.13"
num_enum = "0.7"
convert-enum = "0.1.0"
sans-io-runtime = { version = "0.2", default-features = false }
sans-io-runtime = { version = "0.3", default-features = false }
2 changes: 1 addition & 1 deletion bin/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ clap.workspace = true
serde.workspace = true
atm0s-sdn = { path = "../packages/runner", version = "0.2.2", features = ["vpn"] }
tokio = { version = "1", features = ["full"] }
poem = { version = "2.0", features = ["embed", "static-files", "websocket"] }
poem = { version = "3.0", features = ["embed", "static-files", "websocket"] }
rust-embed = { version = "8.2", optional = true }

futures-util = "0.3"
Expand Down
2 changes: 1 addition & 1 deletion deny.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ allow = [
"MIT",
"Unicode-DFS-2016",
"WTFPL",

"Unicode-3.0",
]
confidence-threshold = 0.8
exceptions = [
Expand Down
6 changes: 6 additions & 0 deletions packages/network/src/base/feature.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ pub enum FeatureOutput<UserData, Event, ToWorker> {
SendRoute(RouteRule, NetOutgoingMeta, Buffer),
NeighboursConnectTo(NodeAddr),
NeighboursDisconnectFrom(NodeId),
OnResourceEmpty,
}

impl<UserData, Event, ToWorker> FeatureOutput<UserData, Event, ToWorker> {
Expand All @@ -137,6 +138,7 @@ impl<UserData, Event, ToWorker> FeatureOutput<UserData, Event, ToWorker> {
FeatureOutput::SendRoute(rule, ttl, buf) => FeatureOutput::SendRoute(rule, ttl, buf),
FeatureOutput::NeighboursConnectTo(addr) => FeatureOutput::NeighboursConnectTo(addr),
FeatureOutput::NeighboursDisconnectFrom(id) => FeatureOutput::NeighboursDisconnectFrom(id),
FeatureOutput::OnResourceEmpty => FeatureOutput::OnResourceEmpty,
}
}
}
Expand All @@ -149,6 +151,7 @@ pub struct FeatureContext {
pub trait Feature<UserData, Control, Event, ToController, ToWorker>: TaskSwitcherChild<FeatureOutput<UserData, Event, ToWorker>> {
fn on_shared_input(&mut self, _ctx: &FeatureContext, _now: u64, _input: FeatureSharedInput);
fn on_input(&mut self, _ctx: &FeatureContext, now_ms: u64, input: FeatureInput<'_, UserData, Control, ToController>);
fn on_shutdown(&mut self, _ctx: &FeatureContext, _now: u64);
}

pub enum FeatureWorkerInput<UserData, Control, ToWorker> {
Expand Down Expand Up @@ -176,6 +179,7 @@ pub enum FeatureWorkerOutput<UserData, Control, Event, ToController> {
RawBroadcast2(Vec<NetPair>, Buffer),
#[cfg(feature = "vpn")]
TunPkt(Buffer),
OnResourceEmpty,
}

impl<UserData, Control, Event, ToController> FeatureWorkerOutput<UserData, Control, Event, ToController> {
Expand All @@ -200,6 +204,7 @@ impl<UserData, Control, Event, ToController> FeatureWorkerOutput<UserData, Contr
FeatureWorkerOutput::RawBroadcast2(conns, buf) => FeatureWorkerOutput::RawBroadcast2(conns, buf),
#[cfg(feature = "vpn")]
FeatureWorkerOutput::TunPkt(buf) => FeatureWorkerOutput::TunPkt(buf),
FeatureWorkerOutput::OnResourceEmpty => FeatureWorkerOutput::OnResourceEmpty,
}
}
}
Expand All @@ -217,4 +222,5 @@ pub trait FeatureWorker<UserData, SdkControl, SdkEvent, ToController, ToWorker>:
self.on_input(ctx, now, FeatureWorkerInput::Network(conn, (&header).into(), buf));
}
fn on_input(&mut self, _ctx: &mut FeatureWorkerContext, _now: u64, input: FeatureWorkerInput<UserData, SdkControl, ToWorker>);
fn on_shutdown(&mut self, _ctx: &mut FeatureWorkerContext, _now: u64);
}
25 changes: 24 additions & 1 deletion packages/network/src/base/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use super::ConnectionEvent;
simple_pub_type!(ServiceId, u8);

/// First part is Service, which is running inside the controller.
#[derive(Debug, Clone, Copy, Hash, Eq, PartialEq)]
pub enum ServiceControlActor<UserData> {
Controller(UserData),
Expand All @@ -32,6 +31,7 @@ pub enum ServiceOutput<UserData, FeaturesControl, ServiceEvent, ToWorker> {
Event(ServiceControlActor<UserData>, ServiceEvent),
FeatureControl(FeaturesControl),
BroadcastWorkers(ToWorker),
OnResourceEmpty,
}

pub struct ServiceCtx {
Expand All @@ -40,17 +40,28 @@ pub struct ServiceCtx {
}

pub trait Service<UserData, FeaturesControl, FeaturesEvent, ServiceControl, ServiceEvent, ToController, ToWorker> {
fn is_service_empty(&self) -> bool;
fn service_id(&self) -> u8;
fn service_name(&self) -> &str;
fn on_shared_input(&mut self, _ctx: &ServiceCtx, _now: u64, _input: ServiceSharedInput);
fn on_input(&mut self, _ctx: &ServiceCtx, _now: u64, input: ServiceInput<UserData, FeaturesEvent, ServiceControl, ToController>);
fn on_shutdown(&mut self, _ctx: &ServiceCtx, _now: u64);
fn pop_output2(&mut self, _now: u64) -> Option<ServiceOutput<UserData, FeaturesControl, ServiceEvent, ToWorker>>;
}

impl<UserData, FeaturesControl, FeaturesEvent, ServiceControl, ServiceEvent, ToController, ToWorker> TaskSwitcherChild<ServiceOutput<UserData, FeaturesControl, ServiceEvent, ToWorker>>
for Box<dyn Service<UserData, FeaturesControl, FeaturesEvent, ServiceControl, ServiceEvent, ToController, ToWorker>>
{
type Time = u64;

fn empty_event(&self) -> ServiceOutput<UserData, FeaturesControl, ServiceEvent, ToWorker> {
ServiceOutput::OnResourceEmpty
}

fn is_empty(&self) -> bool {
self.is_service_empty()
}

fn pop_output(&mut self, now: u64) -> Option<ServiceOutput<UserData, FeaturesControl, ServiceEvent, ToWorker>> {
self.pop_output2(now)
}
Expand All @@ -70,17 +81,20 @@ pub enum ServiceWorkerOutput<UserData, FeaturesControl, FeaturesEvent, ServiceCo
ToController(ToController),
FeatureControl(FeaturesControl),
Event(ServiceControlActor<UserData>, ServiceEvent),
OnResourceEmpty,
}

pub struct ServiceWorkerCtx {
pub node_id: NodeId,
}

pub trait ServiceWorker<UserData, FeaturesControl, FeaturesEvent, ServiceControl, ServiceEvent, ToController, ToWorker> {
fn is_service_empty(&self) -> bool;
fn service_id(&self) -> u8;
fn service_name(&self) -> &str;
fn on_tick(&mut self, _ctx: &ServiceWorkerCtx, _now: u64, _tick_count: u64);
fn on_input(&mut self, _ctx: &ServiceWorkerCtx, _now: u64, input: ServiceWorkerInput<UserData, FeaturesEvent, ServiceControl, ToWorker>);
fn on_shutdown(&mut self, _ctx: &ServiceWorkerCtx, _now: u64);
fn pop_output2(&mut self, _now: u64) -> Option<ServiceWorkerOutput<UserData, FeaturesControl, FeaturesEvent, ServiceControl, ServiceEvent, ToController>>;
}

Expand All @@ -89,6 +103,15 @@ impl<UserData, FeaturesControl, FeaturesEvent, ServiceControl, ServiceEvent, ToC
for Box<dyn ServiceWorker<UserData, FeaturesControl, FeaturesEvent, ServiceControl, ServiceEvent, ToController, ToWorker>>
{
type Time = u64;

fn empty_event(&self) -> ServiceWorkerOutput<UserData, FeaturesControl, FeaturesEvent, ServiceControl, ServiceEvent, ToController> {
ServiceWorkerOutput::OnResourceEmpty
}

fn is_empty(&self) -> bool {
self.is_service_empty()
}

fn pop_output(&mut self, now: u64) -> Option<ServiceWorkerOutput<UserData, FeaturesControl, FeaturesEvent, ServiceControl, ServiceEvent, ToController>> {
self.pop_output2(now)
}
Expand Down
61 changes: 52 additions & 9 deletions packages/network/src/controller_plane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,14 @@ mod services;
pub enum Input<UserData, SC, SE, TC> {
Ext(ExtIn<UserData, SC>),
Control(LogicControl<UserData, SC, SE, TC>),
#[convert_enum(optout)]
ShutdownRequest,
}

#[derive(Debug, Clone, convert_enum::From)]
pub enum Output<UserData, SE, TW> {
Ext(ExtOut<UserData, SE>),
Event(LogicEvent<UserData, SE, TW>),
#[convert_enum(optout)]
ShutdownSuccess,
OnResourceEmpty,
}

#[derive(Debug, Clone, Copy, PartialEq, Eq, num_enum::TryFromPrimitive, num_enum::IntoPrimitive)]
Expand Down Expand Up @@ -65,6 +63,7 @@ pub struct ControllerPlane<UserData, SC, SE, TC, TW> {
services: TaskSwitcherBranch<ServiceManager<UserData, SC, SE, TC, TW>, services::Output<UserData, SE, TW>>,
switcher: TaskSwitcher,
queue: VecDeque<Output<UserData, SE, TW>>,
shutdown: bool,
history: Arc<dyn ShadowRouterHistory>,
}

Expand Down Expand Up @@ -98,6 +97,7 @@ where
services: TaskSwitcherBranch::new(ServiceManager::new(cfg.services), TaskType::Service),
switcher: TaskSwitcher::new(3), //3 types: Neighbours, Feature, Service
queue: VecDeque::new(),
shutdown: false,
history: cfg.history,
}
}
Expand Down Expand Up @@ -174,12 +174,20 @@ where
Input::Control(LogicControl::ExtServicesEvent(service, userdata, event)) => {
self.queue.push_back(Output::Ext(ExtOut::ServicesEvent(service, userdata, event)));
}
Input::ShutdownRequest => {
self.neighbours.input(&mut self.switcher).on_input(now_ms, neighbours::Input::ShutdownRequest);
}
}
}

pub fn on_shutdown(&mut self, now_ms: u64) {
if self.shutdown {
return;
}
log::info!("[ControllerPlane] Shutdown");
self.features.input(&mut self.switcher).on_shutdown(&self.feature_ctx, now_ms);
self.services.input(&mut self.switcher).on_shutdown(&self.service_ctx, now_ms);
self.neighbours.input(&mut self.switcher).on_shutdown(now_ms);
self.shutdown = true;
}

fn pop_neighbours(&mut self, now_ms: u64) {
let out = return_if_none!(self.neighbours.pop_output(now_ms, &mut self.switcher));
match out {
Expand All @@ -197,12 +205,23 @@ where
ConnectionEvent::Disconnected(ctx) => self.queue.push_back(Output::Event(LogicEvent::UnPin(ctx.conn))),
}
}
neighbours::Output::ShutdownResponse => self.queue.push_back(Output::ShutdownSuccess),
neighbours::Output::OnResourceEmpty => {
log::info!("[ControllerPlane] Neighbours OnResourceEmpty");
}
}
}

fn pop_features(&mut self, now_ms: u64) {
let (feature, out) = return_if_none!(self.features.pop_output(now_ms, &mut self.switcher));
let out = return_if_none!(self.features.pop_output(now_ms, &mut self.switcher));

let (feature, out) = match out {
features::Output::Output(feature, out) => (feature, out),
features::Output::Shutdown => {
log::info!("[ControllerPlane] Features Shutdown");
return;
}
};

match out {
FeatureOutput::ToWorker(is_broadcast, to) => self.queue.push_back(Output::Event(LogicEvent::Feature(is_broadcast, to))),
FeatureOutput::Event(actor, event) => {
Expand Down Expand Up @@ -230,11 +249,23 @@ where
FeatureOutput::NeighboursDisconnectFrom(node) => {
self.neighbours.input(&mut self.switcher).on_input(now_ms, neighbours::Input::DisconnectFrom(node));
}
FeatureOutput::OnResourceEmpty => {
log::info!("[ControllerPlane] Feature {feature:?} OnResourceEmpty");
}
}
}

fn pop_services(&mut self, now_ms: u64) {
let (service, out) = return_if_none!(self.services.pop_output(now_ms, &mut self.switcher));
let out = return_if_none!(self.services.pop_output(now_ms, &mut self.switcher));

let (service, out) = match out {
services::Output::Output(service, out) => (service, out),
services::Output::OnResourceEmpty => {
log::info!("[ControllerPlane] Services OnResourceEmpty");
return;
}
};

match out {
ServiceOutput::FeatureControl(control) => {
self.features
Expand All @@ -246,6 +277,9 @@ where
ServiceControlActor::Worker(worker, userdata) => self.queue.push_back(Output::Event(LogicEvent::ExtServicesEvent(worker, service, userdata, event))),
},
ServiceOutput::BroadcastWorkers(to) => self.queue.push_back(Output::Event(LogicEvent::Service(service, to))),
ServiceOutput::OnResourceEmpty => {
log::info!("[ControllerPlane] Service {service} OnResourceEmpty");
}
}
}
}
Expand All @@ -255,6 +289,15 @@ where
UserData: 'static + Hash + Copy + Eq + Debug,
{
type Time = u64;

fn empty_event(&self) -> Output<UserData, SE, TW> {
Output::OnResourceEmpty
}

fn is_empty(&self) -> bool {
self.shutdown && self.queue.is_empty() && self.neighbours.is_empty() && self.features.is_empty() && self.services.is_empty()
}

fn pop_output(&mut self, now_ms: u64) -> Option<Output<UserData, SE, TW>> {
return_if_some!(self.queue.pop_front());

Expand Down
Loading

0 comments on commit 794aecc

Please sign in to comment.