Skip to content

Commit

Permalink
Merge pull request #58 from chaindexing/introduce-node-aware-indexing
Browse files Browse the repository at this point in the history
Introduce node aware indexing
  • Loading branch information
Jurshsmith authored Feb 15, 2024
2 parents c10d655 + 7f7326a commit e454b7e
Show file tree
Hide file tree
Showing 8 changed files with 174 additions and 18 deletions.
8 changes: 8 additions & 0 deletions chaindexing/src/diesels/schema.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
// @generated automatically by Diesel CLI.

diesel::table! {
chaindexing_nodes(id) {
id -> Int4,
last_active_at -> Int8,
inserted_at -> Int8,
}
}

diesel::table! {
chaindexing_contract_addresses (id) {
id -> Int4,
Expand Down
8 changes: 5 additions & 3 deletions chaindexing/src/event_handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::{sync::Arc, time::Duration};
mod handle_events;
mod maybe_handle_chain_reorg;

use tokio::{sync::Mutex, time::interval};
use tokio::{sync::Mutex, task, time::interval};

use crate::{contracts::Contracts, events::Event, ChaindexingRepo, Config, Repo};
use crate::{ChaindexingRepoRawQueryTxnClient, HasRawQueryClient};
Expand Down Expand Up @@ -47,7 +47,9 @@ pub trait EventHandler: Send + Sync {
pub struct EventHandlers;

impl EventHandlers {
pub fn start<S: Send + Sync + Clone + Debug + 'static>(config: &Config<S>) {
pub fn start<S: Send + Sync + Clone + Debug + 'static>(
config: &Config<S>,
) -> task::JoinHandle<()> {
let config = config.clone();
tokio::spawn(async move {
let pool = config.repo.get_pool(1).await;
Expand Down Expand Up @@ -79,6 +81,6 @@ impl EventHandlers {
)
.await;
}
});
})
}
}
6 changes: 3 additions & 3 deletions chaindexing/src/events_ingester.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ use ethers::types::{Address, Filter as EthersFilter, Log};
use futures_util::future::try_join_all;
use futures_util::StreamExt;
use std::cmp::min;
use tokio::sync::Mutex;
use tokio::time::{interval, sleep};
use tokio::{sync::Mutex, task};

use crate::chain_reorg::Execution;
use crate::contracts::Contract;
Expand Down Expand Up @@ -94,7 +94,7 @@ impl From<RepoError> for EventsIngesterError {
pub struct EventsIngester;

impl EventsIngester {
pub fn start<S: Sync + Send + Clone + 'static>(config: &Config<S>) {
pub fn start<S: Sync + Send + Clone + 'static>(config: &Config<S>) -> task::JoinHandle<()> {
let config = config.clone();
tokio::spawn(async move {
let pool = config.repo.get_pool(1).await;
Expand Down Expand Up @@ -125,7 +125,7 @@ impl EventsIngester {
.unwrap();
}
}
});
})
}

pub async fn ingest<'a, S: Send + Sync + Clone>(
Expand Down
84 changes: 72 additions & 12 deletions chaindexing/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@ mod diesels;
mod event_handlers;
mod events;
mod events_ingester;
mod nodes;
mod repos;
mod reset_counts;

use std::fmt::Debug;
use std::time::Duration;

pub use chain_reorg::{MinConfirmationCount, ReorgedBlock, ReorgedBlocks, UnsavedReorgedBlock};
pub use chains::Chains;
Expand All @@ -22,6 +24,7 @@ pub use ethers::prelude::Chain;
pub use event_handlers::{EventHandler, EventHandlerContext as EventContext, EventHandlers};
pub use events::{Event, Events};
pub use events_ingester::{EventsIngester, EventsIngesterJsonRpc};
use nodes::Node;
pub use repos::*;
pub use reset_counts::ResetCount;

Expand All @@ -45,6 +48,7 @@ pub type ChaindexingRepoRawQueryTxnClient<'a> = PostgresRepoRawQueryTxnClient<'a

#[cfg(feature = "postgres")]
pub use repos::PostgresRepoAsyncConnection as ChaindexingRepoAsyncConnection;
use tokio::{task, time};

pub enum ChaindexingError {
Config(ConfigError),
Expand Down Expand Up @@ -73,36 +77,92 @@ impl Chaindexing {
config: &Config<S>,
) -> Result<(), ChaindexingError> {
config.validate()?;
Self::setup(config).await?;
EventsIngester::start(config);
EventHandlers::start(config);

let Config { repo, .. } = config;
let query_client = repo.get_raw_query_client().await;
let pool = repo.get_pool(1).await;
let mut conn = ChaindexingRepo::get_conn(&pool).await;

Self::setup_for_nodes(&query_client).await;

let node = ChaindexingRepo::create_node(&mut conn).await;

Self::wait_for_tasks_of_nodes_to_abort().await;

Self::setup_for_indexing(config, &mut conn, &query_client).await?;

let mut indexing_tasks = Self::start_indexing_tasks(&config);
let mut tasks_are_aborted = false;

let config = config.clone();
tokio::spawn(async move {
let mut interval = time::interval(Duration::from_secs(Node::ELECTION_RATE_SECS));

let pool = config.repo.get_pool(1).await;
let mut conn = ChaindexingRepo::get_conn(&pool).await;

loop {
let active_nodes = ChaindexingRepo::get_active_nodes(&mut conn).await;
let leader_node = nodes::elect_leader(&active_nodes);

if node.id == leader_node.id {
if tasks_are_aborted {
indexing_tasks = Self::start_indexing_tasks(&config);
tasks_are_aborted = false;
}
} else {
if !tasks_are_aborted {
for task in &indexing_tasks {
task.abort();
}
tasks_are_aborted = true;
}
}

ChaindexingRepo::keep_node_active(&mut conn, &node).await;

interval.tick().await;
}
});

Ok(())
}
async fn setup_for_nodes(client: &ChaindexingRepoRawQueryClient) {
ChaindexingRepo::migrate(client, ChaindexingRepo::create_nodes_migration().to_vec()).await;
}
async fn wait_for_tasks_of_nodes_to_abort() {
time::sleep(Duration::from_secs(Node::ELECTION_RATE_SECS)).await;
}
fn start_indexing_tasks<S: Send + Sync + Clone + Debug + 'static>(
config: &Config<S>,
) -> Vec<task::JoinHandle<()>> {
let event_ingester = EventsIngester::start(config);
let event_handlers = EventHandlers::start(config);

pub async fn setup<S: Sync + Send + Clone>(config: &Config<S>) -> Result<(), ChaindexingError> {
vec![event_ingester, event_handlers]
}
pub async fn setup_for_indexing<'a, S: Sync + Send + Clone>(
config: &Config<S>,
conn: &mut ChaindexingRepoConn<'a>,
client: &ChaindexingRepoRawQueryClient,
) -> Result<(), ChaindexingError> {
let Config {
repo,
contracts,
reset_count,
reset_queries,
..
} = config;

let client = repo.get_raw_query_client().await;
let pool = repo.get_pool(1).await;
let mut conn = ChaindexingRepo::get_conn(&pool).await;

Self::run_migrations_for_resets(&client).await;
Self::maybe_reset(reset_count, reset_queries, contracts, &client, &mut conn).await;
Self::maybe_reset(reset_count, reset_queries, contracts, &client, conn).await;
Self::run_internal_migrations(&client).await;
Self::run_migrations_for_contract_states(&client, contracts).await;

let contract_addresses = contracts.clone().into_iter().flat_map(|c| c.addresses).collect();
ChaindexingRepo::create_contract_addresses(&mut conn, &contract_addresses).await;
ChaindexingRepo::create_contract_addresses(conn, &contract_addresses).await;

Ok(())
}

pub async fn maybe_reset<'a, S: Send + Sync + Clone>(
reset_count: &u8,
reset_queries: &Vec<String>,
Expand Down
36 changes: 36 additions & 0 deletions chaindexing/src/nodes.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
use diesel::{prelude::Insertable, Queryable};
use serde::Deserialize;

use crate::diesels::schema::chaindexing_nodes;

#[derive(Debug, Deserialize, Clone, PartialEq, Eq, Insertable, Queryable)]
#[diesel(table_name = chaindexing_nodes)]
pub struct Node {
pub id: i32,
last_active_at: i64,
inserted_at: i64,
}

impl Node {
pub const ELECTION_RATE_SECS: u64 = 5;

pub fn get_min_active_at() -> i64 {
let now = chrono::Utc::now().timestamp();

// Not active if not kept active at least 2 elections away
now - (Node::ELECTION_RATE_SECS * 2) as i64
}
}

pub fn elect_leader(nodes: &Vec<Node>) -> Node {
let mut nodes_iter = nodes.iter();
let mut leader: Option<&Node> = nodes_iter.next();

while let Some(node) = nodes_iter.next() {
if node.inserted_at > leader.unwrap().inserted_at {
leader = Some(node);
}
}

leader.unwrap().clone()
}
32 changes: 32 additions & 0 deletions chaindexing/src/repos/postgres_repo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ mod raw_queries;
use crate::{
contracts::{ContractAddress, ContractAddressID, UnsavedContractAddress},
events::Event,
nodes::Node,
ReorgedBlock, ResetCount, Streamable, UnsavedReorgedBlock,
};
use diesel_async::RunQueryDsl;
Expand Down Expand Up @@ -211,6 +212,37 @@ impl Repo for PostgresRepo {

chaindexing_reset_counts.load(conn).await.unwrap()
}

async fn create_node<'a>(conn: &mut Self::Conn<'a>) -> Node {
use crate::diesels::schema::chaindexing_nodes::dsl::*;

diesel::insert_into(chaindexing_nodes)
.default_values()
.get_result(conn)
.await
.unwrap()
}
async fn get_active_nodes<'a>(conn: &mut Self::Conn<'a>) -> Vec<Node> {
use crate::diesels::schema::chaindexing_nodes::dsl::*;

chaindexing_nodes
.filter(last_active_at.gt(Node::get_min_active_at()))
.load(conn)
.await
.unwrap()
}
async fn keep_node_active<'a>(conn: &mut Self::Conn<'a>, node: &Node) {
use crate::diesels::schema::chaindexing_nodes::dsl::*;

let now = chrono::offset::Utc::now().timestamp();

diesel::update(chaindexing_nodes)
.filter(id.eq(node.id))
.set(last_active_at.eq(now))
.execute(conn)
.await
.unwrap();
}
}

impl Streamable for PostgresRepo {
Expand Down
4 changes: 4 additions & 0 deletions chaindexing/src/repos/postgres_repo/migrations.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
use crate::{Migratable, PostgresRepo, RepoMigrations, SQLikeMigrations};

impl RepoMigrations for PostgresRepo {
fn create_nodes_migration() -> &'static [&'static str] {
SQLikeMigrations::create_nodes()
}

fn create_contract_addresses_migration() -> &'static [&'static str] {
SQLikeMigrations::create_contract_addresses()
}
Expand Down
14 changes: 14 additions & 0 deletions chaindexing/src/repos/repo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use tokio::sync::Mutex;
use crate::{
contracts::{ContractAddressID, UnsavedContractAddress},
events::{Event, PartialEvent},
nodes::Node,
ContractAddress, ReorgedBlock, ResetCount, UnsavedReorgedBlock,
};

Expand Down Expand Up @@ -74,6 +75,10 @@ pub trait Repo:

async fn create_reset_count<'a>(conn: &mut Self::Conn<'a>);
async fn get_reset_counts<'a>(conn: &mut Self::Conn<'a>) -> Vec<ResetCount>;

async fn create_node<'a>(conn: &mut Self::Conn<'a>) -> Node;
async fn get_active_nodes<'a>(conn: &mut Self::Conn<'a>) -> Vec<Node>;
async fn keep_node_active<'a>(conn: &mut Self::Conn<'a>, node: &Node);
}

#[async_trait::async_trait]
Expand Down Expand Up @@ -147,6 +152,7 @@ pub trait Streamable {
}

pub trait RepoMigrations: Migratable {
fn create_nodes_migration() -> &'static [&'static str];
fn create_contract_addresses_migration() -> &'static [&'static str];
fn drop_contract_addresses_migration() -> &'static [&'static str];
fn create_events_migration() -> &'static [&'static str];
Expand Down Expand Up @@ -189,6 +195,14 @@ pub trait Migratable: ExecutesWithRawQuery + Sync + Send {
pub struct SQLikeMigrations;

impl SQLikeMigrations {
pub fn create_nodes() -> &'static [&'static str] {
&["CREATE TABLE IF NOT EXISTS chaindexing_nodes (
id SERIAL PRIMARY KEY,
last_active_at BIGINT DEFAULT EXTRACT(EPOCH FROM CURRENT_TIMESTAMP)::BIGINT,
inserted_at BIGINT DEFAULT EXTRACT(EPOCH FROM CURRENT_TIMESTAMP)::BIGINT
)"]
}

pub fn create_contract_addresses() -> &'static [&'static str] {
&[
"CREATE TABLE IF NOT EXISTS chaindexing_contract_addresses (
Expand Down

0 comments on commit e454b7e

Please sign in to comment.