diff --git a/node/src/manager/commands/chain.rs b/node/src/manager/commands/chain.rs index 2ebe75e0778..41d2492a52a 100644 --- a/node/src/manager/commands/chain.rs +++ b/node/src/manager/commands/chain.rs @@ -21,17 +21,10 @@ use graph_store_postgres::BlockStore; use graph_store_postgres::ChainStatus; use graph_store_postgres::ChainStore; use graph_store_postgres::Shard; -use graph_store_postgres::Storage; use graph_store_postgres::{ command_support::catalog::block_store, connection_pool::ConnectionPool, }; -#[derive(QueryableByName, Debug)] -struct ChainIdSeq { - #[sql_type = "diesel::sql_types::BigInt"] - last_value: i64, -} - pub async fn list(primary: ConnectionPool, store: Arc) -> Result<(), Error> { let mut chains = { let conn = primary.get()?; @@ -185,18 +178,9 @@ pub fn change_block_cache_shard( let ident = chain_store.chain_identifier.clone(); let shard = Shard::new(shard.to_string())?; - // Fetch the current last_value from the sequence - let result = sql_query("SELECT last_value FROM chains_id_seq").get_result::(&conn)?; - - let last_val = result.last_value; + let chain = BlockStore::allocate_chain(&conn, &chain_name, &shard, &ident)?; - let next_val = last_val + 1; - let namespace = format!("chain{}", next_val); - let storage= Storage::new(namespace.to_string()).map_err(|e| { - anyhow!("Failed to create storage: {}", e) - })?; - - store.add_chain_store_inner(&chain_name, &shard, &ident, storage,ChainStatus::Ingestible, true)?; + store.add_chain_store(&chain,ChainStatus::Ingestible, true)?; // Drop the foreign key constraint on deployment_schemas sql_query( @@ -206,7 +190,7 @@ pub fn change_block_cache_shard( // Update the current chain name to chain-old update_chain_name(&conn, &chain_name, &new_name)?; - chain_store.update_name(&new_name)?; + // Create a new chain with the name in the destination shard let _= add_chain(&conn, &chain_name, &ident, &shard)?; @@ -219,6 +203,8 @@ pub fn change_block_cache_shard( Ok(()) })?; + chain_store.update_name(&new_name)?; + println!( "Changed block cache shard for {} from {} to {}", chain_name, old_shard, shard diff --git a/store/postgres/src/block_store.rs b/store/postgres/src/block_store.rs index ea74573dea6..d4ef451ff32 100644 --- a/store/postgres/src/block_store.rs +++ b/store/postgres/src/block_store.rs @@ -4,6 +4,11 @@ use std::{ time::Duration, }; +use anyhow::anyhow; +use diesel::{ + r2d2::{ConnectionManager, PooledConnection}, + sql_query, PgConnection, RunQueryDsl, +}; use graph::{ blockchain::ChainIdentifier, components::store::{BlockStore as BlockStoreTrait, QueryPermit}, @@ -21,6 +26,8 @@ use crate::{ ChainStore, NotificationSender, Shard, PRIMARY_SHARD, }; +use self::primary::Chain; + #[cfg(debug_assertions)] pub const FAKE_NETWORK_SHARED: &str = "fake_network_shared"; @@ -328,30 +335,63 @@ impl BlockStore { .expect("the primary is never disabled") } - pub fn add_chain_store_inner( - &self, + pub fn allocate_chain( + conn: &PooledConnection>, name: &String, shard: &Shard, ident: &ChainIdentifier, - storage: Storage, + ) -> Result { + #[derive(QueryableByName, Debug)] + struct ChainIdSeq { + #[sql_type = "diesel::sql_types::BigInt"] + last_value: i64, + } + + // Fetch the current last_value from the sequence + let result = + sql_query("SELECT last_value FROM chains_id_seq").get_result::(conn)?; + + let last_val = result.last_value; + + let next_val = last_val + 1; + let namespace = format!("chain{}", next_val); + let storage = + Storage::new(namespace.to_string()).map_err(|e| StoreError::Unknown(anyhow!(e)))?; + + let chain = Chain { + id: next_val as i32, + name: name.clone(), + shard: shard.clone(), + net_version: ident.net_version.clone(), + genesis_block: ident.genesis_block_hash.hash_hex(), + storage: storage.clone(), + }; + + Ok(chain) + } + + pub fn add_chain_store( + &self, + chain: &primary::Chain, status: ChainStatus, create: bool, ) -> Result, StoreError> { let pool = self .pools - .get(shard) - .ok_or_else(|| constraint_violation!("there is no pool for shard {}", shard))? + .get(&chain.shard) + .ok_or_else(|| constraint_violation!("there is no pool for shard {}", chain.shard))? .clone(); let sender = ChainHeadUpdateSender::new( self.mirror.primary().clone(), - name.clone(), + chain.name.clone(), self.sender.clone(), ); - let logger = self.logger.new(o!("network" => name.clone())); + let ident = chain.network_identifier()?; + let logger = self.logger.new(o!("network" => chain.name.clone())); let store = ChainStore::new( logger, - name.clone(), - storage.clone(), + chain.name.clone(), + chain.storage.clone(), &ident, status, sender, @@ -366,26 +406,10 @@ impl BlockStore { self.stores .write() .unwrap() - .insert(name.clone(), store.clone()); + .insert(chain.name.clone(), store.clone()); Ok(store) } - pub fn add_chain_store( - &self, - chain: &primary::Chain, - status: ChainStatus, - create: bool, - ) -> Result, StoreError> { - self.add_chain_store_inner( - &chain.name, - &chain.shard, - &chain.network_identifier()?, - chain.storage.clone(), - status, - create, - ) - } - /// Return a map from network name to the network's chain head pointer. /// The information is cached briefly since this method is used heavily /// by the indexing status API