From c955beaf30e26b12bd0bdad044757d4d0640e6d8 Mon Sep 17 00:00:00 2001 From: Adrien Guillo Date: Thu, 14 Mar 2024 14:35:02 -0400 Subject: [PATCH] Add database connection pool metrics --- .../src/metastore/postgres/metastore.rs | 7 +- .../src/metastore/postgres/metrics.rs | 55 ++++++++ .../src/metastore/postgres/migrator.rs | 6 +- .../src/metastore/postgres/mod.rs | 2 + .../src/metastore/postgres/pool.rs | 124 ++++++++++++++++++ .../src/metastore/postgres/split_stream.rs | 6 +- .../src/metastore/postgres/utils.rs | 11 +- 7 files changed, 200 insertions(+), 11 deletions(-) create mode 100644 quickwit/quickwit-metastore/src/metastore/postgres/metrics.rs create mode 100644 quickwit/quickwit-metastore/src/metastore/postgres/pool.rs diff --git a/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs b/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs index 8667ded3f1f..87515bfc372 100644 --- a/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs +++ b/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs @@ -48,12 +48,13 @@ use quickwit_proto::metastore::{ use quickwit_proto::types::{IndexId, IndexUid, Position, PublishToken, SourceId}; use sea_query::{Asterisk, PostgresQueryBuilder, Query}; use sea_query_binder::SqlxBinder; -use sqlx::{Executor, Pool, Postgres, Transaction}; +use sqlx::{Acquire, Executor, Postgres, Transaction}; use tracing::{debug, info, instrument, warn}; use super::error::convert_sqlx_err; use super::migrator::run_migrations; use super::model::{PgDeleteTask, PgIndex, PgIndexTemplate, PgShard, PgSplit, Splits}; +use super::pool::TrackedPool; use super::split_stream::SplitStream; use super::utils::{append_query_filters, establish_connection}; use crate::checkpoint::{ @@ -71,7 +72,7 @@ use crate::{ #[derive(Clone)] pub struct PostgresqlMetastore { uri: Uri, - connection_pool: Pool, + connection_pool: TrackedPool, } impl fmt::Debug for PostgresqlMetastore { @@ -709,7 +710,7 @@ impl MetastoreService for PostgresqlMetastore { let pg_split_stream = SplitStream::new( self.connection_pool.clone(), sql, - |connection_pool: &Pool, sql: &String| { + |connection_pool: &TrackedPool, sql: &String| { sqlx::query_as_with::<_, PgSplit, _>(sql, values).fetch(connection_pool) }, ); diff --git a/quickwit/quickwit-metastore/src/metastore/postgres/metrics.rs b/quickwit/quickwit-metastore/src/metastore/postgres/metrics.rs new file mode 100644 index 00000000000..8e54a529ab9 --- /dev/null +++ b/quickwit/quickwit-metastore/src/metastore/postgres/metrics.rs @@ -0,0 +1,55 @@ +// Copyright (C) 2024 Quickwit, Inc. +// +// Quickwit is offered under the AGPL v3.0 and as commercial software. +// For commercial licensing, contact us at hello@quickwit.io. +// +// AGPL: +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +use once_cell::sync::Lazy; +use quickwit_common::metrics::{new_gauge, IntGauge}; + +#[derive(Clone)] +pub(super) struct PostgresMetrics { + pub acquire_connections: IntGauge, + pub active_connections: IntGauge, + pub idle_connections: IntGauge, +} + +impl Default for PostgresMetrics { + fn default() -> Self { + Self { + acquire_connections: new_gauge( + "acquire_connections", + "Number of connections being acquired.", + "metastore", + &[], + ), + active_connections: new_gauge( + "active_connections", + "Number of active (used + idle) connections.", + "metastore", + &[], + ), + idle_connections: new_gauge( + "idle_connections", + "Number of idle connections.", + "metastore", + &[], + ), + } + } +} + +pub(super) static POSTGRES_METRICS: Lazy = Lazy::new(PostgresMetrics::default); diff --git a/quickwit/quickwit-metastore/src/metastore/postgres/migrator.rs b/quickwit/quickwit-metastore/src/metastore/postgres/migrator.rs index cba3dc4c15e..d4640036edc 100644 --- a/quickwit/quickwit-metastore/src/metastore/postgres/migrator.rs +++ b/quickwit/quickwit-metastore/src/metastore/postgres/migrator.rs @@ -19,15 +19,17 @@ use quickwit_proto::metastore::{MetastoreError, MetastoreResult}; use sqlx::migrate::Migrator; -use sqlx::{Pool, Postgres}; +use sqlx::{Acquire, Postgres}; use tracing::{error, instrument}; +use super::pool::TrackedPool; + static MIGRATOR: Migrator = sqlx::migrate!("migrations/postgresql"); /// Initializes the database and runs the SQL migrations stored in the /// `quickwit-metastore/migrations` directory. #[instrument(skip_all)] -pub(super) async fn run_migrations(pool: &Pool) -> MetastoreResult<()> { +pub(super) async fn run_migrations(pool: &TrackedPool) -> MetastoreResult<()> { let tx = pool.begin().await?; let migrate_result = MIGRATOR.run(pool).await; diff --git a/quickwit/quickwit-metastore/src/metastore/postgres/mod.rs b/quickwit/quickwit-metastore/src/metastore/postgres/mod.rs index 13f4d686da6..c89deb740e7 100644 --- a/quickwit/quickwit-metastore/src/metastore/postgres/mod.rs +++ b/quickwit/quickwit-metastore/src/metastore/postgres/mod.rs @@ -20,8 +20,10 @@ mod error; mod factory; mod metastore; +mod metrics; mod migrator; mod model; +mod pool; mod split_stream; mod tags; mod utils; diff --git a/quickwit/quickwit-metastore/src/metastore/postgres/pool.rs b/quickwit/quickwit-metastore/src/metastore/postgres/pool.rs new file mode 100644 index 00000000000..c7fa11e3eb2 --- /dev/null +++ b/quickwit/quickwit-metastore/src/metastore/postgres/pool.rs @@ -0,0 +1,124 @@ +// Copyright (C) 2024 Quickwit, Inc. +// +// Quickwit is offered under the AGPL v3.0 and as commercial software. +// For commercial licensing, contact us at hello@quickwit.io. +// +// AGPL: +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +use futures::future::BoxFuture; +use futures::stream::BoxStream; +use quickwit_common::metrics::GaugeGuard; +use sqlx::database::HasStatement; +use sqlx::pool::maybe::MaybePoolConnection; +use sqlx::pool::PoolConnection; +use sqlx::{ + Acquire, Database, Describe, Either, Error, Execute, Executor, Pool, Postgres, Transaction, +}; + +use super::metrics::POSTGRES_METRICS; + +#[derive(Debug)] +pub(super) struct TrackedPool { + inner_pool: Pool, +} + +impl TrackedPool { + pub fn new(inner_pool: Pool) -> Self { + Self { inner_pool } + } +} + +impl Clone for TrackedPool { + fn clone(&self) -> Self { + Self { + inner_pool: self.inner_pool.clone(), + } + } +} + +impl<'a, DB: Database> Acquire<'a> for &TrackedPool { + type Database = DB; + + type Connection = PoolConnection; + + fn acquire(self) -> BoxFuture<'static, Result> { + let acquire_fut = self.inner_pool.acquire(); + + POSTGRES_METRICS + .active_connections + .set(self.inner_pool.size() as i64); + POSTGRES_METRICS + .idle_connections + .set(self.inner_pool.num_idle() as i64); + + Box::pin(async move { + let mut gauge_guard = GaugeGuard::from_gauge(&POSTGRES_METRICS.acquire_connections); + gauge_guard.add(1); + + let conn = acquire_fut.await?; + Ok(conn) + }) + } + + fn begin(self) -> BoxFuture<'static, Result, Error>> { + let conn = self.acquire(); + + Box::pin(async move { + Transaction::begin(MaybePoolConnection::PoolConnection(conn.await?)).await + }) + } +} + +impl<'p, DB: Database> Executor<'p> for &'_ TrackedPool +where for<'c> &'c mut DB::Connection: Executor<'c, Database = DB> +{ + type Database = DB; + + fn fetch_many<'e, 'q: 'e, E: 'q>( + self, + query: E, + ) -> BoxStream<'e, Result, Error>> + where + E: Execute<'q, Self::Database>, + { + self.inner_pool.fetch_many(query) + } + + fn fetch_optional<'e, 'q: 'e, E: 'q>( + self, + query: E, + ) -> BoxFuture<'e, Result, Error>> + where + E: Execute<'q, Self::Database>, + { + self.inner_pool.fetch_optional(query) + } + + fn prepare_with<'e, 'q: 'e>( + self, + sql: &'q str, + parameters: &'e [::TypeInfo], + ) -> BoxFuture<'e, Result<>::Statement, Error>> { + self.inner_pool.prepare_with(sql, parameters) + } + + #[doc(hidden)] + fn describe<'e, 'q: 'e>( + self, + sql: &'q str, + ) -> BoxFuture<'e, Result, Error>> { + self.inner_pool.describe(sql) + } +} diff --git a/quickwit/quickwit-metastore/src/metastore/postgres/split_stream.rs b/quickwit/quickwit-metastore/src/metastore/postgres/split_stream.rs index 3891389416f..42138bd066c 100644 --- a/quickwit/quickwit-metastore/src/metastore/postgres/split_stream.rs +++ b/quickwit/quickwit-metastore/src/metastore/postgres/split_stream.rs @@ -22,12 +22,14 @@ use std::task::{Context, Poll}; use futures::stream::BoxStream; use ouroboros::self_referencing; -use sqlx::{Pool, Postgres}; +use sqlx::Postgres; use tokio_stream::Stream; +use super::pool::TrackedPool; + #[self_referencing(pub_extras)] pub struct SplitStream { - connection_pool: Pool, + connection_pool: TrackedPool, sql: String, #[borrows(connection_pool, sql)] #[covariant] diff --git a/quickwit/quickwit-metastore/src/metastore/postgres/utils.rs b/quickwit/quickwit-metastore/src/metastore/postgres/utils.rs index 4fcc0319a6a..1f972749c46 100644 --- a/quickwit/quickwit-metastore/src/metastore/postgres/utils.rs +++ b/quickwit/quickwit-metastore/src/metastore/postgres/utils.rs @@ -26,11 +26,12 @@ use quickwit_common::uri::Uri; use quickwit_proto::metastore::{MetastoreError, MetastoreResult}; use sea_query::{any, Expr, Func, Order, SelectStatement}; use sqlx::postgres::{PgConnectOptions, PgPoolOptions}; -use sqlx::{ConnectOptions, Pool, Postgres}; +use sqlx::{ConnectOptions, Postgres}; use tracing::error; use tracing::log::LevelFilter; use super::model::{Splits, ToTimestampFunc}; +use super::pool::TrackedPool; use super::tags::generate_sql_condition; use crate::metastore::FilterRange; use crate::{ListSplitsQuery, SplitMaturity, SplitMetadata}; @@ -43,7 +44,7 @@ pub(super) async fn establish_connection( acquire_timeout: Duration, idle_timeout_opt: Option, max_lifetime_opt: Option, -) -> MetastoreResult> { +) -> MetastoreResult> { let pool_options = PgPoolOptions::new() .min_connections(min_connections as u32) .max_connections(max_connections as u32) @@ -53,7 +54,7 @@ pub(super) async fn establish_connection( let connect_options: PgConnectOptions = PgConnectOptions::from_str(connection_uri.as_str())? .application_name("quickwit-metastore") .log_statements(LevelFilter::Info); - pool_options + let sqlx_pool = pool_options .connect_with(connect_options) .await .map_err(|error| { @@ -61,7 +62,9 @@ pub(super) async fn establish_connection( MetastoreError::Connection { message: error.to_string(), } - }) + })?; + let tracked_pool = TrackedPool::new(sqlx_pool); + Ok(tracked_pool) } /// Extends an existing SQL string with the generated filter range appended to the query.