From 5471d7b405b02430476cac6f6a85994566aade46 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Mon, 11 Dec 2023 16:43:49 +0000 Subject: [PATCH] Avoid having multiple catalog connection pools Connection pools are meant to be shared, so remove pool from monitoring Also go one step further: cache connection pool from env.go into global This requires never closing pool returned by GetCatalogConnectionPoolFromEnv --- flow/activities/flowable.go | 74 ++++++----- flow/cmd/api.go | 1 - flow/cmd/handler.go | 7 - flow/cmd/worker.go | 5 +- flow/connectors/bigquery/bigquery.go | 1 - flow/connectors/external_metadata/store.go | 2 +- flow/connectors/postgres/postgres.go | 4 +- flow/connectors/utils/catalog/env.go | 28 ++-- .../connectors/utils/monitoring/monitoring.go | 120 ++++++++---------- 9 files changed, 114 insertions(+), 128 deletions(-) diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index fae47a2f5c..e9688a14f0 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -13,7 +13,6 @@ import ( connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres" connsnowflake "github.com/PeerDB-io/peer-flow/connectors/snowflake" "github.com/PeerDB-io/peer-flow/connectors/utils" - catalog "github.com/PeerDB-io/peer-flow/connectors/utils/catalog" "github.com/PeerDB-io/peer-flow/connectors/utils/monitoring" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model" @@ -21,6 +20,7 @@ import ( "github.com/jackc/pglogrepl" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgtype" + "github.com/jackc/pgx/v5/pgxpool" log "github.com/sirupsen/logrus" "go.temporal.io/sdk/activity" "golang.org/x/sync/errgroup" @@ -40,7 +40,7 @@ type SlotSnapshotSignal struct { } type FlowableActivity struct { - CatalogMirrorMonitor *monitoring.CatalogMirrorMonitor + CatalogPool *pgxpool.Pool } // CheckConnection implements CheckConnection. @@ -114,7 +114,7 @@ func (a *FlowableActivity) CreateRawTable( ctx context.Context, config *protos.CreateRawTableInput, ) (*protos.CreateRawTableOutput, error) { - ctx = context.WithValue(ctx, shared.CDCMirrorMonitorKey, a.CatalogMirrorMonitor) + ctx = context.WithValue(ctx, shared.CDCMirrorMonitorKey, a.CatalogPool) dstConn, err := connectors.GetCDCSyncConnector(ctx, config.PeerConnectionConfig) if err != nil { return nil, fmt.Errorf("failed to get connector: %w", err) @@ -125,7 +125,7 @@ func (a *FlowableActivity) CreateRawTable( if err != nil { return nil, err } - err = a.CatalogMirrorMonitor.InitializeCDCFlow(ctx, config.FlowJobName) + err = monitoring.InitializeCDCFlow(ctx, a.CatalogPool, config.FlowJobName) if err != nil { return nil, err } @@ -174,7 +174,7 @@ func (a *FlowableActivity) handleSlotInfo( } if len(slotInfo) != 0 { - return a.CatalogMirrorMonitor.AppendSlotSizeInfo(ctx, peerName, slotInfo[0]) + return monitoring.AppendSlotSizeInfo(ctx, a.CatalogPool, peerName, slotInfo[0]) } return nil } @@ -209,7 +209,7 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, input *protos.StartFlowInput) (*model.SyncResponse, error) { activity.RecordHeartbeat(ctx, "starting flow...") conn := input.FlowConnectionConfigs - ctx = context.WithValue(ctx, shared.CDCMirrorMonitorKey, a.CatalogMirrorMonitor) + ctx = context.WithValue(ctx, shared.CDCMirrorMonitorKey, a.CatalogPool) dstConn, err := connectors.GetCDCSyncConnector(ctx, conn.Destination) if err != nil { return nil, fmt.Errorf("failed to get destination connector: %w", err) @@ -276,13 +276,13 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, "flowName": input.FlowConnectionConfigs.FlowJobName, }).Infof("the current sync flow has records: %v", hasRecords) - if a.CatalogMirrorMonitor.IsActive() && hasRecords { + if a.CatalogPool != nil && hasRecords { syncBatchID, err := dstConn.GetLastSyncBatchID(input.FlowConnectionConfigs.FlowJobName) if err != nil && conn.Destination.Type != protos.DBType_EVENTHUB { return nil, err } - err = a.CatalogMirrorMonitor.AddCDCBatchForFlow(ctx, input.FlowConnectionConfigs.FlowJobName, + err = monitoring.AddCDCBatchForFlow(ctx, a.CatalogPool, input.FlowConnectionConfigs.FlowJobName, monitoring.CDCBatchInfo{ BatchID: syncBatchID + 1, RowsInBatch: 0, @@ -347,8 +347,9 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, return nil, fmt.Errorf("failed to get last checkpoint: %w", err) } - err = a.CatalogMirrorMonitor.UpdateNumRowsAndEndLSNForCDCBatch( + err = monitoring.UpdateNumRowsAndEndLSNForCDCBatch( ctx, + a.CatalogPool, input.FlowConnectionConfigs.FlowJobName, res.CurrentSyncBatchID, uint32(numRecords), @@ -358,13 +359,17 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, return nil, err } - err = a.CatalogMirrorMonitor. - UpdateLatestLSNAtTargetForCDCFlow(ctx, input.FlowConnectionConfigs.FlowJobName, pglogrepl.LSN(lastCheckpoint)) + err = monitoring.UpdateLatestLSNAtTargetForCDCFlow( + ctx, + a.CatalogPool, + input.FlowConnectionConfigs.FlowJobName, + pglogrepl.LSN(lastCheckpoint), + ) if err != nil { return nil, err } if res.TableNameRowsMapping != nil { - err = a.CatalogMirrorMonitor.AddCDCBatchTablesForFlow(ctx, input.FlowConnectionConfigs.FlowJobName, + err = monitoring.AddCDCBatchTablesForFlow(ctx, a.CatalogPool, input.FlowConnectionConfigs.FlowJobName, res.CurrentSyncBatchID, res.TableNameRowsMapping) if err != nil { return nil, err @@ -401,7 +406,7 @@ func (a *FlowableActivity) StartNormalize( return nil, fmt.Errorf("failed to get last sync batch ID: %v", err) } - err = a.CatalogMirrorMonitor.UpdateEndTimeForCDCBatch(ctx, input.FlowConnectionConfigs.FlowJobName, + err = monitoring.UpdateEndTimeForCDCBatch(ctx, a.CatalogPool, input.FlowConnectionConfigs.FlowJobName, lastSyncBatchID) return nil, err } else if err != nil { @@ -434,8 +439,12 @@ func (a *FlowableActivity) StartNormalize( // normalize flow did not run due to no records, no need to update end time. if res.Done { - err = a.CatalogMirrorMonitor.UpdateEndTimeForCDCBatch(ctx, input.FlowConnectionConfigs.FlowJobName, - res.EndBatchID) + err = monitoring.UpdateEndTimeForCDCBatch( + ctx, + a.CatalogPool, + input.FlowConnectionConfigs.FlowJobName, + res.EndBatchID, + ) if err != nil { return nil, err } @@ -500,8 +509,9 @@ func (a *FlowableActivity) GetQRepPartitions(ctx context.Context, return nil, fmt.Errorf("failed to get partitions from source: %w", err) } if len(partitions) > 0 { - err = a.CatalogMirrorMonitor.InitializeQRepRun( + err = monitoring.InitializeQRepRun( ctx, + a.CatalogPool, config, runUUID, partitions, @@ -522,7 +532,7 @@ func (a *FlowableActivity) ReplicateQRepPartitions(ctx context.Context, partitions *protos.QRepPartitionBatch, runUUID string, ) error { - err := a.CatalogMirrorMonitor.UpdateStartTimeForQRepRun(ctx, runUUID) + err := monitoring.UpdateStartTimeForQRepRun(ctx, a.CatalogPool, runUUID) if err != nil { return fmt.Errorf("failed to update start time for qrep run: %w", err) } @@ -549,7 +559,7 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context, partition *protos.QRepPartition, runUUID string, ) error { - err := a.CatalogMirrorMonitor.UpdateStartTimeForPartition(ctx, runUUID, partition, time.Now()) + err := monitoring.UpdateStartTimeForPartition(ctx, a.CatalogPool, runUUID, partition, time.Now()) if err != nil { return fmt.Errorf("failed to update start time for partition: %w", err) } @@ -587,7 +597,7 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context, }).Errorf("failed to pull records: %v", err) goroutineErr = err } else { - err = a.CatalogMirrorMonitor.UpdatePullEndTimeAndRowsForPartition(ctx, runUUID, partition, numRecords) + err = monitoring.UpdatePullEndTimeAndRowsForPartition(ctx, a.CatalogPool, runUUID, partition, numRecords) if err != nil { log.Errorf("%v", err) goroutineErr = err @@ -607,7 +617,7 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context, "flowName": config.FlowJobName, }).Infof("pulled %d records\n", len(recordBatch.Records)) - err = a.CatalogMirrorMonitor.UpdatePullEndTimeAndRowsForPartition(ctx, runUUID, partition, numRecords) + err = monitoring.UpdatePullEndTimeAndRowsForPartition(ctx, a.CatalogPool, runUUID, partition, numRecords) if err != nil { return err } @@ -645,7 +655,7 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context, }).Infof("pushed %d records\n", res) } - err = a.CatalogMirrorMonitor.UpdateEndTimeForPartition(ctx, runUUID, partition) + err = monitoring.UpdateEndTimeForPartition(ctx, a.CatalogPool, runUUID, partition) if err != nil { return err } @@ -657,7 +667,7 @@ func (a *FlowableActivity) ConsolidateQRepPartitions(ctx context.Context, config runUUID string) error { dstConn, err := connectors.GetQRepConsolidateConnector(ctx, config.DestinationPeer) if errors.Is(err, connectors.ErrUnsupportedFunctionality) { - return a.CatalogMirrorMonitor.UpdateEndTimeForQRepRun(ctx, runUUID) + return monitoring.UpdateEndTimeForQRepRun(ctx, a.CatalogPool, runUUID) } else if err != nil { return err } @@ -675,7 +685,7 @@ func (a *FlowableActivity) ConsolidateQRepPartitions(ctx context.Context, config return err } - return a.CatalogMirrorMonitor.UpdateEndTimeForQRepRun(ctx, runUUID) + return monitoring.UpdateEndTimeForQRepRun(ctx, a.CatalogPool, runUUID) } func (a *FlowableActivity) CleanupQRepFlow(ctx context.Context, config *protos.QRepConfig) error { @@ -713,12 +723,8 @@ func (a *FlowableActivity) DropFlow(ctx context.Context, config *protos.Shutdown return nil } -func getPostgresPeerConfigs(ctx context.Context) ([]*protos.Peer, error) { - catalogPool, catalogErr := catalog.GetCatalogConnectionPoolFromEnv() - if catalogErr != nil { - return nil, fmt.Errorf("error getting catalog connection pool: %w", catalogErr) - } - defer catalogPool.Close() +func (a *FlowableActivity) getPostgresPeerConfigs(ctx context.Context) ([]*protos.Peer, error) { + catalogPool := a.CatalogPool optionRows, err := catalogPool.Query(ctx, ` SELECT DISTINCT p.name, p.options @@ -762,7 +768,7 @@ func (a *FlowableActivity) SendWALHeartbeat(ctx context.Context) error { log.Info("context is done, exiting wal heartbeat send loop") return nil case <-ticker.C: - pgPeers, err := getPostgresPeerConfigs(ctx) + pgPeers, err := a.getPostgresPeerConfigs(ctx) if err != nil { log.Warn("[sendwalheartbeat]: warning: unable to fetch peers." + "Skipping walheartbeat send. error encountered: " + err.Error()) @@ -944,17 +950,17 @@ func (a *FlowableActivity) ReplicateXminPartition(ctx context.Context, }}, } } - updateErr := a.CatalogMirrorMonitor.InitializeQRepRun(ctx, config, runUUID, []*protos.QRepPartition{partitionForMetrics}) + updateErr := monitoring.InitializeQRepRun(ctx, a.CatalogPool, config, runUUID, []*protos.QRepPartition{partitionForMetrics}) if updateErr != nil { return updateErr } - err := a.CatalogMirrorMonitor.UpdateStartTimeForPartition(ctx, runUUID, partition, startTime) + err := monitoring.UpdateStartTimeForPartition(ctx, a.CatalogPool, runUUID, partition, startTime) if err != nil { return fmt.Errorf("failed to update start time for partition: %w", err) } - err = a.CatalogMirrorMonitor.UpdatePullEndTimeAndRowsForPartition(errCtx, runUUID, partition, int64(numRecords)) + err = monitoring.UpdatePullEndTimeAndRowsForPartition(errCtx, a.CatalogPool, runUUID, partition, int64(numRecords)) if err != nil { log.Errorf("%v", err) return err @@ -990,7 +996,7 @@ func (a *FlowableActivity) ReplicateXminPartition(ctx context.Context, }).Infof("pushed %d records\n", res) } - err = a.CatalogMirrorMonitor.UpdateEndTimeForPartition(ctx, runUUID, partition) + err = monitoring.UpdateEndTimeForPartition(ctx, a.CatalogPool, runUUID, partition) if err != nil { return 0, err } diff --git a/flow/cmd/api.go b/flow/cmd/api.go index 6715ff3c55..3da1286cfe 100644 --- a/flow/cmd/api.go +++ b/flow/cmd/api.go @@ -125,7 +125,6 @@ func APIMain(args *APIServerParams) error { } flowHandler := NewFlowRequestHandler(tc, catalogConn, taskQueue) - defer flowHandler.Close() err = killExistingHeartbeatFlows(ctx, tc, args.TemporalNamespace, taskQueue) if err != nil { diff --git a/flow/cmd/handler.go b/flow/cmd/handler.go index 40a8e9147f..d71f1e49a7 100644 --- a/flow/cmd/handler.go +++ b/flow/cmd/handler.go @@ -116,13 +116,6 @@ func (h *FlowRequestHandler) createQrepJobEntry(ctx context.Context, return nil } -// Close closes the connection pool -func (h *FlowRequestHandler) Close() { - if h.pool != nil { - h.pool.Close() - } -} - func (h *FlowRequestHandler) CreateCDCFlow( ctx context.Context, req *protos.CreateCDCFlowRequest) (*protos.CreateCDCFlowResponse, error) { cfg := req.ConnectionConfigs diff --git a/flow/cmd/worker.go b/flow/cmd/worker.go index f7431e52cc..a9e1281619 100644 --- a/flow/cmd/worker.go +++ b/flow/cmd/worker.go @@ -10,7 +10,6 @@ import ( "github.com/PeerDB-io/peer-flow/activities" utils "github.com/PeerDB-io/peer-flow/connectors/utils/catalog" - "github.com/PeerDB-io/peer-flow/connectors/utils/monitoring" "github.com/PeerDB-io/peer-flow/shared" peerflow "github.com/PeerDB-io/peer-flow/workflows" @@ -108,8 +107,6 @@ func WorkerMain(opts *WorkerOptions) error { if err != nil { return fmt.Errorf("unable to create catalog connection pool: %w", err) } - catalogMirrorMonitor := monitoring.NewCatalogMirrorMonitor(conn) - defer catalogMirrorMonitor.Close() c, err := client.Dial(clientOptions) if err != nil { @@ -134,7 +131,7 @@ func WorkerMain(opts *WorkerOptions) error { w.RegisterWorkflow(peerflow.DropFlowWorkflow) w.RegisterWorkflow(peerflow.HeartbeatFlowWorkflow) w.RegisterActivity(&activities.FlowableActivity{ - CatalogMirrorMonitor: catalogMirrorMonitor, + CatalogPool: conn, }) err = w.Run(worker.InterruptCh()) diff --git a/flow/connectors/bigquery/bigquery.go b/flow/connectors/bigquery/bigquery.go index 52b6479993..f1ad468c5b 100644 --- a/flow/connectors/bigquery/bigquery.go +++ b/flow/connectors/bigquery/bigquery.go @@ -199,7 +199,6 @@ func (c *BigQueryConnector) Close() error { if c == nil || c.client == nil { return nil } - c.catalogPool.Close() return c.client.Close() } diff --git a/flow/connectors/external_metadata/store.go b/flow/connectors/external_metadata/store.go index 3d4f9baf99..637432b7fe 100644 --- a/flow/connectors/external_metadata/store.go +++ b/flow/connectors/external_metadata/store.go @@ -54,7 +54,7 @@ func NewPostgresMetadataStore(ctx context.Context, pgConfig *protos.PostgresConf } func (p *PostgresMetadataStore) Close() error { - if p.pool != nil { + if p.config != nil && p.pool != nil { p.pool.Close() } diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index 394e8a9d79..eb59f1ab8d 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -246,13 +246,13 @@ func (c *PostgresConnector) PullRecords(req *model.PullRecordsRequest) error { return err } - cdcMirrorMonitor, ok := c.ctx.Value(shared.CDCMirrorMonitorKey).(*monitoring.CatalogMirrorMonitor) + catalogPool, ok := c.ctx.Value(shared.CDCMirrorMonitorKey).(*pgxpool.Pool) if ok { latestLSN, err := c.getCurrentLSN() if err != nil { return fmt.Errorf("failed to get current LSN: %w", err) } - err = cdcMirrorMonitor.UpdateLatestLSNAtSourceForCDCFlow(c.ctx, req.FlowJobName, latestLSN) + err = monitoring.UpdateLatestLSNAtSourceForCDCFlow(c.ctx, catalogPool, req.FlowJobName, latestLSN) if err != nil { return fmt.Errorf("failed to update latest LSN at source for CDC flow: %w", err) } diff --git a/flow/connectors/utils/catalog/env.go b/flow/connectors/utils/catalog/env.go index e1bd33d25a..078921dcc4 100644 --- a/flow/connectors/utils/catalog/env.go +++ b/flow/connectors/utils/catalog/env.go @@ -5,29 +5,37 @@ import ( "fmt" "os" "strconv" + "sync" "github.com/PeerDB-io/peer-flow/connectors/utils" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/jackc/pgx/v5/pgxpool" ) +var poolMutex = &sync.Mutex{} +var pool *pgxpool.Pool + func GetCatalogConnectionPoolFromEnv() (*pgxpool.Pool, error) { - catalogConnectionString, err := genCatalogConnectionString() - if err != nil { - return nil, fmt.Errorf("unable to generate catalog connection string: %w", err) - } + poolMutex.Lock() + defer poolMutex.Unlock() + if pool != nil { + catalogConnectionString, err := genCatalogConnectionString() + if err != nil { + return nil, fmt.Errorf("unable to generate catalog connection string: %w", err) + } - catalogConn, err := pgxpool.New(context.Background(), catalogConnectionString) - if err != nil { - return nil, fmt.Errorf("unable to establish connection with catalog: %w", err) + pool, err = pgxpool.New(context.Background(), catalogConnectionString) + if err != nil { + return nil, fmt.Errorf("unable to establish connection with catalog: %w", err) + } } - err = catalogConn.Ping(context.Background()) + err := pool.Ping(context.Background()) if err != nil { - return nil, fmt.Errorf("unable to establish connection with catalog: %w", err) + return pool, fmt.Errorf("unable to establish connection with catalog: %w", err) } - return catalogConn, nil + return pool, nil } func genCatalogConnectionString() (string, error) { diff --git a/flow/connectors/utils/monitoring/monitoring.go b/flow/connectors/utils/monitoring/monitoring.go index 06a6b4f125..41c634b874 100644 --- a/flow/connectors/utils/monitoring/monitoring.go +++ b/flow/connectors/utils/monitoring/monitoring.go @@ -14,10 +14,6 @@ import ( "google.golang.org/protobuf/proto" ) -type CatalogMirrorMonitor struct { - catalogConn *pgxpool.Pool -} - type CDCBatchInfo struct { BatchID int64 RowsInBatch uint32 @@ -26,29 +22,12 @@ type CDCBatchInfo struct { StartTime time.Time } -func NewCatalogMirrorMonitor(catalogConn *pgxpool.Pool) *CatalogMirrorMonitor { - return &CatalogMirrorMonitor{ - catalogConn: catalogConn, - } -} - -func (c *CatalogMirrorMonitor) IsActive() bool { - return !(c == nil || c.catalogConn == nil) -} - -func (c *CatalogMirrorMonitor) Close() { - if c == nil || c.catalogConn == nil { - return - } - c.catalogConn.Close() -} - -func (c *CatalogMirrorMonitor) InitializeCDCFlow(ctx context.Context, flowJobName string) error { - if c == nil || c.catalogConn == nil { +func InitializeCDCFlow(ctx context.Context, pool *pgxpool.Pool, flowJobName string) error { + if pool == nil { return nil } - _, err := c.catalogConn.Exec(ctx, + _, err := pool.Exec(ctx, `INSERT INTO peerdb_stats.cdc_flows(flow_name,latest_lsn_at_source,latest_lsn_at_target) VALUES($1,0,0) ON CONFLICT DO NOTHING`, flowJobName) if err != nil { @@ -57,13 +36,13 @@ func (c *CatalogMirrorMonitor) InitializeCDCFlow(ctx context.Context, flowJobNam return nil } -func (c *CatalogMirrorMonitor) UpdateLatestLSNAtSourceForCDCFlow(ctx context.Context, flowJobName string, +func UpdateLatestLSNAtSourceForCDCFlow(ctx context.Context, pool *pgxpool.Pool, flowJobName string, latestLSNAtSource pglogrepl.LSN) error { - if c == nil || c.catalogConn == nil { + if pool == nil { return nil } - _, err := c.catalogConn.Exec(ctx, + _, err := pool.Exec(ctx, "UPDATE peerdb_stats.cdc_flows SET latest_lsn_at_source=$1 WHERE flow_name=$2", uint64(latestLSNAtSource), flowJobName) if err != nil { @@ -72,13 +51,13 @@ func (c *CatalogMirrorMonitor) UpdateLatestLSNAtSourceForCDCFlow(ctx context.Con return nil } -func (c *CatalogMirrorMonitor) UpdateLatestLSNAtTargetForCDCFlow(ctx context.Context, flowJobName string, +func UpdateLatestLSNAtTargetForCDCFlow(ctx context.Context, pool *pgxpool.Pool, flowJobName string, latestLSNAtTarget pglogrepl.LSN) error { - if c == nil || c.catalogConn == nil { + if pool == nil { return nil } - _, err := c.catalogConn.Exec(ctx, + _, err := pool.Exec(ctx, "UPDATE peerdb_stats.cdc_flows SET latest_lsn_at_target=$1 WHERE flow_name=$2", uint64(latestLSNAtTarget), flowJobName) if err != nil { @@ -87,13 +66,13 @@ func (c *CatalogMirrorMonitor) UpdateLatestLSNAtTargetForCDCFlow(ctx context.Con return nil } -func (c *CatalogMirrorMonitor) AddCDCBatchForFlow(ctx context.Context, flowJobName string, +func AddCDCBatchForFlow(ctx context.Context, pool *pgxpool.Pool, flowJobName string, batchInfo CDCBatchInfo) error { - if c == nil || c.catalogConn == nil { + if pool == nil { return nil } - _, err := c.catalogConn.Exec(ctx, + _, err := pool.Exec(ctx, `INSERT INTO peerdb_stats.cdc_batches(flow_name,batch_id,rows_in_batch,batch_start_lsn,batch_end_lsn, start_time) VALUES($1,$2,$3,$4,$5,$6) ON CONFLICT DO NOTHING`, flowJobName, batchInfo.BatchID, batchInfo.RowsInBatch, @@ -105,18 +84,19 @@ func (c *CatalogMirrorMonitor) AddCDCBatchForFlow(ctx context.Context, flowJobNa } // update num records and end-lsn for a cdc batch -func (c *CatalogMirrorMonitor) UpdateNumRowsAndEndLSNForCDCBatch( +func UpdateNumRowsAndEndLSNForCDCBatch( ctx context.Context, + pool *pgxpool.Pool, flowJobName string, batchID int64, numRows uint32, batchEndLSN pglogrepl.LSN, ) error { - if c == nil || c.catalogConn == nil { + if pool == nil { return nil } - _, err := c.catalogConn.Exec(ctx, + _, err := pool.Exec(ctx, "UPDATE peerdb_stats.cdc_batches SET rows_in_batch=$1,batch_end_lsn=$2 WHERE flow_name=$3 AND batch_id=$4", numRows, uint64(batchEndLSN), flowJobName, batchID) if err != nil { @@ -125,16 +105,17 @@ func (c *CatalogMirrorMonitor) UpdateNumRowsAndEndLSNForCDCBatch( return nil } -func (c *CatalogMirrorMonitor) UpdateEndTimeForCDCBatch( +func UpdateEndTimeForCDCBatch( ctx context.Context, + pool *pgxpool.Pool, flowJobName string, batchID int64, ) error { - if c == nil || c.catalogConn == nil { + if pool == nil { return nil } - _, err := c.catalogConn.Exec(ctx, + _, err := pool.Exec(ctx, "UPDATE peerdb_stats.cdc_batches SET end_time=$1 WHERE flow_name=$2 AND batch_id=$3", time.Now(), flowJobName, batchID) if err != nil { @@ -143,13 +124,13 @@ func (c *CatalogMirrorMonitor) UpdateEndTimeForCDCBatch( return nil } -func (c *CatalogMirrorMonitor) AddCDCBatchTablesForFlow(ctx context.Context, flowJobName string, +func AddCDCBatchTablesForFlow(ctx context.Context, pool *pgxpool.Pool, flowJobName string, batchID int64, tableNameRowsMapping map[string]uint32) error { - if c == nil || c.catalogConn == nil { + if pool == nil { return nil } - insertBatchTablesTx, err := c.catalogConn.Begin(ctx) + insertBatchTablesTx, err := pool.Begin(ctx) if err != nil { return fmt.Errorf("error while beginning transaction for inserting statistics into cdc_batch_table: %w", err) } @@ -177,18 +158,19 @@ func (c *CatalogMirrorMonitor) AddCDCBatchTablesForFlow(ctx context.Context, flo return nil } -func (c *CatalogMirrorMonitor) InitializeQRepRun( +func InitializeQRepRun( ctx context.Context, + pool *pgxpool.Pool, config *protos.QRepConfig, runUUID string, partitions []*protos.QRepPartition, ) error { - if c == nil || c.catalogConn == nil { + if pool == nil { return nil } flowJobName := config.GetFlowJobName() - _, err := c.catalogConn.Exec(ctx, + _, err := pool.Exec(ctx, "INSERT INTO peerdb_stats.qrep_runs(flow_name,run_uuid) VALUES($1,$2) ON CONFLICT DO NOTHING", flowJobName, runUUID) if err != nil { @@ -200,7 +182,7 @@ func (c *CatalogMirrorMonitor) InitializeQRepRun( return fmt.Errorf("unable to marshal flow config: %w", err) } - _, err = c.catalogConn.Exec(ctx, + _, err = pool.Exec(ctx, "UPDATE peerdb_stats.qrep_runs SET config_proto = $1 WHERE flow_name = $2", cfgBytes, flowJobName) if err != nil { @@ -208,7 +190,7 @@ func (c *CatalogMirrorMonitor) InitializeQRepRun( } for _, partition := range partitions { - if err := c.addPartitionToQRepRun(ctx, flowJobName, runUUID, partition); err != nil { + if err := addPartitionToQRepRun(ctx, pool, flowJobName, runUUID, partition); err != nil { return fmt.Errorf("unable to add partition to qrep run: %w", err) } } @@ -216,12 +198,12 @@ func (c *CatalogMirrorMonitor) InitializeQRepRun( return nil } -func (c *CatalogMirrorMonitor) UpdateStartTimeForQRepRun(ctx context.Context, runUUID string) error { - if c == nil || c.catalogConn == nil { +func UpdateStartTimeForQRepRun(ctx context.Context, pool *pgxpool.Pool, runUUID string) error { + if pool == nil { return nil } - _, err := c.catalogConn.Exec(ctx, + _, err := pool.Exec(ctx, "UPDATE peerdb_stats.qrep_runs SET start_time=$1 WHERE run_uuid=$2", time.Now(), runUUID) if err != nil { @@ -231,12 +213,12 @@ func (c *CatalogMirrorMonitor) UpdateStartTimeForQRepRun(ctx context.Context, ru return nil } -func (c *CatalogMirrorMonitor) UpdateEndTimeForQRepRun(ctx context.Context, runUUID string) error { - if c == nil || c.catalogConn == nil { +func UpdateEndTimeForQRepRun(ctx context.Context, pool *pgxpool.Pool, runUUID string) error { + if pool == nil { return nil } - _, err := c.catalogConn.Exec(ctx, + _, err := pool.Exec(ctx, "UPDATE peerdb_stats.qrep_runs SET end_time=$1 WHERE run_uuid=$2", time.Now(), runUUID) if err != nil { @@ -246,16 +228,17 @@ func (c *CatalogMirrorMonitor) UpdateEndTimeForQRepRun(ctx context.Context, runU return nil } -func (c *CatalogMirrorMonitor) AppendSlotSizeInfo( +func AppendSlotSizeInfo( ctx context.Context, + pool *pgxpool.Pool, peerName string, slotInfo *protos.SlotInfo, ) error { - if c == nil || c.catalogConn == nil || slotInfo == nil { + if pool == nil || slotInfo == nil { return nil } - _, err := c.catalogConn.Exec(ctx, + _, err := pool.Exec(ctx, "INSERT INTO peerdb_stats.peer_slot_size"+ "(peer_name, slot_name, restart_lsn, redo_lsn, confirmed_flush_lsn, slot_size, wal_status) "+ "VALUES($1,$2,$3,$4,$5,$6) ON CONFLICT DO NOTHING;", @@ -274,9 +257,9 @@ func (c *CatalogMirrorMonitor) AppendSlotSizeInfo( return nil } -func (c *CatalogMirrorMonitor) addPartitionToQRepRun(ctx context.Context, flowJobName string, +func addPartitionToQRepRun(ctx context.Context, pool *pgxpool.Pool, flowJobName string, runUUID string, partition *protos.QRepPartition) error { - if c == nil || c.catalogConn == nil { + if pool == nil { return nil } @@ -317,7 +300,7 @@ func (c *CatalogMirrorMonitor) addPartitionToQRepRun(ctx context.Context, flowJo return fmt.Errorf("unknown range type: %v", x) } - _, err := c.catalogConn.Exec(ctx, + _, err := pool.Exec(ctx, `INSERT INTO peerdb_stats.qrep_partitions (flow_name,run_uuid,partition_uuid,partition_start,partition_end,restart_count) VALUES($1,$2,$3,$4,$5,$6) ON CONFLICT(run_uuid,partition_uuid) DO UPDATE SET @@ -330,17 +313,18 @@ func (c *CatalogMirrorMonitor) addPartitionToQRepRun(ctx context.Context, flowJo return nil } -func (c *CatalogMirrorMonitor) UpdateStartTimeForPartition( +func UpdateStartTimeForPartition( ctx context.Context, + pool *pgxpool.Pool, runUUID string, partition *protos.QRepPartition, startTime time.Time, ) error { - if c == nil || c.catalogConn == nil { + if pool == nil { return nil } - _, err := c.catalogConn.Exec(ctx, `UPDATE peerdb_stats.qrep_partitions SET start_time=$1 + _, err := pool.Exec(ctx, `UPDATE peerdb_stats.qrep_partitions SET start_time=$1 WHERE run_uuid=$2 AND partition_uuid=$3`, startTime, runUUID, partition.PartitionId) if err != nil { return fmt.Errorf("error while updating qrep partition in qrep_partitions: %w", err) @@ -348,13 +332,13 @@ func (c *CatalogMirrorMonitor) UpdateStartTimeForPartition( return nil } -func (c *CatalogMirrorMonitor) UpdatePullEndTimeAndRowsForPartition(ctx context.Context, runUUID string, +func UpdatePullEndTimeAndRowsForPartition(ctx context.Context, pool *pgxpool.Pool, runUUID string, partition *protos.QRepPartition, rowsInPartition int64) error { - if c == nil || c.catalogConn == nil { + if pool == nil { return nil } - _, err := c.catalogConn.Exec(ctx, `UPDATE peerdb_stats.qrep_partitions SET pull_end_time=$1,rows_in_partition=$2 + _, err := pool.Exec(ctx, `UPDATE peerdb_stats.qrep_partitions SET pull_end_time=$1,rows_in_partition=$2 WHERE run_uuid=$3 AND partition_uuid=$4`, time.Now(), rowsInPartition, runUUID, partition.PartitionId) if err != nil { return fmt.Errorf("error while updating qrep partition in qrep_partitions: %w", err) @@ -362,13 +346,13 @@ func (c *CatalogMirrorMonitor) UpdatePullEndTimeAndRowsForPartition(ctx context. return nil } -func (c *CatalogMirrorMonitor) UpdateEndTimeForPartition(ctx context.Context, runUUID string, +func UpdateEndTimeForPartition(ctx context.Context, pool *pgxpool.Pool, runUUID string, partition *protos.QRepPartition) error { - if c == nil || c.catalogConn == nil { + if pool == nil { return nil } - _, err := c.catalogConn.Exec(ctx, `UPDATE peerdb_stats.qrep_partitions SET end_time=$1 + _, err := pool.Exec(ctx, `UPDATE peerdb_stats.qrep_partitions SET end_time=$1 WHERE run_uuid=$2 AND partition_uuid=$3`, time.Now(), runUUID, partition.PartitionId) if err != nil { return fmt.Errorf("error while updating qrep partition in qrep_partitions: %w", err)