diff --git a/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs b/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs index 242ba8d11dc..c2a3b263514 100644 --- a/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs +++ b/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs @@ -26,7 +26,7 @@ use futures::StreamExt; use itertools::Itertools; use quickwit_common::pretty::PrettySample; use quickwit_common::uri::Uri; -use quickwit_common::{get_bool_from_env, ServiceStream}; +use quickwit_common::{get_bool_from_env, rate_limited_error, ServiceStream}; use quickwit_config::{ validate_index_id_pattern, IndexTemplate, IndexTemplateId, PostgresMetastoreConfig, }; @@ -304,17 +304,20 @@ async fn try_apply_delta_v2( /// We still use this macro for them in order to make the code /// "trivially correct". macro_rules! run_with_tx { - ($connection_pool:expr, $tx_refmut:ident, $x:block) => {{ + ($connection_pool:expr, $tx_refmut:ident, $label:literal, $x:block) => {{ let mut tx: Transaction<'_, Postgres> = $connection_pool.begin().await?; let $tx_refmut = &mut tx; let op_fut = move || async move { $x }; let op_result: MetastoreResult<_> = op_fut().await; - if op_result.is_ok() { - debug!("committing transaction"); - tx.commit().await?; - } else { - warn!("rolling transaction back"); - tx.rollback().await?; + match &op_result { + Ok(_) => { + debug!("committing transaction"); + tx.commit().await?; + } + Err(error) => { + rate_limited_error!(limit_per_min = 60, error=%error, "failed to {}, rolling transaction back" , $label); + tx.rollback().await?; + } } op_result }}; @@ -331,22 +334,17 @@ where { let index_id = &index_uid.index_id; let mut index_metadata = index_metadata(tx, index_id, true).await?; + if index_metadata.index_uid != index_uid { return Err(MetastoreError::NotFound(EntityKind::Index { index_id: index_id.to_string(), })); } - if let MutationOccurred::No(()) = mutate_fn(&mut index_metadata)? { return Ok(index_metadata); } + let index_metadata_json = serde_utils::to_json_str(&index_metadata)?; - let index_metadata_json = serde_json::to_string(&index_metadata).map_err(|error| { - MetastoreError::JsonSerializeError { - struct_name: "IndexMetadata".to_string(), - message: error.to_string(), - } - })?; let update_index_res = sqlx::query( r#" UPDATE indexes @@ -426,7 +424,7 @@ impl MetastoreService for PostgresqlMetastore { let doc_mapping = request.deserialize_doc_mapping()?; let index_uid: IndexUid = request.index_uid().clone(); - let updated_index_metadata = run_with_tx!(self.connection_pool, tx, { + let updated_index_metadata = run_with_tx!(self.connection_pool, tx, "update index", { mutate_index_metadata::(tx, index_uid, |index_metadata| { let mut mutation_occurred = index_metadata.set_retention_policy(retention_policy_opt); @@ -622,7 +620,7 @@ impl MetastoreService for PostgresqlMetastore { tracing::Span::current().record("split_ids", format!("{split_ids:?}")); // TODO: Remove transaction. - run_with_tx!(self.connection_pool, tx, { + run_with_tx!(self.connection_pool, tx, "stage splits", { let upserted_split_ids: Vec = sqlx::query_scalar(r#" INSERT INTO splits (split_id, time_range_start, time_range_end, tags, split_metadata_json, delete_opstamp, maturity_timestamp, split_state, index_uid, node_id) @@ -698,7 +696,7 @@ impl MetastoreService for PostgresqlMetastore { let staged_split_ids = request.staged_split_ids; let replaced_split_ids = request.replaced_split_ids; - run_with_tx!(self.connection_pool, tx, { + run_with_tx!(self.connection_pool, tx, "publish splits", { let mut index_metadata = index_metadata(tx, &index_uid.index_id, true).await?; if index_metadata.index_uid != index_uid { return Err(MetastoreError::NotFound(EntityKind::Index { @@ -744,12 +742,7 @@ impl MetastoreService for PostgresqlMetastore { })?; } } - let index_metadata_json = serde_json::to_string(&index_metadata).map_err(|error| { - MetastoreError::JsonSerializeError { - struct_name: "IndexMetadata".to_string(), - message: error.to_string(), - } - })?; + let index_metadata_json = serde_utils::to_json_str(&index_metadata)?; const PUBLISH_SPLITS_QUERY: &str = r#" -- Select the splits to update, regardless of their state. @@ -854,8 +847,7 @@ impl MetastoreService for PostgresqlMetastore { } info!( %index_uid, - "published {} splits and marked {} for deletion successfully", - num_published_splits, num_marked_splits + "published {num_published_splits} splits and marked {num_marked_splits} for deletion successfully" ); Ok(EmptyResponse {}) }) @@ -1077,7 +1069,7 @@ impl MetastoreService for PostgresqlMetastore { async fn add_source(&self, request: AddSourceRequest) -> MetastoreResult { let source_config = request.deserialize_source_config()?; let index_uid: IndexUid = request.index_uid().clone(); - run_with_tx!(self.connection_pool, tx, { + run_with_tx!(self.connection_pool, tx, "add source", { mutate_index_metadata::(tx, index_uid, |index_metadata| { index_metadata.add_source(source_config)?; Ok(MutationOccurred::Yes(())) @@ -1091,7 +1083,7 @@ impl MetastoreService for PostgresqlMetastore { #[instrument(skip(self))] async fn toggle_source(&self, request: ToggleSourceRequest) -> MetastoreResult { let index_uid: IndexUid = request.index_uid().clone(); - run_with_tx!(self.connection_pool, tx, { + run_with_tx!(self.connection_pool, tx, "toggle source", { mutate_index_metadata(tx, index_uid, |index_metadata| { if index_metadata.toggle_source(&request.source_id, request.enable)? { Ok::<_, MetastoreError>(MutationOccurred::Yes(())) @@ -1109,7 +1101,7 @@ impl MetastoreService for PostgresqlMetastore { async fn delete_source(&self, request: DeleteSourceRequest) -> MetastoreResult { let index_uid: IndexUid = request.index_uid().clone(); let source_id = request.source_id.clone(); - run_with_tx!(self.connection_pool, tx, { + run_with_tx!(self.connection_pool, tx, "delete source", { mutate_index_metadata(tx, index_uid.clone(), |index_metadata| { index_metadata.delete_source(&source_id)?; Ok::<_, MetastoreError>(MutationOccurred::Yes(())) @@ -1138,7 +1130,7 @@ impl MetastoreService for PostgresqlMetastore { request: ResetSourceCheckpointRequest, ) -> MetastoreResult { let index_uid: IndexUid = request.index_uid().clone(); - run_with_tx!(self.connection_pool, tx, { + run_with_tx!(self.connection_pool, tx, "reset source checkpoint", { mutate_index_metadata(tx, index_uid, |index_metadata| { if index_metadata.checkpoint.reset_source(&request.source_id) { Ok::<_, MetastoreError>(MutationOccurred::Yes(())) @@ -1178,12 +1170,7 @@ impl MetastoreService for PostgresqlMetastore { /// Creates a delete task from a delete query. #[instrument(skip(self))] async fn create_delete_task(&self, delete_query: DeleteQuery) -> MetastoreResult { - let delete_query_json = serde_json::to_string(&delete_query).map_err(|error| { - MetastoreError::JsonSerializeError { - struct_name: "DeleteQuery".to_string(), - message: error.to_string(), - } - })?; + let delete_query_json = serde_utils::to_json_str(&delete_query)?; let (create_timestamp, opstamp): (sqlx::types::time::PrimitiveDateTime, i64) = sqlx::query_as( r#"