Skip to content

Commit

Permalink
refactor: isolate faulty channels and retry channel task on faults (#1)
Browse files Browse the repository at this point in the history
* refactor: agents now build channels and give to run tasks

* refactor: channel-specific agent structs take counter and gauge instead of vecs

* refactor: use decl_channel! macro

* feature: add exponential retry for channel faults and metrics to track number of faults

* fix: move macro to macro.rs

* test: dirty attempt, not done yet

* chore: small fixes

* chore: cleaned

* chore: sanity check

Co-authored-by: Luke Tchang <[email protected]>
Co-authored-by: Daniil Naumetc <[email protected]>
  • Loading branch information
3 people authored Feb 23, 2022
1 parent d4bee91 commit 50267ae
Show file tree
Hide file tree
Showing 10 changed files with 453 additions and 135 deletions.
60 changes: 36 additions & 24 deletions agents/kathy/src/kathy.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::{sync::Arc, time::Duration};

use color_eyre::{eyre::bail, Result};
use color_eyre::Result;

use rand::distributions::Alphanumeric;
use rand::{thread_rng, Rng};
Expand All @@ -10,20 +10,20 @@ use tracing::{info, Instrument};

use ethers::core::types::H256;

use nomad_base::{decl_agent, AgentCore, NomadAgent};
use nomad_base::{decl_agent, decl_channel, AgentCore, CachingHome, CachingReplica, NomadAgent};
use nomad_core::{Common, Home, Message, Replica};

use crate::settings::KathySettings as Settings;

decl_agent!(Kathy {
duration: u64,
interval: u64,
generator: ChatGenerator,
home_lock: Arc<Mutex<()>>,
messages_dispatched: prometheus::IntCounterVec,
});

impl Kathy {
pub fn new(duration: u64, generator: ChatGenerator, core: AgentCore) -> Self {
pub fn new(interval: u64, generator: ChatGenerator, core: AgentCore) -> Self {
let messages_dispatched = core
.metrics
.new_int_counter(
Expand All @@ -34,7 +34,7 @@ impl Kathy {
.expect("failed to register messages_dispatched_count metric");

Self {
duration,
interval,
generator,
core,
home_lock: Arc::new(Mutex::new(())),
Expand All @@ -43,12 +43,21 @@ impl Kathy {
}
}

decl_channel!(Kathy {
home_lock: Arc<Mutex<()>>,
generator: ChatGenerator,
messages_dispatched: prometheus::IntCounter,
interval: u64,
});

#[async_trait::async_trait]
impl NomadAgent for Kathy {
const AGENT_NAME: &'static str = "kathy";

type Settings = Settings;

type Channel = KathyChannel;

async fn from_settings(settings: Settings) -> Result<Self> {
Ok(Self::new(
settings.interval.parse().expect("invalid u64"),
Expand All @@ -57,26 +66,29 @@ impl NomadAgent for Kathy {
))
}

#[tracing::instrument]
fn run(&self, name: &str) -> Instrumented<JoinHandle<Result<()>>> {
let replica_opt = self.replica_by_name(name);
let name = name.to_owned();
let home = self.home();
let home_lock = self.home_lock.clone();

let mut generator = self.generator.clone();
let duration = Duration::from_secs(self.duration);

let messages_dispatched =
self.messages_dispatched
.with_label_values(&[home.name(), &name, Self::AGENT_NAME]);
fn build_channel(&self, replica: &str) -> Self::Channel {
Self::Channel {
base: self.channel_base(replica),
home_lock: self.home_lock.clone(),
generator: self.generator.clone(),
messages_dispatched: self.messages_dispatched.with_label_values(&[
self.home().name(),
replica,
Self::AGENT_NAME,
]),
interval: self.interval,
}
}

#[tracing::instrument]
fn run(channel: Self::Channel) -> Instrumented<JoinHandle<Result<()>>> {
tokio::spawn(async move {
if replica_opt.is_none() {
bail!("No replica named {}", name);
}
let replica = replica_opt.unwrap();
let destination = replica.local_domain();
let home = channel.home();
let destination = channel.replica().local_domain();
let mut generator = channel.generator;
let home_lock = channel.home_lock;
let messages_dispatched = channel.messages_dispatched;
let interval = channel.interval;

loop {
let msg = generator.gen_chat();
Expand Down Expand Up @@ -110,7 +122,7 @@ impl NomadAgent for Kathy {
}
}

sleep(duration).await;
sleep(Duration::from_secs(interval)).await;
}
})
.in_current_span()
Expand Down
94 changes: 46 additions & 48 deletions agents/processor/src/processor.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
use async_trait::async_trait;
use color_eyre::{
eyre::{bail, eyre},
Result,
};
use color_eyre::{eyre::bail, Result};
use ethers::prelude::H256;
use futures_util::future::select_all;
use std::{
Expand All @@ -14,8 +11,8 @@ use tokio::{sync::RwLock, task::JoinHandle, time::sleep};
use tracing::{debug, error, info, info_span, instrument, instrument::Instrumented, Instrument};

use nomad_base::{
cancel_task, decl_agent, AgentCore, CachingHome, CachingReplica, ContractSyncMetrics,
IndexDataTypes, NomadAgent, NomadDB, ProcessorError,
cancel_task, decl_agent, decl_channel, AgentCore, CachingHome, CachingReplica,
ContractSyncMetrics, IndexDataTypes, NomadAgent, NomadDB, ProcessorError,
};
use nomad_core::{
accumulator::merkle::Proof, CommittedMessage, Common, Home, HomeEvents, MessageStatus,
Expand Down Expand Up @@ -45,7 +42,7 @@ pub(crate) struct Replica {
db: NomadDB,
allowed: Option<Arc<HashSet<H256>>>,
denied: Option<Arc<HashSet<H256>>>,
next_message_nonce: Arc<prometheus::IntGaugeVec>,
next_message_nonce: prometheus::IntGauge,
}

impl std::fmt::Display for Replica {
Expand Down Expand Up @@ -81,9 +78,7 @@ impl Replica {
.map(|n: u32| n + 1)
.unwrap_or_default();

self.next_message_nonce
.with_label_values(&[self.home.name(), self.replica.name(), AGENT_NAME])
.set(next_message_nonce as i64);
self.next_message_nonce.set(next_message_nonce as i64);

info!(
replica_domain,
Expand Down Expand Up @@ -114,13 +109,7 @@ impl Replica {
.store_keyed_encodable(CURRENT_NONCE, &replica_domain, &next_message_nonce)?;

next_message_nonce += 1;
self.next_message_nonce
.with_label_values(&[
self.home.name(),
self.replica.name(),
AGENT_NAME,
])
.set(next_message_nonce as i64);
self.next_message_nonce.set(next_message_nonce as i64);
}
Ok(Flow::Repeat) => {
// there was some fault, let's wait and then try again later when state may have moved
Expand Down Expand Up @@ -305,7 +294,7 @@ decl_agent!(
allowed: Option<Arc<HashSet<H256>>>,
denied: Option<Arc<HashSet<H256>>>,
index_only: bool,
next_message_nonce: Arc<prometheus::IntGaugeVec>,
next_message_nonces: prometheus::IntGaugeVec,
config: Option<S3Config>,
}
);
Expand All @@ -320,36 +309,44 @@ impl Processor {
index_only: bool,
config: Option<S3Config>,
) -> Self {
let next_message_nonce = Arc::new(
core.metrics
.new_int_gauge(
"next_message_nonce",
"Index of the next message to inspect",
&["home", "replica", "agent"],
)
.expect("processor metric already registered -- should have be a singleton"),
);
let next_message_nonces = core
.metrics
.new_int_gauge(
"next_message_nonce",
"Index of the next message to inspect",
&["home", "replica", "agent"],
)
.expect("processor metric already registered -- should have be a singleton");

Self {
interval,
core,
replica_tasks: Default::default(),
allowed: allowed.map(Arc::new),
denied: denied.map(Arc::new),
next_message_nonce,
next_message_nonces,
index_only,
config,
}
}
}

decl_channel!(Processor {
next_message_nonce: prometheus::IntGauge,
allowed: Option<Arc<HashSet<H256>>>,
denied: Option<Arc<HashSet<H256>>>,
interval: u64,
});

#[async_trait]
#[allow(clippy::unit_arg)]
impl NomadAgent for Processor {
const AGENT_NAME: &'static str = AGENT_NAME;

type Settings = Settings;

type Channel = ProcessorChannel;

async fn from_settings(settings: Self::Settings) -> Result<Self>
where
Self: Sized,
Expand All @@ -364,29 +361,30 @@ impl NomadAgent for Processor {
))
}

fn run(&self, name: &str) -> Instrumented<JoinHandle<Result<()>>> {
let home = self.home();
let next_message_nonce = self.next_message_nonce.clone();
let interval = self.interval;
let db = NomadDB::new(home.name(), self.db());

let replica_opt = self.replica_by_name(name);
let name = name.to_owned();

let allowed = self.allowed.clone();
let denied = self.denied.clone();
fn build_channel(&self, replica: &str) -> Self::Channel {
Self::Channel {
base: self.channel_base(replica),
next_message_nonce: self.next_message_nonces.with_label_values(&[
self.home().name(),
replica,
Self::AGENT_NAME,
]),
allowed: self.allowed.clone(),
denied: self.denied.clone(),
interval: self.interval,
}
}

fn run(channel: Self::Channel) -> Instrumented<JoinHandle<Result<()>>> {
tokio::spawn(async move {
let replica = replica_opt.ok_or_else(|| eyre!("No replica named {}", name))?;

Replica {
interval,
replica,
home,
db,
allowed,
denied,
next_message_nonce,
interval: channel.interval,
replica: channel.replica(),
home: channel.home(),
db: channel.db(),
allowed: channel.allowed,
denied: channel.denied,
next_message_nonce: channel.next_message_nonce,
}
.main()
.await?
Expand Down
Loading

0 comments on commit 50267ae

Please sign in to comment.