From ddf691af9acdfa36b47c2653dcec0d5ad1920ba9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Thu, 25 Jan 2024 16:15:49 +0000 Subject: [PATCH 1/4] HeartbeatRoutine: use explicit ticker (#1157) `time.After(x)` is same as `time.NewTicker(x).C` --- flow/connectors/utils/heartbeat.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/flow/connectors/utils/heartbeat.go b/flow/connectors/utils/heartbeat.go index 2ff8e007de..c12a0dce6f 100644 --- a/flow/connectors/utils/heartbeat.go +++ b/flow/connectors/utils/heartbeat.go @@ -16,6 +16,9 @@ func HeartbeatRoutine( shutdown := make(chan struct{}) go func() { counter := 0 + ticker := time.NewTicker(15 * time.Second) + defer ticker.Stop() + for { counter += 1 msg := fmt.Sprintf("heartbeat #%d: %s", counter, message()) @@ -25,7 +28,8 @@ func HeartbeatRoutine( return case <-ctx.Done(): return - case <-time.After(15 * time.Second): + case <-ticker.C: + ticker.Reset(15 * time.Second) } } }() From c1a6562a8f2902202414f5042d5b74368ba6a437 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Thu, 25 Jan 2024 16:17:01 +0000 Subject: [PATCH 2/4] EventHub processBatch: handle `ctx.Done()` (#1155) Based on #1151, ticker.Reset doesn't need to be changed since ticker isn't shared between goroutines --- flow/activities/flowable.go | 27 +++++++++++++------------- flow/activities/slot.go | 2 +- flow/connectors/eventhub/eventhub.go | 5 ++++- flow/connectors/eventhub/hubmanager.go | 9 ++++++++- 4 files changed, 26 insertions(+), 17 deletions(-) diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 1a6c026fff..2eaf9f1d3f 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -315,9 +315,7 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, numRecords := res.NumRecordsSynced syncDuration := time.Since(syncStartTime) - slog.InfoContext(ctx, fmt.Sprintf("pushed %d records in %d seconds\n", - numRecords, int(syncDuration.Seconds())), - ) + slog.InfoContext(ctx, fmt.Sprintf("pushed %d records in %d seconds", numRecords, int(syncDuration.Seconds()))) lastCheckpoint, err := recordBatch.GetLastCheckpoint() if err != nil { @@ -426,7 +424,7 @@ func (a *FlowableActivity) StartNormalize( // log the number of batches normalized if res != nil { - slog.InfoContext(ctx, fmt.Sprintf("normalized records from batch %d to batch %d\n", + slog.InfoContext(ctx, fmt.Sprintf("normalized records from batch %d to batch %d", res.StartBatchID, res.EndBatchID)) } @@ -505,11 +503,11 @@ func (a *FlowableActivity) ReplicateQRepPartitions(ctx context.Context, numPartitions := len(partitions.Partitions) - slog.InfoContext(ctx, fmt.Sprintf("replicating partitions for batch %d - size: %d\n", + slog.InfoContext(ctx, fmt.Sprintf("replicating partitions for batch %d - size: %d", partitions.BatchId, numPartitions), ) for i, p := range partitions.Partitions { - slog.InfoContext(ctx, fmt.Sprintf("batch-%d - replicating partition - %s\n", partitions.BatchId, p.PartitionId)) + slog.InfoContext(ctx, fmt.Sprintf("batch-%d - replicating partition - %s", partitions.BatchId, p.PartitionId)) err := a.replicateQRepPartition(ctx, config, i+1, numPartitions, p, runUUID) if err != nil { a.Alerter.LogFlowError(ctx, config.FlowJobName, err) @@ -551,7 +549,7 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context, } defer connectors.CloseConnector(dstConn) - slog.InfoContext(ctx, fmt.Sprintf("replicating partition %s\n", partition.PartitionId)) + slog.InfoContext(ctx, fmt.Sprintf("replicating partition %s", partition.PartitionId)) shutdown := utils.HeartbeatRoutine(ctx, func() string { return fmt.Sprintf("syncing partition - %s: %d of %d total.", partition.PartitionId, idx, total) }) @@ -590,7 +588,7 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context, a.Alerter.LogFlowError(ctx, config.FlowJobName, err) return fmt.Errorf("failed to pull qrep records: %w", err) } - slog.InfoContext(ctx, fmt.Sprintf("pulled %d records\n", len(recordBatch.Records))) + slog.InfoContext(ctx, fmt.Sprintf("pulled %d records", len(recordBatch.Records))) err = monitoring.UpdatePullEndTimeAndRowsForPartition(ctx, a.CatalogPool, runUUID, partition, int64(len(recordBatch.Records))) if err != nil { @@ -611,7 +609,7 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context, } if rowsSynced == 0 { - slog.InfoContext(ctx, fmt.Sprintf("no records to push for partition %s\n", partition.PartitionId)) + slog.InfoContext(ctx, fmt.Sprintf("no records to push for partition %s", partition.PartitionId)) pullCancel() } else { wg.Wait() @@ -625,7 +623,7 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context, return err } - slog.InfoContext(ctx, fmt.Sprintf("pushed %d records\n", rowsSynced)) + slog.InfoContext(ctx, fmt.Sprintf("pushed %d records", rowsSynced)) } err = monitoring.UpdateEndTimeForPartition(ctx, a.CatalogPool, runUUID, partition) @@ -732,6 +730,7 @@ func (a *FlowableActivity) SendWALHeartbeat(ctx context.Context) error { sendTimeout := 10 * time.Minute ticker := time.NewTicker(sendTimeout) defer ticker.Stop() + activity.RecordHeartbeat(ctx, "sending walheartbeat every 10 minutes") for { select { @@ -798,7 +797,7 @@ func (a *FlowableActivity) QRepWaitUntilNewRows(ctx context.Context, } defer connectors.CloseConnector(srcConn) pgSrcConn := srcConn.(*connpostgres.PostgresConnector) - slog.InfoContext(ctx, fmt.Sprintf("current last partition value is %v\n", last)) + slog.InfoContext(ctx, fmt.Sprintf("current last partition value is %v", last)) attemptCount := 1 for { activity.RecordHeartbeat(ctx, fmt.Sprintf("no new rows yet, attempt #%d", attemptCount)) @@ -914,7 +913,7 @@ func (a *FlowableActivity) ReplicateXminPartition(ctx context.Context, } defer connectors.CloseConnector(dstConn) - slog.InfoContext(ctx, "replicating xmin\n") + slog.InfoContext(ctx, "replicating xmin") bufferSize := shared.FetchAndChannelSize errGroup, errCtx := errgroup.WithContext(ctx) @@ -980,7 +979,7 @@ func (a *FlowableActivity) ReplicateXminPartition(ctx context.Context, } if rowsSynced == 0 { - slog.InfoContext(ctx, "no records to push for xmin\n") + slog.InfoContext(ctx, "no records to push for xmin") } else { err := errGroup.Wait() if err != nil { @@ -993,7 +992,7 @@ func (a *FlowableActivity) ReplicateXminPartition(ctx context.Context, return 0, err } - slog.InfoContext(ctx, fmt.Sprintf("pushed %d records\n", rowsSynced)) + slog.InfoContext(ctx, fmt.Sprintf("pushed %d records", rowsSynced)) } err = monitoring.UpdateEndTimeForPartition(ctx, a.CatalogPool, runUUID, partition) diff --git a/flow/activities/slot.go b/flow/activities/slot.go index 8e8bb9aea0..b748e67e45 100644 --- a/flow/activities/slot.go +++ b/flow/activities/slot.go @@ -77,8 +77,8 @@ func (a *FlowableActivity) recordSlotSizePeriodically( timeout := 5 * time.Minute ticker := time.NewTicker(timeout) - defer ticker.Stop() + for { select { case <-ticker.C: diff --git a/flow/connectors/eventhub/eventhub.go b/flow/connectors/eventhub/eventhub.go index f14ca4ac1c..66a9131676 100644 --- a/flow/connectors/eventhub/eventhub.go +++ b/flow/connectors/eventhub/eventhub.go @@ -202,6 +202,9 @@ func (c *EventHubConnector) processBatch( c.logger.Info("processBatch", slog.Int("number of records processed for sending", int(curNumRecords))) } + case <-c.ctx.Done(): + return 0, fmt.Errorf("[eventhub] context cancelled %w", c.ctx.Err()) + case <-ticker.C: err := batchPerTopic.flushAllBatches(ctx, flowJobName) if err != nil { @@ -213,7 +216,7 @@ func (c *EventHubConnector) processBatch( lastUpdatedOffset = lastSeenLSN c.logger.Info("processBatch", slog.Int64("updated last offset", lastSeenLSN)) if err != nil { - return 0, fmt.Errorf("failed to update last offset: %v", err) + return 0, fmt.Errorf("failed to update last offset: %w", err) } } diff --git a/flow/connectors/eventhub/hubmanager.go b/flow/connectors/eventhub/hubmanager.go index 7f6d206728..0caa6106fa 100644 --- a/flow/connectors/eventhub/hubmanager.go +++ b/flow/connectors/eventhub/hubmanager.go @@ -7,6 +7,7 @@ import ( "log/slog" "strings" "sync" + "sync/atomic" "time" "github.com/Azure/azure-sdk-for-go/sdk/azidentity" @@ -103,8 +104,13 @@ func (m *EventHubManager) closeProducerClient(ctx context.Context, pc *azeventhu } func (m *EventHubManager) Close(ctx context.Context) error { - var allErrors error + numHubsClosed := atomic.Uint32{} + shutdown := utils.HeartbeatRoutine(ctx, func() string { + return fmt.Sprintf("closed %d eventhub clients", numHubsClosed.Load()) + }) + defer shutdown() + var allErrors error m.hubs.Range(func(key any, value any) bool { name := key.(ScopedEventhub) hub := value.(*azeventhubs.ProducerClient) @@ -113,6 +119,7 @@ func (m *EventHubManager) Close(ctx context.Context) error { slog.Error(fmt.Sprintf("failed to close eventhub client for %v", name), slog.Any("error", err)) allErrors = errors.Join(allErrors, err) } + numHubsClosed.Add(1) return true }) From aefdeaaa0b72b001cf3a3a141b88d7213df58ff0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Thu, 25 Jan 2024 16:21:49 +0000 Subject: [PATCH 3/4] nexus: impl QueryExecutor for Catalog (#1150) To share code, Catalog was storing a postgres query executor Unfortunately, that caused two connections for every nexus connection Remove is_connection_valid since it isn't used, & for pg was wrong (see deadpool-postgres for how they check if connection is valid) Alternative fix would've been to use more connection pools, that could be added in the future if necessary --- nexus/Cargo.lock | 57 ++++++++------------------ nexus/Cargo.toml | 1 + nexus/analyzer/Cargo.toml | 2 +- nexus/catalog/Cargo.toml | 5 ++- nexus/catalog/src/lib.rs | 36 ++++++++++------- nexus/parser/Cargo.toml | 2 +- nexus/peer-bigquery/Cargo.toml | 2 +- nexus/peer-bigquery/src/lib.rs | 15 +------ nexus/peer-cursor/Cargo.toml | 2 +- nexus/peer-cursor/src/lib.rs | 3 -- nexus/peer-postgres/Cargo.toml | 2 +- nexus/peer-postgres/src/lib.rs | 72 ++++++++++++++++----------------- nexus/peer-snowflake/Cargo.toml | 2 +- nexus/peer-snowflake/src/lib.rs | 9 ----- nexus/pt/Cargo.toml | 2 +- nexus/server/Cargo.toml | 2 +- nexus/server/src/main.rs | 36 ++++++++--------- 17 files changed, 105 insertions(+), 145 deletions(-) diff --git a/nexus/Cargo.lock b/nexus/Cargo.lock index 3685c4773e..2bd0816699 100644 --- a/nexus/Cargo.lock +++ b/nexus/Cargo.lock @@ -423,7 +423,7 @@ dependencies = [ "serde_json", "tar", "tempfile", - "toml 0.8.8", + "toml", "xz2", "zopfli", ] @@ -435,7 +435,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "802b755090e39835a4b0440fb0bbee0df7495a8b337f63db21e616f7821c7e8c" dependencies = [ "serde", - "toml 0.8.8", + "toml", ] [[package]] @@ -443,14 +443,17 @@ name = "catalog" version = "0.1.0" dependencies = [ "anyhow", + "async-trait", "chrono", "peer-cursor", "peer-postgres", + "pgwire", "postgres-connection", "prost", "pt", "refinery", "serde_json", + "sqlparser", "tokio", "tokio-postgres", "tracing", @@ -488,9 +491,9 @@ checksum = "fd16c4719339c4530435d38e511904438d07cce7950afa3718a84ac36c10e89e" [[package]] name = "chrono" -version = "0.4.31" +version = "0.4.32" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7f2c685bad3eb3d45a01354cedb7d5faa66194d1d58ba6e267a8de788f79db38" +checksum = "41daef31d7a747c5c847246f36de49ced6f7403b4cdabc807a97b5cc184cda7a" dependencies = [ "android-tzdata", "iana-time-zone", @@ -498,7 +501,7 @@ dependencies = [ "num-traits", "serde", "wasm-bindgen", - "windows-targets 0.48.5", + "windows-targets 0.52.0", ] [[package]] @@ -2107,7 +2110,7 @@ version = "3.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6d37c51ca738a55da99dc0c4a34860fd675453b8b36209178c2249bb13651284" dependencies = [ - "toml_edit 0.21.0", + "toml_edit", ] [[package]] @@ -2314,9 +2317,9 @@ dependencies = [ [[package]] name = "refinery" -version = "0.8.11" +version = "0.8.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "529664dbccc0a296947615c997a857912d72d1c44be1fafb7bae54ecfa7a8c24" +checksum = "a2783724569d96af53464d0711dff635cab7a4934df5e22e9fbc9e181523b83e" dependencies = [ "refinery-core", "refinery-macros", @@ -2324,13 +2327,12 @@ dependencies = [ [[package]] name = "refinery-core" -version = "0.8.11" +version = "0.8.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e895cb870cf06e92318cbbeb701f274d022d5ca87a16fa8244e291cd035ef954" +checksum = "08d6c80329c0455510a8d42fce286ecb4b6bcd8c57e1816d9f2d6bd7379c2cc8" dependencies = [ "async-trait", "cfg-if", - "lazy_static", "log", "regex", "serde", @@ -2339,16 +2341,16 @@ dependencies = [ "time", "tokio", "tokio-postgres", - "toml 0.7.8", + "toml", "url", "walkdir", ] [[package]] name = "refinery-macros" -version = "0.8.11" +version = "0.8.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "123e8b80f8010c3ae38330c81e76938fc7adf6cdbfbaad20295bb8c22718b4f1" +checksum = "6ab6e31e166a49d55cb09b62639e5ab9ba2e73f2f124336b06f6c321dc602779" dependencies = [ "proc-macro2", "quote", @@ -3272,18 +3274,6 @@ dependencies = [ "tracing", ] -[[package]] -name = "toml" -version = "0.7.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dd79e69d3b627db300ff956027cc6c3798cef26d22526befdfcd12feeb6d2257" -dependencies = [ - "serde", - "serde_spanned", - "toml_datetime", - "toml_edit 0.19.15", -] - [[package]] name = "toml" version = "0.8.8" @@ -3293,7 +3283,7 @@ dependencies = [ "serde", "serde_spanned", "toml_datetime", - "toml_edit 0.21.0", + "toml_edit", ] [[package]] @@ -3305,19 +3295,6 @@ dependencies = [ "serde", ] -[[package]] -name = "toml_edit" -version = "0.19.15" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1b5bb770da30e5cbfde35a2d7b9b8a2c4b8ef89548a7a6aeab5c9a576e3e7421" -dependencies = [ - "indexmap 2.1.0", - "serde", - "serde_spanned", - "toml_datetime", - "winnow", -] - [[package]] name = "toml_edit" version = "0.21.0" diff --git a/nexus/Cargo.toml b/nexus/Cargo.toml index 10a697d065..3b2f1daacf 100644 --- a/nexus/Cargo.toml +++ b/nexus/Cargo.toml @@ -18,4 +18,5 @@ members = [ resolver = "2" [workspace.dependencies] +sqlparser = { git = "https://github.com/peerdb-io/sqlparser-rs.git" } pgwire = "0.19" diff --git a/nexus/analyzer/Cargo.toml b/nexus/analyzer/Cargo.toml index 6ce963eeb7..bc59ef7491 100644 --- a/nexus/analyzer/Cargo.toml +++ b/nexus/analyzer/Cargo.toml @@ -12,5 +12,5 @@ catalog = { path = "../catalog" } flow-rs = { path = "../flow-rs" } pem = "3.0" pt = { path = "../pt" } -sqlparser = { git = "https://github.com/peerdb-io/sqlparser-rs.git", features = ["visitor"] } +sqlparser.workspace = true serde_json = "1.0" diff --git a/nexus/catalog/Cargo.toml b/nexus/catalog/Cargo.toml index 4065833565..7884e5df06 100644 --- a/nexus/catalog/Cargo.toml +++ b/nexus/catalog/Cargo.toml @@ -7,12 +7,16 @@ edition = "2021" [dependencies] anyhow = "1" +async-trait = "0.1" chrono = { version = "0.4.22", default-features = false } prost = "0.12" peer-cursor = { path = "../peer-cursor" } peer-postgres = { path = "../peer-postgres" } +pgwire.workspace = true pt = { path = "../pt" } refinery = { version = "0.8", features = ["tokio-postgres"] } +serde_json = "1.0" +sqlparser.workspace = true tokio = { version = "1.13.0", features = ["full"] } tokio-postgres = { version = "0.7.6", features = [ "with-chrono-0_4", @@ -20,5 +24,4 @@ tokio-postgres = { version = "0.7.6", features = [ "with-uuid-1", ] } tracing = "0.1.29" -serde_json = "1.0" postgres-connection = { path = "../postgres-connection" } diff --git a/nexus/catalog/src/lib.rs b/nexus/catalog/src/lib.rs index 9dedbb7a33..8b2e8591e5 100644 --- a/nexus/catalog/src/lib.rs +++ b/nexus/catalog/src/lib.rs @@ -1,8 +1,9 @@ -use std::{collections::HashMap, sync::Arc}; +use std::collections::HashMap; use anyhow::{anyhow, Context}; -use peer_cursor::QueryExecutor; -use peer_postgres::PostgresQueryExecutor; +use peer_cursor::{QueryExecutor, QueryOutput, Schema}; +use peer_postgres::{self, ast}; +use pgwire::error::{PgWireResult}; use postgres_connection::{connect_postgres, get_pg_connection_string}; use prost::Message; use pt::{ @@ -11,6 +12,7 @@ use pt::{ peerdb_peers::{peer::Config, DbType, Peer}, }; use serde_json::Value; +use sqlparser::ast::Statement; use tokio_postgres::{types, Client}; mod embedded { @@ -19,8 +21,7 @@ mod embedded { } pub struct Catalog { - pg: Box, - executor: Arc, + pg: Client, } async fn run_migrations(client: &mut Client) -> anyhow::Result<()> { @@ -77,22 +78,13 @@ impl<'a> CatalogConfig<'a> { impl Catalog { pub async fn new(pt_config: pt::peerdb_peers::PostgresConfig) -> anyhow::Result { let client = connect_postgres(&pt_config).await?; - let executor = PostgresQueryExecutor::new(None, &pt_config).await?; - - Ok(Self { - pg: Box::new(client), - executor: Arc::new(executor), - }) + Ok(Self { pg: client }) } pub async fn run_migrations(&mut self) -> anyhow::Result<()> { run_migrations(&mut self.pg).await } - pub fn get_executor(&self) -> &Arc { - &self.executor - } - pub async fn create_peer(&self, peer: &Peer) -> anyhow::Result { let config_blob = { let config = peer.config.clone().context("invalid peer config")?; @@ -602,3 +594,17 @@ impl Catalog { }) } } + +#[async_trait::async_trait] +impl QueryExecutor for Catalog { + #[tracing::instrument(skip(self, stmt), fields(stmt = %stmt))] + async fn execute(&self, stmt: &Statement) -> PgWireResult { + peer_postgres::pg_execute(&self.pg, ast::PostgresAst { + peername: None, + }, stmt).await + } + + async fn describe(&self, stmt: &Statement) -> PgWireResult> { + peer_postgres::pg_describe(&self.pg, stmt).await + } +} diff --git a/nexus/parser/Cargo.toml b/nexus/parser/Cargo.toml index 7cb79e5820..8c79ac4a18 100644 --- a/nexus/parser/Cargo.toml +++ b/nexus/parser/Cargo.toml @@ -13,6 +13,6 @@ futures = { version = "0.3.28", features = ["executor"] } pgwire.workspace = true pt = { path = "../pt" } rand = "0.8" -sqlparser = { git = "https://github.com/peerdb-io/sqlparser-rs.git" } +sqlparser.workspace = true tokio = { version = "1", features = ["full"] } tracing = "0.1" diff --git a/nexus/peer-bigquery/Cargo.toml b/nexus/peer-bigquery/Cargo.toml index 5e48c3a1ac..617b5f8acd 100644 --- a/nexus/peer-bigquery/Cargo.toml +++ b/nexus/peer-bigquery/Cargo.toml @@ -19,7 +19,7 @@ rust_decimal = { version = "1.30.0", features = [ "tokio-pg" ] } serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" serde_bytes = "0.11" -sqlparser = { git = "https://github.com/peerdb-io/sqlparser-rs.git" } +sqlparser.workspace = true tracing = "0.1" tokio = { version = "1.0", features = ["full"] } gcp-bigquery-client = "0.18" diff --git a/nexus/peer-bigquery/src/lib.rs b/nexus/peer-bigquery/src/lib.rs index 880219661c..656760c2c8 100644 --- a/nexus/peer-bigquery/src/lib.rs +++ b/nexus/peer-bigquery/src/lib.rs @@ -78,11 +78,7 @@ impl BigQueryQueryExecutor { PgWireError::ApiError(err.into()) })?; - let result_set = self - .client - .job() - .query(&self.project_id, query_req) - .await; + let result_set = self.client.job().query(&self.project_id, query_req).await; token.end().await.map_err(|err| { tracing::error!("error closing tracking token: {}", err); @@ -237,13 +233,4 @@ impl QueryExecutor for BigQueryQueryExecutor { )))), } } - async fn is_connection_valid(&self) -> anyhow::Result { - let sql = "SELECT 1;"; - let _result_set = self - .client - .job() - .query(&self.project_id, QueryRequest::new(sql)) - .await?; - Ok(true) - } } diff --git a/nexus/peer-cursor/Cargo.toml b/nexus/peer-cursor/Cargo.toml index 5ce558e055..bead6c92b0 100644 --- a/nexus/peer-cursor/Cargo.toml +++ b/nexus/peer-cursor/Cargo.toml @@ -10,6 +10,6 @@ anyhow = "1.0" async-trait = "0.1" futures = "0.3" pgwire.workspace = true -sqlparser = { git = "https://github.com/peerdb-io/sqlparser-rs.git" } +sqlparser.workspace = true tokio = { version = "1.0", features = ["full"] } value = { path = "../value" } diff --git a/nexus/peer-cursor/src/lib.rs b/nexus/peer-cursor/src/lib.rs index 5c3b297080..08a7891f87 100644 --- a/nexus/peer-cursor/src/lib.rs +++ b/nexus/peer-cursor/src/lib.rs @@ -44,8 +44,5 @@ pub enum QueryOutput { #[async_trait::async_trait] pub trait QueryExecutor: Send + Sync { async fn execute(&self, stmt: &Statement) -> PgWireResult; - async fn describe(&self, stmt: &Statement) -> PgWireResult>; - - async fn is_connection_valid(&self) -> anyhow::Result; } diff --git a/nexus/peer-postgres/Cargo.toml b/nexus/peer-postgres/Cargo.toml index f1ae2c7aac..e72e059f25 100644 --- a/nexus/peer-postgres/Cargo.toml +++ b/nexus/peer-postgres/Cargo.toml @@ -21,7 +21,7 @@ serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" serde_bytes = "0.11" postgres-inet = "0.19.0" -sqlparser = { git = "https://github.com/peerdb-io/sqlparser-rs.git" } +sqlparser.workspace = true tokio = { version = "1.0", features = ["full"] } tokio-postgres = { version = "0.7.6", features = [ "with-chrono-0_4", diff --git a/nexus/peer-postgres/src/lib.rs b/nexus/peer-postgres/src/lib.rs index e19b533a80..0ae519c7c8 100644 --- a/nexus/peer-postgres/src/lib.rs +++ b/nexus/peer-postgres/src/lib.rs @@ -9,50 +9,42 @@ use pt::peerdb_peers::PostgresConfig; use sqlparser::ast::Statement; use tokio_postgres::Client; -mod ast; -mod stream; +pub mod ast; +pub mod stream; // PostgresQueryExecutor is a QueryExecutor that uses a Postgres database as its // backing store. pub struct PostgresQueryExecutor { - config: PostgresConfig, - peername: Option, + peername: String, client: Box, } impl PostgresQueryExecutor { - pub async fn new(peername: Option, config: &PostgresConfig) -> anyhow::Result { + pub async fn new(peername: String, config: &PostgresConfig) -> anyhow::Result { let client = postgres_connection::connect_postgres(config).await?; Ok(Self { - config: config.clone(), peername, client: Box::new(client), }) } +} - pub async fn schema_from_query(&self, query: &str) -> anyhow::Result { - let prepared = self.client.prepare_typed(query, &[]).await?; +async fn schema_from_query(client: &Client, query: &str) -> anyhow::Result { + let prepared = client.prepare_typed(query, &[]).await?; - let fields: Vec = prepared - .columns() - .iter() - .map(|c| { - let name = c.name().to_string(); - FieldInfo::new(name, None, None, c.type_().clone(), FieldFormat::Text) - }) - .collect(); + let fields: Vec = prepared + .columns() + .iter() + .map(|c| { + let name = c.name().to_string(); + FieldInfo::new(name, None, None, c.type_().clone(), FieldFormat::Text) + }) + .collect(); - Ok(Arc::new(fields)) - } + Ok(Arc::new(fields)) } -#[async_trait::async_trait] -impl QueryExecutor for PostgresQueryExecutor { - #[tracing::instrument(skip(self, stmt), fields(stmt = %stmt))] - async fn execute(&self, stmt: &Statement) -> PgWireResult { - let ast = ast::PostgresAst { - peername: self.peername.clone(), - }; +pub async fn pg_execute(client: &Client, ast: ast::PostgresAst, stmt: &Statement) -> PgWireResult { // if the query is a select statement, we need to fetch the rows // and return them as a QueryOutput::Stream, else we return the // number of affected rows. @@ -65,8 +57,7 @@ impl QueryExecutor for PostgresQueryExecutor { // first fetch the schema as this connection will be // short lived, only then run the query as the query // could hold the pin on the connection for a long time. - let schema = self - .schema_from_query(&rewritten_query) + let schema = schema_from_query(client, &rewritten_query) .await .map_err(|e| { tracing::error!("error getting schema: {}", e); @@ -77,8 +68,7 @@ impl QueryExecutor for PostgresQueryExecutor { // given that there could be a lot of rows returned, we // need to use a cursor to stream the rows back to the // client. - let stream = self - .client + let stream = client .query_raw(&rewritten_query, std::iter::empty::<&str>()) .await .map_err(|e| { @@ -101,7 +91,7 @@ impl QueryExecutor for PostgresQueryExecutor { let rewritten_query = rewritten_stmt.to_string(); tracing::info!("[peer-postgres] rewritten statement: {}", rewritten_query); let rows_affected = - self.client + client .execute(&rewritten_query, &[]) .await .map_err(|e| { @@ -111,13 +101,13 @@ impl QueryExecutor for PostgresQueryExecutor { Ok(QueryOutput::AffectedRows(rows_affected as usize)) } } - } - async fn describe(&self, stmt: &Statement) -> PgWireResult> { +} + +pub async fn pg_describe(client: &Client, stmt: &Statement) -> PgWireResult> { match stmt { Statement::Query(_query) => { - let schema = self - .schema_from_query(&stmt.to_string()) + let schema = schema_from_query(client, &stmt.to_string()) .await .map_err(|e| { tracing::error!("error getting schema: {}", e); @@ -127,10 +117,18 @@ impl QueryExecutor for PostgresQueryExecutor { } _ => Ok(None), } +} + +#[async_trait::async_trait] +impl QueryExecutor for PostgresQueryExecutor { + #[tracing::instrument(skip(self, stmt), fields(stmt = %stmt))] + async fn execute(&self, stmt: &Statement) -> PgWireResult { + pg_execute(&self.client, ast::PostgresAst { + peername: Some(self.peername.clone()), + }, stmt).await } - async fn is_connection_valid(&self) -> anyhow::Result { - let _ = PostgresQueryExecutor::new(None, &self.config).await?; - Ok(true) + async fn describe(&self, stmt: &Statement) -> PgWireResult> { + pg_describe(&self.client, stmt).await } } diff --git a/nexus/peer-snowflake/Cargo.toml b/nexus/peer-snowflake/Cargo.toml index cbebba6427..97d4a6d50d 100644 --- a/nexus/peer-snowflake/Cargo.toml +++ b/nexus/peer-snowflake/Cargo.toml @@ -8,7 +8,7 @@ edition = "2021" [dependencies] catalog = { path = "../catalog" } peer-cursor = { path = "../peer-cursor" } -sqlparser = { git = "https://github.com/peerdb-io/sqlparser-rs.git" } +sqlparser.workspace = true value = { path = "../value" } tracing = "0.1" secrecy = { version = "0.8.0" } diff --git a/nexus/peer-snowflake/src/lib.rs b/nexus/peer-snowflake/src/lib.rs index c58ad68902..a940688b7e 100644 --- a/nexus/peer-snowflake/src/lib.rs +++ b/nexus/peer-snowflake/src/lib.rs @@ -3,8 +3,6 @@ use async_recursion::async_recursion; use cursor::SnowflakeCursorManager; use peer_cursor::{CursorModification, QueryExecutor, QueryOutput, Schema}; use pgwire::error::{ErrorInfo, PgWireError, PgWireResult}; -use sqlparser::dialect::GenericDialect; -use sqlparser::parser; use std::cmp::min; use std::time::Duration; use stream::SnowflakeDataType; @@ -423,11 +421,4 @@ impl QueryExecutor for SnowflakeQueryExecutor { )))), } } - - async fn is_connection_valid(&self) -> anyhow::Result { - let sql = "SELECT 1;"; - let test_stmt = parser::Parser::parse_sql(&GenericDialect {}, sql)?; - let _ = self.execute(&test_stmt[0]).await?; - Ok(true) - } } diff --git a/nexus/pt/Cargo.toml b/nexus/pt/Cargo.toml index fc9aaaeeaf..5ac1e0c0f2 100644 --- a/nexus/pt/Cargo.toml +++ b/nexus/pt/Cargo.toml @@ -11,7 +11,7 @@ prost = "0.12" prost-types = "0.12" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" -sqlparser = { git = "https://github.com/peerdb-io/sqlparser-rs.git" } +sqlparser.workspace = true tonic = "0.10" tonic-reflection = "0.10" pbjson = "0.6" diff --git a/nexus/server/Cargo.toml b/nexus/server/Cargo.toml index 9915852316..e9d484e975 100644 --- a/nexus/server/Cargo.toml +++ b/nexus/server/Cargo.toml @@ -50,7 +50,7 @@ peerdb-parser = { path = "../parser" } pgwire.workspace = true prost = "0.12" pt = { path = "../pt" } -sqlparser = { git = "https://github.com/peerdb-io/sqlparser-rs.git", features = ["visitor"] } +sqlparser = { workspace = true, features = ["visitor"] } serde_json = "1.0" rand = "0.8" time = "0.3" diff --git a/nexus/server/src/main.rs b/nexus/server/src/main.rs index 24b03a4020..e237e48bf0 100644 --- a/nexus/server/src/main.rs +++ b/nexus/server/src/main.rs @@ -129,7 +129,9 @@ impl NexusBackend { let mut peer_cursors = self.peer_cursors.lock().await; match cm { peer_cursor::CursorModification::Created(cursor_name) => { - peer_cursors.add_cursor(cursor_name, peer_holder.unwrap()); + if let Some(peer_holder) = peer_holder { + peer_cursors.add_cursor(cursor_name, peer_holder); + } Ok(vec![Response::Execution(Tag::new("DECLARE CURSOR"))]) } peer_cursor::CursorModification::Closed(cursors) => { @@ -354,7 +356,6 @@ impl NexusBackend { &self, nexus_stmt: NexusStatement, ) -> PgWireResult>> { - let mut peer_holder: Option> = None; match nexus_stmt { NexusStatement::PeerDDL { stmt: _, ref ddl } => match ddl.as_ref() { PeerDDL::CreatePeer { @@ -738,19 +739,21 @@ impl NexusBackend { }, NexusStatement::PeerQuery { stmt, assoc } => { // get the query executor - let executor = match assoc { + let (peer_holder, executor): (Option<_>, Arc) = match assoc { QueryAssociation::Peer(peer) => { tracing::info!("handling peer[{}] query: {}", peer.name, stmt); - peer_holder = Some(peer.clone()); - self.get_peer_executor(&peer).await.map_err(|err| { - PgWireError::ApiError( - format!("unable to get peer executor: {:?}", err).into(), - ) - })? + ( + Some(peer.clone()), + self.get_peer_executor(&peer).await.map_err(|err| { + PgWireError::ApiError( + format!("unable to get peer executor: {:?}", err).into(), + ) + })?, + ) } QueryAssociation::Catalog => { tracing::info!("handling catalog query: {}", stmt); - Arc::clone(self.catalog.get_executor()) + (None, self.catalog.clone()) } }; @@ -773,7 +776,7 @@ impl NexusBackend { analyzer::CursorEvent::Close(c) => peer_cursors.get_peer(&c), }; match peer { - None => Arc::clone(self.catalog.get_executor()), + None => self.catalog.clone(), Some(peer) => self.get_peer_executor(peer).await.map_err(|err| { PgWireError::ApiError( format!("unable to get peer executor: {:?}", err).into(), @@ -782,8 +785,7 @@ impl NexusBackend { } }; - self.execute_statement(executor.as_ref(), &stmt, peer_holder) - .await + self.execute_statement(executor.as_ref(), &stmt, None).await } NexusStatement::Empty => Ok(vec![Response::EmptyQuery]), @@ -840,9 +842,8 @@ impl NexusBackend { Arc::new(executor) } Some(Config::PostgresConfig(ref c)) => { - let peername = Some(peer.name.clone()); let executor = - peer_postgres::PostgresQueryExecutor::new(peername, c).await?; + peer_postgres::PostgresQueryExecutor::new(peer.name.clone(), c).await?; Arc::new(executor) } Some(Config::SnowflakeConfig(ref c)) => { @@ -1024,7 +1025,7 @@ impl ExtendedQueryHandler for NexusBackend { } } } - QueryAssociation::Catalog => self.catalog.get_executor().describe(stmt).await?, + QueryAssociation::Catalog => self.catalog.describe(stmt).await?, }; if let Some(described_schema) = schema { if self.peerdb_fdw_mode { @@ -1178,8 +1179,7 @@ async fn run_migrations<'a>(config: &CatalogConfig<'a>) -> anyhow::Result<()> { // retry connecting to the catalog 3 times with 30 seconds delay // if it fails, return an error for _ in 0..3 { - let catalog = Catalog::new(config.to_postgres_config()).await; - match catalog { + match Catalog::new(config.to_postgres_config()).await { Ok(mut catalog) => { catalog.run_migrations().await?; return Ok(()); From 251eaad5b49075651a2f7e874d002a3cb064d014 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Thu, 25 Jan 2024 16:25:37 +0000 Subject: [PATCH 4/4] Avoid log.Fatalf in connector code, return error instead (#1138) We don't want temporal activities calling `os.Exit` Also prefer `fmt.Sprint(x)` to `fmt.Sprintf("%v", x)` --- flow/connectors/postgres/cdc.go | 2 +- flow/connectors/utils/avro/avro_writer.go | 33 +++++++++++++---------- flow/e2e/test_utils.go | 7 +---- flow/workflows/qrep_flow.go | 8 ++---- 4 files changed, 23 insertions(+), 27 deletions(-) diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index 8029b4ae60..c3fd393522 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -897,7 +897,7 @@ func (p *PostgresCDCSource) recToTablePKey(req *model.PullRecordsRequest, if err != nil { return nil, fmt.Errorf("error getting pkey column value: %w", err) } - pkeyColsMerged = append(pkeyColsMerged, []byte(fmt.Sprintf("%v", pkeyColVal.Value))...) + pkeyColsMerged = append(pkeyColsMerged, []byte(fmt.Sprint(pkeyColVal.Value))...) } return &model.TableWithPkey{ diff --git a/flow/connectors/utils/avro/avro_writer.go b/flow/connectors/utils/avro/avro_writer.go index 6148ae2a6a..a8b5c41be8 100644 --- a/flow/connectors/utils/avro/avro_writer.go +++ b/flow/connectors/utils/avro/avro_writer.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "io" - "log" "log/slog" "os" "sync/atomic" @@ -185,23 +184,23 @@ func (p *peerDBOCFWriter) WriteOCF(w io.Writer) (int, error) { } func (p *peerDBOCFWriter) WriteRecordsToS3(bucketName, key string, s3Creds utils.S3PeerCredentials) (*AvroFile, error) { - r, w := io.Pipe() - numRowsWritten := make(chan int, 1) - go func() { - defer w.Close() - numRows, err := p.WriteOCF(w) - if err != nil { - log.Fatalf("%v", err) - } - numRowsWritten <- numRows - }() - s3svc, err := utils.CreateS3Client(s3Creds) if err != nil { slog.Error("failed to create S3 client: ", slog.Any("error", err)) return nil, fmt.Errorf("failed to create S3 client: %w", err) } + r, w := io.Pipe() + defer r.Close() + var writeOcfError error + var numRows int + var noPanic bool + go func() { + defer w.Close() + numRows, writeOcfError = p.WriteOCF(w) + noPanic = true + }() + _, err = manager.NewUploader(s3svc).Upload(p.ctx, &s3.PutObjectInput{ Bucket: aws.String(bucketName), Key: aws.String(key), @@ -212,11 +211,17 @@ func (p *peerDBOCFWriter) WriteRecordsToS3(bucketName, key string, s3Creds utils slog.Error("failed to upload file: ", slog.Any("error", err), slog.Any("s3_path", s3Path)) return nil, fmt.Errorf("failed to upload file to path %s: %w", s3Path, err) } - slog.Info("file uploaded to " + fmt.Sprintf("%s/%s", bucketName, key)) + if !noPanic { + return nil, fmt.Errorf("WriteOCF panicked while writing avro to S3 %s/%s", bucketName, key) + } + if writeOcfError != nil { + return nil, writeOcfError + } + return &AvroFile{ - NumRecords: <-numRowsWritten, + NumRecords: numRows, StorageLocation: AvroS3Storage, FilePath: key, }, nil diff --git a/flow/e2e/test_utils.go b/flow/e2e/test_utils.go index 0638ecae25..772cdc78aa 100644 --- a/flow/e2e/test_utils.go +++ b/flow/e2e/test_utils.go @@ -518,12 +518,7 @@ func NewTStructuredLogger(logger slog.Logger) *TStructuredLogger { } func (l *TStructuredLogger) keyvalsToFields(keyvals []interface{}) slog.Attr { - var attrs []any - for i := 0; i < len(keyvals); i += 1 { - key := fmt.Sprintf("%v", keyvals[i]) - attrs = append(attrs, key) - } - return slog.Group("test-log", attrs...) + return slog.Group("test-log", keyvals...) } func (l *TStructuredLogger) Debug(msg string, keyvals ...interface{}) { diff --git a/flow/workflows/qrep_flow.go b/flow/workflows/qrep_flow.go index e2ddadd2ae..e7e87a41d5 100644 --- a/flow/workflows/qrep_flow.go +++ b/flow/workflows/qrep_flow.go @@ -235,13 +235,9 @@ func (q *QRepFlowExecution) processPartitions( chunkSize = 1 } - batches := make([][]*protos.QRepPartition, 0) + batches := make([][]*protos.QRepPartition, 0, len(partitions)/chunkSize+1) for i := 0; i < len(partitions); i += chunkSize { - end := i + chunkSize - if end > len(partitions) { - end = len(partitions) - } - + end := min(i+chunkSize, len(partitions)) batches = append(batches, partitions[i:end]) }