Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: isolate faulty channels and retry channel task on faults #1

Merged
merged 9 commits into from
Feb 23, 2022
60 changes: 36 additions & 24 deletions agents/kathy/src/kathy.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::{sync::Arc, time::Duration};

use color_eyre::{eyre::bail, Result};
use color_eyre::Result;

use rand::distributions::Alphanumeric;
use rand::{thread_rng, Rng};
Expand All @@ -10,20 +10,20 @@ use tracing::{info, Instrument};

use ethers::core::types::H256;

use nomad_base::{decl_agent, AgentCore, NomadAgent};
use nomad_base::{decl_agent, decl_channel, AgentCore, CachingHome, CachingReplica, NomadAgent};
use nomad_core::{Common, Home, Message, Replica};

use crate::settings::KathySettings as Settings;

decl_agent!(Kathy {
duration: u64,
interval: u64,
generator: ChatGenerator,
home_lock: Arc<Mutex<()>>,
messages_dispatched: prometheus::IntCounterVec,
});

impl Kathy {
pub fn new(duration: u64, generator: ChatGenerator, core: AgentCore) -> Self {
pub fn new(interval: u64, generator: ChatGenerator, core: AgentCore) -> Self {
let messages_dispatched = core
.metrics
.new_int_counter(
Expand All @@ -34,7 +34,7 @@ impl Kathy {
.expect("failed to register messages_dispatched_count metric");

Self {
duration,
interval,
generator,
core,
home_lock: Arc::new(Mutex::new(())),
Expand All @@ -43,12 +43,21 @@ impl Kathy {
}
}

decl_channel!(Kathy {
home_lock: Arc<Mutex<()>>,
generator: ChatGenerator,
messages_dispatched: prometheus::IntCounter,
interval: u64,
});

#[async_trait::async_trait]
impl NomadAgent for Kathy {
const AGENT_NAME: &'static str = "kathy";

type Settings = Settings;

type Channel = KathyChannel;

async fn from_settings(settings: Settings) -> Result<Self> {
Ok(Self::new(
settings.interval.parse().expect("invalid u64"),
Expand All @@ -57,26 +66,29 @@ impl NomadAgent for Kathy {
))
}

#[tracing::instrument]
fn run(&self, name: &str) -> Instrumented<JoinHandle<Result<()>>> {
let replica_opt = self.replica_by_name(name);
let name = name.to_owned();
let home = self.home();
let home_lock = self.home_lock.clone();

let mut generator = self.generator.clone();
let duration = Duration::from_secs(self.duration);

let messages_dispatched =
self.messages_dispatched
.with_label_values(&[home.name(), &name, Self::AGENT_NAME]);
fn build_channel(&self, replica: &str) -> Self::Channel {
Self::Channel {
base: self.channel_base(replica),
home_lock: self.home_lock.clone(),
generator: self.generator.clone(),
messages_dispatched: self.messages_dispatched.with_label_values(&[
self.home().name(),
replica,
Self::AGENT_NAME,
]),
interval: self.interval,
}
}

#[tracing::instrument]
fn run(channel: Self::Channel) -> Instrumented<JoinHandle<Result<()>>> {
tokio::spawn(async move {
if replica_opt.is_none() {
bail!("No replica named {}", name);
}
let replica = replica_opt.unwrap();
let destination = replica.local_domain();
let home = channel.home();
let destination = channel.replica().local_domain();
let mut generator = channel.generator;
let home_lock = channel.home_lock;
let messages_dispatched = channel.messages_dispatched;
let interval = channel.interval;

loop {
let msg = generator.gen_chat();
Expand Down Expand Up @@ -110,7 +122,7 @@ impl NomadAgent for Kathy {
}
}

sleep(duration).await;
sleep(Duration::from_secs(interval)).await;
}
})
.in_current_span()
Expand Down
94 changes: 46 additions & 48 deletions agents/processor/src/processor.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
use async_trait::async_trait;
use color_eyre::{
eyre::{bail, eyre},
Result,
};
use color_eyre::{eyre::bail, Result};
use ethers::prelude::H256;
use futures_util::future::select_all;
use std::{
Expand All @@ -14,8 +11,8 @@ use tokio::{sync::RwLock, task::JoinHandle, time::sleep};
use tracing::{debug, error, info, info_span, instrument, instrument::Instrumented, Instrument};

use nomad_base::{
cancel_task, decl_agent, AgentCore, CachingHome, CachingReplica, ContractSyncMetrics,
IndexDataTypes, NomadAgent, NomadDB, ProcessorError,
cancel_task, decl_agent, decl_channel, AgentCore, CachingHome, CachingReplica,
ContractSyncMetrics, IndexDataTypes, NomadAgent, NomadDB, ProcessorError,
};
use nomad_core::{
accumulator::merkle::Proof, CommittedMessage, Common, Home, HomeEvents, MessageStatus,
Expand Down Expand Up @@ -45,7 +42,7 @@ pub(crate) struct Replica {
db: NomadDB,
allowed: Option<Arc<HashSet<H256>>>,
denied: Option<Arc<HashSet<H256>>>,
next_message_nonce: Arc<prometheus::IntGaugeVec>,
next_message_nonce: prometheus::IntGauge,
}

impl std::fmt::Display for Replica {
Expand Down Expand Up @@ -81,9 +78,7 @@ impl Replica {
.map(|n: u32| n + 1)
.unwrap_or_default();

self.next_message_nonce
.with_label_values(&[self.home.name(), self.replica.name(), AGENT_NAME])
.set(next_message_nonce as i64);
self.next_message_nonce.set(next_message_nonce as i64);

info!(
replica_domain,
Expand Down Expand Up @@ -114,13 +109,7 @@ impl Replica {
.store_keyed_encodable(CURRENT_NONCE, &replica_domain, &next_message_nonce)?;

next_message_nonce += 1;
self.next_message_nonce
.with_label_values(&[
self.home.name(),
self.replica.name(),
AGENT_NAME,
])
.set(next_message_nonce as i64);
self.next_message_nonce.set(next_message_nonce as i64);
}
Ok(Flow::Repeat) => {
// there was some fault, let's wait and then try again later when state may have moved
Expand Down Expand Up @@ -305,7 +294,7 @@ decl_agent!(
allowed: Option<Arc<HashSet<H256>>>,
denied: Option<Arc<HashSet<H256>>>,
index_only: bool,
next_message_nonce: Arc<prometheus::IntGaugeVec>,
next_message_nonces: prometheus::IntGaugeVec,
config: Option<S3Config>,
}
);
Expand All @@ -320,36 +309,44 @@ impl Processor {
index_only: bool,
config: Option<S3Config>,
) -> Self {
let next_message_nonce = Arc::new(
core.metrics
.new_int_gauge(
"next_message_nonce",
"Index of the next message to inspect",
&["home", "replica", "agent"],
)
.expect("processor metric already registered -- should have be a singleton"),
);
let next_message_nonces = core
.metrics
.new_int_gauge(
"next_message_nonce",
"Index of the next message to inspect",
&["home", "replica", "agent"],
)
.expect("processor metric already registered -- should have be a singleton");

Self {
interval,
core,
replica_tasks: Default::default(),
allowed: allowed.map(Arc::new),
denied: denied.map(Arc::new),
next_message_nonce,
next_message_nonces,
index_only,
config,
}
}
}

decl_channel!(Processor {
next_message_nonce: prometheus::IntGauge,
allowed: Option<Arc<HashSet<H256>>>,
denied: Option<Arc<HashSet<H256>>>,
interval: u64,
});

#[async_trait]
#[allow(clippy::unit_arg)]
impl NomadAgent for Processor {
const AGENT_NAME: &'static str = AGENT_NAME;

type Settings = Settings;

type Channel = ProcessorChannel;

async fn from_settings(settings: Self::Settings) -> Result<Self>
where
Self: Sized,
Expand All @@ -364,29 +361,30 @@ impl NomadAgent for Processor {
))
}

fn run(&self, name: &str) -> Instrumented<JoinHandle<Result<()>>> {
let home = self.home();
let next_message_nonce = self.next_message_nonce.clone();
let interval = self.interval;
let db = NomadDB::new(home.name(), self.db());

let replica_opt = self.replica_by_name(name);
let name = name.to_owned();

let allowed = self.allowed.clone();
let denied = self.denied.clone();
fn build_channel(&self, replica: &str) -> Self::Channel {
Self::Channel {
base: self.channel_base(replica),
next_message_nonce: self.next_message_nonces.with_label_values(&[
self.home().name(),
replica,
Self::AGENT_NAME,
]),
allowed: self.allowed.clone(),
denied: self.denied.clone(),
interval: self.interval,
}
}

fn run(channel: Self::Channel) -> Instrumented<JoinHandle<Result<()>>> {
tokio::spawn(async move {
let replica = replica_opt.ok_or_else(|| eyre!("No replica named {}", name))?;

Replica {
interval,
replica,
home,
db,
allowed,
denied,
next_message_nonce,
interval: channel.interval,
replica: channel.replica(),
home: channel.home(),
db: channel.db(),
allowed: channel.allowed,
denied: channel.denied,
next_message_nonce: channel.next_message_nonce,
}
.main()
.await?
Expand Down
Loading