Skip to content

Commit

Permalink
Add database connection pool metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
guilload committed Mar 14, 2024
1 parent 6153d96 commit c955bea
Show file tree
Hide file tree
Showing 7 changed files with 200 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,13 @@ use quickwit_proto::metastore::{
use quickwit_proto::types::{IndexId, IndexUid, Position, PublishToken, SourceId};
use sea_query::{Asterisk, PostgresQueryBuilder, Query};
use sea_query_binder::SqlxBinder;
use sqlx::{Executor, Pool, Postgres, Transaction};
use sqlx::{Acquire, Executor, Postgres, Transaction};
use tracing::{debug, info, instrument, warn};

use super::error::convert_sqlx_err;
use super::migrator::run_migrations;
use super::model::{PgDeleteTask, PgIndex, PgIndexTemplate, PgShard, PgSplit, Splits};
use super::pool::TrackedPool;
use super::split_stream::SplitStream;
use super::utils::{append_query_filters, establish_connection};
use crate::checkpoint::{
Expand All @@ -71,7 +72,7 @@ use crate::{
#[derive(Clone)]
pub struct PostgresqlMetastore {
uri: Uri,
connection_pool: Pool<Postgres>,
connection_pool: TrackedPool<Postgres>,
}

impl fmt::Debug for PostgresqlMetastore {
Expand Down Expand Up @@ -709,7 +710,7 @@ impl MetastoreService for PostgresqlMetastore {
let pg_split_stream = SplitStream::new(
self.connection_pool.clone(),
sql,
|connection_pool: &Pool<Postgres>, sql: &String| {
|connection_pool: &TrackedPool<Postgres>, sql: &String| {
sqlx::query_as_with::<_, PgSplit, _>(sql, values).fetch(connection_pool)
},
);
Expand Down
55 changes: 55 additions & 0 deletions quickwit/quickwit-metastore/src/metastore/postgres/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Copyright (C) 2024 Quickwit, Inc.
//
// Quickwit is offered under the AGPL v3.0 and as commercial software.
// For commercial licensing, contact us at [email protected].
//
// AGPL:
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use once_cell::sync::Lazy;
use quickwit_common::metrics::{new_gauge, IntGauge};

#[derive(Clone)]
pub(super) struct PostgresMetrics {
pub acquire_connections: IntGauge,
pub active_connections: IntGauge,
pub idle_connections: IntGauge,
}

impl Default for PostgresMetrics {
fn default() -> Self {
Self {
acquire_connections: new_gauge(
"acquire_connections",
"Number of connections being acquired.",
"metastore",
&[],
),
active_connections: new_gauge(
"active_connections",
"Number of active (used + idle) connections.",
"metastore",
&[],
),
idle_connections: new_gauge(
"idle_connections",
"Number of idle connections.",
"metastore",
&[],
),
}
}
}

pub(super) static POSTGRES_METRICS: Lazy<PostgresMetrics> = Lazy::new(PostgresMetrics::default);
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,17 @@

use quickwit_proto::metastore::{MetastoreError, MetastoreResult};
use sqlx::migrate::Migrator;
use sqlx::{Pool, Postgres};
use sqlx::{Acquire, Postgres};
use tracing::{error, instrument};

use super::pool::TrackedPool;

static MIGRATOR: Migrator = sqlx::migrate!("migrations/postgresql");

/// Initializes the database and runs the SQL migrations stored in the
/// `quickwit-metastore/migrations` directory.
#[instrument(skip_all)]
pub(super) async fn run_migrations(pool: &Pool<Postgres>) -> MetastoreResult<()> {
pub(super) async fn run_migrations(pool: &TrackedPool<Postgres>) -> MetastoreResult<()> {
let tx = pool.begin().await?;
let migrate_result = MIGRATOR.run(pool).await;

Expand Down
2 changes: 2 additions & 0 deletions quickwit/quickwit-metastore/src/metastore/postgres/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@
mod error;
mod factory;
mod metastore;
mod metrics;
mod migrator;
mod model;
mod pool;
mod split_stream;
mod tags;
mod utils;
Expand Down
124 changes: 124 additions & 0 deletions quickwit/quickwit-metastore/src/metastore/postgres/pool.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
// Copyright (C) 2024 Quickwit, Inc.
//
// Quickwit is offered under the AGPL v3.0 and as commercial software.
// For commercial licensing, contact us at [email protected].
//
// AGPL:
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use futures::future::BoxFuture;
use futures::stream::BoxStream;
use quickwit_common::metrics::GaugeGuard;
use sqlx::database::HasStatement;
use sqlx::pool::maybe::MaybePoolConnection;
use sqlx::pool::PoolConnection;
use sqlx::{
Acquire, Database, Describe, Either, Error, Execute, Executor, Pool, Postgres, Transaction,
};

use super::metrics::POSTGRES_METRICS;

#[derive(Debug)]
pub(super) struct TrackedPool<DB: Database> {
inner_pool: Pool<DB>,
}

impl TrackedPool<Postgres> {
pub fn new(inner_pool: Pool<Postgres>) -> Self {
Self { inner_pool }
}
}

impl<DB: Database> Clone for TrackedPool<DB> {
fn clone(&self) -> Self {
Self {
inner_pool: self.inner_pool.clone(),
}
}
}

impl<'a, DB: Database> Acquire<'a> for &TrackedPool<DB> {
type Database = DB;

type Connection = PoolConnection<DB>;

fn acquire(self) -> BoxFuture<'static, Result<Self::Connection, Error>> {
let acquire_fut = self.inner_pool.acquire();

POSTGRES_METRICS
.active_connections
.set(self.inner_pool.size() as i64);
POSTGRES_METRICS
.idle_connections
.set(self.inner_pool.num_idle() as i64);

Box::pin(async move {
let mut gauge_guard = GaugeGuard::from_gauge(&POSTGRES_METRICS.acquire_connections);
gauge_guard.add(1);

let conn = acquire_fut.await?;
Ok(conn)
})
}

fn begin(self) -> BoxFuture<'static, Result<Transaction<'a, DB>, Error>> {
let conn = self.acquire();

Box::pin(async move {
Transaction::begin(MaybePoolConnection::PoolConnection(conn.await?)).await
})
}
}

impl<'p, DB: Database> Executor<'p> for &'_ TrackedPool<DB>
where for<'c> &'c mut DB::Connection: Executor<'c, Database = DB>
{
type Database = DB;

fn fetch_many<'e, 'q: 'e, E: 'q>(
self,
query: E,
) -> BoxStream<'e, Result<Either<DB::QueryResult, DB::Row>, Error>>
where
E: Execute<'q, Self::Database>,
{
self.inner_pool.fetch_many(query)
}

fn fetch_optional<'e, 'q: 'e, E: 'q>(
self,
query: E,
) -> BoxFuture<'e, Result<Option<DB::Row>, Error>>
where
E: Execute<'q, Self::Database>,
{
self.inner_pool.fetch_optional(query)
}

fn prepare_with<'e, 'q: 'e>(
self,
sql: &'q str,
parameters: &'e [<Self::Database as Database>::TypeInfo],
) -> BoxFuture<'e, Result<<Self::Database as HasStatement<'q>>::Statement, Error>> {
self.inner_pool.prepare_with(sql, parameters)
}

#[doc(hidden)]
fn describe<'e, 'q: 'e>(
self,
sql: &'q str,
) -> BoxFuture<'e, Result<Describe<Self::Database>, Error>> {
self.inner_pool.describe(sql)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,14 @@ use std::task::{Context, Poll};

use futures::stream::BoxStream;
use ouroboros::self_referencing;
use sqlx::{Pool, Postgres};
use sqlx::Postgres;
use tokio_stream::Stream;

use super::pool::TrackedPool;

#[self_referencing(pub_extras)]
pub struct SplitStream<T> {
connection_pool: Pool<Postgres>,
connection_pool: TrackedPool<Postgres>,
sql: String,
#[borrows(connection_pool, sql)]
#[covariant]
Expand Down
11 changes: 7 additions & 4 deletions quickwit/quickwit-metastore/src/metastore/postgres/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,12 @@ use quickwit_common::uri::Uri;
use quickwit_proto::metastore::{MetastoreError, MetastoreResult};
use sea_query::{any, Expr, Func, Order, SelectStatement};
use sqlx::postgres::{PgConnectOptions, PgPoolOptions};
use sqlx::{ConnectOptions, Pool, Postgres};
use sqlx::{ConnectOptions, Postgres};
use tracing::error;
use tracing::log::LevelFilter;

use super::model::{Splits, ToTimestampFunc};
use super::pool::TrackedPool;
use super::tags::generate_sql_condition;
use crate::metastore::FilterRange;
use crate::{ListSplitsQuery, SplitMaturity, SplitMetadata};
Expand All @@ -43,7 +44,7 @@ pub(super) async fn establish_connection(
acquire_timeout: Duration,
idle_timeout_opt: Option<Duration>,
max_lifetime_opt: Option<Duration>,
) -> MetastoreResult<Pool<Postgres>> {
) -> MetastoreResult<TrackedPool<Postgres>> {
let pool_options = PgPoolOptions::new()
.min_connections(min_connections as u32)
.max_connections(max_connections as u32)
Expand All @@ -53,15 +54,17 @@ pub(super) async fn establish_connection(
let connect_options: PgConnectOptions = PgConnectOptions::from_str(connection_uri.as_str())?
.application_name("quickwit-metastore")
.log_statements(LevelFilter::Info);
pool_options
let sqlx_pool = pool_options
.connect_with(connect_options)
.await
.map_err(|error| {
error!(connection_uri=%connection_uri, error=?error, "failed to establish connection to database");
MetastoreError::Connection {
message: error.to_string(),
}
})
})?;
let tracked_pool = TrackedPool::new(sqlx_pool);
Ok(tracked_pool)
}

/// Extends an existing SQL string with the generated filter range appended to the query.
Expand Down

0 comments on commit c955bea

Please sign in to comment.