From 53f21254abef68c632ecbc87c48de197ae5e40a5 Mon Sep 17 00:00:00 2001 From: trinity-1686a Date: Mon, 19 Aug 2024 12:15:20 +0200 Subject: [PATCH] use FOR UPDATE only when needed (#5326) --- .../src/metastore/postgres/metastore.rs | 42 ++++++++++++------- 1 file changed, 26 insertions(+), 16 deletions(-) diff --git a/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs b/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs index 786e4f1a4af..7d916fcd29f 100644 --- a/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs +++ b/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs @@ -129,16 +129,23 @@ impl PostgresqlMetastore { } /// Returns an Index object given an index_id or None if it does not exist. -async fn index_opt<'a, E>(executor: E, index_id: &str) -> MetastoreResult> -where E: sqlx::Executor<'a, Database = Postgres> { - let index_opt: Option = sqlx::query_as::<_, PgIndex>( +async fn index_opt<'a, E>( + executor: E, + index_id: &str, + lock: bool, +) -> MetastoreResult> +where + E: sqlx::Executor<'a, Database = Postgres>, +{ + let index_opt: Option = sqlx::query_as::<_, PgIndex>(&format!( r#" SELECT * FROM indexes WHERE index_id = $1 - FOR UPDATE + {} "#, - ) + if lock { "FOR UPDATE" } else { "" } + )) .bind(index_id) .fetch_optional(executor) .await?; @@ -149,18 +156,20 @@ where E: sqlx::Executor<'a, Database = Postgres> { async fn index_opt_for_uid<'a, E>( executor: E, index_uid: IndexUid, + lock: bool, ) -> MetastoreResult> where E: sqlx::Executor<'a, Database = Postgres>, { - let index_opt: Option = sqlx::query_as::<_, PgIndex>( + let index_opt: Option = sqlx::query_as::<_, PgIndex>(&format!( r#" SELECT * FROM indexes WHERE index_uid = $1 - FOR UPDATE + {} "#, - ) + if lock { "FOR UPDATE" } else { "" } + )) .bind(&index_uid) .fetch_optional(executor) .await?; @@ -170,8 +179,9 @@ where async fn index_metadata( tx: &mut Transaction<'_, Postgres>, index_id: &str, + lock: bool, ) -> MetastoreResult { - index_opt(tx.as_mut(), index_id) + index_opt(tx.as_mut(), index_id, lock) .await? .ok_or_else(|| { MetastoreError::NotFound(EntityKind::Index { @@ -305,7 +315,7 @@ where M: FnOnce(&mut IndexMetadata) -> Result, E>, { let index_id = &index_uid.index_id; - let mut index_metadata = index_metadata(tx, index_id).await?; + let mut index_metadata = index_metadata(tx, index_id, true).await?; if index_metadata.index_uid != index_uid { return Err(MetastoreError::NotFound(EntityKind::Index { index_id: index_id.to_string(), @@ -421,9 +431,9 @@ impl MetastoreService for PostgresqlMetastore { request: IndexMetadataRequest, ) -> MetastoreResult { let pg_index_opt = if let Some(index_uid) = &request.index_uid { - index_opt_for_uid(&self.connection_pool, index_uid.clone()).await? + index_opt_for_uid(&self.connection_pool, index_uid.clone(), false).await? } else if let Some(index_id) = &request.index_id { - index_opt(&self.connection_pool, index_id).await? + index_opt(&self.connection_pool, index_id, false).await? } else { let message = "invalid request: neither `index_id` nor `index_uid` is set".to_string(); return Err(MetastoreError::Internal { @@ -675,7 +685,7 @@ impl MetastoreService for PostgresqlMetastore { let replaced_split_ids = request.replaced_split_ids; run_with_tx!(self.connection_pool, tx, { - let mut index_metadata = index_metadata(tx, &index_uid.index_id).await?; + let mut index_metadata = index_metadata(tx, &index_uid.index_id, true).await?; if index_metadata.index_uid != index_uid { return Err(MetastoreError::NotFound(EntityKind::Index { index_id: index_uid.index_id, @@ -938,7 +948,7 @@ impl MetastoreService for PostgresqlMetastore { .map_err(|sqlx_error| convert_sqlx_err(&index_uid.index_id, sqlx_error))?; if num_found_splits == 0 - && index_opt(&self.connection_pool, &index_uid.index_id) + && index_opt(&self.connection_pool, &index_uid.index_id, false) .await? .is_none() { @@ -1018,7 +1028,7 @@ impl MetastoreService for PostgresqlMetastore { .map_err(|sqlx_error| convert_sqlx_err(&index_uid.index_id, sqlx_error))?; if num_found_splits == 0 - && index_opt_for_uid(&self.connection_pool, index_uid.clone()) + && index_opt_for_uid(&self.connection_pool, index_uid.clone(), false) .await? .is_none() { @@ -1214,7 +1224,7 @@ impl MetastoreService for PostgresqlMetastore { // If no splits were updated, maybe the index does not exist in the first place? if update_result.rows_affected() == 0 - && index_opt_for_uid(&self.connection_pool, index_uid.clone()) + && index_opt_for_uid(&self.connection_pool, index_uid.clone(), false) .await? .is_none() {