Skip to content

Commit

Permalink
refactor: use decl_channel! macro
Browse files Browse the repository at this point in the history
  • Loading branch information
luketchang committed Feb 8, 2022
1 parent 5804034 commit fadf0b6
Show file tree
Hide file tree
Showing 6 changed files with 99 additions and 57 deletions.
20 changes: 8 additions & 12 deletions agents/kathy/src/kathy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use tracing::{info, Instrument};

use ethers::core::types::H256;

use nomad_base::{decl_agent, AgentCore, ChannelBase, NomadAgent};
use nomad_base::{decl_agent, decl_channel, AgentCore, CachingHome, CachingReplica, NomadAgent};
use nomad_core::{Common, Home, Message, Replica};

use crate::settings::KathySettings as Settings;
Expand Down Expand Up @@ -43,14 +43,12 @@ impl Kathy {
}
}

#[derive(Debug, Clone)]
pub struct KathyChannel {
base: ChannelBase,
decl_channel!(Kathy {
home_lock: Arc<Mutex<()>>,
generator: ChatGenerator,
messages_dispatched: prometheus::IntCounter,
interval: u64,
}
});

#[async_trait::async_trait]
impl NomadAgent for Kathy {
Expand All @@ -69,15 +67,13 @@ impl NomadAgent for Kathy {
}

fn build_channel(&self, replica: &str) -> Self::Channel {
let base = self.channel_base(replica);

Self::Channel {
base: base.clone(),
base: self.channel_base(replica),
home_lock: self.home_lock.clone(),
generator: self.generator.clone(),
messages_dispatched: self.messages_dispatched.with_label_values(&[
base.home.name(),
base.replica.name(),
self.home().name(),
replica,
Self::AGENT_NAME,
]),
interval: self.interval,
Expand All @@ -87,8 +83,8 @@ impl NomadAgent for Kathy {
#[tracing::instrument]
fn run(channel: Self::Channel) -> Instrumented<JoinHandle<Result<()>>> {
tokio::spawn(async move {
let home = channel.base.home;
let destination = channel.base.replica.local_domain();
let home = channel.home();
let destination = channel.replica().local_domain();
let mut generator = channel.generator;
let home_lock = channel.home_lock;
let messages_dispatched = channel.messages_dispatched;
Expand Down
14 changes: 6 additions & 8 deletions agents/processor/src/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use tokio::{sync::RwLock, task::JoinHandle, time::sleep};
use tracing::{debug, error, info, info_span, instrument, instrument::Instrumented, Instrument};

use nomad_base::{
cancel_task, decl_agent, AgentCore, CachingHome, CachingReplica, ChannelBase,
cancel_task, decl_agent, decl_channel, AgentCore, CachingHome, CachingReplica,
ContractSyncMetrics, IndexDataTypes, NomadAgent, NomadDB, ProcessorError,
};
use nomad_core::{
Expand Down Expand Up @@ -323,14 +323,12 @@ impl Processor {
}
}

#[derive(Debug, Clone)]
pub struct ProcessorChannel {
base: ChannelBase,
decl_channel!(Processor {
next_message_nonce: prometheus::IntGauge,
allowed: Option<Arc<HashSet<H256>>>,
denied: Option<Arc<HashSet<H256>>>,
interval: u64,
}
});

#[async_trait]
#[allow(clippy::unit_arg)]
Expand Down Expand Up @@ -373,9 +371,9 @@ impl NomadAgent for Processor {
tokio::spawn(async move {
Replica {
interval: channel.interval,
replica: channel.base.replica,
home: channel.base.home,
db: channel.base.db,
replica: channel.replica(),
home: channel.home(),
db: channel.db(),
allowed: channel.allowed,
denied: channel.denied,
next_message_nonce: channel.next_message_nonce,
Expand Down
30 changes: 8 additions & 22 deletions agents/relayer/src/relayer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::{sync::Arc, time::Duration};
use tokio::{sync::Mutex, task::JoinHandle, time::sleep};
use tracing::{info, instrument::Instrumented, Instrument};

use nomad_base::{AgentCore, CachingHome, CachingReplica, ChannelBase, NomadAgent};
use nomad_base::{decl_agent, decl_channel, AgentCore, CachingHome, CachingReplica, NomadAgent};
use nomad_core::{Common, CommonEvents};

use crate::settings::RelayerSettings as Settings;
Expand Down Expand Up @@ -100,19 +100,10 @@ impl UpdatePoller {
}
}

/// A relayer agent
#[derive(Debug)]
pub struct Relayer {
interval: u64,
core: AgentCore,
decl_agent!(Relayer {
updates_relayed_counts: prometheus::IntCounterVec,
}

impl AsRef<AgentCore> for Relayer {
fn as_ref(&self) -> &AgentCore {
&self.core
}
}
interval: u64,
});

#[allow(clippy::unit_arg)]
impl Relayer {
Expand All @@ -135,12 +126,10 @@ impl Relayer {
}
}

#[derive(Debug, Clone)]
pub struct RelayerChannel {
base: ChannelBase,
decl_channel!(Relayer {
updates_relayed_count: prometheus::IntCounter,
interval: u64,
}
});

#[async_trait]
#[allow(clippy::unit_arg)]
Expand Down Expand Up @@ -175,13 +164,10 @@ impl NomadAgent for Relayer {

#[tracing::instrument]
fn run(channel: Self::Channel) -> Instrumented<JoinHandle<Result<()>>> {
let home = channel.base.home;
let replica = channel.base.replica;

tokio::spawn(async move {
let update_poller = UpdatePoller::new(
home.clone(),
replica.clone(),
channel.home(),
channel.replica(),
channel.interval,
channel.updates_relayed_count,
);
Expand Down
25 changes: 10 additions & 15 deletions nomad-base/src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@ use crate::{
cancel_task,
metrics::CoreMetrics,
settings::{IndexSettings, Settings},
BaseError, CachingHome, CachingReplica, ContractSyncMetrics, IndexDataTypes, NomadDB,
BaseError, CachingHome, CachingReplica, ChannelBase, ContractSyncMetrics, IndexDataTypes,
NomadDB,
};
use async_trait::async_trait;
use color_eyre::{eyre::WrapErr, Result};
use futures_util::future::select_all;
use nomad_core::{db::DB, Common};
use tracing::instrument::Instrumented;
use tracing::{error, info_span, Instrument};
use tracing::{error, info, info_span, Instrument};

use std::{collections::HashMap, sync::Arc, time::Duration};
use tokio::{task::JoinHandle, time::sleep};
Expand All @@ -31,17 +32,6 @@ pub struct AgentCore {
pub settings: crate::settings::Settings,
}

/// Data shared across all agent run tasks
#[derive(Debug, Clone)]
pub struct ChannelBase {
/// Home
pub home: Arc<CachingHome>,
/// Replica
pub replica: Arc<CachingReplica>,
/// NomadDB keyed by home
pub db: NomadDB,
}

/// A trait for an application:
/// that runs on a replica
/// and:
Expand Down Expand Up @@ -113,11 +103,16 @@ pub trait NomadAgent: Send + Sync + Sized + std::fmt::Debug + AsRef<AgentCore> {
let handle = Self::run(channel.clone()).in_current_span();
let res = handle
.await?
.wrap_err(format!("Task for replica named {} failed", replica));
.wrap_err(format!("Task for replica named {} failed", &replica));

match res {
Ok(_) => return Ok(()),
Err(e) => {
error!("Channel errored out! Error: {:?}", e);
error!(
"Channel for replica {} errored out! Error: {:?}",
&replica, e
);
info!("Restarting channel to {}", &replica);
}
}
}
Expand Down
64 changes: 64 additions & 0 deletions nomad-base/src/channel.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
use crate::{CachingHome, CachingReplica, NomadDB};
use std::sync::Arc;

#[derive(Debug, Clone)]
/// Commmon data needed for a single agent channel
pub struct ChannelBase {
/// Home
pub home: Arc<CachingHome>,
/// Replica
pub replica: Arc<CachingReplica>,
/// NomadDB keyed by home
pub db: NomadDB,
}

#[macro_export]
/// Declare a new channel block
/// ### Usage
///
/// ```ignore
/// decl_agent!(Relayer {
/// updates_relayed_counts: prometheus::IntCounterVec,
/// interval: u64,
/// });

/// ```
macro_rules! decl_channel {
(
$name:ident {
$($(#[$tags:meta])* $prop:ident: $type:ty,)*
}
) => {
paste::paste! {
#[derive(Debug, Clone)]
#[doc = "Channel for `" $name]
pub struct [<$name Channel>] {
pub(crate) base: nomad_base::ChannelBase,
$(
$(#[$tags])*
pub(crate) $prop: $type,
)*
}

impl AsRef<nomad_base::ChannelBase> for [<$name Channel>] {
fn as_ref(&self) -> &nomad_base::ChannelBase {
&self.base
}
}

impl [<$name Channel>] {
pub fn home(&self) -> Arc<CachingHome> {
self.as_ref().home.clone()
}

pub fn replica(&self) -> Arc<CachingReplica> {
self.as_ref().replica.clone()
}

pub fn db(&self) -> nomad_base::NomadDB {
self.as_ref().db.clone()
}
}
}
}
}
3 changes: 3 additions & 0 deletions nomad-base/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ pub use settings::*;
mod agent;
pub use agent::*;

mod channel;
pub use channel::*;

#[doc(hidden)]
#[cfg_attr(tarpaulin, skip)]
#[macro_use]
Expand Down

0 comments on commit fadf0b6

Please sign in to comment.