Skip to content

Commit

Permalink
Log PostgreSQL metastore error (#5530)
Browse files Browse the repository at this point in the history
  • Loading branch information
guilload authored Oct 29, 2024
1 parent a91d2e7 commit 826f10f
Showing 1 changed file with 23 additions and 36 deletions.
59 changes: 23 additions & 36 deletions quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use futures::StreamExt;
use itertools::Itertools;
use quickwit_common::pretty::PrettySample;
use quickwit_common::uri::Uri;
use quickwit_common::{get_bool_from_env, ServiceStream};
use quickwit_common::{get_bool_from_env, rate_limited_error, ServiceStream};
use quickwit_config::{
validate_index_id_pattern, IndexTemplate, IndexTemplateId, PostgresMetastoreConfig,
};
Expand Down Expand Up @@ -304,17 +304,20 @@ async fn try_apply_delta_v2(
/// We still use this macro for them in order to make the code
/// "trivially correct".
macro_rules! run_with_tx {
($connection_pool:expr, $tx_refmut:ident, $x:block) => {{
($connection_pool:expr, $tx_refmut:ident, $label:literal, $x:block) => {{
let mut tx: Transaction<'_, Postgres> = $connection_pool.begin().await?;
let $tx_refmut = &mut tx;
let op_fut = move || async move { $x };
let op_result: MetastoreResult<_> = op_fut().await;
if op_result.is_ok() {
debug!("committing transaction");
tx.commit().await?;
} else {
warn!("rolling transaction back");
tx.rollback().await?;
match &op_result {
Ok(_) => {
debug!("committing transaction");
tx.commit().await?;
}
Err(error) => {
rate_limited_error!(limit_per_min = 60, error=%error, "failed to {}, rolling transaction back" , $label);
tx.rollback().await?;
}
}
op_result
}};
Expand All @@ -331,22 +334,17 @@ where
{
let index_id = &index_uid.index_id;
let mut index_metadata = index_metadata(tx, index_id, true).await?;

if index_metadata.index_uid != index_uid {
return Err(MetastoreError::NotFound(EntityKind::Index {
index_id: index_id.to_string(),
}));
}

if let MutationOccurred::No(()) = mutate_fn(&mut index_metadata)? {
return Ok(index_metadata);
}
let index_metadata_json = serde_utils::to_json_str(&index_metadata)?;

let index_metadata_json = serde_json::to_string(&index_metadata).map_err(|error| {
MetastoreError::JsonSerializeError {
struct_name: "IndexMetadata".to_string(),
message: error.to_string(),
}
})?;
let update_index_res = sqlx::query(
r#"
UPDATE indexes
Expand Down Expand Up @@ -426,7 +424,7 @@ impl MetastoreService for PostgresqlMetastore {
let doc_mapping = request.deserialize_doc_mapping()?;

let index_uid: IndexUid = request.index_uid().clone();
let updated_index_metadata = run_with_tx!(self.connection_pool, tx, {
let updated_index_metadata = run_with_tx!(self.connection_pool, tx, "update index", {
mutate_index_metadata::<MetastoreError, _>(tx, index_uid, |index_metadata| {
let mut mutation_occurred =
index_metadata.set_retention_policy(retention_policy_opt);
Expand Down Expand Up @@ -622,7 +620,7 @@ impl MetastoreService for PostgresqlMetastore {
tracing::Span::current().record("split_ids", format!("{split_ids:?}"));

// TODO: Remove transaction.
run_with_tx!(self.connection_pool, tx, {
run_with_tx!(self.connection_pool, tx, "stage splits", {
let upserted_split_ids: Vec<String> = sqlx::query_scalar(r#"
INSERT INTO splits
(split_id, time_range_start, time_range_end, tags, split_metadata_json, delete_opstamp, maturity_timestamp, split_state, index_uid, node_id)
Expand Down Expand Up @@ -698,7 +696,7 @@ impl MetastoreService for PostgresqlMetastore {
let staged_split_ids = request.staged_split_ids;
let replaced_split_ids = request.replaced_split_ids;

run_with_tx!(self.connection_pool, tx, {
run_with_tx!(self.connection_pool, tx, "publish splits", {
let mut index_metadata = index_metadata(tx, &index_uid.index_id, true).await?;
if index_metadata.index_uid != index_uid {
return Err(MetastoreError::NotFound(EntityKind::Index {
Expand Down Expand Up @@ -744,12 +742,7 @@ impl MetastoreService for PostgresqlMetastore {
})?;
}
}
let index_metadata_json = serde_json::to_string(&index_metadata).map_err(|error| {
MetastoreError::JsonSerializeError {
struct_name: "IndexMetadata".to_string(),
message: error.to_string(),
}
})?;
let index_metadata_json = serde_utils::to_json_str(&index_metadata)?;

const PUBLISH_SPLITS_QUERY: &str = r#"
-- Select the splits to update, regardless of their state.
Expand Down Expand Up @@ -854,8 +847,7 @@ impl MetastoreService for PostgresqlMetastore {
}
info!(
%index_uid,
"published {} splits and marked {} for deletion successfully",
num_published_splits, num_marked_splits
"published {num_published_splits} splits and marked {num_marked_splits} for deletion successfully"
);
Ok(EmptyResponse {})
})
Expand Down Expand Up @@ -1077,7 +1069,7 @@ impl MetastoreService for PostgresqlMetastore {
async fn add_source(&self, request: AddSourceRequest) -> MetastoreResult<EmptyResponse> {
let source_config = request.deserialize_source_config()?;
let index_uid: IndexUid = request.index_uid().clone();
run_with_tx!(self.connection_pool, tx, {
run_with_tx!(self.connection_pool, tx, "add source", {
mutate_index_metadata::<MetastoreError, _>(tx, index_uid, |index_metadata| {
index_metadata.add_source(source_config)?;
Ok(MutationOccurred::Yes(()))
Expand All @@ -1091,7 +1083,7 @@ impl MetastoreService for PostgresqlMetastore {
#[instrument(skip(self))]
async fn toggle_source(&self, request: ToggleSourceRequest) -> MetastoreResult<EmptyResponse> {
let index_uid: IndexUid = request.index_uid().clone();
run_with_tx!(self.connection_pool, tx, {
run_with_tx!(self.connection_pool, tx, "toggle source", {
mutate_index_metadata(tx, index_uid, |index_metadata| {
if index_metadata.toggle_source(&request.source_id, request.enable)? {
Ok::<_, MetastoreError>(MutationOccurred::Yes(()))
Expand All @@ -1109,7 +1101,7 @@ impl MetastoreService for PostgresqlMetastore {
async fn delete_source(&self, request: DeleteSourceRequest) -> MetastoreResult<EmptyResponse> {
let index_uid: IndexUid = request.index_uid().clone();
let source_id = request.source_id.clone();
run_with_tx!(self.connection_pool, tx, {
run_with_tx!(self.connection_pool, tx, "delete source", {
mutate_index_metadata(tx, index_uid.clone(), |index_metadata| {
index_metadata.delete_source(&source_id)?;
Ok::<_, MetastoreError>(MutationOccurred::Yes(()))
Expand Down Expand Up @@ -1138,7 +1130,7 @@ impl MetastoreService for PostgresqlMetastore {
request: ResetSourceCheckpointRequest,
) -> MetastoreResult<EmptyResponse> {
let index_uid: IndexUid = request.index_uid().clone();
run_with_tx!(self.connection_pool, tx, {
run_with_tx!(self.connection_pool, tx, "reset source checkpoint", {
mutate_index_metadata(tx, index_uid, |index_metadata| {
if index_metadata.checkpoint.reset_source(&request.source_id) {
Ok::<_, MetastoreError>(MutationOccurred::Yes(()))
Expand Down Expand Up @@ -1178,12 +1170,7 @@ impl MetastoreService for PostgresqlMetastore {
/// Creates a delete task from a delete query.
#[instrument(skip(self))]
async fn create_delete_task(&self, delete_query: DeleteQuery) -> MetastoreResult<DeleteTask> {
let delete_query_json = serde_json::to_string(&delete_query).map_err(|error| {
MetastoreError::JsonSerializeError {
struct_name: "DeleteQuery".to_string(),
message: error.to_string(),
}
})?;
let delete_query_json = serde_utils::to_json_str(&delete_query)?;
let (create_timestamp, opstamp): (sqlx::types::time::PrimitiveDateTime, i64) =
sqlx::query_as(
r#"
Expand Down

0 comments on commit 826f10f

Please sign in to comment.