From a0959ebb37393a31387b04486583428a6f89fa37 Mon Sep 17 00:00:00 2001 From: Joshua Oladele Date: Thu, 15 Feb 2024 09:57:45 +0100 Subject: [PATCH 1/4] Introduce Chaindexing Nodes Currently, nodes elect the latest node as the leader. This allows consumer apps to update their indexing logic with guarantees that the latest logic is being used, and stale ones totally disregarded --- chaindexing/src/diesels/schema.rs | 8 +++++ chaindexing/src/nodes.rs | 36 +++++++++++++++++++ .../src/repos/postgres_repo/migrations.rs | 4 +++ 3 files changed, 48 insertions(+) create mode 100644 chaindexing/src/nodes.rs diff --git a/chaindexing/src/diesels/schema.rs b/chaindexing/src/diesels/schema.rs index 2a2c5d2..818bbae 100644 --- a/chaindexing/src/diesels/schema.rs +++ b/chaindexing/src/diesels/schema.rs @@ -1,5 +1,13 @@ // @generated automatically by Diesel CLI. +diesel::table! { + chaindexing_nodes(id) { + id -> Int4, + last_active_at -> Int8, + inserted_at -> Int8, + } +} + diesel::table! { chaindexing_contract_addresses (id) { id -> Int4, diff --git a/chaindexing/src/nodes.rs b/chaindexing/src/nodes.rs new file mode 100644 index 0000000..332c704 --- /dev/null +++ b/chaindexing/src/nodes.rs @@ -0,0 +1,36 @@ +use diesel::{prelude::Insertable, Queryable}; +use serde::Deserialize; + +use crate::diesels::schema::chaindexing_nodes; + +#[derive(Debug, Deserialize, Clone, PartialEq, Eq, Insertable, Queryable)] +#[diesel(table_name = chaindexing_nodes)] +pub struct Node { + pub id: i32, + last_active_at: i64, + inserted_at: i64, +} + +impl Node { + pub const ELECTION_RATE_SECS: u64 = 5; + pub const ELECTION_RATE_MS: u64 = Node::ELECTION_RATE_SECS * 1_000; + + pub fn get_min_active_at() -> i64 { + let now = chrono::Utc::now().timestamp(); + + now - (Node::ELECTION_RATE_SECS * 2) as i64 + } +} + +pub fn elect_leader(nodes: &Vec) -> Node { + let mut nodes_iter = nodes.iter(); + let mut leader: Option<&Node> = nodes_iter.next(); + + while let Some(node) = nodes_iter.next() { + if node.inserted_at > leader.unwrap().inserted_at { + leader = Some(node); + } + } + + leader.unwrap().clone() +} diff --git a/chaindexing/src/repos/postgres_repo/migrations.rs b/chaindexing/src/repos/postgres_repo/migrations.rs index 1fc050c..857dbab 100644 --- a/chaindexing/src/repos/postgres_repo/migrations.rs +++ b/chaindexing/src/repos/postgres_repo/migrations.rs @@ -1,6 +1,10 @@ use crate::{Migratable, PostgresRepo, RepoMigrations, SQLikeMigrations}; impl RepoMigrations for PostgresRepo { + fn create_nodes_migration() -> &'static [&'static str] { + SQLikeMigrations::create_nodes() + } + fn create_contract_addresses_migration() -> &'static [&'static str] { SQLikeMigrations::create_contract_addresses() } From a88e737a0fa2cc65f1bc942fd7b5f8e397091517 Mon Sep 17 00:00:00 2001 From: Joshua Oladele Date: Thu, 15 Feb 2024 09:58:56 +0100 Subject: [PATCH 2/4] Add APIs for orchestrating nodes in Repo --- chaindexing/src/repos/postgres_repo.rs | 32 ++++++++++++++++++++++++++ chaindexing/src/repos/repo.rs | 14 +++++++++++ 2 files changed, 46 insertions(+) diff --git a/chaindexing/src/repos/postgres_repo.rs b/chaindexing/src/repos/postgres_repo.rs index 0aaf041..8a9ecdc 100644 --- a/chaindexing/src/repos/postgres_repo.rs +++ b/chaindexing/src/repos/postgres_repo.rs @@ -6,6 +6,7 @@ mod raw_queries; use crate::{ contracts::{ContractAddress, ContractAddressID, UnsavedContractAddress}, events::Event, + nodes::Node, ReorgedBlock, ResetCount, Streamable, UnsavedReorgedBlock, }; use diesel_async::RunQueryDsl; @@ -211,6 +212,37 @@ impl Repo for PostgresRepo { chaindexing_reset_counts.load(conn).await.unwrap() } + + async fn create_node<'a>(conn: &mut Self::Conn<'a>) -> Node { + use crate::diesels::schema::chaindexing_nodes::dsl::*; + + diesel::insert_into(chaindexing_nodes) + .default_values() + .get_result(conn) + .await + .unwrap() + } + async fn get_active_nodes<'a>(conn: &mut Self::Conn<'a>) -> Vec { + use crate::diesels::schema::chaindexing_nodes::dsl::*; + + chaindexing_nodes + .filter(last_active_at.gt(Node::get_min_active_at())) + .load(conn) + .await + .unwrap() + } + async fn keep_node_active<'a>(conn: &mut Self::Conn<'a>, node: &Node) { + use crate::diesels::schema::chaindexing_nodes::dsl::*; + + let now = chrono::offset::Utc::now().timestamp(); + + diesel::update(chaindexing_nodes) + .filter(id.eq(node.id)) + .set(last_active_at.eq(now)) + .execute(conn) + .await + .unwrap(); + } } impl Streamable for PostgresRepo { diff --git a/chaindexing/src/repos/repo.rs b/chaindexing/src/repos/repo.rs index b9dfea6..eca47b7 100644 --- a/chaindexing/src/repos/repo.rs +++ b/chaindexing/src/repos/repo.rs @@ -9,6 +9,7 @@ use tokio::sync::Mutex; use crate::{ contracts::{ContractAddressID, UnsavedContractAddress}, events::{Event, PartialEvent}, + nodes::Node, ContractAddress, ReorgedBlock, ResetCount, UnsavedReorgedBlock, }; @@ -74,6 +75,10 @@ pub trait Repo: async fn create_reset_count<'a>(conn: &mut Self::Conn<'a>); async fn get_reset_counts<'a>(conn: &mut Self::Conn<'a>) -> Vec; + + async fn create_node<'a>(conn: &mut Self::Conn<'a>) -> Node; + async fn get_active_nodes<'a>(conn: &mut Self::Conn<'a>) -> Vec; + async fn keep_node_active<'a>(conn: &mut Self::Conn<'a>, node: &Node); } #[async_trait::async_trait] @@ -147,6 +152,7 @@ pub trait Streamable { } pub trait RepoMigrations: Migratable { + fn create_nodes_migration() -> &'static [&'static str]; fn create_contract_addresses_migration() -> &'static [&'static str]; fn drop_contract_addresses_migration() -> &'static [&'static str]; fn create_events_migration() -> &'static [&'static str]; @@ -189,6 +195,14 @@ pub trait Migratable: ExecutesWithRawQuery + Sync + Send { pub struct SQLikeMigrations; impl SQLikeMigrations { + pub fn create_nodes() -> &'static [&'static str] { + &["CREATE TABLE IF NOT EXISTS chaindexing_nodes ( + id SERIAL PRIMARY KEY, + last_active_at BIGINT DEFAULT (EXTRACT(EPOCH FROM CURRENT_TIMESTAMP) * 1000)::BIGINT, + inserted_at BIGINT DEFAULT (EXTRACT(EPOCH FROM CURRENT_TIMESTAMP) * 1000)::BIGINT + )"] + } + pub fn create_contract_addresses() -> &'static [&'static str] { &[ "CREATE TABLE IF NOT EXISTS chaindexing_contract_addresses ( From f7d1b26f52cc67d26c708c533b68218287130124 Mon Sep 17 00:00:00 2001 From: Joshua Oladele Date: Thu, 15 Feb 2024 10:01:41 +0100 Subject: [PATCH 3/4] Ensure only leader node does indexing This change ensures peer nodes abort their indexing process to allow leader node reset and re-index on a clean slate --- chaindexing/src/event_handlers.rs | 8 +-- chaindexing/src/events_ingester.rs | 6 +-- chaindexing/src/lib.rs | 84 +++++++++++++++++++++++++----- 3 files changed, 80 insertions(+), 18 deletions(-) diff --git a/chaindexing/src/event_handlers.rs b/chaindexing/src/event_handlers.rs index 28a8ed3..83c6989 100644 --- a/chaindexing/src/event_handlers.rs +++ b/chaindexing/src/event_handlers.rs @@ -4,7 +4,7 @@ use std::{sync::Arc, time::Duration}; mod handle_events; mod maybe_handle_chain_reorg; -use tokio::{sync::Mutex, time::interval}; +use tokio::{sync::Mutex, task, time::interval}; use crate::{contracts::Contracts, events::Event, ChaindexingRepo, Config, Repo}; use crate::{ChaindexingRepoRawQueryTxnClient, HasRawQueryClient}; @@ -47,7 +47,9 @@ pub trait EventHandler: Send + Sync { pub struct EventHandlers; impl EventHandlers { - pub fn start(config: &Config) { + pub fn start( + config: &Config, + ) -> task::JoinHandle<()> { let config = config.clone(); tokio::spawn(async move { let pool = config.repo.get_pool(1).await; @@ -79,6 +81,6 @@ impl EventHandlers { ) .await; } - }); + }) } } diff --git a/chaindexing/src/events_ingester.rs b/chaindexing/src/events_ingester.rs index 551b1c9..79d0998 100644 --- a/chaindexing/src/events_ingester.rs +++ b/chaindexing/src/events_ingester.rs @@ -12,8 +12,8 @@ use ethers::types::{Address, Filter as EthersFilter, Log}; use futures_util::future::try_join_all; use futures_util::StreamExt; use std::cmp::min; -use tokio::sync::Mutex; use tokio::time::{interval, sleep}; +use tokio::{sync::Mutex, task}; use crate::chain_reorg::Execution; use crate::contracts::Contract; @@ -94,7 +94,7 @@ impl From for EventsIngesterError { pub struct EventsIngester; impl EventsIngester { - pub fn start(config: &Config) { + pub fn start(config: &Config) -> task::JoinHandle<()> { let config = config.clone(); tokio::spawn(async move { let pool = config.repo.get_pool(1).await; @@ -125,7 +125,7 @@ impl EventsIngester { .unwrap(); } } - }); + }) } pub async fn ingest<'a, S: Send + Sync + Clone>( diff --git a/chaindexing/src/lib.rs b/chaindexing/src/lib.rs index bab9095..692e064 100644 --- a/chaindexing/src/lib.rs +++ b/chaindexing/src/lib.rs @@ -7,10 +7,12 @@ mod diesels; mod event_handlers; mod events; mod events_ingester; +mod nodes; mod repos; mod reset_counts; use std::fmt::Debug; +use std::time::Duration; pub use chain_reorg::{MinConfirmationCount, ReorgedBlock, ReorgedBlocks, UnsavedReorgedBlock}; pub use chains::Chains; @@ -22,6 +24,7 @@ pub use ethers::prelude::Chain; pub use event_handlers::{EventHandler, EventHandlerContext as EventContext, EventHandlers}; pub use events::{Event, Events}; pub use events_ingester::{EventsIngester, EventsIngesterJsonRpc}; +use nodes::Node; pub use repos::*; pub use reset_counts::ResetCount; @@ -45,6 +48,7 @@ pub type ChaindexingRepoRawQueryTxnClient<'a> = PostgresRepoRawQueryTxnClient<'a #[cfg(feature = "postgres")] pub use repos::PostgresRepoAsyncConnection as ChaindexingRepoAsyncConnection; +use tokio::{task, time}; pub enum ChaindexingError { Config(ConfigError), @@ -73,36 +77,92 @@ impl Chaindexing { config: &Config, ) -> Result<(), ChaindexingError> { config.validate()?; - Self::setup(config).await?; - EventsIngester::start(config); - EventHandlers::start(config); + + let Config { repo, .. } = config; + let query_client = repo.get_raw_query_client().await; + let pool = repo.get_pool(1).await; + let mut conn = ChaindexingRepo::get_conn(&pool).await; + + Self::setup_for_nodes(&query_client).await; + + let node = ChaindexingRepo::create_node(&mut conn).await; + + Self::wait_for_other_nodes_to_pause().await; + + Self::setup_for_indexing(config, &mut conn, &query_client).await?; + + let mut indexing_tasks = Self::start_indexing_tasks(&config); + let mut tasks_are_aborted = false; + + let config = config.clone(); + tokio::spawn(async move { + let mut interval = time::interval(Duration::from_millis(Node::ELECTION_RATE_MS)); + + let pool = config.repo.get_pool(1).await; + let mut conn = ChaindexingRepo::get_conn(&pool).await; + + loop { + let active_nodes = ChaindexingRepo::get_active_nodes(&mut conn).await; + let leader_node = nodes::elect_leader(&active_nodes); + + if node.id == leader_node.id { + if tasks_are_aborted { + indexing_tasks = Self::start_indexing_tasks(&config); + tasks_are_aborted = false; + } + } else { + if !tasks_are_aborted { + for task in &indexing_tasks { + task.abort(); + } + tasks_are_aborted = true; + } + } + + ChaindexingRepo::keep_node_active(&mut conn, &node).await; + + interval.tick().await; + } + }); + Ok(()) } + async fn setup_for_nodes(client: &ChaindexingRepoRawQueryClient) { + ChaindexingRepo::migrate(client, ChaindexingRepo::create_nodes_migration().to_vec()).await; + } + async fn wait_for_other_nodes_to_pause() { + time::sleep(Duration::from_millis(Node::ELECTION_RATE_MS)).await; + } + fn start_indexing_tasks( + config: &Config, + ) -> Vec> { + let event_ingester = EventsIngester::start(config); + let event_handlers = EventHandlers::start(config); - pub async fn setup(config: &Config) -> Result<(), ChaindexingError> { + vec![event_ingester, event_handlers] + } + pub async fn setup_for_indexing<'a, S: Sync + Send + Clone>( + config: &Config, + conn: &mut ChaindexingRepoConn<'a>, + client: &ChaindexingRepoRawQueryClient, + ) -> Result<(), ChaindexingError> { let Config { - repo, contracts, reset_count, reset_queries, .. } = config; - let client = repo.get_raw_query_client().await; - let pool = repo.get_pool(1).await; - let mut conn = ChaindexingRepo::get_conn(&pool).await; - Self::run_migrations_for_resets(&client).await; - Self::maybe_reset(reset_count, reset_queries, contracts, &client, &mut conn).await; + Self::maybe_reset(reset_count, reset_queries, contracts, &client, conn).await; Self::run_internal_migrations(&client).await; Self::run_migrations_for_contract_states(&client, contracts).await; let contract_addresses = contracts.clone().into_iter().flat_map(|c| c.addresses).collect(); - ChaindexingRepo::create_contract_addresses(&mut conn, &contract_addresses).await; + ChaindexingRepo::create_contract_addresses(conn, &contract_addresses).await; Ok(()) } - pub async fn maybe_reset<'a, S: Send + Sync + Clone>( reset_count: &u8, reset_queries: &Vec, From 7f7326a847dc006ce45c0c30ec26d423d72dadd3 Mon Sep 17 00:00:00 2001 From: Joshua Oladele Date: Thu, 15 Feb 2024 10:30:01 +0100 Subject: [PATCH 4/4] Reduce timing precision since active nodes have two elections grace period --- chaindexing/src/lib.rs | 8 ++++---- chaindexing/src/nodes.rs | 2 +- chaindexing/src/repos/repo.rs | 4 ++-- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/chaindexing/src/lib.rs b/chaindexing/src/lib.rs index 692e064..abfdade 100644 --- a/chaindexing/src/lib.rs +++ b/chaindexing/src/lib.rs @@ -87,7 +87,7 @@ impl Chaindexing { let node = ChaindexingRepo::create_node(&mut conn).await; - Self::wait_for_other_nodes_to_pause().await; + Self::wait_for_tasks_of_nodes_to_abort().await; Self::setup_for_indexing(config, &mut conn, &query_client).await?; @@ -96,7 +96,7 @@ impl Chaindexing { let config = config.clone(); tokio::spawn(async move { - let mut interval = time::interval(Duration::from_millis(Node::ELECTION_RATE_MS)); + let mut interval = time::interval(Duration::from_secs(Node::ELECTION_RATE_SECS)); let pool = config.repo.get_pool(1).await; let mut conn = ChaindexingRepo::get_conn(&pool).await; @@ -130,8 +130,8 @@ impl Chaindexing { async fn setup_for_nodes(client: &ChaindexingRepoRawQueryClient) { ChaindexingRepo::migrate(client, ChaindexingRepo::create_nodes_migration().to_vec()).await; } - async fn wait_for_other_nodes_to_pause() { - time::sleep(Duration::from_millis(Node::ELECTION_RATE_MS)).await; + async fn wait_for_tasks_of_nodes_to_abort() { + time::sleep(Duration::from_secs(Node::ELECTION_RATE_SECS)).await; } fn start_indexing_tasks( config: &Config, diff --git a/chaindexing/src/nodes.rs b/chaindexing/src/nodes.rs index 332c704..e6ab40b 100644 --- a/chaindexing/src/nodes.rs +++ b/chaindexing/src/nodes.rs @@ -13,11 +13,11 @@ pub struct Node { impl Node { pub const ELECTION_RATE_SECS: u64 = 5; - pub const ELECTION_RATE_MS: u64 = Node::ELECTION_RATE_SECS * 1_000; pub fn get_min_active_at() -> i64 { let now = chrono::Utc::now().timestamp(); + // Not active if not kept active at least 2 elections away now - (Node::ELECTION_RATE_SECS * 2) as i64 } } diff --git a/chaindexing/src/repos/repo.rs b/chaindexing/src/repos/repo.rs index eca47b7..b2743df 100644 --- a/chaindexing/src/repos/repo.rs +++ b/chaindexing/src/repos/repo.rs @@ -198,8 +198,8 @@ impl SQLikeMigrations { pub fn create_nodes() -> &'static [&'static str] { &["CREATE TABLE IF NOT EXISTS chaindexing_nodes ( id SERIAL PRIMARY KEY, - last_active_at BIGINT DEFAULT (EXTRACT(EPOCH FROM CURRENT_TIMESTAMP) * 1000)::BIGINT, - inserted_at BIGINT DEFAULT (EXTRACT(EPOCH FROM CURRENT_TIMESTAMP) * 1000)::BIGINT + last_active_at BIGINT DEFAULT EXTRACT(EPOCH FROM CURRENT_TIMESTAMP)::BIGINT, + inserted_at BIGINT DEFAULT EXTRACT(EPOCH FROM CURRENT_TIMESTAMP)::BIGINT )"] }