Skip to content

Commit

Permalink
[bridge indexer] - bridge indexer e2e test (#19880)
Browse files Browse the repository at this point in the history
## Description 

e2e test using bridge test cluster and local database

## Test plan 

How did you test the new or updated feature?

---

## Release notes

Check each box that your changes affect. If none of the boxes relate to
your changes, release notes aren't required.

For each box you select, include information after the relevant heading
that describes the impact of your changes that a user might notice and
any actions they must take to implement updates.

- [ ] Protocol: 
- [ ] Nodes (Validators and Full nodes): 
- [ ] Indexer: 
- [ ] JSON-RPC: 
- [ ] GraphQL: 
- [ ] CLI: 
- [ ] Rust SDK:
- [ ] REST API:
  • Loading branch information
patrickkuo authored Oct 22, 2024
1 parent f6b1300 commit d82ec1f
Show file tree
Hide file tree
Showing 16 changed files with 752 additions and 486 deletions.
4 changes: 3 additions & 1 deletion .github/workflows/bridge.yml
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,11 @@ jobs:
working-directory: bridge/evm
run: |
forge soldeer update
- name: Add postgres to PATH
run: echo "/usr/lib/postgresql/14/bin" >> $GITHUB_PATH
- name: cargo test
run: |
cargo nextest run --profile ci -E 'package(sui-bridge)'
cargo nextest run --profile ci -E 'package(sui-bridge) | package(sui-bridge-indexer)'
# Ensure there are no uncommitted changes in the repo after running tests
- run: scripts/changed-files.sh
shell: bash
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ jobs:
swap-size-gb: 256
- name: cargo test
run: |
cargo nextest run --profile ci -E '!package(sui-bridge)'
cargo nextest run --profile ci -E '!package(sui-bridge) and !package(sui-bridge-indexer)'
# Ensure there are no uncommitted changes in the repo after running tests
- run: scripts/changed-files.sh
shell: bash
Expand Down
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions crates/sui-bridge-indexer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ sui-types = { workspace = true, features = ["test-utils"] }
sui-test-transaction-builder.workspace = true
test-cluster.workspace = true
hex-literal = "0.3.4"
sui-indexer.workspace = true
diesel_migrations = "2.2.0"
sui-indexer-builder = { workspace = true, features = ["test-utils"] }
sui-bridge = { workspace = true, features = ["test-utils"] }

[[bin]]
name = "bridge-indexer"
Expand Down
174 changes: 169 additions & 5 deletions crates/sui-bridge-indexer/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,35 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use std::fmt::{Display, Formatter};
use strum_macros::Display;

use sui_types::base_types::{SuiAddress, TransactionDigest};

use crate::config::IndexerConfig;
use crate::eth_bridge_indexer::{
EthDataMapper, EthFinalizedSyncDatasource, EthSubscriptionDatasource,
};
use crate::metrics::BridgeIndexerMetrics;
use crate::models::GovernanceAction as DBGovernanceAction;
use crate::models::TokenTransferData as DBTokenTransferData;
use crate::models::{SuiErrorTransactions, TokenTransfer as DBTokenTransfer};
use crate::postgres_manager::PgPool;
use crate::storage::PgBridgePersistent;
use crate::sui_bridge_indexer::SuiBridgeDataMapper;
use crate::sui_datasource::SuiCheckpointDatasource;
use ethers::providers::{Http, Provider};
use ethers::types::Address as EthAddress;
use std::fmt::{Display, Formatter};
use std::str::FromStr;
use std::sync::Arc;
use strum_macros::Display;
use sui_bridge::eth_client::EthClient;
use sui_bridge::metered_eth_provider::MeteredEthHttpProvier;
use sui_bridge::metrics::BridgeMetrics;
use sui_bridge::utils::get_eth_contract_addresses;
use sui_data_ingestion_core::DataIngestionMetrics;
use sui_indexer_builder::indexer_builder::{BackfillStrategy, Datasource, Indexer, IndexerBuilder};
use sui_indexer_builder::progress::{
OutOfOrderSaveAfterDurationPolicy, ProgressSavingPolicy, SaveAfterDurationPolicy,
};
use sui_sdk::SuiClientBuilder;
use sui_types::base_types::{SuiAddress, TransactionDigest};

pub mod config;
pub mod metrics;
Expand Down Expand Up @@ -179,3 +200,146 @@ impl Display for BridgeDataSource {
write!(f, "{str}")
}
}

pub async fn create_sui_indexer(
pool: PgPool,
metrics: BridgeIndexerMetrics,
ingestion_metrics: DataIngestionMetrics,
config: &IndexerConfig,
) -> anyhow::Result<
Indexer<PgBridgePersistent, SuiCheckpointDatasource, SuiBridgeDataMapper>,
anyhow::Error,
> {
let datastore_with_out_of_order_source = PgBridgePersistent::new(
pool,
ProgressSavingPolicy::OutOfOrderSaveAfterDuration(OutOfOrderSaveAfterDurationPolicy::new(
tokio::time::Duration::from_secs(30),
)),
);

let sui_client = Arc::new(
SuiClientBuilder::default()
.build(config.sui_rpc_url.clone())
.await?,
);

let sui_checkpoint_datasource = SuiCheckpointDatasource::new(
config.remote_store_url.clone(),
sui_client,
config.concurrency as usize,
config
.checkpoints_path
.clone()
.map(|p| p.into())
.unwrap_or(tempfile::tempdir()?.into_path()),
config.sui_bridge_genesis_checkpoint,
ingestion_metrics,
metrics.clone(),
);

Ok(IndexerBuilder::new(
"SuiBridgeIndexer",
sui_checkpoint_datasource,
SuiBridgeDataMapper { metrics },
datastore_with_out_of_order_source,
)
.build())
}

pub async fn create_eth_sync_indexer(
pool: PgPool,
metrics: BridgeIndexerMetrics,
bridge_metrics: Arc<BridgeMetrics>,
config: &IndexerConfig,
eth_client: Arc<EthClient<MeteredEthHttpProvier>>,
) -> Result<Indexer<PgBridgePersistent, EthFinalizedSyncDatasource, EthDataMapper>, anyhow::Error> {
let bridge_addresses = get_eth_bridge_contract_addresses(config).await?;
// Start the eth sync data source
let eth_sync_datasource = EthFinalizedSyncDatasource::new(
bridge_addresses,
eth_client.clone(),
config.eth_rpc_url.clone(),
metrics.clone(),
bridge_metrics.clone(),
config.eth_bridge_genesis_block,
)
.await?;
Ok(create_eth_indexer_builder(
pool,
metrics,
eth_sync_datasource,
"EthBridgeFinalizedSyncIndexer",
)
.await?
.with_backfill_strategy(BackfillStrategy::Partitioned { task_size: 1000 })
.build())
}

pub async fn create_eth_subscription_indexer(
pool: PgPool,
metrics: BridgeIndexerMetrics,
config: &IndexerConfig,
eth_client: Arc<EthClient<MeteredEthHttpProvier>>,
) -> Result<Indexer<PgBridgePersistent, EthSubscriptionDatasource, EthDataMapper>, anyhow::Error> {
// Start the eth subscription indexer
let bridge_addresses = get_eth_bridge_contract_addresses(config).await?;
// Start the eth subscription indexer
let eth_subscription_datasource = EthSubscriptionDatasource::new(
bridge_addresses.clone(),
eth_client.clone(),
config.eth_ws_url.clone(),
metrics.clone(),
config.eth_bridge_genesis_block,
)
.await?;

Ok(create_eth_indexer_builder(
pool,
metrics,
eth_subscription_datasource,
"EthBridgeSubscriptionIndexer",
)
.await?
.with_backfill_strategy(BackfillStrategy::Disabled)
.build())
}

async fn create_eth_indexer_builder<T: Send, D: Datasource<T>>(
pool: PgPool,
metrics: BridgeIndexerMetrics,
datasource: D,
indexer_name: &str,
) -> Result<IndexerBuilder<D, EthDataMapper, PgBridgePersistent>, anyhow::Error> {
let datastore = PgBridgePersistent::new(
pool,
ProgressSavingPolicy::SaveAfterDuration(SaveAfterDurationPolicy::new(
tokio::time::Duration::from_secs(30),
)),
);

// Start the eth subscription indexer
Ok(IndexerBuilder::new(
indexer_name,
datasource,
EthDataMapper { metrics },
datastore.clone(),
))
}

async fn get_eth_bridge_contract_addresses(
config: &IndexerConfig,
) -> Result<Vec<EthAddress>, anyhow::Error> {
let bridge_address = EthAddress::from_str(&config.eth_sui_bridge_contract_address)?;
let provider = Arc::new(
Provider::<Http>::try_from(&config.eth_rpc_url)?
.interval(std::time::Duration::from_millis(2000)),
);
let bridge_addresses = get_eth_contract_addresses(bridge_address, &provider).await?;
Ok(vec![
bridge_address,
bridge_addresses.0,
bridge_addresses.1,
bridge_addresses.2,
bridge_addresses.3,
])
}
109 changes: 13 additions & 96 deletions crates/sui-bridge-indexer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

use anyhow::Result;
use clap::*;
use ethers::providers::{Http, Provider};
use ethers::types::Address as EthAddress;
use prometheus::Registry;
use std::collections::HashSet;
Expand All @@ -18,8 +17,6 @@ use sui_bridge::metered_eth_provider::{new_metered_eth_provider, MeteredEthHttpP
use sui_bridge::sui_bridge_watchdog::Observable;
use sui_bridge::sui_client::SuiBridgeClient;
use sui_bridge::utils::get_eth_contract_addresses;
use sui_bridge_indexer::eth_bridge_indexer::EthFinalizedSyncDatasource;
use sui_bridge_indexer::eth_bridge_indexer::EthSubscriptionDatasource;
use sui_config::Config;
use tokio::task::JoinHandle;
use tracing::info;
Expand All @@ -34,19 +31,14 @@ use sui_bridge::sui_bridge_watchdog::{
metrics::WatchdogMetrics, sui_bridge_status::SuiBridgeStatus, BridgeWatchDog,
};
use sui_bridge_indexer::config::IndexerConfig;
use sui_bridge_indexer::eth_bridge_indexer::EthDataMapper;
use sui_bridge_indexer::metrics::BridgeIndexerMetrics;
use sui_bridge_indexer::postgres_manager::{get_connection_pool, read_sui_progress_store};
use sui_bridge_indexer::storage::PgBridgePersistent;
use sui_bridge_indexer::sui_bridge_indexer::SuiBridgeDataMapper;
use sui_bridge_indexer::sui_datasource::SuiCheckpointDatasource;
use sui_bridge_indexer::sui_transaction_handler::handle_sui_transactions_loop;
use sui_bridge_indexer::sui_transaction_queries::start_sui_tx_polling_task;
use sui_data_ingestion_core::DataIngestionMetrics;
use sui_indexer_builder::indexer_builder::{BackfillStrategy, IndexerBuilder};
use sui_indexer_builder::progress::{
OutOfOrderSaveAfterDurationPolicy, ProgressSavingPolicy, SaveAfterDurationPolicy,
use sui_bridge_indexer::{
create_eth_subscription_indexer, create_eth_sync_indexer, create_sui_indexer,
};
use sui_data_ingestion_core::DataIngestionMetrics;
use sui_sdk::SuiClientBuilder;

#[derive(Parser, Clone, Debug)]
Expand Down Expand Up @@ -87,18 +79,7 @@ async fn main() -> Result<()> {
let bridge_metrics = Arc::new(BridgeMetrics::new(&registry));

let db_url = config.db_url.clone();
let datastore = PgBridgePersistent::new(
get_connection_pool(db_url.clone()).await,
ProgressSavingPolicy::SaveAfterDuration(SaveAfterDurationPolicy::new(
tokio::time::Duration::from_secs(30),
)),
);
let datastore_with_out_of_order_source = PgBridgePersistent::new(
get_connection_pool(db_url.clone()).await,
ProgressSavingPolicy::OutOfOrderSaveAfterDuration(OutOfOrderSaveAfterDurationPolicy::new(
tokio::time::Duration::from_secs(30),
)),
);
let pool = get_connection_pool(db_url.clone()).await;

let eth_client: Arc<EthClient<MeteredEthHttpProvier>> = Arc::new(
EthClient::<MeteredEthHttpProvier>::new(
Expand All @@ -111,93 +92,29 @@ async fn main() -> Result<()> {
let eth_bridge_proxy_address = EthAddress::from_str(&config.eth_sui_bridge_contract_address)?;
let mut tasks = vec![];
// Start the eth subscription indexer
let bridge_address = EthAddress::from_str(&config.eth_sui_bridge_contract_address)?;
let provider = Arc::new(
Provider::<Http>::try_from(&config.eth_rpc_url)?
.interval(std::time::Duration::from_millis(2000)),
);
let bridge_addresses = get_eth_contract_addresses(bridge_address, &provider).await?;
let bridge_addresses: Vec<EthAddress> = vec![
bridge_address,
bridge_addresses.0,
bridge_addresses.1,
bridge_addresses.2,
bridge_addresses.3,
];

// Start the eth subscription indexer
let eth_subscription_datasource = EthSubscriptionDatasource::new(
bridge_addresses.clone(),
eth_client.clone(),
config.eth_ws_url.clone(),
let eth_subscription_indexer = create_eth_subscription_indexer(
pool.clone(),
indexer_meterics.clone(),
config.eth_bridge_genesis_block,
&config,
eth_client.clone(),
)
.await?;
let eth_subscription_indexer = IndexerBuilder::new(
"EthBridgeSubscriptionIndexer",
eth_subscription_datasource,
EthDataMapper {
metrics: indexer_meterics.clone(),
},
datastore.clone(),
)
.with_backfill_strategy(BackfillStrategy::Disabled)
.build();
tasks.push(spawn_logged_monitored_task!(
eth_subscription_indexer.start()
));

// Start the eth sync data source
let eth_sync_datasource = EthFinalizedSyncDatasource::new(
bridge_addresses.clone(),
eth_client.clone(),
config.eth_rpc_url.clone(),
let eth_sync_indexer = create_eth_sync_indexer(
pool.clone(),
indexer_meterics.clone(),
bridge_metrics.clone(),
config.eth_bridge_genesis_block,
&config,
eth_client,
)
.await?;

let eth_sync_indexer = IndexerBuilder::new(
"EthBridgeFinalizedSyncIndexer",
eth_sync_datasource,
EthDataMapper {
metrics: indexer_meterics.clone(),
},
datastore,
)
.with_backfill_strategy(BackfillStrategy::Partitioned { task_size: 1000 })
.build();
tasks.push(spawn_logged_monitored_task!(eth_sync_indexer.start()));

let sui_client = Arc::new(
SuiClientBuilder::default()
.build(config.sui_rpc_url.clone())
.await?,
);
let sui_checkpoint_datasource = SuiCheckpointDatasource::new(
config.remote_store_url.clone(),
sui_client,
config.concurrency as usize,
config
.checkpoints_path
.clone()
.map(|p| p.into())
.unwrap_or(tempfile::tempdir()?.into_path()),
config.sui_bridge_genesis_checkpoint,
ingestion_metrics.clone(),
indexer_meterics.clone(),
);
let indexer = IndexerBuilder::new(
"SuiBridgeIndexer",
sui_checkpoint_datasource,
SuiBridgeDataMapper {
metrics: indexer_meterics.clone(),
},
datastore_with_out_of_order_source,
)
.build();
let indexer = create_sui_indexer(pool, indexer_meterics, ingestion_metrics, &config).await?;
tasks.push(spawn_logged_monitored_task!(indexer.start()));

let sui_bridge_client =
Expand Down
Loading

0 comments on commit d82ec1f

Please sign in to comment.