Skip to content

Commit

Permalink
use FOR UPDATE only when needed (#5326)
Browse files Browse the repository at this point in the history
  • Loading branch information
trinity-1686a authored Aug 19, 2024
1 parent 70c8bbf commit 53f2125
Showing 1 changed file with 26 additions and 16 deletions.
42 changes: 26 additions & 16 deletions quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,16 +129,23 @@ impl PostgresqlMetastore {
}

/// Returns an Index object given an index_id or None if it does not exist.
async fn index_opt<'a, E>(executor: E, index_id: &str) -> MetastoreResult<Option<PgIndex>>
where E: sqlx::Executor<'a, Database = Postgres> {
let index_opt: Option<PgIndex> = sqlx::query_as::<_, PgIndex>(
async fn index_opt<'a, E>(
executor: E,
index_id: &str,
lock: bool,
) -> MetastoreResult<Option<PgIndex>>
where
E: sqlx::Executor<'a, Database = Postgres>,
{
let index_opt: Option<PgIndex> = sqlx::query_as::<_, PgIndex>(&format!(
r#"
SELECT *
FROM indexes
WHERE index_id = $1
FOR UPDATE
{}
"#,
)
if lock { "FOR UPDATE" } else { "" }
))
.bind(index_id)
.fetch_optional(executor)
.await?;
Expand All @@ -149,18 +156,20 @@ where E: sqlx::Executor<'a, Database = Postgres> {
async fn index_opt_for_uid<'a, E>(
executor: E,
index_uid: IndexUid,
lock: bool,
) -> MetastoreResult<Option<PgIndex>>
where
E: sqlx::Executor<'a, Database = Postgres>,
{
let index_opt: Option<PgIndex> = sqlx::query_as::<_, PgIndex>(
let index_opt: Option<PgIndex> = sqlx::query_as::<_, PgIndex>(&format!(
r#"
SELECT *
FROM indexes
WHERE index_uid = $1
FOR UPDATE
{}
"#,
)
if lock { "FOR UPDATE" } else { "" }
))
.bind(&index_uid)
.fetch_optional(executor)
.await?;
Expand All @@ -170,8 +179,9 @@ where
async fn index_metadata(
tx: &mut Transaction<'_, Postgres>,
index_id: &str,
lock: bool,
) -> MetastoreResult<IndexMetadata> {
index_opt(tx.as_mut(), index_id)
index_opt(tx.as_mut(), index_id, lock)
.await?
.ok_or_else(|| {
MetastoreError::NotFound(EntityKind::Index {
Expand Down Expand Up @@ -305,7 +315,7 @@ where
M: FnOnce(&mut IndexMetadata) -> Result<MutationOccurred<()>, E>,
{
let index_id = &index_uid.index_id;
let mut index_metadata = index_metadata(tx, index_id).await?;
let mut index_metadata = index_metadata(tx, index_id, true).await?;
if index_metadata.index_uid != index_uid {
return Err(MetastoreError::NotFound(EntityKind::Index {
index_id: index_id.to_string(),
Expand Down Expand Up @@ -421,9 +431,9 @@ impl MetastoreService for PostgresqlMetastore {
request: IndexMetadataRequest,
) -> MetastoreResult<IndexMetadataResponse> {
let pg_index_opt = if let Some(index_uid) = &request.index_uid {
index_opt_for_uid(&self.connection_pool, index_uid.clone()).await?
index_opt_for_uid(&self.connection_pool, index_uid.clone(), false).await?
} else if let Some(index_id) = &request.index_id {
index_opt(&self.connection_pool, index_id).await?
index_opt(&self.connection_pool, index_id, false).await?
} else {
let message = "invalid request: neither `index_id` nor `index_uid` is set".to_string();
return Err(MetastoreError::Internal {
Expand Down Expand Up @@ -675,7 +685,7 @@ impl MetastoreService for PostgresqlMetastore {
let replaced_split_ids = request.replaced_split_ids;

run_with_tx!(self.connection_pool, tx, {
let mut index_metadata = index_metadata(tx, &index_uid.index_id).await?;
let mut index_metadata = index_metadata(tx, &index_uid.index_id, true).await?;
if index_metadata.index_uid != index_uid {
return Err(MetastoreError::NotFound(EntityKind::Index {
index_id: index_uid.index_id,
Expand Down Expand Up @@ -938,7 +948,7 @@ impl MetastoreService for PostgresqlMetastore {
.map_err(|sqlx_error| convert_sqlx_err(&index_uid.index_id, sqlx_error))?;

if num_found_splits == 0
&& index_opt(&self.connection_pool, &index_uid.index_id)
&& index_opt(&self.connection_pool, &index_uid.index_id, false)
.await?
.is_none()
{
Expand Down Expand Up @@ -1018,7 +1028,7 @@ impl MetastoreService for PostgresqlMetastore {
.map_err(|sqlx_error| convert_sqlx_err(&index_uid.index_id, sqlx_error))?;

if num_found_splits == 0
&& index_opt_for_uid(&self.connection_pool, index_uid.clone())
&& index_opt_for_uid(&self.connection_pool, index_uid.clone(), false)
.await?
.is_none()
{
Expand Down Expand Up @@ -1214,7 +1224,7 @@ impl MetastoreService for PostgresqlMetastore {

// If no splits were updated, maybe the index does not exist in the first place?
if update_result.rows_affected() == 0
&& index_opt_for_uid(&self.connection_pool, index_uid.clone())
&& index_opt_for_uid(&self.connection_pool, index_uid.clone(), false)
.await?
.is_none()
{
Expand Down

0 comments on commit 53f2125

Please sign in to comment.