Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix a couple database startup issues #5901

Merged
merged 7 commits into from
Mar 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 40 additions & 16 deletions node/src/manager/commands/copy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use diesel::{ExpressionMethods, JoinOnDsl, OptionalExtension, QueryDsl, RunQuery
use std::{collections::HashMap, sync::Arc, time::SystemTime};

use graph::{
components::store::{BlockStore as _, DeploymentId},
components::store::{BlockStore as _, DeploymentId, DeploymentLocator},
data::query::QueryTarget,
prelude::{
anyhow::{anyhow, bail, Error},
Expand Down Expand Up @@ -84,10 +84,9 @@ impl CopyState {
}
}

pub async fn create(
async fn create_inner(
store: Arc<Store>,
primary: ConnectionPool,
src: DeploymentSearch,
src: &DeploymentLocator,
shard: String,
shards: Vec<String>,
node: String,
Expand All @@ -104,7 +103,6 @@ pub async fn create(
};

let subgraph_store = store.subgraph_store();
let src = src.locate_unique(&primary)?;
let query_store = store
.query_store(QueryTarget::Deployment(
src.hash.clone(),
Expand Down Expand Up @@ -154,6 +152,32 @@ pub async fn create(
Ok(())
}

pub async fn create(
store: Arc<Store>,
primary: ConnectionPool,
src: DeploymentSearch,
shard: String,
shards: Vec<String>,
node: String,
block_offset: u32,
activate: bool,
replace: bool,
) -> Result<(), Error> {
let src = src.locate_unique(&primary)?;
create_inner(
store,
&src,
shard,
shards,
node,
block_offset,
activate,
replace,
)
.await
.map_err(|e| anyhow!("cannot copy {src}: {e}"))
}

pub fn activate(store: Arc<SubgraphStore>, deployment: String, shard: String) -> Result<(), Error> {
let shard = Shard::new(shard)?;
let deployment =
Expand Down Expand Up @@ -231,13 +255,11 @@ pub fn list(pools: HashMap<Shard, ConnectionPool>) -> Result<(), Error> {
}

pub fn status(pools: HashMap<Shard, ConnectionPool>, dst: &DeploymentSearch) -> Result<(), Error> {
const CHECK: &str = "✓";

use catalog::active_copies as ac;
use catalog::deployment_schemas as ds;

fn done(ts: &Option<UtcDateTime>) -> String {
ts.map(|_| "✓").unwrap_or(".").to_string()
}

fn duration(start: &UtcDateTime, end: &Option<UtcDateTime>) -> String {
let start = *start;
let end = *end;
Expand Down Expand Up @@ -290,7 +312,7 @@ pub fn status(pools: HashMap<Shard, ConnectionPool>, dst: &DeploymentSearch) ->
};

let progress = match &state.finished_at {
Some(_) => done(&state.finished_at),
Some(_) => CHECK.to_string(),
None => {
let target: i64 = tables.iter().map(|table| table.target_vid).sum();
let next: i64 = tables.iter().map(|table| table.next_vid).sum();
Expand Down Expand Up @@ -339,13 +361,15 @@ pub fn status(pools: HashMap<Shard, ConnectionPool>, dst: &DeploymentSearch) ->
);
println!("{:-<74}", "-");
for table in tables {
let status = if table.next_vid > 0 && table.next_vid < table.target_vid {
">".to_string()
} else if table.target_vid < 0 {
let status = match &table.finished_at {
// table finished
Some(_) => CHECK,
// empty source table
"✓".to_string()
} else {
done(&table.finished_at)
None if table.target_vid < 0 => CHECK,
// copying in progress
None if table.duration_ms > 0 => ">",
// not started
None => ".",
};
println!(
"{} {:<28} | {:>8} | {:>8} | {:>8} | {:>8}",
Expand Down
10 changes: 10 additions & 0 deletions store/postgres/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,16 @@ pub fn drop_foreign_schema(conn: &mut PgConnection, src: &Site) -> Result<(), St
Ok(())
}

pub fn foreign_tables(conn: &mut PgConnection, nsp: &str) -> Result<Vec<String>, StoreError> {
use foreign_tables as ft;

ft::table
.filter(ft::foreign_table_schema.eq(nsp))
.select(ft::foreign_table_name)
.get_results::<String>(conn)
.map_err(StoreError::from)
}

/// Drop the schema `nsp` and all its contents if it exists, and create it
/// again so that `nsp` is an empty schema
pub fn recreate_schema(conn: &mut PgConnection, nsp: &str) -> Result<(), StoreError> {
Expand Down
83 changes: 63 additions & 20 deletions store/postgres/src/connection_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ use crate::primary::{self, NAMESPACE_PUBLIC};
use crate::{advisory_lock, catalog};
use crate::{Shard, PRIMARY_SHARD};

/// Tables that we map from the primary into `primary_public` in each shard
const PRIMARY_TABLES: [&str; 3] = ["deployment_schemas", "chains", "active_copies"];

/// Tables that we map from each shard into each other shard into the
/// `shard_<name>_subgraphs` namespace
const SHARDED_TABLES: [(&str, &[&str]); 2] = [
("public", &["ethereum_networks"]),
(
Expand All @@ -47,7 +52,6 @@ const SHARDED_TABLES: [(&str, &[&str]); 2] = [
"dynamic_ethereum_contract_data_source",
"subgraph_deployment",
"subgraph_error",
"subgraph_features",
"subgraph_manifest",
"table_stats",
],
Expand Down Expand Up @@ -185,7 +189,7 @@ impl ForeignServer {
alter server \"{name}\"
options (set host '{remote_host}', \
{set_port} port '{remote_port}', \
set dbname '{remote_db}, \
set dbname '{remote_db}', \
{set_fetch_size} fetch_size '{fetch_size}');
alter user mapping
for current_user server \"{name}\"
Expand All @@ -209,7 +213,7 @@ impl ForeignServer {
catalog::recreate_schema(conn, Self::PRIMARY_PUBLIC)?;

let mut query = String::new();
for table_name in ["deployment_schemas", "chains", "active_copies"] {
for table_name in PRIMARY_TABLES {
let create_stmt = if shard == &*PRIMARY_SHARD {
format!(
"create view {nsp}.{table_name} as select * from public.{table_name};",
Expand Down Expand Up @@ -246,6 +250,33 @@ impl ForeignServer {
}
Ok(conn.batch_execute(&query)?)
}

fn needs_remap(&self, conn: &mut PgConnection) -> Result<bool, StoreError> {
fn different(mut existing: Vec<String>, mut needed: Vec<String>) -> bool {
existing.sort();
needed.sort();
existing != needed
}

if &self.shard == &*PRIMARY_SHARD {
let existing = catalog::foreign_tables(conn, Self::PRIMARY_PUBLIC)?;
let needed = PRIMARY_TABLES
.into_iter()
.map(String::from)
.collect::<Vec<_>>();
if different(existing, needed) {
return Ok(true);
}
}

let existing = catalog::foreign_tables(conn, &Self::metadata_schema(&self.shard))?;
let needed = SHARDED_TABLES
.iter()
.flat_map(|(_, tables)| *tables)
.map(|table| table.to_string())
.collect::<Vec<_>>();
Ok(different(existing, needed))
}
}

/// How long to keep connections in the `fdw_pool` around before closing
Expand Down Expand Up @@ -1037,16 +1068,14 @@ impl PoolInner {
let result = pool
.configure_fdw(coord.servers.as_ref())
.and_then(|()| pool.drop_cross_shard_views())
.and_then(|()| migrate_schema(&pool.logger, &mut conn))
.and_then(|count| {
pool.create_cross_shard_views(coord.servers.as_ref())
.map(|()| count)
});
.and_then(|()| migrate_schema(&pool.logger, &mut conn));
debug!(&pool.logger, "Release migration lock");
advisory_lock::unlock_migration(&mut conn).unwrap_or_else(|err| {
die(&pool.logger, "failed to release migration lock", &err);
});
let result = result.and_then(|count| coord.propagate(&pool, count));
let result = result
.and_then(|count| coord.propagate(&pool, count))
.and_then(|()| pool.create_cross_shard_views(coord.servers.as_ref()));
result.unwrap_or_else(|err| die(&pool.logger, "migrations failed", &err));

// Locale check
Expand Down Expand Up @@ -1178,9 +1207,9 @@ impl PoolInner {
.await
}

// The foreign server `server` had schema changes, and we therefore need
// to remap anything that we are importing via fdw to make sure we are
// using this updated schema
/// The foreign server `server` had schema changes, and we therefore
/// need to remap anything that we are importing via fdw to make sure we
/// are using this updated schema
pub fn remap(&self, server: &ForeignServer) -> Result<(), StoreError> {
if &server.shard == &*PRIMARY_SHARD {
info!(&self.logger, "Mapping primary");
Expand All @@ -1198,6 +1227,15 @@ impl PoolInner {
}
Ok(())
}

pub fn needs_remap(&self, server: &ForeignServer) -> Result<bool, StoreError> {
if &server.shard == &self.shard {
return Ok(false);
}

let mut conn = self.get()?;
server.needs_remap(&mut conn)
}
}

pub const MIGRATIONS: EmbeddedMigrations = embed_migrations!("./migrations");
Expand All @@ -1211,10 +1249,6 @@ impl MigrationCount {
fn had_migrations(&self) -> bool {
self.old != self.new
}

fn is_new(&self) -> bool {
self.old == 0
}
}

/// Run all schema migrations.
Expand Down Expand Up @@ -1334,13 +1368,22 @@ impl PoolCoordinator {
/// code that does _not_ hold the migration lock as it will otherwise
/// deadlock
fn propagate(&self, pool: &PoolInner, count: MigrationCount) -> Result<(), StoreError> {
// pool is a new shard, map all other shards into it
if count.is_new() {
for server in self.servers.iter() {
// We need to remap all these servers into `pool` if the list of
// tables that are mapped have changed from the code of the previous
// version. Since dropping and recreating the foreign table
// definitions can slow the startup of other nodes down because of
// locking, we try to only do this when it is actually needed
for server in self.servers.iter() {
if pool.needs_remap(server)? {
pool.remap(server)?;
}
}
// pool had schema changes, refresh the import from pool into all other shards

// pool had schema changes, refresh the import from pool into all
// other shards. This makes sure that schema changes to
// already-mapped tables are propagated to all other shards. Since
// we run `propagate` after migrations have been applied to `pool`,
// we can be sure that these mappings use the correct schema
if count.had_migrations() {
let server = self.server(&pool.shard)?;
for pool in self.pools.lock().unwrap().values() {
Expand Down
8 changes: 6 additions & 2 deletions store/postgres/src/copy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -730,7 +730,7 @@ impl Connection {
&table.src.name.to_string(),
&table.dst,
true,
true,
false,
)?;

for (_, sql) in arr {
Expand All @@ -748,7 +748,11 @@ impl Connection {
.iter()
.map(|c| c.name.to_string())
.collect_vec();
for sql in table.dst.create_postponed_indexes(orig_colums).into_iter() {
for sql in table
.dst
.create_postponed_indexes(orig_colums, false)
.into_iter()
{
let query = sql_query(sql);
query.execute(conn)?;
}
Expand Down
12 changes: 7 additions & 5 deletions store/postgres/src/deployment_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1553,6 +1553,12 @@ impl DeploymentStore {

catalog::copy_account_like(conn, &src.site, &dst.site)?;

// Analyze all tables for this deployment
info!(logger, "Analyzing all {} tables", dst.tables.len());
for entity_name in dst.tables.keys() {
self.analyze_with_conn(site.cheap_clone(), entity_name.as_str(), conn)?;
}

// Rewind the subgraph so that entity versions that are
// clamped in the future (beyond `block`) become valid for
// all blocks after `block`. `revert_block` gets rid of
Expand All @@ -1563,6 +1569,7 @@ impl DeploymentStore {
.number
.checked_add(1)
.expect("block numbers fit into an i32");
info!(logger, "Rewinding to block {}", block.number);
let count = dst.revert_block(conn, block_to_revert)?;
deployment::update_entity_count(conn, &dst.site, count)?;

Expand All @@ -1575,11 +1582,6 @@ impl DeploymentStore {
src_deployment.manifest.history_blocks,
)?;

// Analyze all tables for this deployment
for entity_name in dst.tables.keys() {
self.analyze_with_conn(site.cheap_clone(), entity_name.as_str(), conn)?;
}

// The `earliest_block` for `src` might have changed while
// we did the copy if `src` was pruned while we copied;
// adjusting it very late in the copy process ensures that
Expand Down
9 changes: 7 additions & 2 deletions store/postgres/src/relational/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,11 @@ impl Table {
(method, index_expr)
}

pub(crate) fn create_postponed_indexes(&self, skip_colums: Vec<String>) -> Vec<String> {
pub(crate) fn create_postponed_indexes(
&self,
skip_colums: Vec<String>,
concurrently: bool,
) -> Vec<String> {
let mut indexing_queries = vec![];
let columns = self.columns_to_index();

Expand All @@ -281,8 +285,9 @@ impl Table {
&& column.name.as_str() != "id"
&& !skip_colums.contains(&column.name.to_string())
{
let conc = if concurrently { "concurrently " } else { "" };
let sql = format!(
"create index concurrently if not exists attr_{table_index}_{column_index}_{table_name}_{column_name}\n on {qname} using {method}({index_expr});\n",
"create index {conc}if not exists attr_{table_index}_{column_index}_{table_name}_{column_name}\n on {qname} using {method}({index_expr});\n",
table_index = self.position,
table_name = self.name,
column_name = column.name,
Expand Down
2 changes: 1 addition & 1 deletion store/postgres/src/relational/ddl_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ fn generate_postponed_indexes() {
let layout = test_layout(THING_GQL);
let table = layout.table(&SqlName::from("Scalar")).unwrap();
let skip_colums = vec!["id".to_string()];
let query_vec = table.create_postponed_indexes(skip_colums);
let query_vec = table.create_postponed_indexes(skip_colums, true);
assert!(query_vec.len() == 7);
let queries = query_vec.join(" ");
check_eqv(THING_POSTPONED_INDEXES, &queries)
Expand Down
Loading