Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add database connection pool metrics #4742

Merged
merged 1 commit into from
Mar 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,13 @@ use quickwit_proto::metastore::{
use quickwit_proto::types::{IndexId, IndexUid, Position, PublishToken, SourceId};
use sea_query::{Asterisk, PostgresQueryBuilder, Query};
use sea_query_binder::SqlxBinder;
use sqlx::{Executor, Pool, Postgres, Transaction};
use sqlx::{Acquire, Executor, Postgres, Transaction};
use tracing::{debug, info, instrument, warn};

use super::error::convert_sqlx_err;
use super::migrator::run_migrations;
use super::model::{PgDeleteTask, PgIndex, PgIndexTemplate, PgShard, PgSplit, Splits};
use super::pool::TrackedPool;
use super::split_stream::SplitStream;
use super::utils::{append_query_filters, establish_connection};
use crate::checkpoint::{
Expand All @@ -71,7 +72,7 @@ use crate::{
#[derive(Clone)]
pub struct PostgresqlMetastore {
uri: Uri,
connection_pool: Pool<Postgres>,
connection_pool: TrackedPool<Postgres>,
}

impl fmt::Debug for PostgresqlMetastore {
Expand Down Expand Up @@ -709,7 +710,7 @@ impl MetastoreService for PostgresqlMetastore {
let pg_split_stream = SplitStream::new(
self.connection_pool.clone(),
sql,
|connection_pool: &Pool<Postgres>, sql: &String| {
|connection_pool: &TrackedPool<Postgres>, sql: &String| {
sqlx::query_as_with::<_, PgSplit, _>(sql, values).fetch(connection_pool)
},
);
Expand Down
55 changes: 55 additions & 0 deletions quickwit/quickwit-metastore/src/metastore/postgres/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Copyright (C) 2024 Quickwit, Inc.
//
// Quickwit is offered under the AGPL v3.0 and as commercial software.
// For commercial licensing, contact us at [email protected].
//
// AGPL:
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use once_cell::sync::Lazy;
use quickwit_common::metrics::{new_gauge, IntGauge};

#[derive(Clone)]
pub(super) struct PostgresMetrics {
pub acquire_connections: IntGauge,
pub active_connections: IntGauge,
pub idle_connections: IntGauge,
}

impl Default for PostgresMetrics {
fn default() -> Self {
Self {
acquire_connections: new_gauge(
"acquire_connections",
"Number of connections being acquired.",
"metastore",
&[],
),
active_connections: new_gauge(
"active_connections",
"Number of active (used + idle) connections.",
"metastore",
&[],
),
idle_connections: new_gauge(
"idle_connections",
"Number of idle connections.",
"metastore",
&[],
),
}
}
}

pub(super) static POSTGRES_METRICS: Lazy<PostgresMetrics> = Lazy::new(PostgresMetrics::default);
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,17 @@

use quickwit_proto::metastore::{MetastoreError, MetastoreResult};
use sqlx::migrate::Migrator;
use sqlx::{Pool, Postgres};
use sqlx::{Acquire, Postgres};
use tracing::{error, instrument};

use super::pool::TrackedPool;

static MIGRATOR: Migrator = sqlx::migrate!("migrations/postgresql");

/// Initializes the database and runs the SQL migrations stored in the
/// `quickwit-metastore/migrations` directory.
#[instrument(skip_all)]
pub(super) async fn run_migrations(pool: &Pool<Postgres>) -> MetastoreResult<()> {
pub(super) async fn run_migrations(pool: &TrackedPool<Postgres>) -> MetastoreResult<()> {
let tx = pool.begin().await?;
let migrate_result = MIGRATOR.run(pool).await;

Expand Down
2 changes: 2 additions & 0 deletions quickwit/quickwit-metastore/src/metastore/postgres/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@
mod error;
mod factory;
mod metastore;
mod metrics;
mod migrator;
mod model;
mod pool;
mod split_stream;
mod tags;
mod utils;
Expand Down
124 changes: 124 additions & 0 deletions quickwit/quickwit-metastore/src/metastore/postgres/pool.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
// Copyright (C) 2024 Quickwit, Inc.
//
// Quickwit is offered under the AGPL v3.0 and as commercial software.
// For commercial licensing, contact us at [email protected].
//
// AGPL:
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use futures::future::BoxFuture;
use futures::stream::BoxStream;
use quickwit_common::metrics::GaugeGuard;
use sqlx::database::HasStatement;
use sqlx::pool::maybe::MaybePoolConnection;
use sqlx::pool::PoolConnection;
use sqlx::{
Acquire, Database, Describe, Either, Error, Execute, Executor, Pool, Postgres, Transaction,
};

use super::metrics::POSTGRES_METRICS;

#[derive(Debug)]
pub(super) struct TrackedPool<DB: Database> {
inner_pool: Pool<DB>,
}

impl TrackedPool<Postgres> {
pub fn new(inner_pool: Pool<Postgres>) -> Self {
Self { inner_pool }
}
}

impl<DB: Database> Clone for TrackedPool<DB> {
fn clone(&self) -> Self {
Self {
inner_pool: self.inner_pool.clone(),
}
}
}

impl<'a, DB: Database> Acquire<'a> for &TrackedPool<DB> {
type Database = DB;

type Connection = PoolConnection<DB>;

fn acquire(self) -> BoxFuture<'static, Result<Self::Connection, Error>> {
let acquire_conn_fut = self.inner_pool.acquire();

POSTGRES_METRICS
.active_connections
.set(self.inner_pool.size() as i64);
POSTGRES_METRICS
.idle_connections
.set(self.inner_pool.num_idle() as i64);

Box::pin(async move {
let mut gauge_guard = GaugeGuard::from_gauge(&POSTGRES_METRICS.acquire_connections);
gauge_guard.add(1);

let conn = acquire_conn_fut.await?;
Ok(conn)
})
}

fn begin(self) -> BoxFuture<'static, Result<Transaction<'a, DB>, Error>> {
let acquire_conn_fut = self.acquire();

Box::pin(async move {
Transaction::begin(MaybePoolConnection::PoolConnection(acquire_conn_fut.await?)).await
})
}
}

impl<'p, DB: Database> Executor<'p> for &'_ TrackedPool<DB>
where for<'c> &'c mut DB::Connection: Executor<'c, Database = DB>
{
type Database = DB;

fn fetch_many<'e, 'q: 'e, E: 'q>(
self,
query: E,
) -> BoxStream<'e, Result<Either<DB::QueryResult, DB::Row>, Error>>
where
E: Execute<'q, Self::Database>,
{
self.inner_pool.fetch_many(query)
}

fn fetch_optional<'e, 'q: 'e, E: 'q>(
self,
query: E,
) -> BoxFuture<'e, Result<Option<DB::Row>, Error>>
where
E: Execute<'q, Self::Database>,
{
self.inner_pool.fetch_optional(query)
}

fn prepare_with<'e, 'q: 'e>(
self,
sql: &'q str,
parameters: &'e [<Self::Database as Database>::TypeInfo],
) -> BoxFuture<'e, Result<<Self::Database as HasStatement<'q>>::Statement, Error>> {
self.inner_pool.prepare_with(sql, parameters)
}

#[doc(hidden)]
fn describe<'e, 'q: 'e>(
self,
sql: &'q str,
) -> BoxFuture<'e, Result<Describe<Self::Database>, Error>> {
self.inner_pool.describe(sql)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,14 @@ use std::task::{Context, Poll};

use futures::stream::BoxStream;
use ouroboros::self_referencing;
use sqlx::{Pool, Postgres};
use sqlx::Postgres;
use tokio_stream::Stream;

use super::pool::TrackedPool;

#[self_referencing(pub_extras)]
pub struct SplitStream<T> {
connection_pool: Pool<Postgres>,
connection_pool: TrackedPool<Postgres>,
sql: String,
#[borrows(connection_pool, sql)]
#[covariant]
Expand Down
11 changes: 7 additions & 4 deletions quickwit/quickwit-metastore/src/metastore/postgres/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,12 @@ use quickwit_common::uri::Uri;
use quickwit_proto::metastore::{MetastoreError, MetastoreResult};
use sea_query::{any, Expr, Func, Order, SelectStatement};
use sqlx::postgres::{PgConnectOptions, PgPoolOptions};
use sqlx::{ConnectOptions, Pool, Postgres};
use sqlx::{ConnectOptions, Postgres};
use tracing::error;
use tracing::log::LevelFilter;

use super::model::{Splits, ToTimestampFunc};
use super::pool::TrackedPool;
use super::tags::generate_sql_condition;
use crate::metastore::FilterRange;
use crate::{ListSplitsQuery, SplitMaturity, SplitMetadata};
Expand All @@ -43,7 +44,7 @@ pub(super) async fn establish_connection(
acquire_timeout: Duration,
idle_timeout_opt: Option<Duration>,
max_lifetime_opt: Option<Duration>,
) -> MetastoreResult<Pool<Postgres>> {
) -> MetastoreResult<TrackedPool<Postgres>> {
let pool_options = PgPoolOptions::new()
.min_connections(min_connections as u32)
.max_connections(max_connections as u32)
Expand All @@ -53,15 +54,17 @@ pub(super) async fn establish_connection(
let connect_options: PgConnectOptions = PgConnectOptions::from_str(connection_uri.as_str())?
.application_name("quickwit-metastore")
.log_statements(LevelFilter::Info);
pool_options
let sqlx_pool = pool_options
.connect_with(connect_options)
.await
.map_err(|error| {
error!(connection_uri=%connection_uri, error=?error, "failed to establish connection to database");
MetastoreError::Connection {
message: error.to_string(),
}
})
})?;
let tracked_pool = TrackedPool::new(sqlx_pool);
Ok(tracked_pool)
}

/// Extends an existing SQL string with the generated filter range appended to the query.
Expand Down
Loading