-
Notifications
You must be signed in to change notification settings - Fork 421
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Implement Shard API for PostgreSQL metastore
- Loading branch information
Showing
25 changed files
with
914 additions
and
496 deletions.
There are no files selected for viewing
5 changes: 5 additions & 0 deletions
5
quickwit/quickwit-metastore/migrations/postgresql/14_update-shard-id.down.sql
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
ALTER TABLE shards | ||
ALTER COLUMN shard_id DROP NOT NULL, | ||
ALTER COLUMN shard_id TYPE BIGSERIAL, | ||
ALTER COLUMN shard_state DROP DEFAULT, | ||
ALTER COLUMN publish_position_inclusive DROP DEFAULT; |
5 changes: 5 additions & 0 deletions
5
quickwit/quickwit-metastore/migrations/postgresql/14_update-shard-id.up.sql
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
ALTER TABLE shards | ||
ALTER COLUMN shard_id TYPE VARCHAR(255), | ||
ALTER COLUMN shard_id SET NOT NULL, | ||
ALTER COLUMN shard_state SET DEFAULT 'open', | ||
ALTER COLUMN publish_position_inclusive SET DEFAULT ''; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
File renamed without changes.
File renamed without changes.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
70 changes: 70 additions & 0 deletions
70
quickwit/quickwit-metastore/src/metastore/postgres/error.rs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,70 @@ | ||
// Copyright (C) 2024 Quickwit, Inc. | ||
// | ||
// Quickwit is offered under the AGPL v3.0 and as commercial software. | ||
// For commercial licensing, contact us at [email protected]. | ||
// | ||
// AGPL: | ||
// This program is free software: you can redistribute it and/or modify | ||
// it under the terms of the GNU Affero General Public License as | ||
// published by the Free Software Foundation, either version 3 of the | ||
// License, or (at your option) any later version. | ||
// | ||
// This program is distributed in the hope that it will be useful, | ||
// but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||
// GNU Affero General Public License for more details. | ||
// | ||
// You should have received a copy of the GNU Affero General Public License | ||
// along with this program. If not, see <http://www.gnu.org/licenses/>. | ||
|
||
use quickwit_proto::metastore::{EntityKind, MetastoreError}; | ||
use sqlx::postgres::PgDatabaseError; | ||
use tracing::error; | ||
|
||
// https://www.postgresql.org/docs/current/errcodes-appendix.html | ||
mod pg_error_codes { | ||
pub const FOREIGN_KEY_VIOLATION: &str = "23503"; | ||
pub const UNIQUE_VIOLATION: &str = "23505"; | ||
} | ||
|
||
pub(super) fn convert_sqlx_err(index_id: &str, sqlx_error: sqlx::Error) -> MetastoreError { | ||
match &sqlx_error { | ||
sqlx::Error::Database(boxed_db_error) => { | ||
let pg_db_error = boxed_db_error.downcast_ref::<PgDatabaseError>(); | ||
let pg_error_code = pg_db_error.code(); | ||
let pg_error_table = pg_db_error.table(); | ||
|
||
match (pg_error_code, pg_error_table) { | ||
(pg_error_codes::FOREIGN_KEY_VIOLATION, _) => { | ||
MetastoreError::NotFound(EntityKind::Index { | ||
index_id: index_id.to_string(), | ||
}) | ||
} | ||
(pg_error_codes::UNIQUE_VIOLATION, Some(table)) if table.starts_with("indexes") => { | ||
MetastoreError::AlreadyExists(EntityKind::Index { | ||
index_id: index_id.to_string(), | ||
}) | ||
} | ||
(pg_error_codes::UNIQUE_VIOLATION, _) => { | ||
error!(error=?boxed_db_error, "postgresql-error"); | ||
MetastoreError::Internal { | ||
message: "unique key violation".to_string(), | ||
cause: format!("DB error {boxed_db_error:?}"), | ||
} | ||
} | ||
_ => { | ||
error!(error=?boxed_db_error, "postgresql-error"); | ||
MetastoreError::Db { | ||
message: boxed_db_error.to_string(), | ||
} | ||
} | ||
} | ||
} | ||
_ => { | ||
error!(error=?sqlx_error, "an error has occurred in the database operation"); | ||
MetastoreError::Db { | ||
message: sqlx_error.to_string(), | ||
} | ||
} | ||
} | ||
} |
100 changes: 100 additions & 0 deletions
100
quickwit/quickwit-metastore/src/metastore/postgres/factory.rs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,100 @@ | ||
// Copyright (C) 2024 Quickwit, Inc. | ||
// | ||
// Quickwit is offered under the AGPL v3.0 and as commercial software. | ||
// For commercial licensing, contact us at [email protected]. | ||
// | ||
// AGPL: | ||
// This program is free software: you can redistribute it and/or modify | ||
// it under the terms of the GNU Affero General Public License as | ||
// published by the Free Software Foundation, either version 3 of the | ||
// License, or (at your option) any later version. | ||
// | ||
// This program is distributed in the hope that it will be useful, | ||
// but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||
// GNU Affero General Public License for more details. | ||
// | ||
// You should have received a copy of the GNU Affero General Public License | ||
// along with this program. If not, see <http://www.gnu.org/licenses/>. | ||
|
||
use std::collections::HashMap; | ||
use std::sync::Arc; | ||
|
||
use async_trait::async_trait; | ||
use quickwit_common::uri::Uri; | ||
use quickwit_config::{MetastoreBackend, MetastoreConfig}; | ||
use quickwit_proto::metastore::MetastoreServiceClient; | ||
use tokio::sync::Mutex; | ||
use tracing::debug; | ||
|
||
use crate::metastore::instrument_metastore; | ||
use crate::{MetastoreFactory, MetastoreResolverError, PostgresqlMetastore}; | ||
|
||
#[derive(Clone, Default)] | ||
pub struct PostgresqlMetastoreFactory { | ||
// Under normal conditions of use, this cache will contain a single `Metastore`. | ||
// | ||
// In contrast to the file-backed metastore, we use a strong pointer here, so that the | ||
// `Metastore` doesn't get dropped. This is done in order to keep the underlying connection | ||
// pool to Postgres alive. | ||
cache: Arc<Mutex<HashMap<Uri, MetastoreServiceClient>>>, | ||
} | ||
|
||
impl PostgresqlMetastoreFactory { | ||
async fn get_from_cache(&self, uri: &Uri) -> Option<MetastoreServiceClient> { | ||
let cache_lock = self.cache.lock().await; | ||
cache_lock.get(uri).map(MetastoreServiceClient::clone) | ||
} | ||
|
||
/// If there is a valid entry in the cache to begin with, we trash the new | ||
/// one and return the old one. | ||
/// | ||
/// This way we make sure that we keep only one instance associated | ||
/// to the key `uri` outside of this struct. | ||
async fn cache_metastore( | ||
&self, | ||
uri: Uri, | ||
metastore: MetastoreServiceClient, | ||
) -> MetastoreServiceClient { | ||
let mut cache_lock = self.cache.lock().await; | ||
if let Some(metastore) = cache_lock.get(&uri) { | ||
return metastore.clone(); | ||
} | ||
cache_lock.insert(uri, metastore.clone()); | ||
metastore | ||
} | ||
} | ||
|
||
#[async_trait] | ||
impl MetastoreFactory for PostgresqlMetastoreFactory { | ||
fn backend(&self) -> MetastoreBackend { | ||
MetastoreBackend::PostgreSQL | ||
} | ||
|
||
async fn resolve( | ||
&self, | ||
metastore_config: &MetastoreConfig, | ||
uri: &Uri, | ||
) -> Result<MetastoreServiceClient, MetastoreResolverError> { | ||
if let Some(metastore) = self.get_from_cache(uri).await { | ||
debug!("using metastore from cache"); | ||
return Ok(metastore); | ||
} | ||
debug!("metastore not found in cache"); | ||
let postgresql_metastore_config = metastore_config.as_postgres().ok_or_else(|| { | ||
let message = format!( | ||
"expected PostgreSQL metastore config, got `{:?}`", | ||
metastore_config.backend() | ||
); | ||
MetastoreResolverError::InvalidConfig(message) | ||
})?; | ||
let postgresql_metastore = PostgresqlMetastore::new(postgresql_metastore_config, uri) | ||
.await | ||
.map_err(MetastoreResolverError::Initialization)?; | ||
let instrumented_metastore = instrument_metastore(postgresql_metastore); | ||
let unique_metastore_for_uri = self | ||
.cache_metastore(uri.clone(), instrumented_metastore) | ||
.await; | ||
Ok(unique_metastore_for_uri) | ||
} | ||
} |
45 changes: 45 additions & 0 deletions
45
quickwit/quickwit-metastore/src/metastore/postgres/migrator.rs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,45 @@ | ||
// Copyright (C) 2024 Quickwit, Inc. | ||
// | ||
// Quickwit is offered under the AGPL v3.0 and as commercial software. | ||
// For commercial licensing, contact us at [email protected]. | ||
// | ||
// AGPL: | ||
// This program is free software: you can redistribute it and/or modify | ||
// it under the terms of the GNU Affero General Public License as | ||
// published by the Free Software Foundation, either version 3 of the | ||
// License, or (at your option) any later version. | ||
// | ||
// This program is distributed in the hope that it will be useful, | ||
// but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||
// GNU Affero General Public License for more details. | ||
// | ||
// You should have received a copy of the GNU Affero General Public License | ||
// along with this program. If not, see <http://www.gnu.org/licenses/>. | ||
|
||
use quickwit_proto::metastore::{MetastoreError, MetastoreResult}; | ||
use sqlx::migrate::Migrator; | ||
use sqlx::{Pool, Postgres}; | ||
use tracing::{error, instrument}; | ||
|
||
static MIGRATOR: Migrator = sqlx::migrate!("migrations/postgresql"); | ||
|
||
/// Initializes the database and runs the SQL migrations stored in the | ||
/// `quickwit-metastore/migrations` directory. | ||
#[instrument(skip_all)] | ||
pub(super) async fn run_migrations(pool: &Pool<Postgres>) -> MetastoreResult<()> { | ||
let tx = pool.begin().await?; | ||
let migrate_result = MIGRATOR.run(pool).await; | ||
|
||
let Err(migrate_error) = migrate_result else { | ||
tx.commit().await?; | ||
return Ok(()); | ||
}; | ||
tx.rollback().await?; | ||
error!(error=%migrate_error, "failed to run PostgreSQL migrations"); | ||
|
||
Err(MetastoreError::Internal { | ||
message: "failed to run PostgreSQL migrations".to_string(), | ||
cause: migrate_error.to_string(), | ||
}) | ||
} |
Oops, something went wrong.