From de903b985a5102bc5500a8c44dc28559bd410d5f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Tue, 12 Dec 2023 14:48:43 +0000 Subject: [PATCH] Avoid having multiple catalog connection pools (#793) Connection pools are meant to be shared, so remove pool from monitoring Also go one step further: cache connection pool from env.go into global This requires never closing pool returned by GetCatalogConnectionPoolFromEnv Run migrations for CI because `monitoring` logic is now non optional Opens up reusing pool for #778 --- .github/workflows/flow.yml | 5 +- flow/activities/flowable.go | 80 ++++----- flow/cmd/api.go | 1 - flow/cmd/handler.go | 7 - flow/cmd/worker.go | 5 +- flow/connectors/bigquery/bigquery.go | 1 - flow/connectors/external_metadata/store.go | 2 +- flow/connectors/postgres/postgres.go | 4 +- flow/connectors/utils/catalog/env.go | 28 ++-- .../connectors/utils/monitoring/monitoring.go | 158 +++++------------- flow/e2e/bigquery/peer_flow_bq_test.go | 33 ++-- flow/e2e/bigquery/qrep_flow_bq_test.go | 2 +- flow/e2e/postgres/peer_flow_pg_test.go | 10 +- flow/e2e/postgres/qrep_flow_pg_test.go | 2 +- flow/e2e/s3/cdc_s3_test.go | 4 +- flow/e2e/s3/qrep_flow_s3_test.go | 4 +- flow/e2e/snowflake/peer_flow_sf_test.go | 36 ++-- flow/e2e/snowflake/qrep_flow_sf_test.go | 10 +- .../e2e/sqlserver/qrep_flow_sqlserver_test.go | 2 +- flow/e2e/test_utils.go | 11 +- 20 files changed, 168 insertions(+), 237 deletions(-) diff --git a/.github/workflows/flow.yml b/.github/workflows/flow.yml index 6d965122a3..2ddff33bd0 100644 --- a/.github/workflows/flow.yml +++ b/.github/workflows/flow.yml @@ -13,7 +13,7 @@ jobs: matrix: runner: [ubicloud-standard-8-ubuntu-2204-arm] runs-on: ${{ matrix.runner }} - timeout-minutes: 40 + timeout-minutes: 30 services: pg_cdc: image: imresamu/postgis:15-3.4-alpine @@ -80,13 +80,14 @@ jobs: name: "gcs_creds.json" json: ${{ secrets.GCS_CREDS }} - - name: create hstore extension and increase logical replication limits + - name: create hstore extension, increase logical replication limits, and setup catalog database run: > docker exec pg_cdc psql -h localhost -p 5432 -U postgres -c "CREATE EXTENSION hstore;" -c "ALTER SYSTEM SET wal_level=logical;" -c "ALTER SYSTEM SET max_replication_slots=192;" -c "ALTER SYSTEM SET max_wal_senders=256;" -c "ALTER SYSTEM SET max_connections=2048;" && + (cat ../nexus/catalog/migrations/V{?,??}__* | docker exec -i pg_cdc psql -h localhost -p 5432 -U postgres) && docker restart pg_cdc working-directory: ./flow env: diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 8f8b4f2b3b..2bb3136a1e 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -13,7 +13,6 @@ import ( connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres" connsnowflake "github.com/PeerDB-io/peer-flow/connectors/snowflake" "github.com/PeerDB-io/peer-flow/connectors/utils" - catalog "github.com/PeerDB-io/peer-flow/connectors/utils/catalog" "github.com/PeerDB-io/peer-flow/connectors/utils/monitoring" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model" @@ -21,6 +20,7 @@ import ( "github.com/jackc/pglogrepl" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgtype" + "github.com/jackc/pgx/v5/pgxpool" log "github.com/sirupsen/logrus" "go.temporal.io/sdk/activity" "golang.org/x/sync/errgroup" @@ -40,7 +40,7 @@ type SlotSnapshotSignal struct { } type FlowableActivity struct { - CatalogMirrorMonitor *monitoring.CatalogMirrorMonitor + CatalogPool *pgxpool.Pool } // CheckConnection implements CheckConnection. @@ -114,7 +114,7 @@ func (a *FlowableActivity) CreateRawTable( ctx context.Context, config *protos.CreateRawTableInput, ) (*protos.CreateRawTableOutput, error) { - ctx = context.WithValue(ctx, shared.CDCMirrorMonitorKey, a.CatalogMirrorMonitor) + ctx = context.WithValue(ctx, shared.CDCMirrorMonitorKey, a.CatalogPool) dstConn, err := connectors.GetCDCSyncConnector(ctx, config.PeerConnectionConfig) if err != nil { return nil, fmt.Errorf("failed to get connector: %w", err) @@ -125,7 +125,7 @@ func (a *FlowableActivity) CreateRawTable( if err != nil { return nil, err } - err = a.CatalogMirrorMonitor.InitializeCDCFlow(ctx, config.FlowJobName) + err = monitoring.InitializeCDCFlow(ctx, a.CatalogPool, config.FlowJobName) if err != nil { return nil, err } @@ -174,7 +174,7 @@ func (a *FlowableActivity) handleSlotInfo( } if len(slotInfo) != 0 { - return a.CatalogMirrorMonitor.AppendSlotSizeInfo(ctx, peerName, slotInfo[0]) + return monitoring.AppendSlotSizeInfo(ctx, a.CatalogPool, peerName, slotInfo[0]) } return nil } @@ -209,7 +209,7 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, input *protos.StartFlowInput) (*model.SyncResponse, error) { activity.RecordHeartbeat(ctx, "starting flow...") conn := input.FlowConnectionConfigs - ctx = context.WithValue(ctx, shared.CDCMirrorMonitorKey, a.CatalogMirrorMonitor) + ctx = context.WithValue(ctx, shared.CDCMirrorMonitorKey, a.CatalogPool) dstConn, err := connectors.GetCDCSyncConnector(ctx, conn.Destination) if err != nil { return nil, fmt.Errorf("failed to get destination connector: %w", err) @@ -276,13 +276,13 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, "flowName": input.FlowConnectionConfigs.FlowJobName, }).Infof("the current sync flow has records: %v", hasRecords) - if a.CatalogMirrorMonitor.IsActive() && hasRecords { + if a.CatalogPool != nil && hasRecords { syncBatchID, err := dstConn.GetLastSyncBatchID(input.FlowConnectionConfigs.FlowJobName) if err != nil && conn.Destination.Type != protos.DBType_EVENTHUB { return nil, err } - err = a.CatalogMirrorMonitor.AddCDCBatchForFlow(ctx, input.FlowConnectionConfigs.FlowJobName, + err = monitoring.AddCDCBatchForFlow(ctx, a.CatalogPool, input.FlowConnectionConfigs.FlowJobName, monitoring.CDCBatchInfo{ BatchID: syncBatchID + 1, RowsInBatch: 0, @@ -347,8 +347,9 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, return nil, fmt.Errorf("failed to get last checkpoint: %w", err) } - err = a.CatalogMirrorMonitor.UpdateNumRowsAndEndLSNForCDCBatch( + err = monitoring.UpdateNumRowsAndEndLSNForCDCBatch( ctx, + a.CatalogPool, input.FlowConnectionConfigs.FlowJobName, res.CurrentSyncBatchID, uint32(numRecords), @@ -358,13 +359,17 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, return nil, err } - err = a.CatalogMirrorMonitor. - UpdateLatestLSNAtTargetForCDCFlow(ctx, input.FlowConnectionConfigs.FlowJobName, pglogrepl.LSN(lastCheckpoint)) + err = monitoring.UpdateLatestLSNAtTargetForCDCFlow( + ctx, + a.CatalogPool, + input.FlowConnectionConfigs.FlowJobName, + pglogrepl.LSN(lastCheckpoint), + ) if err != nil { return nil, err } if res.TableNameRowsMapping != nil { - err = a.CatalogMirrorMonitor.AddCDCBatchTablesForFlow(ctx, input.FlowConnectionConfigs.FlowJobName, + err = monitoring.AddCDCBatchTablesForFlow(ctx, a.CatalogPool, input.FlowConnectionConfigs.FlowJobName, res.CurrentSyncBatchID, res.TableNameRowsMapping) if err != nil { return nil, err @@ -401,7 +406,7 @@ func (a *FlowableActivity) StartNormalize( return nil, fmt.Errorf("failed to get last sync batch ID: %v", err) } - err = a.CatalogMirrorMonitor.UpdateEndTimeForCDCBatch(ctx, input.FlowConnectionConfigs.FlowJobName, + err = monitoring.UpdateEndTimeForCDCBatch(ctx, a.CatalogPool, input.FlowConnectionConfigs.FlowJobName, lastSyncBatchID) return nil, err } else if err != nil { @@ -434,8 +439,12 @@ func (a *FlowableActivity) StartNormalize( // normalize flow did not run due to no records, no need to update end time. if res.Done { - err = a.CatalogMirrorMonitor.UpdateEndTimeForCDCBatch(ctx, input.FlowConnectionConfigs.FlowJobName, - res.EndBatchID) + err = monitoring.UpdateEndTimeForCDCBatch( + ctx, + a.CatalogPool, + input.FlowConnectionConfigs.FlowJobName, + res.EndBatchID, + ) if err != nil { return nil, err } @@ -500,8 +509,9 @@ func (a *FlowableActivity) GetQRepPartitions(ctx context.Context, return nil, fmt.Errorf("failed to get partitions from source: %w", err) } if len(partitions) > 0 { - err = a.CatalogMirrorMonitor.InitializeQRepRun( + err = monitoring.InitializeQRepRun( ctx, + a.CatalogPool, config, runUUID, partitions, @@ -522,7 +532,7 @@ func (a *FlowableActivity) ReplicateQRepPartitions(ctx context.Context, partitions *protos.QRepPartitionBatch, runUUID string, ) error { - err := a.CatalogMirrorMonitor.UpdateStartTimeForQRepRun(ctx, runUUID) + err := monitoring.UpdateStartTimeForQRepRun(ctx, a.CatalogPool, runUUID) if err != nil { return fmt.Errorf("failed to update start time for qrep run: %w", err) } @@ -549,7 +559,7 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context, partition *protos.QRepPartition, runUUID string, ) error { - err := a.CatalogMirrorMonitor.UpdateStartTimeForPartition(ctx, runUUID, partition, time.Now()) + err := monitoring.UpdateStartTimeForPartition(ctx, a.CatalogPool, runUUID, partition, time.Now()) if err != nil { return fmt.Errorf("failed to update start time for partition: %w", err) } @@ -587,7 +597,7 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context, }).Errorf("failed to pull records: %v", err) goroutineErr = err } else { - err = a.CatalogMirrorMonitor.UpdatePullEndTimeAndRowsForPartition(ctx, runUUID, partition, numRecords) + err = monitoring.UpdatePullEndTimeAndRowsForPartition(ctx, a.CatalogPool, runUUID, partition, numRecords) if err != nil { log.Errorf("%v", err) goroutineErr = err @@ -607,7 +617,7 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context, "flowName": config.FlowJobName, }).Infof("pulled %d records\n", len(recordBatch.Records)) - err = a.CatalogMirrorMonitor.UpdatePullEndTimeAndRowsForPartition(ctx, runUUID, partition, numRecords) + err = monitoring.UpdatePullEndTimeAndRowsForPartition(ctx, a.CatalogPool, runUUID, partition, numRecords) if err != nil { return err } @@ -641,7 +651,7 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context, return goroutineErr } - err := a.CatalogMirrorMonitor.UpdateRowsSyncedForPartition(ctx, rowsSynced, runUUID, partition) + err := monitoring.UpdateRowsSyncedForPartition(ctx, a.CatalogPool, rowsSynced, runUUID, partition) if err != nil { return err } @@ -651,7 +661,7 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context, }).Infof("pushed %d records\n", rowsSynced) } - err = a.CatalogMirrorMonitor.UpdateEndTimeForPartition(ctx, runUUID, partition) + err = monitoring.UpdateEndTimeForPartition(ctx, a.CatalogPool, runUUID, partition) if err != nil { return err } @@ -663,7 +673,7 @@ func (a *FlowableActivity) ConsolidateQRepPartitions(ctx context.Context, config runUUID string) error { dstConn, err := connectors.GetQRepConsolidateConnector(ctx, config.DestinationPeer) if errors.Is(err, connectors.ErrUnsupportedFunctionality) { - return a.CatalogMirrorMonitor.UpdateEndTimeForQRepRun(ctx, runUUID) + return monitoring.UpdateEndTimeForQRepRun(ctx, a.CatalogPool, runUUID) } else if err != nil { return err } @@ -681,7 +691,7 @@ func (a *FlowableActivity) ConsolidateQRepPartitions(ctx context.Context, config return err } - return a.CatalogMirrorMonitor.UpdateEndTimeForQRepRun(ctx, runUUID) + return monitoring.UpdateEndTimeForQRepRun(ctx, a.CatalogPool, runUUID) } func (a *FlowableActivity) CleanupQRepFlow(ctx context.Context, config *protos.QRepConfig) error { @@ -719,14 +729,8 @@ func (a *FlowableActivity) DropFlow(ctx context.Context, config *protos.Shutdown return nil } -func getPostgresPeerConfigs(ctx context.Context) ([]*protos.Peer, error) { - catalogPool, catalogErr := catalog.GetCatalogConnectionPoolFromEnv() - if catalogErr != nil { - return nil, fmt.Errorf("error getting catalog connection pool: %w", catalogErr) - } - defer catalogPool.Close() - - optionRows, err := catalogPool.Query(ctx, ` +func (a *FlowableActivity) getPostgresPeerConfigs(ctx context.Context) ([]*protos.Peer, error) { + optionRows, err := a.CatalogPool.Query(ctx, ` SELECT DISTINCT p.name, p.options FROM peers p JOIN flows f ON p.id = f.source_peer @@ -768,7 +772,7 @@ func (a *FlowableActivity) SendWALHeartbeat(ctx context.Context) error { log.Info("context is done, exiting wal heartbeat send loop") return nil case <-ticker.C: - pgPeers, err := getPostgresPeerConfigs(ctx) + pgPeers, err := a.getPostgresPeerConfigs(ctx) if err != nil { log.Warn("[sendwalheartbeat]: warning: unable to fetch peers." + "Skipping walheartbeat send. error encountered: " + err.Error()) @@ -950,17 +954,17 @@ func (a *FlowableActivity) ReplicateXminPartition(ctx context.Context, }}, } } - updateErr := a.CatalogMirrorMonitor.InitializeQRepRun(ctx, config, runUUID, []*protos.QRepPartition{partitionForMetrics}) + updateErr := monitoring.InitializeQRepRun(ctx, a.CatalogPool, config, runUUID, []*protos.QRepPartition{partitionForMetrics}) if updateErr != nil { return updateErr } - err := a.CatalogMirrorMonitor.UpdateStartTimeForPartition(ctx, runUUID, partition, startTime) + err := monitoring.UpdateStartTimeForPartition(ctx, a.CatalogPool, runUUID, partition, startTime) if err != nil { return fmt.Errorf("failed to update start time for partition: %w", err) } - err = a.CatalogMirrorMonitor.UpdatePullEndTimeAndRowsForPartition(errCtx, runUUID, partition, int64(numRecords)) + err = monitoring.UpdatePullEndTimeAndRowsForPartition(errCtx, a.CatalogPool, runUUID, partition, int64(numRecords)) if err != nil { log.Errorf("%v", err) return err @@ -992,7 +996,7 @@ func (a *FlowableActivity) ReplicateXminPartition(ctx context.Context, return 0, err } - err = a.CatalogMirrorMonitor.UpdateRowsSyncedForPartition(ctx, rowsSynced, runUUID, partition) + err = monitoring.UpdateRowsSyncedForPartition(ctx, a.CatalogPool, rowsSynced, runUUID, partition) if err != nil { return 0, err } @@ -1002,7 +1006,7 @@ func (a *FlowableActivity) ReplicateXminPartition(ctx context.Context, }).Infof("pushed %d records\n", rowsSynced) } - err = a.CatalogMirrorMonitor.UpdateEndTimeForPartition(ctx, runUUID, partition) + err = monitoring.UpdateEndTimeForPartition(ctx, a.CatalogPool, runUUID, partition) if err != nil { return 0, err } diff --git a/flow/cmd/api.go b/flow/cmd/api.go index 6715ff3c55..3da1286cfe 100644 --- a/flow/cmd/api.go +++ b/flow/cmd/api.go @@ -125,7 +125,6 @@ func APIMain(args *APIServerParams) error { } flowHandler := NewFlowRequestHandler(tc, catalogConn, taskQueue) - defer flowHandler.Close() err = killExistingHeartbeatFlows(ctx, tc, args.TemporalNamespace, taskQueue) if err != nil { diff --git a/flow/cmd/handler.go b/flow/cmd/handler.go index 40a8e9147f..d71f1e49a7 100644 --- a/flow/cmd/handler.go +++ b/flow/cmd/handler.go @@ -116,13 +116,6 @@ func (h *FlowRequestHandler) createQrepJobEntry(ctx context.Context, return nil } -// Close closes the connection pool -func (h *FlowRequestHandler) Close() { - if h.pool != nil { - h.pool.Close() - } -} - func (h *FlowRequestHandler) CreateCDCFlow( ctx context.Context, req *protos.CreateCDCFlowRequest) (*protos.CreateCDCFlowResponse, error) { cfg := req.ConnectionConfigs diff --git a/flow/cmd/worker.go b/flow/cmd/worker.go index f7431e52cc..a9e1281619 100644 --- a/flow/cmd/worker.go +++ b/flow/cmd/worker.go @@ -10,7 +10,6 @@ import ( "github.com/PeerDB-io/peer-flow/activities" utils "github.com/PeerDB-io/peer-flow/connectors/utils/catalog" - "github.com/PeerDB-io/peer-flow/connectors/utils/monitoring" "github.com/PeerDB-io/peer-flow/shared" peerflow "github.com/PeerDB-io/peer-flow/workflows" @@ -108,8 +107,6 @@ func WorkerMain(opts *WorkerOptions) error { if err != nil { return fmt.Errorf("unable to create catalog connection pool: %w", err) } - catalogMirrorMonitor := monitoring.NewCatalogMirrorMonitor(conn) - defer catalogMirrorMonitor.Close() c, err := client.Dial(clientOptions) if err != nil { @@ -134,7 +131,7 @@ func WorkerMain(opts *WorkerOptions) error { w.RegisterWorkflow(peerflow.DropFlowWorkflow) w.RegisterWorkflow(peerflow.HeartbeatFlowWorkflow) w.RegisterActivity(&activities.FlowableActivity{ - CatalogMirrorMonitor: catalogMirrorMonitor, + CatalogPool: conn, }) err = w.Run(worker.InterruptCh()) diff --git a/flow/connectors/bigquery/bigquery.go b/flow/connectors/bigquery/bigquery.go index 52b6479993..f1ad468c5b 100644 --- a/flow/connectors/bigquery/bigquery.go +++ b/flow/connectors/bigquery/bigquery.go @@ -199,7 +199,6 @@ func (c *BigQueryConnector) Close() error { if c == nil || c.client == nil { return nil } - c.catalogPool.Close() return c.client.Close() } diff --git a/flow/connectors/external_metadata/store.go b/flow/connectors/external_metadata/store.go index 3d4f9baf99..637432b7fe 100644 --- a/flow/connectors/external_metadata/store.go +++ b/flow/connectors/external_metadata/store.go @@ -54,7 +54,7 @@ func NewPostgresMetadataStore(ctx context.Context, pgConfig *protos.PostgresConf } func (p *PostgresMetadataStore) Close() error { - if p.pool != nil { + if p.config != nil && p.pool != nil { p.pool.Close() } diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index 394e8a9d79..eb59f1ab8d 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -246,13 +246,13 @@ func (c *PostgresConnector) PullRecords(req *model.PullRecordsRequest) error { return err } - cdcMirrorMonitor, ok := c.ctx.Value(shared.CDCMirrorMonitorKey).(*monitoring.CatalogMirrorMonitor) + catalogPool, ok := c.ctx.Value(shared.CDCMirrorMonitorKey).(*pgxpool.Pool) if ok { latestLSN, err := c.getCurrentLSN() if err != nil { return fmt.Errorf("failed to get current LSN: %w", err) } - err = cdcMirrorMonitor.UpdateLatestLSNAtSourceForCDCFlow(c.ctx, req.FlowJobName, latestLSN) + err = monitoring.UpdateLatestLSNAtSourceForCDCFlow(c.ctx, catalogPool, req.FlowJobName, latestLSN) if err != nil { return fmt.Errorf("failed to update latest LSN at source for CDC flow: %w", err) } diff --git a/flow/connectors/utils/catalog/env.go b/flow/connectors/utils/catalog/env.go index e1bd33d25a..0563f9201b 100644 --- a/flow/connectors/utils/catalog/env.go +++ b/flow/connectors/utils/catalog/env.go @@ -5,29 +5,37 @@ import ( "fmt" "os" "strconv" + "sync" "github.com/PeerDB-io/peer-flow/connectors/utils" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/jackc/pgx/v5/pgxpool" ) +var poolMutex = &sync.Mutex{} +var pool *pgxpool.Pool + func GetCatalogConnectionPoolFromEnv() (*pgxpool.Pool, error) { - catalogConnectionString, err := genCatalogConnectionString() - if err != nil { - return nil, fmt.Errorf("unable to generate catalog connection string: %w", err) - } + poolMutex.Lock() + defer poolMutex.Unlock() + if pool == nil { + catalogConnectionString, err := genCatalogConnectionString() + if err != nil { + return nil, fmt.Errorf("unable to generate catalog connection string: %w", err) + } - catalogConn, err := pgxpool.New(context.Background(), catalogConnectionString) - if err != nil { - return nil, fmt.Errorf("unable to establish connection with catalog: %w", err) + pool, err = pgxpool.New(context.Background(), catalogConnectionString) + if err != nil { + return nil, fmt.Errorf("unable to establish connection with catalog: %w", err) + } } - err = catalogConn.Ping(context.Background()) + err := pool.Ping(context.Background()) if err != nil { - return nil, fmt.Errorf("unable to establish connection with catalog: %w", err) + return pool, fmt.Errorf("unable to establish connection with catalog: %w", err) } - return catalogConn, nil + return pool, nil } func genCatalogConnectionString() (string, error) { diff --git a/flow/connectors/utils/monitoring/monitoring.go b/flow/connectors/utils/monitoring/monitoring.go index 99415cd6da..59d8d5a82a 100644 --- a/flow/connectors/utils/monitoring/monitoring.go +++ b/flow/connectors/utils/monitoring/monitoring.go @@ -14,10 +14,6 @@ import ( "google.golang.org/protobuf/proto" ) -type CatalogMirrorMonitor struct { - catalogConn *pgxpool.Pool -} - type CDCBatchInfo struct { BatchID int64 RowsInBatch uint32 @@ -26,29 +22,8 @@ type CDCBatchInfo struct { StartTime time.Time } -func NewCatalogMirrorMonitor(catalogConn *pgxpool.Pool) *CatalogMirrorMonitor { - return &CatalogMirrorMonitor{ - catalogConn: catalogConn, - } -} - -func (c *CatalogMirrorMonitor) IsActive() bool { - return !(c == nil || c.catalogConn == nil) -} - -func (c *CatalogMirrorMonitor) Close() { - if c == nil || c.catalogConn == nil { - return - } - c.catalogConn.Close() -} - -func (c *CatalogMirrorMonitor) InitializeCDCFlow(ctx context.Context, flowJobName string) error { - if c == nil || c.catalogConn == nil { - return nil - } - - _, err := c.catalogConn.Exec(ctx, +func InitializeCDCFlow(ctx context.Context, pool *pgxpool.Pool, flowJobName string) error { + _, err := pool.Exec(ctx, `INSERT INTO peerdb_stats.cdc_flows(flow_name,latest_lsn_at_source,latest_lsn_at_target) VALUES($1,0,0) ON CONFLICT DO NOTHING`, flowJobName) if err != nil { @@ -57,13 +32,9 @@ func (c *CatalogMirrorMonitor) InitializeCDCFlow(ctx context.Context, flowJobNam return nil } -func (c *CatalogMirrorMonitor) UpdateLatestLSNAtSourceForCDCFlow(ctx context.Context, flowJobName string, +func UpdateLatestLSNAtSourceForCDCFlow(ctx context.Context, pool *pgxpool.Pool, flowJobName string, latestLSNAtSource pglogrepl.LSN) error { - if c == nil || c.catalogConn == nil { - return nil - } - - _, err := c.catalogConn.Exec(ctx, + _, err := pool.Exec(ctx, "UPDATE peerdb_stats.cdc_flows SET latest_lsn_at_source=$1 WHERE flow_name=$2", uint64(latestLSNAtSource), flowJobName) if err != nil { @@ -72,13 +43,9 @@ func (c *CatalogMirrorMonitor) UpdateLatestLSNAtSourceForCDCFlow(ctx context.Con return nil } -func (c *CatalogMirrorMonitor) UpdateLatestLSNAtTargetForCDCFlow(ctx context.Context, flowJobName string, +func UpdateLatestLSNAtTargetForCDCFlow(ctx context.Context, pool *pgxpool.Pool, flowJobName string, latestLSNAtTarget pglogrepl.LSN) error { - if c == nil || c.catalogConn == nil { - return nil - } - - _, err := c.catalogConn.Exec(ctx, + _, err := pool.Exec(ctx, "UPDATE peerdb_stats.cdc_flows SET latest_lsn_at_target=$1 WHERE flow_name=$2", uint64(latestLSNAtTarget), flowJobName) if err != nil { @@ -87,13 +54,9 @@ func (c *CatalogMirrorMonitor) UpdateLatestLSNAtTargetForCDCFlow(ctx context.Con return nil } -func (c *CatalogMirrorMonitor) AddCDCBatchForFlow(ctx context.Context, flowJobName string, +func AddCDCBatchForFlow(ctx context.Context, pool *pgxpool.Pool, flowJobName string, batchInfo CDCBatchInfo) error { - if c == nil || c.catalogConn == nil { - return nil - } - - _, err := c.catalogConn.Exec(ctx, + _, err := pool.Exec(ctx, `INSERT INTO peerdb_stats.cdc_batches(flow_name,batch_id,rows_in_batch,batch_start_lsn,batch_end_lsn, start_time) VALUES($1,$2,$3,$4,$5,$6) ON CONFLICT DO NOTHING`, flowJobName, batchInfo.BatchID, batchInfo.RowsInBatch, @@ -105,18 +68,15 @@ func (c *CatalogMirrorMonitor) AddCDCBatchForFlow(ctx context.Context, flowJobNa } // update num records and end-lsn for a cdc batch -func (c *CatalogMirrorMonitor) UpdateNumRowsAndEndLSNForCDCBatch( +func UpdateNumRowsAndEndLSNForCDCBatch( ctx context.Context, + pool *pgxpool.Pool, flowJobName string, batchID int64, numRows uint32, batchEndLSN pglogrepl.LSN, ) error { - if c == nil || c.catalogConn == nil { - return nil - } - - _, err := c.catalogConn.Exec(ctx, + _, err := pool.Exec(ctx, "UPDATE peerdb_stats.cdc_batches SET rows_in_batch=$1,batch_end_lsn=$2 WHERE flow_name=$3 AND batch_id=$4", numRows, uint64(batchEndLSN), flowJobName, batchID) if err != nil { @@ -125,16 +85,13 @@ func (c *CatalogMirrorMonitor) UpdateNumRowsAndEndLSNForCDCBatch( return nil } -func (c *CatalogMirrorMonitor) UpdateEndTimeForCDCBatch( +func UpdateEndTimeForCDCBatch( ctx context.Context, + pool *pgxpool.Pool, flowJobName string, batchID int64, ) error { - if c == nil || c.catalogConn == nil { - return nil - } - - _, err := c.catalogConn.Exec(ctx, + _, err := pool.Exec(ctx, "UPDATE peerdb_stats.cdc_batches SET end_time=$1 WHERE flow_name=$2 AND batch_id=$3", time.Now(), flowJobName, batchID) if err != nil { @@ -143,13 +100,9 @@ func (c *CatalogMirrorMonitor) UpdateEndTimeForCDCBatch( return nil } -func (c *CatalogMirrorMonitor) AddCDCBatchTablesForFlow(ctx context.Context, flowJobName string, +func AddCDCBatchTablesForFlow(ctx context.Context, pool *pgxpool.Pool, flowJobName string, batchID int64, tableNameRowsMapping map[string]uint32) error { - if c == nil || c.catalogConn == nil { - return nil - } - - insertBatchTablesTx, err := c.catalogConn.Begin(ctx) + insertBatchTablesTx, err := pool.Begin(ctx) if err != nil { return fmt.Errorf("error while beginning transaction for inserting statistics into cdc_batch_table: %w", err) } @@ -177,18 +130,15 @@ func (c *CatalogMirrorMonitor) AddCDCBatchTablesForFlow(ctx context.Context, flo return nil } -func (c *CatalogMirrorMonitor) InitializeQRepRun( +func InitializeQRepRun( ctx context.Context, + pool *pgxpool.Pool, config *protos.QRepConfig, runUUID string, partitions []*protos.QRepPartition, ) error { - if c == nil || c.catalogConn == nil { - return nil - } - flowJobName := config.GetFlowJobName() - _, err := c.catalogConn.Exec(ctx, + _, err := pool.Exec(ctx, "INSERT INTO peerdb_stats.qrep_runs(flow_name,run_uuid) VALUES($1,$2) ON CONFLICT DO NOTHING", flowJobName, runUUID) if err != nil { @@ -200,7 +150,7 @@ func (c *CatalogMirrorMonitor) InitializeQRepRun( return fmt.Errorf("unable to marshal flow config: %w", err) } - _, err = c.catalogConn.Exec(ctx, + _, err = pool.Exec(ctx, "UPDATE peerdb_stats.qrep_runs SET config_proto = $1 WHERE flow_name = $2", cfgBytes, flowJobName) if err != nil { @@ -208,7 +158,7 @@ func (c *CatalogMirrorMonitor) InitializeQRepRun( } for _, partition := range partitions { - if err := c.addPartitionToQRepRun(ctx, flowJobName, runUUID, partition); err != nil { + if err := addPartitionToQRepRun(ctx, pool, flowJobName, runUUID, partition); err != nil { return fmt.Errorf("unable to add partition to qrep run: %w", err) } } @@ -216,12 +166,8 @@ func (c *CatalogMirrorMonitor) InitializeQRepRun( return nil } -func (c *CatalogMirrorMonitor) UpdateStartTimeForQRepRun(ctx context.Context, runUUID string) error { - if c == nil || c.catalogConn == nil { - return nil - } - - _, err := c.catalogConn.Exec(ctx, +func UpdateStartTimeForQRepRun(ctx context.Context, pool *pgxpool.Pool, runUUID string) error { + _, err := pool.Exec(ctx, "UPDATE peerdb_stats.qrep_runs SET start_time=$1 WHERE run_uuid=$2", time.Now(), runUUID) if err != nil { @@ -231,12 +177,8 @@ func (c *CatalogMirrorMonitor) UpdateStartTimeForQRepRun(ctx context.Context, ru return nil } -func (c *CatalogMirrorMonitor) UpdateEndTimeForQRepRun(ctx context.Context, runUUID string) error { - if c == nil || c.catalogConn == nil { - return nil - } - - _, err := c.catalogConn.Exec(ctx, +func UpdateEndTimeForQRepRun(ctx context.Context, pool *pgxpool.Pool, runUUID string) error { + _, err := pool.Exec(ctx, "UPDATE peerdb_stats.qrep_runs SET end_time=$1 WHERE run_uuid=$2", time.Now(), runUUID) if err != nil { @@ -246,16 +188,13 @@ func (c *CatalogMirrorMonitor) UpdateEndTimeForQRepRun(ctx context.Context, runU return nil } -func (c *CatalogMirrorMonitor) AppendSlotSizeInfo( +func AppendSlotSizeInfo( ctx context.Context, + pool *pgxpool.Pool, peerName string, slotInfo *protos.SlotInfo, ) error { - if c == nil || c.catalogConn == nil || slotInfo == nil { - return nil - } - - _, err := c.catalogConn.Exec(ctx, + _, err := pool.Exec(ctx, "INSERT INTO peerdb_stats.peer_slot_size"+ "(peer_name, slot_name, restart_lsn, redo_lsn, confirmed_flush_lsn, slot_size, wal_status) "+ "VALUES($1,$2,$3,$4,$5,$6) ON CONFLICT DO NOTHING;", @@ -274,12 +213,8 @@ func (c *CatalogMirrorMonitor) AppendSlotSizeInfo( return nil } -func (c *CatalogMirrorMonitor) addPartitionToQRepRun(ctx context.Context, flowJobName string, +func addPartitionToQRepRun(ctx context.Context, pool *pgxpool.Pool, flowJobName string, runUUID string, partition *protos.QRepPartition) error { - if c == nil || c.catalogConn == nil { - return nil - } - if partition.Range == nil && partition.FullTablePartition { log.Infof("partition %s is a full table partition. Metrics logging is skipped.", partition.PartitionId) return nil @@ -317,7 +252,7 @@ func (c *CatalogMirrorMonitor) addPartitionToQRepRun(ctx context.Context, flowJo return fmt.Errorf("unknown range type: %v", x) } - _, err := c.catalogConn.Exec(ctx, + _, err := pool.Exec(ctx, `INSERT INTO peerdb_stats.qrep_partitions (flow_name,run_uuid,partition_uuid,partition_start,partition_end,restart_count) VALUES($1,$2,$3,$4,$5,$6) ON CONFLICT(run_uuid,partition_uuid) DO UPDATE SET @@ -330,17 +265,14 @@ func (c *CatalogMirrorMonitor) addPartitionToQRepRun(ctx context.Context, flowJo return nil } -func (c *CatalogMirrorMonitor) UpdateStartTimeForPartition( +func UpdateStartTimeForPartition( ctx context.Context, + pool *pgxpool.Pool, runUUID string, partition *protos.QRepPartition, startTime time.Time, ) error { - if c == nil || c.catalogConn == nil { - return nil - } - - _, err := c.catalogConn.Exec(ctx, `UPDATE peerdb_stats.qrep_partitions SET start_time=$1 + _, err := pool.Exec(ctx, `UPDATE peerdb_stats.qrep_partitions SET start_time=$1 WHERE run_uuid=$2 AND partition_uuid=$3`, startTime, runUUID, partition.PartitionId) if err != nil { return fmt.Errorf("error while updating qrep partition in qrep_partitions: %w", err) @@ -348,13 +280,9 @@ func (c *CatalogMirrorMonitor) UpdateStartTimeForPartition( return nil } -func (c *CatalogMirrorMonitor) UpdatePullEndTimeAndRowsForPartition(ctx context.Context, runUUID string, +func UpdatePullEndTimeAndRowsForPartition(ctx context.Context, pool *pgxpool.Pool, runUUID string, partition *protos.QRepPartition, rowsInPartition int64) error { - if c == nil || c.catalogConn == nil { - return nil - } - - _, err := c.catalogConn.Exec(ctx, `UPDATE peerdb_stats.qrep_partitions SET pull_end_time=$1,rows_in_partition=$2 + _, err := pool.Exec(ctx, `UPDATE peerdb_stats.qrep_partitions SET pull_end_time=$1,rows_in_partition=$2 WHERE run_uuid=$3 AND partition_uuid=$4`, time.Now(), rowsInPartition, runUUID, partition.PartitionId) if err != nil { return fmt.Errorf("error while updating qrep partition in qrep_partitions: %w", err) @@ -362,13 +290,9 @@ func (c *CatalogMirrorMonitor) UpdatePullEndTimeAndRowsForPartition(ctx context. return nil } -func (c *CatalogMirrorMonitor) UpdateEndTimeForPartition(ctx context.Context, runUUID string, +func UpdateEndTimeForPartition(ctx context.Context, pool *pgxpool.Pool, runUUID string, partition *protos.QRepPartition) error { - if c == nil || c.catalogConn == nil { - return nil - } - - _, err := c.catalogConn.Exec(ctx, `UPDATE peerdb_stats.qrep_partitions SET end_time=$1 + _, err := pool.Exec(ctx, `UPDATE peerdb_stats.qrep_partitions SET end_time=$1 WHERE run_uuid=$2 AND partition_uuid=$3`, time.Now(), runUUID, partition.PartitionId) if err != nil { return fmt.Errorf("error while updating qrep partition in qrep_partitions: %w", err) @@ -376,13 +300,9 @@ func (c *CatalogMirrorMonitor) UpdateEndTimeForPartition(ctx context.Context, ru return nil } -func (c *CatalogMirrorMonitor) UpdateRowsSyncedForPartition(ctx context.Context, rowsSynced int, runUUID string, +func UpdateRowsSyncedForPartition(ctx context.Context, pool *pgxpool.Pool, rowsSynced int, runUUID string, partition *protos.QRepPartition) error { - if c == nil || c.catalogConn == nil { - return nil - } - - _, err := c.catalogConn.Exec(ctx, `UPDATE peerdb_stats.qrep_partitions SET rows_synced=$1 + _, err := pool.Exec(ctx, `UPDATE peerdb_stats.qrep_partitions SET rows_synced=$1 WHERE run_uuid=$2 AND partition_uuid=$3`, rowsSynced, runUUID, partition.PartitionId) if err != nil { return fmt.Errorf("error while updating rows_synced in qrep_partitions: %w", err) diff --git a/flow/e2e/bigquery/peer_flow_bq_test.go b/flow/e2e/bigquery/peer_flow_bq_test.go index 559f03ba62..d642d8eed1 100644 --- a/flow/e2e/bigquery/peer_flow_bq_test.go +++ b/flow/e2e/bigquery/peer_flow_bq_test.go @@ -117,7 +117,7 @@ func (s PeerFlowE2ETestSuiteBQ) tearDownSuite() { func (s PeerFlowE2ETestSuiteBQ) Test_Invalid_Connection_Config() { env := e2e.NewTemporalTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env) + e2e.RegisterWorkflowsAndActivities(env, s.t) // TODO (kaushikiska): ensure flow name can only be alpha numeric and underscores. limits := peerflow.CDCFlowLimits{ @@ -139,7 +139,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Invalid_Connection_Config() { func (s PeerFlowE2ETestSuiteBQ) Test_Complete_Flow_No_Data() { env := e2e.NewTemporalTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env) + e2e.RegisterWorkflowsAndActivities(env, s.t) srcTableName := s.attachSchemaSuffix("test_no_data") dstTableName := "test_no_data" @@ -183,7 +183,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Complete_Flow_No_Data() { func (s PeerFlowE2ETestSuiteBQ) Test_Char_ColType_Error() { env := e2e.NewTemporalTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env) + e2e.RegisterWorkflowsAndActivities(env, s.t) srcTableName := s.attachSchemaSuffix("test_char_coltype") dstTableName := "test_char_coltype" @@ -230,7 +230,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Char_ColType_Error() { // correctly synced to the destination table after sync flow completes. func (s PeerFlowE2ETestSuiteBQ) Test_Complete_Simple_Flow_BQ() { env := e2e.NewTemporalTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env) + e2e.RegisterWorkflowsAndActivities(env, s.t) srcTableName := s.attachSchemaSuffix("test_simple_flow_bq") dstTableName := "test_simple_flow_bq" @@ -297,7 +297,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Complete_Simple_Flow_BQ() { func (s PeerFlowE2ETestSuiteBQ) Test_Toast_BQ() { env := e2e.NewTemporalTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env) + e2e.RegisterWorkflowsAndActivities(env, s.t) srcTableName := s.attachSchemaSuffix("test_toast_bq_1") dstTableName := "test_toast_bq_1" @@ -365,7 +365,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_BQ() { func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Nochanges_BQ() { env := e2e.NewTemporalTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env) + e2e.RegisterWorkflowsAndActivities(env, s.t) srcTableName := s.attachSchemaSuffix("test_toast_bq_2") dstTableName := "test_toast_bq_2" @@ -398,6 +398,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Nochanges_BQ() { // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and execute a transaction touching toast columns + done := make(chan struct{}) go func() { e2e.SetupCDCFlowStatusQuery(env, connectionGen) /* transaction updating no rows */ @@ -409,6 +410,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Nochanges_BQ() { `, srcTableName, srcTableName)) require.NoError(s.t, err) fmt.Println("Executed a transaction touching toast columns") + done <- struct{}{} }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) @@ -422,11 +424,12 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Nochanges_BQ() { s.compareTableContentsBQ(dstTableName, "id,t1,t2,k") env.AssertExpectations(s.t) + <-done } func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_1_BQ() { env := e2e.NewTemporalTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env) + e2e.RegisterWorkflowsAndActivities(env, s.t) srcTableName := s.attachSchemaSuffix("test_toast_bq_3") dstTableName := "test_toast_bq_3" @@ -500,7 +503,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_1_BQ() { func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_2_BQ() { env := e2e.NewTemporalTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env) + e2e.RegisterWorkflowsAndActivities(env, s.t) srcTableName := s.attachSchemaSuffix("test_toast_bq_4") dstTableName := "test_toast_bq_4" @@ -567,7 +570,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_2_BQ() { func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_3_BQ() { env := e2e.NewTemporalTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env) + e2e.RegisterWorkflowsAndActivities(env, s.t) srcTableName := s.attachSchemaSuffix("test_toast_bq_5") dstTableName := "test_toast_bq_5" @@ -634,7 +637,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_3_BQ() { func (s PeerFlowE2ETestSuiteBQ) Test_Types_BQ() { env := e2e.NewTemporalTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env) + e2e.RegisterWorkflowsAndActivities(env, s.t) srcTableName := s.attachSchemaSuffix("test_types_bq") dstTableName := "test_types_bq" @@ -712,7 +715,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Types_BQ() { func (s PeerFlowE2ETestSuiteBQ) Test_Multi_Table_BQ() { env := e2e.NewTemporalTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env) + e2e.RegisterWorkflowsAndActivities(env, s.t) srcTable1Name := s.attachSchemaSuffix("test1_bq") dstTable1Name := "test1_bq" @@ -774,7 +777,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Multi_Table_BQ() { // TODO: not checking schema exactly, add later func (s PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() { env := e2e.NewTemporalTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env) + e2e.RegisterWorkflowsAndActivities(env, s.t) srcTableName := s.attachSchemaSuffix("test_simple_schema_changes") dstTableName := "test_simple_schema_changes" @@ -874,7 +877,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() { func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_BQ() { env := e2e.NewTemporalTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env) + e2e.RegisterWorkflowsAndActivities(env, s.t) srcTableName := s.attachSchemaSuffix("test_simple_cpkey") dstTableName := "test_simple_cpkey" @@ -947,7 +950,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_BQ() { func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_1_BQ() { env := e2e.NewTemporalTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env) + e2e.RegisterWorkflowsAndActivities(env, s.t) srcTableName := s.attachSchemaSuffix("test_cpkey_toast1") dstTableName := "test_cpkey_toast1" @@ -1024,7 +1027,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_1_BQ() { func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_2_BQ() { env := e2e.NewTemporalTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env) + e2e.RegisterWorkflowsAndActivities(env, s.t) srcTableName := s.attachSchemaSuffix("test_cpkey_toast2") dstTableName := "test_cpkey_toast2" diff --git a/flow/e2e/bigquery/qrep_flow_bq_test.go b/flow/e2e/bigquery/qrep_flow_bq_test.go index c12634cec8..f520014b04 100644 --- a/flow/e2e/bigquery/qrep_flow_bq_test.go +++ b/flow/e2e/bigquery/qrep_flow_bq_test.go @@ -48,7 +48,7 @@ func (s PeerFlowE2ETestSuiteBQ) compareTableContentsBQ(tableName string, colsStr func (s PeerFlowE2ETestSuiteBQ) Test_Complete_QRep_Flow_Avro() { env := e2e.NewTemporalTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env) + e2e.RegisterWorkflowsAndActivities(env, s.t) numRows := 10 diff --git a/flow/e2e/postgres/peer_flow_pg_test.go b/flow/e2e/postgres/peer_flow_pg_test.go index 3f5718bb72..2720891fb6 100644 --- a/flow/e2e/postgres/peer_flow_pg_test.go +++ b/flow/e2e/postgres/peer_flow_pg_test.go @@ -20,7 +20,7 @@ func (s *PeerFlowE2ETestSuitePG) attachSuffix(input string) string { func (s *PeerFlowE2ETestSuitePG) Test_Simple_Flow_PG() { env := s.NewTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env) + e2e.RegisterWorkflowsAndActivities(env, s.T()) srcTableName := s.attachSchemaSuffix("test_simple_flow") dstTableName := s.attachSchemaSuffix("test_simple_flow_dst") @@ -83,7 +83,7 @@ func (s *PeerFlowE2ETestSuitePG) Test_Simple_Flow_PG() { func (s *PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() { env := s.NewTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env) + e2e.RegisterWorkflowsAndActivities(env, s.T()) srcTableName := s.attachSchemaSuffix("test_simple_schema_changes") dstTableName := s.attachSchemaSuffix("test_simple_schema_changes_dst") @@ -244,7 +244,7 @@ func (s *PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() { func (s *PeerFlowE2ETestSuitePG) Test_Composite_PKey_PG() { env := s.NewTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env) + e2e.RegisterWorkflowsAndActivities(env, s.T()) srcTableName := s.attachSchemaSuffix("test_simple_cpkey") dstTableName := s.attachSchemaSuffix("test_simple_cpkey_dst") @@ -319,7 +319,7 @@ func (s *PeerFlowE2ETestSuitePG) Test_Composite_PKey_PG() { func (s *PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_1_PG() { env := s.NewTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env) + e2e.RegisterWorkflowsAndActivities(env, s.T()) srcTableName := s.attachSchemaSuffix("test_cpkey_toast1") dstTableName := s.attachSchemaSuffix("test_cpkey_toast1_dst") @@ -400,7 +400,7 @@ func (s *PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_1_PG() { func (s *PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_2_PG() { env := s.NewTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env) + e2e.RegisterWorkflowsAndActivities(env, s.T()) srcTableName := s.attachSchemaSuffix("test_cpkey_toast2") dstTableName := s.attachSchemaSuffix("test_cpkey_toast2_dst") diff --git a/flow/e2e/postgres/qrep_flow_pg_test.go b/flow/e2e/postgres/qrep_flow_pg_test.go index c9b82a4c6c..9ca4db2a79 100644 --- a/flow/e2e/postgres/qrep_flow_pg_test.go +++ b/flow/e2e/postgres/qrep_flow_pg_test.go @@ -138,7 +138,7 @@ func (s *PeerFlowE2ETestSuitePG) compareQuery(srcSchemaQualified, dstSchemaQuali func (s *PeerFlowE2ETestSuitePG) Test_Complete_QRep_Flow_Multi_Insert_PG() { env := s.NewTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env) + e2e.RegisterWorkflowsAndActivities(env, s.T()) numRows := 10 diff --git a/flow/e2e/s3/cdc_s3_test.go b/flow/e2e/s3/cdc_s3_test.go index 85993e2fe8..478877ef4b 100644 --- a/flow/e2e/s3/cdc_s3_test.go +++ b/flow/e2e/s3/cdc_s3_test.go @@ -20,7 +20,7 @@ func (s *PeerFlowE2ETestSuiteS3) attachSuffix(input string) string { func (s *PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_S3() { env := s.NewTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env) + e2e.RegisterWorkflowsAndActivities(env, s.T()) srcTableName := s.attachSchemaSuffix("test_simple_flow_s3") dstTableName := fmt.Sprintf("%s.%s", "peerdb_test_s3", "test_simple_flow_s3") @@ -88,7 +88,7 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_S3() { func (s *PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_GCS_Interop() { env := s.NewTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env) + e2e.RegisterWorkflowsAndActivities(env, s.T()) setupErr := s.setupS3("gcs") if setupErr != nil { s.Fail("failed to setup S3", setupErr) diff --git a/flow/e2e/s3/qrep_flow_s3_test.go b/flow/e2e/s3/qrep_flow_s3_test.go index d807cf6335..810b25aa42 100644 --- a/flow/e2e/s3/qrep_flow_s3_test.go +++ b/flow/e2e/s3/qrep_flow_s3_test.go @@ -93,7 +93,7 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_QRep_Flow_S3() { } env := s.NewTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env) + e2e.RegisterWorkflowsAndActivities(env, s.T()) jobName := "test_complete_flow_s3" schemaQualifiedName := fmt.Sprintf("e2e_test_%s.%s", s3Suffix, jobName) @@ -140,7 +140,7 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_QRep_Flow_S3_CTID() { } env := s.NewTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env) + e2e.RegisterWorkflowsAndActivities(env, s.T()) jobName := "test_complete_flow_s3_ctid" schemaQualifiedName := fmt.Sprintf("e2e_test_%s.%s", s3Suffix, jobName) diff --git a/flow/e2e/snowflake/peer_flow_sf_test.go b/flow/e2e/snowflake/peer_flow_sf_test.go index 759ab34b05..1960c03ae5 100644 --- a/flow/e2e/snowflake/peer_flow_sf_test.go +++ b/flow/e2e/snowflake/peer_flow_sf_test.go @@ -128,7 +128,7 @@ func (s PeerFlowE2ETestSuiteSF) tearDownSuite() { func (s PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF() { env := e2e.NewTemporalTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env) + e2e.RegisterWorkflowsAndActivities(env, s.t) srcTableName := s.attachSchemaSuffix("test_simple_flow_sf") dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "test_simple_flow_sf") @@ -203,7 +203,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF() { func (s PeerFlowE2ETestSuiteSF) Test_Invalid_Geo_SF_Avro_CDC() { env := e2e.NewTemporalTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env) + e2e.RegisterWorkflowsAndActivities(env, s.t) srcTableName := s.attachSchemaSuffix("test_invalid_geo_sf_avro_cdc") dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "test_invalid_geo_sf_avro_cdc") @@ -288,7 +288,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Invalid_Geo_SF_Avro_CDC() { func (s PeerFlowE2ETestSuiteSF) Test_Toast_SF() { env := e2e.NewTemporalTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env) + e2e.RegisterWorkflowsAndActivities(env, s.t) srcTableName := s.attachSchemaSuffix("test_toast_sf_1") dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "test_toast_sf_1") @@ -355,7 +355,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Toast_SF() { func (s PeerFlowE2ETestSuiteSF) Test_Toast_Nochanges_SF() { env := e2e.NewTemporalTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env) + e2e.RegisterWorkflowsAndActivities(env, s.t) srcTableName := s.attachSchemaSuffix("test_toast_sf_2") dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "test_toast_sf_2") @@ -424,7 +424,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Toast_Nochanges_SF() { func (s PeerFlowE2ETestSuiteSF) Test_Toast_Advance_1_SF() { env := e2e.NewTemporalTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env) + e2e.RegisterWorkflowsAndActivities(env, s.t) srcTableName := s.attachSchemaSuffix("test_toast_sf_3") dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "test_toast_sf_3") @@ -497,7 +497,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Toast_Advance_1_SF() { func (s PeerFlowE2ETestSuiteSF) Test_Toast_Advance_2_SF() { env := e2e.NewTemporalTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env) + e2e.RegisterWorkflowsAndActivities(env, s.t) srcTableName := s.attachSchemaSuffix("test_toast_sf_4") dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "test_toast_sf_4") @@ -563,7 +563,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Toast_Advance_2_SF() { func (s PeerFlowE2ETestSuiteSF) Test_Toast_Advance_3_SF() { env := e2e.NewTemporalTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env) + e2e.RegisterWorkflowsAndActivities(env, s.t) srcTableName := s.attachSchemaSuffix("test_toast_sf_5") dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "test_toast_sf_5") @@ -629,7 +629,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Toast_Advance_3_SF() { func (s PeerFlowE2ETestSuiteSF) Test_Types_SF() { env := e2e.NewTemporalTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env) + e2e.RegisterWorkflowsAndActivities(env, s.t) srcTableName := s.attachSchemaSuffix("test_types_sf") dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "test_types_sf") @@ -706,7 +706,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Types_SF() { func (s PeerFlowE2ETestSuiteSF) Test_Multi_Table_SF() { env := e2e.NewTemporalTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env) + e2e.RegisterWorkflowsAndActivities(env, s.t) srcTable1Name := s.attachSchemaSuffix("test1_sf") srcTable2Name := s.attachSchemaSuffix("test2_sf") @@ -765,7 +765,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Multi_Table_SF() { func (s PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { env := e2e.NewTemporalTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env) + e2e.RegisterWorkflowsAndActivities(env, s.t) srcTableName := s.attachSchemaSuffix("test_simple_schema_changes") dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "test_simple_schema_changes") @@ -925,7 +925,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_SF() { env := e2e.NewTemporalTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env) + e2e.RegisterWorkflowsAndActivities(env, s.t) srcTableName := s.attachSchemaSuffix("test_simple_cpkey") dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "test_simple_cpkey") @@ -998,7 +998,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_SF() { func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_1_SF() { env := e2e.NewTemporalTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env) + e2e.RegisterWorkflowsAndActivities(env, s.t) srcTableName := s.attachSchemaSuffix("test_cpkey_toast1") dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "test_cpkey_toast1") @@ -1074,7 +1074,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_1_SF() { func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_2_SF() { env := e2e.NewTemporalTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env) + e2e.RegisterWorkflowsAndActivities(env, s.t) srcTableName := s.attachSchemaSuffix("test_cpkey_toast2") dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "test_cpkey_toast2") @@ -1146,7 +1146,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_2_SF() { func (s PeerFlowE2ETestSuiteSF) Test_Column_Exclusion() { env := e2e.NewTemporalTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env) + e2e.RegisterWorkflowsAndActivities(env, s.t) srcTableName := s.attachSchemaSuffix("test_exclude_sf") dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "test_exclude_sf") @@ -1228,7 +1228,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Column_Exclusion() { func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_Basic() { env := e2e.NewTemporalTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env) + e2e.RegisterWorkflowsAndActivities(env, s.t) cmpTableName := s.attachSchemaSuffix("test_softdel") srcTableName := fmt.Sprintf("%s_src", cmpTableName) @@ -1314,7 +1314,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_Basic() { func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_IUD_Same_Batch() { env := e2e.NewTemporalTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env) + e2e.RegisterWorkflowsAndActivities(env, s.t) cmpTableName := s.attachSchemaSuffix("test_softdel_iud") srcTableName := fmt.Sprintf("%s_src", cmpTableName) @@ -1396,7 +1396,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_IUD_Same_Batch() { func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_UD_Same_Batch() { env := e2e.NewTemporalTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env) + e2e.RegisterWorkflowsAndActivities(env, s.t) cmpTableName := s.attachSchemaSuffix("test_softdel_ud") srcTableName := fmt.Sprintf("%s_src", cmpTableName) @@ -1482,7 +1482,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_UD_Same_Batch() { func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_Insert_After_Delete() { env := e2e.NewTemporalTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env) + e2e.RegisterWorkflowsAndActivities(env, s.t) srcTableName := s.attachSchemaSuffix("test_softdel_iad") dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "test_softdel_iad") diff --git a/flow/e2e/snowflake/qrep_flow_sf_test.go b/flow/e2e/snowflake/qrep_flow_sf_test.go index 81884d2e3d..a541be9283 100644 --- a/flow/e2e/snowflake/qrep_flow_sf_test.go +++ b/flow/e2e/snowflake/qrep_flow_sf_test.go @@ -58,7 +58,7 @@ func (s PeerFlowE2ETestSuiteSF) compareTableContentsSF(tableName string, selecto func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF() { env := e2e.NewTemporalTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env) + e2e.RegisterWorkflowsAndActivities(env, s.t) numRows := 10 @@ -97,7 +97,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF() { func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_Upsert_Simple() { env := e2e.NewTemporalTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env) + e2e.RegisterWorkflowsAndActivities(env, s.t) numRows := 10 @@ -140,7 +140,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_Upsert_Simple() func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_S3() { env := e2e.NewTemporalTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env) + e2e.RegisterWorkflowsAndActivities(env, s.t) numRows := 10 @@ -180,7 +180,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_S3() { func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_Upsert_XMIN() { env := e2e.NewTemporalTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env) + e2e.RegisterWorkflowsAndActivities(env, s.t) numRows := 10 @@ -224,7 +224,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_Upsert_XMIN() { func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_S3_Integration() { env := e2e.NewTemporalTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env) + e2e.RegisterWorkflowsAndActivities(env, s.t) numRows := 10 diff --git a/flow/e2e/sqlserver/qrep_flow_sqlserver_test.go b/flow/e2e/sqlserver/qrep_flow_sqlserver_test.go index 013685a364..8ea5370ee1 100644 --- a/flow/e2e/sqlserver/qrep_flow_sqlserver_test.go +++ b/flow/e2e/sqlserver/qrep_flow_sqlserver_test.go @@ -137,7 +137,7 @@ func (s *PeerFlowE2ETestSuiteSQLServer) Test_Complete_QRep_Flow_SqlServer_Append } env := s.NewTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env) + e2e.RegisterWorkflowsAndActivities(env, s.T()) numRows := 10 tblName := "test_qrep_flow_avro_ss_append" diff --git a/flow/e2e/test_utils.go b/flow/e2e/test_utils.go index c16040062a..e4652b4854 100644 --- a/flow/e2e/test_utils.go +++ b/flow/e2e/test_utils.go @@ -7,9 +7,11 @@ import ( "io" "os" "strings" + "testing" "time" "github.com/PeerDB-io/peer-flow/activities" + utils "github.com/PeerDB-io/peer-flow/connectors/utils/catalog" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/model/qvalue" @@ -39,7 +41,12 @@ func ReadFileToBytes(path string) ([]byte, error) { return ret, nil } -func RegisterWorkflowsAndActivities(env *testsuite.TestWorkflowEnvironment) { +func RegisterWorkflowsAndActivities(env *testsuite.TestWorkflowEnvironment, t *testing.T) { + conn, err := utils.GetCatalogConnectionPoolFromEnv() + if err != nil { + t.Fatalf("unable to create catalog connection pool: %v", err) + } + // set a 300 second timeout for the workflow to execute a few runs. env.SetTestTimeout(300 * time.Second) @@ -51,7 +58,7 @@ func RegisterWorkflowsAndActivities(env *testsuite.TestWorkflowEnvironment) { env.RegisterWorkflow(peerflow.QRepFlowWorkflow) env.RegisterWorkflow(peerflow.XminFlowWorkflow) env.RegisterWorkflow(peerflow.QRepPartitionWorkflow) - env.RegisterActivity(&activities.FlowableActivity{}) + env.RegisterActivity(&activities.FlowableActivity{CatalogPool: conn}) env.RegisterActivity(&activities.SnapshotActivity{}) }