From c1011eeaaadb17e64687c69a759c6b0f4ee1a407 Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Fri, 21 Mar 2025 08:30:14 -0700 Subject: [PATCH 1/7] store: Fix syntax error in 'alter server' --- store/postgres/src/connection_pool.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/store/postgres/src/connection_pool.rs b/store/postgres/src/connection_pool.rs index 374a1adc5ab..5ad9a60c5e1 100644 --- a/store/postgres/src/connection_pool.rs +++ b/store/postgres/src/connection_pool.rs @@ -185,7 +185,7 @@ impl ForeignServer { alter server \"{name}\" options (set host '{remote_host}', \ {set_port} port '{remote_port}', \ - set dbname '{remote_db}, \ + set dbname '{remote_db}', \ {set_fetch_size} fetch_size '{fetch_size}'); alter user mapping for current_user server \"{name}\" From 3bc203201f7aee2f48a05aed44623dd7fc884b87 Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Sat, 22 Mar 2025 09:48:53 -0700 Subject: [PATCH 2/7] store: Analyze tables earlier during copying Analyzing earlier makes it so that Postgres has statistics when rewinding the subgraph --- store/postgres/src/deployment_store.rs | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/store/postgres/src/deployment_store.rs b/store/postgres/src/deployment_store.rs index b196cd3c539..01f705158d3 100644 --- a/store/postgres/src/deployment_store.rs +++ b/store/postgres/src/deployment_store.rs @@ -1553,6 +1553,12 @@ impl DeploymentStore { catalog::copy_account_like(conn, &src.site, &dst.site)?; + // Analyze all tables for this deployment + info!(logger, "Analyzing all {} tables", dst.tables.len()); + for entity_name in dst.tables.keys() { + self.analyze_with_conn(site.cheap_clone(), entity_name.as_str(), conn)?; + } + // Rewind the subgraph so that entity versions that are // clamped in the future (beyond `block`) become valid for // all blocks after `block`. `revert_block` gets rid of @@ -1563,6 +1569,7 @@ impl DeploymentStore { .number .checked_add(1) .expect("block numbers fit into an i32"); + info!(logger, "Rewinding to block {}", block.number); let count = dst.revert_block(conn, block_to_revert)?; deployment::update_entity_count(conn, &dst.site, count)?; @@ -1575,11 +1582,6 @@ impl DeploymentStore { src_deployment.manifest.history_blocks, )?; - // Analyze all tables for this deployment - for entity_name in dst.tables.keys() { - self.analyze_with_conn(site.cheap_clone(), entity_name.as_str(), conn)?; - } - // The `earliest_block` for `src` might have changed while // we did the copy if `src` was pruned while we copied; // adjusting it very late in the copy process ensures that From c6f02fc7ef2d022746c70bc75493593b1e58c9c0 Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Fri, 21 Mar 2025 09:22:50 -0700 Subject: [PATCH 3/7] store: Make sure we always map the right set of foreign tables --- store/postgres/src/catalog.rs | 10 ++++ store/postgres/src/connection_pool.rs | 80 +++++++++++++++++++++------ 2 files changed, 72 insertions(+), 18 deletions(-) diff --git a/store/postgres/src/catalog.rs b/store/postgres/src/catalog.rs index 1524a768acc..ba532dd53ff 100644 --- a/store/postgres/src/catalog.rs +++ b/store/postgres/src/catalog.rs @@ -398,6 +398,16 @@ pub fn drop_foreign_schema(conn: &mut PgConnection, src: &Site) -> Result<(), St Ok(()) } +pub fn foreign_tables(conn: &mut PgConnection, nsp: &str) -> Result, StoreError> { + use foreign_tables as ft; + + ft::table + .filter(ft::foreign_table_schema.eq(nsp)) + .select(ft::foreign_table_name) + .get_results::(conn) + .map_err(StoreError::from) +} + /// Drop the schema `nsp` and all its contents if it exists, and create it /// again so that `nsp` is an empty schema pub fn recreate_schema(conn: &mut PgConnection, nsp: &str) -> Result<(), StoreError> { diff --git a/store/postgres/src/connection_pool.rs b/store/postgres/src/connection_pool.rs index 5ad9a60c5e1..c4ba365cfd7 100644 --- a/store/postgres/src/connection_pool.rs +++ b/store/postgres/src/connection_pool.rs @@ -37,6 +37,11 @@ use crate::primary::{self, NAMESPACE_PUBLIC}; use crate::{advisory_lock, catalog}; use crate::{Shard, PRIMARY_SHARD}; +/// Tables that we map from the primary into `primary_public` in each shard +const PRIMARY_TABLES: [&str; 3] = ["deployment_schemas", "chains", "active_copies"]; + +/// Tables that we map from each shard into each other shard into the +/// `shard__subgraphs` namespace const SHARDED_TABLES: [(&str, &[&str]); 2] = [ ("public", &["ethereum_networks"]), ( @@ -209,7 +214,7 @@ impl ForeignServer { catalog::recreate_schema(conn, Self::PRIMARY_PUBLIC)?; let mut query = String::new(); - for table_name in ["deployment_schemas", "chains", "active_copies"] { + for table_name in PRIMARY_TABLES { let create_stmt = if shard == &*PRIMARY_SHARD { format!( "create view {nsp}.{table_name} as select * from public.{table_name};", @@ -246,6 +251,33 @@ impl ForeignServer { } Ok(conn.batch_execute(&query)?) } + + fn needs_remap(&self, conn: &mut PgConnection) -> Result { + fn different(mut existing: Vec, mut needed: Vec) -> bool { + existing.sort(); + needed.sort(); + existing != needed + } + + if &self.shard == &*PRIMARY_SHARD { + let existing = catalog::foreign_tables(conn, Self::PRIMARY_PUBLIC)?; + let needed = PRIMARY_TABLES + .into_iter() + .map(String::from) + .collect::>(); + if different(existing, needed) { + return Ok(true); + } + } + + let existing = catalog::foreign_tables(conn, &Self::metadata_schema(&self.shard))?; + let needed = SHARDED_TABLES + .iter() + .flat_map(|(_, tables)| *tables) + .map(|table| table.to_string()) + .collect::>(); + Ok(different(existing, needed)) + } } /// How long to keep connections in the `fdw_pool` around before closing @@ -1037,16 +1069,14 @@ impl PoolInner { let result = pool .configure_fdw(coord.servers.as_ref()) .and_then(|()| pool.drop_cross_shard_views()) - .and_then(|()| migrate_schema(&pool.logger, &mut conn)) - .and_then(|count| { - pool.create_cross_shard_views(coord.servers.as_ref()) - .map(|()| count) - }); + .and_then(|()| migrate_schema(&pool.logger, &mut conn)); debug!(&pool.logger, "Release migration lock"); advisory_lock::unlock_migration(&mut conn).unwrap_or_else(|err| { die(&pool.logger, "failed to release migration lock", &err); }); - let result = result.and_then(|count| coord.propagate(&pool, count)); + let result = result + .and_then(|count| coord.propagate(&pool, count)) + .and_then(|()| pool.create_cross_shard_views(coord.servers.as_ref())); result.unwrap_or_else(|err| die(&pool.logger, "migrations failed", &err)); // Locale check @@ -1178,9 +1208,9 @@ impl PoolInner { .await } - // The foreign server `server` had schema changes, and we therefore need - // to remap anything that we are importing via fdw to make sure we are - // using this updated schema + /// The foreign server `server` had schema changes, and we therefore + /// need to remap anything that we are importing via fdw to make sure we + /// are using this updated schema pub fn remap(&self, server: &ForeignServer) -> Result<(), StoreError> { if &server.shard == &*PRIMARY_SHARD { info!(&self.logger, "Mapping primary"); @@ -1198,6 +1228,15 @@ impl PoolInner { } Ok(()) } + + pub fn needs_remap(&self, server: &ForeignServer) -> Result { + if &server.shard == &self.shard { + return Ok(false); + } + + let mut conn = self.get()?; + server.needs_remap(&mut conn) + } } pub const MIGRATIONS: EmbeddedMigrations = embed_migrations!("./migrations"); @@ -1211,10 +1250,6 @@ impl MigrationCount { fn had_migrations(&self) -> bool { self.old != self.new } - - fn is_new(&self) -> bool { - self.old == 0 - } } /// Run all schema migrations. @@ -1334,13 +1369,22 @@ impl PoolCoordinator { /// code that does _not_ hold the migration lock as it will otherwise /// deadlock fn propagate(&self, pool: &PoolInner, count: MigrationCount) -> Result<(), StoreError> { - // pool is a new shard, map all other shards into it - if count.is_new() { - for server in self.servers.iter() { + // We need to remap all these servers into `pool` if the list of + // tables that are mapped have changed from the code of the previous + // version. Since dropping and recreating the foreign table + // definitions can slow the startup of other nodes down because of + // locking, we try to only do this when it is actually needed + for server in self.servers.iter() { + if pool.needs_remap(server)? { pool.remap(server)?; } } - // pool had schema changes, refresh the import from pool into all other shards + + // pool had schema changes, refresh the import from pool into all + // other shards. This makes sure that schema changes to + // already-mapped tables are propagated to all other shards. Since + // we run `propagate` after migrations have been applied to `pool`, + // we can be sure that these mappings use the correct schema if count.had_migrations() { let server = self.server(&pool.shard)?; for pool in self.pools.lock().unwrap().values() { From 5f2ecb72352038ddf60ebe63c8536e7199fad5ad Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Sat, 22 Mar 2025 10:44:25 -0700 Subject: [PATCH 4/7] store: Do not map the subgraph_features table It's only maintained in the primary, and there's no point in mapping it across shards --- store/postgres/src/connection_pool.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/store/postgres/src/connection_pool.rs b/store/postgres/src/connection_pool.rs index c4ba365cfd7..6267a41628a 100644 --- a/store/postgres/src/connection_pool.rs +++ b/store/postgres/src/connection_pool.rs @@ -52,7 +52,6 @@ const SHARDED_TABLES: [(&str, &[&str]); 2] = [ "dynamic_ethereum_contract_data_source", "subgraph_deployment", "subgraph_error", - "subgraph_features", "subgraph_manifest", "table_stats", ], From 701f77d2d39decfef2ec2d91aa5df0cf5abb7c69 Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Sun, 23 Mar 2025 10:46:42 -0700 Subject: [PATCH 5/7] graphman: Annotate failures in 'copy create' with source When creating many copies with a shell script, it is useful to have the deployment we are trying to copy in the error message --- node/src/manager/commands/copy.rs | 34 ++++++++++++++++++++++++++----- 1 file changed, 29 insertions(+), 5 deletions(-) diff --git a/node/src/manager/commands/copy.rs b/node/src/manager/commands/copy.rs index ab007ea319d..d3280823e76 100644 --- a/node/src/manager/commands/copy.rs +++ b/node/src/manager/commands/copy.rs @@ -2,7 +2,7 @@ use diesel::{ExpressionMethods, JoinOnDsl, OptionalExtension, QueryDsl, RunQuery use std::{collections::HashMap, sync::Arc, time::SystemTime}; use graph::{ - components::store::{BlockStore as _, DeploymentId}, + components::store::{BlockStore as _, DeploymentId, DeploymentLocator}, data::query::QueryTarget, prelude::{ anyhow::{anyhow, bail, Error}, @@ -84,10 +84,9 @@ impl CopyState { } } -pub async fn create( +async fn create_inner( store: Arc, - primary: ConnectionPool, - src: DeploymentSearch, + src: &DeploymentLocator, shard: String, shards: Vec, node: String, @@ -104,7 +103,6 @@ pub async fn create( }; let subgraph_store = store.subgraph_store(); - let src = src.locate_unique(&primary)?; let query_store = store .query_store(QueryTarget::Deployment( src.hash.clone(), @@ -154,6 +152,32 @@ pub async fn create( Ok(()) } +pub async fn create( + store: Arc, + primary: ConnectionPool, + src: DeploymentSearch, + shard: String, + shards: Vec, + node: String, + block_offset: u32, + activate: bool, + replace: bool, +) -> Result<(), Error> { + let src = src.locate_unique(&primary)?; + create_inner( + store, + &src, + shard, + shards, + node, + block_offset, + activate, + replace, + ) + .await + .map_err(|e| anyhow!("cannot copy {src}: {e}")) +} + pub fn activate(store: Arc, deployment: String, shard: String) -> Result<(), Error> { let shard = Shard::new(shard)?; let deployment = From ccd65e777ca322816172862eaa198fc478b792b1 Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Sun, 23 Mar 2025 13:14:54 -0700 Subject: [PATCH 6/7] graphman: Fix table status indicator for 'copy status' With recent changes, the status shown was '>' (in progress) for all tables that hadn't finished yet, not just the one being worked on --- node/src/manager/commands/copy.rs | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/node/src/manager/commands/copy.rs b/node/src/manager/commands/copy.rs index d3280823e76..9ca80bc9b20 100644 --- a/node/src/manager/commands/copy.rs +++ b/node/src/manager/commands/copy.rs @@ -255,13 +255,11 @@ pub fn list(pools: HashMap) -> Result<(), Error> { } pub fn status(pools: HashMap, dst: &DeploymentSearch) -> Result<(), Error> { + const CHECK: &str = "✓"; + use catalog::active_copies as ac; use catalog::deployment_schemas as ds; - fn done(ts: &Option) -> String { - ts.map(|_| "✓").unwrap_or(".").to_string() - } - fn duration(start: &UtcDateTime, end: &Option) -> String { let start = *start; let end = *end; @@ -314,7 +312,7 @@ pub fn status(pools: HashMap, dst: &DeploymentSearch) -> }; let progress = match &state.finished_at { - Some(_) => done(&state.finished_at), + Some(_) => CHECK.to_string(), None => { let target: i64 = tables.iter().map(|table| table.target_vid).sum(); let next: i64 = tables.iter().map(|table| table.next_vid).sum(); @@ -363,13 +361,15 @@ pub fn status(pools: HashMap, dst: &DeploymentSearch) -> ); println!("{:-<74}", "-"); for table in tables { - let status = if table.next_vid > 0 && table.next_vid < table.target_vid { - ">".to_string() - } else if table.target_vid < 0 { + let status = match &table.finished_at { + // table finished + Some(_) => CHECK, // empty source table - "✓".to_string() - } else { - done(&table.finished_at) + None if table.target_vid < 0 => CHECK, + // copying in progress + None if table.duration_ms > 0 => ">", + // not started + None => ".", }; println!( "{} {:<28} | {:>8} | {:>8} | {:>8} | {:>8}", From dde111cf6cd61d44b61716e583d029b6295fb451 Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Sun, 23 Mar 2025 17:10:34 -0700 Subject: [PATCH 7/7] store: Create postponed indexes non-concurrently for copy/graft At the point where we create the postponed indexes during copying, nothing else is writing to the subgraph and we can't be blocking a writer with a normal 'create index'. Since concurrent index creation has to wait for all previous transactions in the database to finish, the concurrent creation can significantly slow down index creation and therefore how long the copy takes. --- store/postgres/src/copy.rs | 8 ++++++-- store/postgres/src/relational/ddl.rs | 9 +++++++-- store/postgres/src/relational/ddl_tests.rs | 2 +- 3 files changed, 14 insertions(+), 5 deletions(-) diff --git a/store/postgres/src/copy.rs b/store/postgres/src/copy.rs index 5a31acfb959..d82bc33e4a8 100644 --- a/store/postgres/src/copy.rs +++ b/store/postgres/src/copy.rs @@ -730,7 +730,7 @@ impl Connection { &table.src.name.to_string(), &table.dst, true, - true, + false, )?; for (_, sql) in arr { @@ -748,7 +748,11 @@ impl Connection { .iter() .map(|c| c.name.to_string()) .collect_vec(); - for sql in table.dst.create_postponed_indexes(orig_colums).into_iter() { + for sql in table + .dst + .create_postponed_indexes(orig_colums, false) + .into_iter() + { let query = sql_query(sql); query.execute(conn)?; } diff --git a/store/postgres/src/relational/ddl.rs b/store/postgres/src/relational/ddl.rs index 55e116272d1..980bca2b9fd 100644 --- a/store/postgres/src/relational/ddl.rs +++ b/store/postgres/src/relational/ddl.rs @@ -269,7 +269,11 @@ impl Table { (method, index_expr) } - pub(crate) fn create_postponed_indexes(&self, skip_colums: Vec) -> Vec { + pub(crate) fn create_postponed_indexes( + &self, + skip_colums: Vec, + concurrently: bool, + ) -> Vec { let mut indexing_queries = vec![]; let columns = self.columns_to_index(); @@ -281,8 +285,9 @@ impl Table { && column.name.as_str() != "id" && !skip_colums.contains(&column.name.to_string()) { + let conc = if concurrently { "concurrently " } else { "" }; let sql = format!( - "create index concurrently if not exists attr_{table_index}_{column_index}_{table_name}_{column_name}\n on {qname} using {method}({index_expr});\n", + "create index {conc}if not exists attr_{table_index}_{column_index}_{table_name}_{column_name}\n on {qname} using {method}({index_expr});\n", table_index = self.position, table_name = self.name, column_name = column.name, diff --git a/store/postgres/src/relational/ddl_tests.rs b/store/postgres/src/relational/ddl_tests.rs index 86e9f232d49..bb1dcc67f46 100644 --- a/store/postgres/src/relational/ddl_tests.rs +++ b/store/postgres/src/relational/ddl_tests.rs @@ -158,7 +158,7 @@ fn generate_postponed_indexes() { let layout = test_layout(THING_GQL); let table = layout.table(&SqlName::from("Scalar")).unwrap(); let skip_colums = vec!["id".to_string()]; - let query_vec = table.create_postponed_indexes(skip_colums); + let query_vec = table.create_postponed_indexes(skip_colums, true); assert!(query_vec.len() == 7); let queries = query_vec.join(" "); check_eqv(THING_POSTPONED_INDEXES, &queries)