Skip to content

Commit

Permalink
Speed up snapshotting (helius-labs#202)
Browse files Browse the repository at this point in the history
* Speed up backfilling

* Speed up snapshotting
  • Loading branch information
pmantica11 authored Sep 13, 2024
1 parent 351cb96 commit 0fa69d6
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 25 deletions.
31 changes: 18 additions & 13 deletions src/ingester/indexer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use crate::{

use super::typedefs::block_info::BlockInfo;
const POST_BACKFILL_FREQUENCY: u64 = 100;
const PRE_BACKFILL_FREQUENCY: u64 = 10;
const PRE_BACKFILL_FREQUENCY: u64 = 1000;

#[derive(FromQueryResult)]
pub struct OptionalContextModel {
Expand Down Expand Up @@ -54,9 +54,10 @@ pub async fn index_block_stream(
db: Arc<DatabaseConnection>,
rpc_client: &RpcClient,
last_indexed_slot_at_start: u64,
end_slot: Option<u64>,
) {
pin_mut!(block_stream);
let current_slot = fetch_current_slot_with_infinite_retry(rpc_client).await;
let current_slot = end_slot.unwrap_or(fetch_current_slot_with_infinite_retry(rpc_client).await);
let number_of_blocks_to_backfill = current_slot - last_indexed_slot_at_start;
info!(
"Backfilling historical blocks. Current number of blocks to backfill: {}",
Expand All @@ -65,25 +66,29 @@ pub async fn index_block_stream(
let mut last_indexed_slot = last_indexed_slot_at_start;

let mut finished_backfill = false;
let mut last_num_blocks_indexed = 0;

while let Some(blocks) = block_stream.next().await {
let slot_indexed = blocks.last().unwrap().metadata.slot;
index_block_batch_with_infinite_retries(db.as_ref(), blocks).await;


if !finished_backfill {
let blocks_indexed = slot_indexed - last_indexed_slot_at_start;
if blocks_indexed <= number_of_blocks_to_backfill {
if blocks_indexed % PRE_BACKFILL_FREQUENCY == 0 {
info!(
"Backfilled {} / {} blocks",
blocks_indexed, number_of_blocks_to_backfill
);
let blocks_indexed_in_batch = slot_indexed - last_indexed_slot_at_start;
for blocks_indexed in last_num_blocks_indexed..blocks_indexed_in_batch {
if blocks_indexed <= number_of_blocks_to_backfill {
if blocks_indexed % PRE_BACKFILL_FREQUENCY == 0 {
info!(
"Backfilled {} / {} blocks",
blocks_indexed, number_of_blocks_to_backfill
);
}
} else {
finished_backfill = true;
info!("Finished backfilling historical blocks!");
}
} else {
finished_backfill = true;
info!("Finished backfilling historical blocks!");
}

last_num_blocks_indexed = blocks_indexed_in_batch
} else {
for slot in last_indexed_slot..slot_indexed {
if slot % POST_BACKFILL_FREQUENCY == 0 {
Expand Down
2 changes: 1 addition & 1 deletion src/ingester/persist/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ pub mod persisted_state_tree;
const COMPRESSED_TOKEN_PROGRAM: Pubkey = pubkey!("cTokenmWW8bLPjZEBAUgYy3zKxQZW6VKi7bqNFEVv3m");
const TREE_HEIGHT: u32 = 27;
// To avoid exceeding the 64k total parameter limit
pub const MAX_SQL_INSERTS: usize = 5000;
pub const MAX_SQL_INSERTS: usize = 500;

pub async fn persist_state_update(
txn: &DatabaseTransaction,
Expand Down
31 changes: 22 additions & 9 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use std::fs::File;

use async_std::stream::StreamExt;
use async_stream::stream;
use clap::Parser;
use futures::{pin_mut, stream};
use futures::pin_mut;
use jsonrpsee::server::ServerHandle;
use log::{error, info};
use photon_indexer::api::{self, api::PhotonApi};
Expand Down Expand Up @@ -180,7 +181,14 @@ fn continously_index_new_blocks(
) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
let block_stream = block_stream_config.load_block_stream();
index_block_stream(block_stream, db, &rpc_client.client, last_indexed_slot).await;
index_block_stream(
block_stream,
db,
&rpc_client.client,
last_indexed_slot,
None,
)
.await;
})
}

Expand All @@ -199,26 +207,31 @@ async fn main() {

if let Some(snapshot_dir) = args.snapshot_dir {
let directory_adapter = Arc::new(DirectoryAdapter::from_local_directory(snapshot_dir));
if !get_snapshot_files_with_metadata(&directory_adapter.clone())
let snapshot_files = get_snapshot_files_with_metadata(&directory_adapter)
.await
.unwrap()
.is_empty()
{
.unwrap();
if !snapshot_files.is_empty() {
info!("Detected snapshot files. Loading snapshot...");
let last_slot = snapshot_files.last().unwrap().end_slot;
let block_stream =
load_block_stream_from_directory_adapter(directory_adapter.clone()).await;
pin_mut!(block_stream);
let first_blocks = block_stream.next().await.unwrap();
let slot = first_blocks.last().unwrap().metadata.slot;
let last_indexed_slot = first_blocks.first().unwrap().metadata.parent_slot;
let block_stream = stream! {
yield first_blocks;
while let Some(blocks) = block_stream.next().await {
yield blocks;
}
};
index_block_stream(
stream::iter(vec![first_blocks].into_iter()),
block_stream,
db_conn.clone(),
&rpc_client.client,
last_indexed_slot,
Some(last_slot),
)
.await;
index_block_stream(block_stream, db_conn.clone(), &rpc_client.client, slot).await;
}
}

Expand Down
26 changes: 24 additions & 2 deletions src/snapshot/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ pub mod s3_utils;

pub const MEGABYTE: usize = 1024 * 1024;
pub const CHUNK_SIZE: usize = 100 * 1024 * 1024;
// Up to 50 MB
pub const TRANSACTIONS_TO_ACCUMULATE: usize = 5000;

const SNAPSHOT_VERSION: u8 = 1;

Expand Down Expand Up @@ -583,25 +585,45 @@ pub async fn load_block_stream_from_directory_adapter(

let mut reader = Vec::new();
let mut index = 0;
let mut accumulated_blocks = Vec::new();
let mut accumulated_transactions = 0;

while let Some(bytes) = byte_stream.next().await {
let bytes = bytes.unwrap();
reader.extend(&bytes);
while reader.len() - index > CHUNK_SIZE {
let block: BlockInfo = bincode::deserialize(&reader[index..]).unwrap();
let size = bincode::serialized_size(&block).unwrap() as usize;
index += size;
yield vec![block];
accumulated_transactions += block.transactions.len();
accumulated_blocks.push(block);
if accumulated_transactions >= TRANSACTIONS_TO_ACCUMULATE {
yield accumulated_blocks;
accumulated_blocks = Vec::new();
accumulated_transactions = 0;
}
}
if index > 0 {
reader.drain(..index);
index = 0;
}
}

while index < reader.len() {
let block: BlockInfo = bincode::deserialize(&reader[index..]).unwrap();
let size = bincode::serialized_size(&block).unwrap() as usize;
index += size;
yield vec![block];
accumulated_transactions += block.transactions.len();
accumulated_blocks.push(block);
if accumulated_transactions >= TRANSACTIONS_TO_ACCUMULATE {
yield accumulated_blocks;
accumulated_blocks = Vec::new();
accumulated_transactions = 0;
}
}

if !accumulated_blocks.is_empty() {
yield accumulated_blocks;
}
}
}
Expand Down

0 comments on commit 0fa69d6

Please sign in to comment.