Skip to content

Commit

Permalink
store: add a BlockStore::allocate_chain helper method
Browse files Browse the repository at this point in the history
  • Loading branch information
incrypto32 committed Mar 7, 2024
1 parent efe2123 commit cf5997d
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 45 deletions.
24 changes: 5 additions & 19 deletions node/src/manager/commands/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,10 @@ use graph_store_postgres::BlockStore;
use graph_store_postgres::ChainStatus;
use graph_store_postgres::ChainStore;
use graph_store_postgres::Shard;
use graph_store_postgres::Storage;
use graph_store_postgres::{
command_support::catalog::block_store, connection_pool::ConnectionPool,
};

#[derive(QueryableByName, Debug)]
struct ChainIdSeq {
#[sql_type = "diesel::sql_types::BigInt"]
last_value: i64,
}

pub async fn list(primary: ConnectionPool, store: Arc<BlockStore>) -> Result<(), Error> {
let mut chains = {
let conn = primary.get()?;
Expand Down Expand Up @@ -185,18 +178,9 @@ pub fn change_block_cache_shard(
let ident = chain_store.chain_identifier.clone();
let shard = Shard::new(shard.to_string())?;

// Fetch the current last_value from the sequence
let result = sql_query("SELECT last_value FROM chains_id_seq").get_result::<ChainIdSeq>(&conn)?;

let last_val = result.last_value;
let chain = BlockStore::allocate_chain(&conn, &chain_name, &shard, &ident)?;

let next_val = last_val + 1;
let namespace = format!("chain{}", next_val);
let storage= Storage::new(namespace.to_string()).map_err(|e| {
anyhow!("Failed to create storage: {}", e)
})?;

store.add_chain_store_inner(&chain_name, &shard, &ident, storage,ChainStatus::Ingestible, true)?;
store.add_chain_store(&chain,ChainStatus::Ingestible, true)?;

// Drop the foreign key constraint on deployment_schemas
sql_query(
Expand All @@ -206,7 +190,7 @@ pub fn change_block_cache_shard(

// Update the current chain name to chain-old
update_chain_name(&conn, &chain_name, &new_name)?;
chain_store.update_name(&new_name)?;


// Create a new chain with the name in the destination shard
let _= add_chain(&conn, &chain_name, &ident, &shard)?;
Expand All @@ -219,6 +203,8 @@ pub fn change_block_cache_shard(
Ok(())
})?;

chain_store.update_name(&new_name)?;

println!(
"Changed block cache shard for {} from {} to {}",
chain_name, old_shard, shard
Expand Down
76 changes: 50 additions & 26 deletions store/postgres/src/block_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@ use std::{
time::Duration,
};

use anyhow::anyhow;
use diesel::{
r2d2::{ConnectionManager, PooledConnection},
sql_query, PgConnection, RunQueryDsl,
};
use graph::{
blockchain::ChainIdentifier,
components::store::{BlockStore as BlockStoreTrait, QueryPermit},
Expand All @@ -21,6 +26,8 @@ use crate::{
ChainStore, NotificationSender, Shard, PRIMARY_SHARD,
};

use self::primary::Chain;

#[cfg(debug_assertions)]
pub const FAKE_NETWORK_SHARED: &str = "fake_network_shared";

Expand Down Expand Up @@ -328,30 +335,63 @@ impl BlockStore {
.expect("the primary is never disabled")
}

pub fn add_chain_store_inner(
&self,
pub fn allocate_chain(
conn: &PooledConnection<ConnectionManager<PgConnection>>,
name: &String,
shard: &Shard,
ident: &ChainIdentifier,
storage: Storage,
) -> Result<Chain, StoreError> {
#[derive(QueryableByName, Debug)]
struct ChainIdSeq {
#[sql_type = "diesel::sql_types::BigInt"]
last_value: i64,
}

// Fetch the current last_value from the sequence
let result =
sql_query("SELECT last_value FROM chains_id_seq").get_result::<ChainIdSeq>(conn)?;

let last_val = result.last_value;

let next_val = last_val + 1;
let namespace = format!("chain{}", next_val);
let storage =
Storage::new(namespace.to_string()).map_err(|e| StoreError::Unknown(anyhow!(e)))?;

let chain = Chain {
id: next_val as i32,
name: name.clone(),
shard: shard.clone(),
net_version: ident.net_version.clone(),
genesis_block: ident.genesis_block_hash.hash_hex(),
storage: storage.clone(),
};

Ok(chain)
}

pub fn add_chain_store(
&self,
chain: &primary::Chain,
status: ChainStatus,
create: bool,
) -> Result<Arc<ChainStore>, StoreError> {
let pool = self
.pools
.get(shard)
.ok_or_else(|| constraint_violation!("there is no pool for shard {}", shard))?
.get(&chain.shard)
.ok_or_else(|| constraint_violation!("there is no pool for shard {}", chain.shard))?
.clone();
let sender = ChainHeadUpdateSender::new(
self.mirror.primary().clone(),
name.clone(),
chain.name.clone(),
self.sender.clone(),
);
let logger = self.logger.new(o!("network" => name.clone()));
let ident = chain.network_identifier()?;
let logger = self.logger.new(o!("network" => chain.name.clone()));
let store = ChainStore::new(
logger,
name.clone(),
storage.clone(),
chain.name.clone(),
chain.storage.clone(),
&ident,
status,
sender,
Expand All @@ -366,26 +406,10 @@ impl BlockStore {
self.stores
.write()
.unwrap()
.insert(name.clone(), store.clone());
.insert(chain.name.clone(), store.clone());
Ok(store)
}

pub fn add_chain_store(
&self,
chain: &primary::Chain,
status: ChainStatus,
create: bool,
) -> Result<Arc<ChainStore>, StoreError> {
self.add_chain_store_inner(
&chain.name,
&chain.shard,
&chain.network_identifier()?,
chain.storage.clone(),
status,
create,
)
}

/// Return a map from network name to the network's chain head pointer.
/// The information is cached briefly since this method is used heavily
/// by the indexing status API
Expand Down

0 comments on commit cf5997d

Please sign in to comment.