From 05900b5c7f7abe07962ddd516a59113f8f362b0e Mon Sep 17 00:00:00 2001 From: Phil Date: Wed, 21 Feb 2024 16:39:16 -0500 Subject: [PATCH] agent: defer handling of publications and evolutions that require blocking locks There's an issue related to locking that's been plaguing publications handling. We sometimes see deadlocks due to multiple publications that affect the same `live_specs` rows. To start from the beginning: for each individual `live_specs` row, we need to guarantee a sensible order of operations. For example, let's say we have two different captures into two separate collections, and a single materialization of both collections. Auto-discovers run for the captures, and result in two separate publications, one for each of the materialized collections. Each of those publications will result in an update to the `built_spec` of the materialization, and it's critical the modifications from one publication don't clobber those of the other. We solve this issue today by implicitly locking the "expanded" `live_specs` rows when we update the `built_spec`s. This is why we occasionally see deadlocks that cause the agent to crash. We'd like to find another way to ensure a sensible ordering of operations that doesn't result in deadlocks and agent crashes. It's tempting to think of an approach where we first determine the set of specs that would be affected by a given publication, and use that knowledge to process publications in an order that avoids simultaneous processing of publications that would affect the same `live_specs`. But the set of specs that's affected by a given publication is dependent on the current state of the drafted and live specs, and is subject to change. We need to do a decent amount of parsing and validation before we can even identify which "expanded" rows might be affected by a publication. As long as that's the case, we can really only answer the question, "can we continue working on this publication right now?". So the basic idea is to do just that. If we're unable to continue working on the publication, then just defer it and try a different job in the meantime. We can identify publications that affect overlapping `live_specs` rows, we can rely on postgres row-level locks, as we do today. We just need to add `for update of live_specs nowait` to the SQL that fetches `live_specs` rows. If the rows can't be locked immediately, then it'll return a specific error code, which we can handle in the `agent`. When we see an error with that code, we handle it specially and leave its `job_status` as `{"type": "queued" }`. The agent will attempt to invoke another handler that has work to do, and will re-try the publication later. The end result is that the agent should now defer publications that previously might have deadlocked. It will try to work on something else in the meantime. The change to `evolutions` started out as accidental, due to the evolutions handler using the same `resolve_expanded_rows` function. But it turns out that it's really helpful to prevent background publication failures. Without it, `evolutions` have tended to create `publications` having stale `expect_pub_id`s in scenarios where there's multiple captures feeding into one materialization. Those stale `expect_pub_id`s mean that the publication will fail, and we'll have to wait for the next auto-discover in order to try again. But deferring evolutions that affect the same `live_specs` as an in-progress publication allows the evolution to result in a publication that's much more likely to succeed the first time. --- Cargo.lock | 1 + crates/agent-sql/src/publications.rs | 3 +- crates/agent/Cargo.toml | 3 +- crates/agent/src/evolution.rs | 21 ++++++++++---- crates/agent/src/handlers.rs | 33 ++++++++++++---------- crates/agent/src/lib.rs | 6 ++++ crates/agent/src/publications.rs | 19 ++++++++++--- crates/agent/src/publications/specs.rs | 39 +++++++++++++++----------- 8 files changed, 83 insertions(+), 42 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 224c8fa3700..50b36add8bb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -49,6 +49,7 @@ dependencies = [ "derivative", "doc", "futures", + "humantime-serde", "insta", "itertools 0.10.5", "json", diff --git a/crates/agent-sql/src/publications.rs b/crates/agent-sql/src/publications.rs index c265c761e41..62cda51d042 100644 --- a/crates/agent-sql/src/publications.rs +++ b/crates/agent-sql/src/publications.rs @@ -282,7 +282,7 @@ pub async fn resolve_spec_rows( on draft_specs.catalog_name = live_specs.catalog_name where draft_specs.draft_id = $1 order by draft_specs.catalog_name asc - for update of draft_specs, live_specs; + for update of draft_specs, live_specs nowait; "#, draft_id as Id, user_id, @@ -447,6 +447,7 @@ pub async fn resolve_expanded_rows( -- Strip deleted specs which are still reach-able through a dataflow edge, -- and strip rows already part of the seed set. where l.spec is not null and l.id not in (select id from seeds) + for update of l nowait "#, seed_ids as Vec, user_id, diff --git a/crates/agent/Cargo.toml b/crates/agent/Cargo.toml index a7598886512..33953e46e41 100644 --- a/crates/agent/Cargo.toml +++ b/crates/agent/Cargo.toml @@ -29,9 +29,10 @@ base64 = { workspace = true } bytes = { workspace = true } chrono = { workspace = true } clap = { workspace = true } -colored_json = { workspace = true } # Used to render ops::Logs for UI. +colored_json = { workspace = true } # Used to render ops::Logs for UI. derivative = { workspace = true } futures = { workspace = true } +humantime-serde = { workspace = true } itertools = { workspace = true } lazy_static = { workspace = true } regex = { workspace = true } diff --git a/crates/agent/src/evolution.rs b/crates/agent/src/evolution.rs index 01c21f3ebd5..06f4ad37f67 100644 --- a/crates/agent/src/evolution.rs +++ b/crates/agent/src/evolution.rs @@ -41,6 +41,7 @@ pub enum JobStatus { evolved_collections: Vec, publication_id: Option, }, + Queued, } #[derive(Serialize, Deserialize, PartialEq, Debug)] @@ -72,10 +73,14 @@ impl Handler for EvolutionHandler { return Ok(HandleResult::NoJobs); }; + // Savepoint is necessary because we must execute a `rollback` after failing to + // lock rows, and we still want to update the `evolutions` job row to bump `updated_at`. + agent_sql::publications::savepoint_noop(&mut txn).await?; + let time_queued = chrono::Utc::now().signed_duration_since(row.updated_at); let id: Id = row.id; - let status = process_row(row, &mut txn).await?; - let status = serde_json::to_value(status)?; + let job_status = process_row(row, &mut txn).await?; + let status = serde_json::to_value(job_status)?; tracing::info!(%id, %time_queued, %status, "evolution finished"); agent_sql::evolutions::resolve(id, &status, &mut txn).await?; @@ -134,9 +139,15 @@ async fn process_row( .collect::>(); // Fetch all of the live_specs that directly read from or write to any of these collections. - let expanded_rows = agent_sql::publications::resolve_expanded_rows(user_id, seed_ids, txn) - .await - .context("expanding specifications")?; + let expanded_rows = + match agent_sql::publications::resolve_expanded_rows(user_id, seed_ids, txn).await { + Ok(rows) => rows, + Err(err) if crate::is_acquire_lock_error(&err) => { + agent_sql::publications::rollback_noop(txn).await?; + return Ok(JobStatus::Queued); + } + Err(other_err) => return Err(other_err).context("expanding specifications"), + }; // Build up catalog of all the possibly affected entities, for easy lookups. // Note that we put `expanded_rows` first so that `spec_rows` will overwrite diff --git a/crates/agent/src/handlers.rs b/crates/agent/src/handlers.rs index fede64d4d88..dde79bbc967 100644 --- a/crates/agent/src/handlers.rs +++ b/crates/agent/src/handlers.rs @@ -121,21 +121,24 @@ struct WrappedHandler { impl WrappedHandler { async fn handle_next_job(&mut self, pg_pool: &sqlx::PgPool) -> anyhow::Result<()> { let allow_background = self.status != Status::PollInteractive; - match self.handler.handle(pg_pool, allow_background).await { - Ok(HandleResult::HadJob) => Ok(()), - Ok(HandleResult::NoJobs) if self.status == Status::PollInteractive => { - tracing::debug!(handler = %self.handler.name(), "handler completed all interactive jobs"); - self.status = Status::PollBackground; - Ok(()) - } - Ok(HandleResult::NoJobs) => { - tracing::debug!(handler = %self.handler.name(), "handler completed all background jobs"); - self.status = Status::Idle; - Ok(()) - } - Err(err) => { - tracing::error!(handler = %self.handler.name(), error = ?err, "Error invoking handler"); - Err(err) + + loop { + match self.handler.handle(pg_pool, allow_background).await { + Ok(HandleResult::HadJob) => return Ok(()), + Ok(HandleResult::NoJobs) if self.status == Status::PollInteractive => { + tracing::debug!(handler = %self.handler.name(), "handler completed all interactive jobs"); + self.status = Status::PollBackground; + return Ok(()); + } + Ok(HandleResult::NoJobs) => { + tracing::debug!(handler = %self.handler.name(), "handler completed all background jobs"); + self.status = Status::Idle; + return Ok(()); + } + Err(err) => { + tracing::error!(handler = %self.handler.name(), error = ?err, "Error invoking handler"); + return Err(err); + } } } } diff --git a/crates/agent/src/lib.rs b/crates/agent/src/lib.rs index a5753aaceba..9fee95a85ec 100644 --- a/crates/agent/src/lib.rs +++ b/crates/agent/src/lib.rs @@ -27,6 +27,12 @@ lazy_static! { static ref NAME_VERSION_RE: Regex = Regex::new(r#".*[_-][vV](\d+)$"#).unwrap(); } +fn is_acquire_lock_error(err: &sqlx::Error) -> bool { + err.as_database_error() + .filter(|e| e.code().as_ref().map(|c| c.as_ref()) == Some("55P03")) + .is_some() +} + /// Takes an existing name and returns a new name with an incremeted version suffix. /// The name `foo` will become `foo_v2`, and `foo_v2` will become `foo_v3` and so on. pub fn next_name(current_name: &str) -> String { diff --git a/crates/agent/src/publications.rs b/crates/agent/src/publications.rs index 7dd68a483ea..2deea6c08d4 100644 --- a/crates/agent/src/publications.rs +++ b/crates/agent/src/publications.rs @@ -109,6 +109,7 @@ impl Handler for PublishHandler { None => return Ok(HandleResult::NoJobs), Some(row) => row, }; + let background = row.background; let delete_draft_id = if !row.dry_run { Some(row.draft_id) @@ -119,7 +120,7 @@ impl Handler for PublishHandler { let time_queued = chrono::Utc::now().signed_duration_since(row.updated_at); let (id, status) = self.process(row, &mut txn, false).await?; - info!(%id, %time_queued, ?status, "finished"); + info!(%id, %time_queued, ?status, %background, "finished"); agent_sql::publications::resolve(id, &status, &mut txn).await?; txn.commit().await?; @@ -167,8 +168,12 @@ impl PublishHandler { .await .context("creating savepoint")?; - let spec_rows = - specs::resolve_specifications(row.draft_id, row.pub_id, row.user_id, txn).await?; + let Ok(spec_rows) = + specs::resolve_specifications(row.draft_id, row.pub_id, row.user_id, txn).await? + else { + // If unable to lock the spec rows, then bail and leave the job queued so we can try again. + return stop_with_errors(Vec::new(), JobStatus::Queued, row, txn).await; + }; tracing::debug!(specs = %spec_rows.len(), "resolved specifications"); // Keep track of which collections are being deleted so that we can account for them @@ -284,7 +289,13 @@ impl PublishHandler { return stop_with_errors(errors, JobStatus::build_failed(Vec::new()), row, txn).await; } - let expanded_rows = specs::expanded_specifications(row.user_id, &spec_rows, txn).await?; + let Ok(expanded_rows) = + specs::expanded_specifications(row.user_id, &spec_rows, txn).await? + else { + // If unable to lock the spec rows, then bail and leave the job queued so we can try again. + return stop_with_errors(Vec::new(), JobStatus::Queued, row, txn).await; + }; + tracing::debug!(specs = %expanded_rows.len(), "resolved expanded specifications"); // Touch all expanded specifications to update their build ID. diff --git a/crates/agent/src/publications/specs.rs b/crates/agent/src/publications/specs.rs index 9c9ad3cb21b..985ffc530a9 100644 --- a/crates/agent/src/publications/specs.rs +++ b/crates/agent/src/publications/specs.rs @@ -10,13 +10,14 @@ use std::collections::{BTreeMap, HashMap, HashSet}; // resolve_specifications returns the definitive set of specifications which // are changing in this publication. It obtains sufficient locks to ensure // that raced publications to returned specifications are serialized with -// this publication. +// this publication. Returns `Ok(Err(CannotAcquireLock))` if any locks could +// not be immediately acquired, so that the publication can be re-tried later. pub async fn resolve_specifications( draft_id: Id, pub_id: Id, user_id: Uuid, txn: &mut sqlx::Transaction<'_, sqlx::Postgres>, -) -> anyhow::Result> { +) -> anyhow::Result, CannotAcquireLock>> { // Attempt to create a row in live_specs for each of our draft_specs. // This allows us next inner-join over draft and live spec rows. // Inner join (vs a left-join) is required for "for update" semantics. @@ -47,9 +48,12 @@ pub async fn resolve_specifications( // of what's "in" this publication, and what's not. Anything we don't pick up here will // be left behind as a draft_spec, and this is the reason we don't delete the draft // itself within this transaction. - let mut spec_rows = agent_sql::publications::resolve_spec_rows(draft_id, user_id, txn) - .await - .context("selecting joined draft & live specs")?; + let mut spec_rows = + match agent_sql::publications::resolve_spec_rows(draft_id, user_id, txn).await { + Ok(rows) => rows, + Err(err) if crate::is_acquire_lock_error(&err) => return Ok(Err(CannotAcquireLock)), + Err(other_err) => return Err(other_err).context("selecting joined draft & live specs"), + }; // The query may return live specifications that the user is not // authorized to know anything about. Tweak such rows to appear @@ -65,19 +69,22 @@ pub async fn resolve_specifications( } } - Ok(spec_rows) + Ok(Ok(spec_rows)) } +#[derive(Debug)] +pub struct CannotAcquireLock; + // expanded_specifications returns additional specifications which should be -// included in this publication's build. These specifications are not changed -// by the publication and are read with read-committed transaction semantics, -// but (if not a dry-run) we do re-activate each specification within the -// data-plane with the outcome of this publication's build. +// included in this publication's build. Attempts to acquire a lock on each expanded `live_specs` +// row, with the assumption that we will be updating the `built_spec` and `last_build_id`. +// Returns `Ok(Err(CannotAcquireLock))` if any locks could not be immediately acquired, so that the +// publication can be re-tried later. pub async fn expanded_specifications( user_id: Uuid, spec_rows: &[SpecRow], txn: &mut sqlx::Transaction<'_, sqlx::Postgres>, -) -> anyhow::Result> { +) -> anyhow::Result, CannotAcquireLock>> { // We seed expansion with the set of live specifications // (that the user must be authorized to administer). let seed_ids: Vec = spec_rows @@ -88,11 +95,11 @@ pub async fn expanded_specifications( }) .collect(); - let expanded_rows = agent_sql::publications::resolve_expanded_rows(user_id, seed_ids, txn) - .await - .context("selecting expanded specs")?; - - Ok(expanded_rows) + match agent_sql::publications::resolve_expanded_rows(user_id, seed_ids, txn).await { + Ok(rows) => Ok(Ok(rows)), + Err(err) if crate::is_acquire_lock_error(&err) => Ok(Err(CannotAcquireLock)), + Err(other) => Err(anyhow::Error::from(other)).context("selecting expanded specs"), + } } pub fn validate_transition(