diff --git a/chaindexing/src/diesels/schema.rs b/chaindexing/src/diesels/schema.rs index 2a2c5d2..818bbae 100644 --- a/chaindexing/src/diesels/schema.rs +++ b/chaindexing/src/diesels/schema.rs @@ -1,5 +1,13 @@ // @generated automatically by Diesel CLI. +diesel::table! { + chaindexing_nodes(id) { + id -> Int4, + last_active_at -> Int8, + inserted_at -> Int8, + } +} + diesel::table! { chaindexing_contract_addresses (id) { id -> Int4, diff --git a/chaindexing/src/event_handlers.rs b/chaindexing/src/event_handlers.rs index 28a8ed3..83c6989 100644 --- a/chaindexing/src/event_handlers.rs +++ b/chaindexing/src/event_handlers.rs @@ -4,7 +4,7 @@ use std::{sync::Arc, time::Duration}; mod handle_events; mod maybe_handle_chain_reorg; -use tokio::{sync::Mutex, time::interval}; +use tokio::{sync::Mutex, task, time::interval}; use crate::{contracts::Contracts, events::Event, ChaindexingRepo, Config, Repo}; use crate::{ChaindexingRepoRawQueryTxnClient, HasRawQueryClient}; @@ -47,7 +47,9 @@ pub trait EventHandler: Send + Sync { pub struct EventHandlers; impl EventHandlers { - pub fn start(config: &Config) { + pub fn start( + config: &Config, + ) -> task::JoinHandle<()> { let config = config.clone(); tokio::spawn(async move { let pool = config.repo.get_pool(1).await; @@ -79,6 +81,6 @@ impl EventHandlers { ) .await; } - }); + }) } } diff --git a/chaindexing/src/events_ingester.rs b/chaindexing/src/events_ingester.rs index 551b1c9..79d0998 100644 --- a/chaindexing/src/events_ingester.rs +++ b/chaindexing/src/events_ingester.rs @@ -12,8 +12,8 @@ use ethers::types::{Address, Filter as EthersFilter, Log}; use futures_util::future::try_join_all; use futures_util::StreamExt; use std::cmp::min; -use tokio::sync::Mutex; use tokio::time::{interval, sleep}; +use tokio::{sync::Mutex, task}; use crate::chain_reorg::Execution; use crate::contracts::Contract; @@ -94,7 +94,7 @@ impl From for EventsIngesterError { pub struct EventsIngester; impl EventsIngester { - pub fn start(config: &Config) { + pub fn start(config: &Config) -> task::JoinHandle<()> { let config = config.clone(); tokio::spawn(async move { let pool = config.repo.get_pool(1).await; @@ -125,7 +125,7 @@ impl EventsIngester { .unwrap(); } } - }); + }) } pub async fn ingest<'a, S: Send + Sync + Clone>( diff --git a/chaindexing/src/lib.rs b/chaindexing/src/lib.rs index bab9095..abfdade 100644 --- a/chaindexing/src/lib.rs +++ b/chaindexing/src/lib.rs @@ -7,10 +7,12 @@ mod diesels; mod event_handlers; mod events; mod events_ingester; +mod nodes; mod repos; mod reset_counts; use std::fmt::Debug; +use std::time::Duration; pub use chain_reorg::{MinConfirmationCount, ReorgedBlock, ReorgedBlocks, UnsavedReorgedBlock}; pub use chains::Chains; @@ -22,6 +24,7 @@ pub use ethers::prelude::Chain; pub use event_handlers::{EventHandler, EventHandlerContext as EventContext, EventHandlers}; pub use events::{Event, Events}; pub use events_ingester::{EventsIngester, EventsIngesterJsonRpc}; +use nodes::Node; pub use repos::*; pub use reset_counts::ResetCount; @@ -45,6 +48,7 @@ pub type ChaindexingRepoRawQueryTxnClient<'a> = PostgresRepoRawQueryTxnClient<'a #[cfg(feature = "postgres")] pub use repos::PostgresRepoAsyncConnection as ChaindexingRepoAsyncConnection; +use tokio::{task, time}; pub enum ChaindexingError { Config(ConfigError), @@ -73,36 +77,92 @@ impl Chaindexing { config: &Config, ) -> Result<(), ChaindexingError> { config.validate()?; - Self::setup(config).await?; - EventsIngester::start(config); - EventHandlers::start(config); + + let Config { repo, .. } = config; + let query_client = repo.get_raw_query_client().await; + let pool = repo.get_pool(1).await; + let mut conn = ChaindexingRepo::get_conn(&pool).await; + + Self::setup_for_nodes(&query_client).await; + + let node = ChaindexingRepo::create_node(&mut conn).await; + + Self::wait_for_tasks_of_nodes_to_abort().await; + + Self::setup_for_indexing(config, &mut conn, &query_client).await?; + + let mut indexing_tasks = Self::start_indexing_tasks(&config); + let mut tasks_are_aborted = false; + + let config = config.clone(); + tokio::spawn(async move { + let mut interval = time::interval(Duration::from_secs(Node::ELECTION_RATE_SECS)); + + let pool = config.repo.get_pool(1).await; + let mut conn = ChaindexingRepo::get_conn(&pool).await; + + loop { + let active_nodes = ChaindexingRepo::get_active_nodes(&mut conn).await; + let leader_node = nodes::elect_leader(&active_nodes); + + if node.id == leader_node.id { + if tasks_are_aborted { + indexing_tasks = Self::start_indexing_tasks(&config); + tasks_are_aborted = false; + } + } else { + if !tasks_are_aborted { + for task in &indexing_tasks { + task.abort(); + } + tasks_are_aborted = true; + } + } + + ChaindexingRepo::keep_node_active(&mut conn, &node).await; + + interval.tick().await; + } + }); + Ok(()) } + async fn setup_for_nodes(client: &ChaindexingRepoRawQueryClient) { + ChaindexingRepo::migrate(client, ChaindexingRepo::create_nodes_migration().to_vec()).await; + } + async fn wait_for_tasks_of_nodes_to_abort() { + time::sleep(Duration::from_secs(Node::ELECTION_RATE_SECS)).await; + } + fn start_indexing_tasks( + config: &Config, + ) -> Vec> { + let event_ingester = EventsIngester::start(config); + let event_handlers = EventHandlers::start(config); - pub async fn setup(config: &Config) -> Result<(), ChaindexingError> { + vec![event_ingester, event_handlers] + } + pub async fn setup_for_indexing<'a, S: Sync + Send + Clone>( + config: &Config, + conn: &mut ChaindexingRepoConn<'a>, + client: &ChaindexingRepoRawQueryClient, + ) -> Result<(), ChaindexingError> { let Config { - repo, contracts, reset_count, reset_queries, .. } = config; - let client = repo.get_raw_query_client().await; - let pool = repo.get_pool(1).await; - let mut conn = ChaindexingRepo::get_conn(&pool).await; - Self::run_migrations_for_resets(&client).await; - Self::maybe_reset(reset_count, reset_queries, contracts, &client, &mut conn).await; + Self::maybe_reset(reset_count, reset_queries, contracts, &client, conn).await; Self::run_internal_migrations(&client).await; Self::run_migrations_for_contract_states(&client, contracts).await; let contract_addresses = contracts.clone().into_iter().flat_map(|c| c.addresses).collect(); - ChaindexingRepo::create_contract_addresses(&mut conn, &contract_addresses).await; + ChaindexingRepo::create_contract_addresses(conn, &contract_addresses).await; Ok(()) } - pub async fn maybe_reset<'a, S: Send + Sync + Clone>( reset_count: &u8, reset_queries: &Vec, diff --git a/chaindexing/src/nodes.rs b/chaindexing/src/nodes.rs new file mode 100644 index 0000000..e6ab40b --- /dev/null +++ b/chaindexing/src/nodes.rs @@ -0,0 +1,36 @@ +use diesel::{prelude::Insertable, Queryable}; +use serde::Deserialize; + +use crate::diesels::schema::chaindexing_nodes; + +#[derive(Debug, Deserialize, Clone, PartialEq, Eq, Insertable, Queryable)] +#[diesel(table_name = chaindexing_nodes)] +pub struct Node { + pub id: i32, + last_active_at: i64, + inserted_at: i64, +} + +impl Node { + pub const ELECTION_RATE_SECS: u64 = 5; + + pub fn get_min_active_at() -> i64 { + let now = chrono::Utc::now().timestamp(); + + // Not active if not kept active at least 2 elections away + now - (Node::ELECTION_RATE_SECS * 2) as i64 + } +} + +pub fn elect_leader(nodes: &Vec) -> Node { + let mut nodes_iter = nodes.iter(); + let mut leader: Option<&Node> = nodes_iter.next(); + + while let Some(node) = nodes_iter.next() { + if node.inserted_at > leader.unwrap().inserted_at { + leader = Some(node); + } + } + + leader.unwrap().clone() +} diff --git a/chaindexing/src/repos/postgres_repo.rs b/chaindexing/src/repos/postgres_repo.rs index 0aaf041..8a9ecdc 100644 --- a/chaindexing/src/repos/postgres_repo.rs +++ b/chaindexing/src/repos/postgres_repo.rs @@ -6,6 +6,7 @@ mod raw_queries; use crate::{ contracts::{ContractAddress, ContractAddressID, UnsavedContractAddress}, events::Event, + nodes::Node, ReorgedBlock, ResetCount, Streamable, UnsavedReorgedBlock, }; use diesel_async::RunQueryDsl; @@ -211,6 +212,37 @@ impl Repo for PostgresRepo { chaindexing_reset_counts.load(conn).await.unwrap() } + + async fn create_node<'a>(conn: &mut Self::Conn<'a>) -> Node { + use crate::diesels::schema::chaindexing_nodes::dsl::*; + + diesel::insert_into(chaindexing_nodes) + .default_values() + .get_result(conn) + .await + .unwrap() + } + async fn get_active_nodes<'a>(conn: &mut Self::Conn<'a>) -> Vec { + use crate::diesels::schema::chaindexing_nodes::dsl::*; + + chaindexing_nodes + .filter(last_active_at.gt(Node::get_min_active_at())) + .load(conn) + .await + .unwrap() + } + async fn keep_node_active<'a>(conn: &mut Self::Conn<'a>, node: &Node) { + use crate::diesels::schema::chaindexing_nodes::dsl::*; + + let now = chrono::offset::Utc::now().timestamp(); + + diesel::update(chaindexing_nodes) + .filter(id.eq(node.id)) + .set(last_active_at.eq(now)) + .execute(conn) + .await + .unwrap(); + } } impl Streamable for PostgresRepo { diff --git a/chaindexing/src/repos/postgres_repo/migrations.rs b/chaindexing/src/repos/postgres_repo/migrations.rs index 1fc050c..857dbab 100644 --- a/chaindexing/src/repos/postgres_repo/migrations.rs +++ b/chaindexing/src/repos/postgres_repo/migrations.rs @@ -1,6 +1,10 @@ use crate::{Migratable, PostgresRepo, RepoMigrations, SQLikeMigrations}; impl RepoMigrations for PostgresRepo { + fn create_nodes_migration() -> &'static [&'static str] { + SQLikeMigrations::create_nodes() + } + fn create_contract_addresses_migration() -> &'static [&'static str] { SQLikeMigrations::create_contract_addresses() } diff --git a/chaindexing/src/repos/repo.rs b/chaindexing/src/repos/repo.rs index b9dfea6..b2743df 100644 --- a/chaindexing/src/repos/repo.rs +++ b/chaindexing/src/repos/repo.rs @@ -9,6 +9,7 @@ use tokio::sync::Mutex; use crate::{ contracts::{ContractAddressID, UnsavedContractAddress}, events::{Event, PartialEvent}, + nodes::Node, ContractAddress, ReorgedBlock, ResetCount, UnsavedReorgedBlock, }; @@ -74,6 +75,10 @@ pub trait Repo: async fn create_reset_count<'a>(conn: &mut Self::Conn<'a>); async fn get_reset_counts<'a>(conn: &mut Self::Conn<'a>) -> Vec; + + async fn create_node<'a>(conn: &mut Self::Conn<'a>) -> Node; + async fn get_active_nodes<'a>(conn: &mut Self::Conn<'a>) -> Vec; + async fn keep_node_active<'a>(conn: &mut Self::Conn<'a>, node: &Node); } #[async_trait::async_trait] @@ -147,6 +152,7 @@ pub trait Streamable { } pub trait RepoMigrations: Migratable { + fn create_nodes_migration() -> &'static [&'static str]; fn create_contract_addresses_migration() -> &'static [&'static str]; fn drop_contract_addresses_migration() -> &'static [&'static str]; fn create_events_migration() -> &'static [&'static str]; @@ -189,6 +195,14 @@ pub trait Migratable: ExecutesWithRawQuery + Sync + Send { pub struct SQLikeMigrations; impl SQLikeMigrations { + pub fn create_nodes() -> &'static [&'static str] { + &["CREATE TABLE IF NOT EXISTS chaindexing_nodes ( + id SERIAL PRIMARY KEY, + last_active_at BIGINT DEFAULT EXTRACT(EPOCH FROM CURRENT_TIMESTAMP)::BIGINT, + inserted_at BIGINT DEFAULT EXTRACT(EPOCH FROM CURRENT_TIMESTAMP)::BIGINT + )"] + } + pub fn create_contract_addresses() -> &'static [&'static str] { &[ "CREATE TABLE IF NOT EXISTS chaindexing_contract_addresses (