diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index fae47a2f5c..e9688a14f0 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -13,7 +13,6 @@ import ( connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres" connsnowflake "github.com/PeerDB-io/peer-flow/connectors/snowflake" "github.com/PeerDB-io/peer-flow/connectors/utils" - catalog "github.com/PeerDB-io/peer-flow/connectors/utils/catalog" "github.com/PeerDB-io/peer-flow/connectors/utils/monitoring" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model" @@ -21,6 +20,7 @@ import ( "github.com/jackc/pglogrepl" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgtype" + "github.com/jackc/pgx/v5/pgxpool" log "github.com/sirupsen/logrus" "go.temporal.io/sdk/activity" "golang.org/x/sync/errgroup" @@ -40,7 +40,7 @@ type SlotSnapshotSignal struct { } type FlowableActivity struct { - CatalogMirrorMonitor *monitoring.CatalogMirrorMonitor + CatalogPool *pgxpool.Pool } // CheckConnection implements CheckConnection. @@ -114,7 +114,7 @@ func (a *FlowableActivity) CreateRawTable( ctx context.Context, config *protos.CreateRawTableInput, ) (*protos.CreateRawTableOutput, error) { - ctx = context.WithValue(ctx, shared.CDCMirrorMonitorKey, a.CatalogMirrorMonitor) + ctx = context.WithValue(ctx, shared.CDCMirrorMonitorKey, a.CatalogPool) dstConn, err := connectors.GetCDCSyncConnector(ctx, config.PeerConnectionConfig) if err != nil { return nil, fmt.Errorf("failed to get connector: %w", err) @@ -125,7 +125,7 @@ func (a *FlowableActivity) CreateRawTable( if err != nil { return nil, err } - err = a.CatalogMirrorMonitor.InitializeCDCFlow(ctx, config.FlowJobName) + err = monitoring.InitializeCDCFlow(ctx, a.CatalogPool, config.FlowJobName) if err != nil { return nil, err } @@ -174,7 +174,7 @@ func (a *FlowableActivity) handleSlotInfo( } if len(slotInfo) != 0 { - return a.CatalogMirrorMonitor.AppendSlotSizeInfo(ctx, peerName, slotInfo[0]) + return monitoring.AppendSlotSizeInfo(ctx, a.CatalogPool, peerName, slotInfo[0]) } return nil } @@ -209,7 +209,7 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, input *protos.StartFlowInput) (*model.SyncResponse, error) { activity.RecordHeartbeat(ctx, "starting flow...") conn := input.FlowConnectionConfigs - ctx = context.WithValue(ctx, shared.CDCMirrorMonitorKey, a.CatalogMirrorMonitor) + ctx = context.WithValue(ctx, shared.CDCMirrorMonitorKey, a.CatalogPool) dstConn, err := connectors.GetCDCSyncConnector(ctx, conn.Destination) if err != nil { return nil, fmt.Errorf("failed to get destination connector: %w", err) @@ -276,13 +276,13 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, "flowName": input.FlowConnectionConfigs.FlowJobName, }).Infof("the current sync flow has records: %v", hasRecords) - if a.CatalogMirrorMonitor.IsActive() && hasRecords { + if a.CatalogPool != nil && hasRecords { syncBatchID, err := dstConn.GetLastSyncBatchID(input.FlowConnectionConfigs.FlowJobName) if err != nil && conn.Destination.Type != protos.DBType_EVENTHUB { return nil, err } - err = a.CatalogMirrorMonitor.AddCDCBatchForFlow(ctx, input.FlowConnectionConfigs.FlowJobName, + err = monitoring.AddCDCBatchForFlow(ctx, a.CatalogPool, input.FlowConnectionConfigs.FlowJobName, monitoring.CDCBatchInfo{ BatchID: syncBatchID + 1, RowsInBatch: 0, @@ -347,8 +347,9 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, return nil, fmt.Errorf("failed to get last checkpoint: %w", err) } - err = a.CatalogMirrorMonitor.UpdateNumRowsAndEndLSNForCDCBatch( + err = monitoring.UpdateNumRowsAndEndLSNForCDCBatch( ctx, + a.CatalogPool, input.FlowConnectionConfigs.FlowJobName, res.CurrentSyncBatchID, uint32(numRecords), @@ -358,13 +359,17 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, return nil, err } - err = a.CatalogMirrorMonitor. - UpdateLatestLSNAtTargetForCDCFlow(ctx, input.FlowConnectionConfigs.FlowJobName, pglogrepl.LSN(lastCheckpoint)) + err = monitoring.UpdateLatestLSNAtTargetForCDCFlow( + ctx, + a.CatalogPool, + input.FlowConnectionConfigs.FlowJobName, + pglogrepl.LSN(lastCheckpoint), + ) if err != nil { return nil, err } if res.TableNameRowsMapping != nil { - err = a.CatalogMirrorMonitor.AddCDCBatchTablesForFlow(ctx, input.FlowConnectionConfigs.FlowJobName, + err = monitoring.AddCDCBatchTablesForFlow(ctx, a.CatalogPool, input.FlowConnectionConfigs.FlowJobName, res.CurrentSyncBatchID, res.TableNameRowsMapping) if err != nil { return nil, err @@ -401,7 +406,7 @@ func (a *FlowableActivity) StartNormalize( return nil, fmt.Errorf("failed to get last sync batch ID: %v", err) } - err = a.CatalogMirrorMonitor.UpdateEndTimeForCDCBatch(ctx, input.FlowConnectionConfigs.FlowJobName, + err = monitoring.UpdateEndTimeForCDCBatch(ctx, a.CatalogPool, input.FlowConnectionConfigs.FlowJobName, lastSyncBatchID) return nil, err } else if err != nil { @@ -434,8 +439,12 @@ func (a *FlowableActivity) StartNormalize( // normalize flow did not run due to no records, no need to update end time. if res.Done { - err = a.CatalogMirrorMonitor.UpdateEndTimeForCDCBatch(ctx, input.FlowConnectionConfigs.FlowJobName, - res.EndBatchID) + err = monitoring.UpdateEndTimeForCDCBatch( + ctx, + a.CatalogPool, + input.FlowConnectionConfigs.FlowJobName, + res.EndBatchID, + ) if err != nil { return nil, err } @@ -500,8 +509,9 @@ func (a *FlowableActivity) GetQRepPartitions(ctx context.Context, return nil, fmt.Errorf("failed to get partitions from source: %w", err) } if len(partitions) > 0 { - err = a.CatalogMirrorMonitor.InitializeQRepRun( + err = monitoring.InitializeQRepRun( ctx, + a.CatalogPool, config, runUUID, partitions, @@ -522,7 +532,7 @@ func (a *FlowableActivity) ReplicateQRepPartitions(ctx context.Context, partitions *protos.QRepPartitionBatch, runUUID string, ) error { - err := a.CatalogMirrorMonitor.UpdateStartTimeForQRepRun(ctx, runUUID) + err := monitoring.UpdateStartTimeForQRepRun(ctx, a.CatalogPool, runUUID) if err != nil { return fmt.Errorf("failed to update start time for qrep run: %w", err) } @@ -549,7 +559,7 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context, partition *protos.QRepPartition, runUUID string, ) error { - err := a.CatalogMirrorMonitor.UpdateStartTimeForPartition(ctx, runUUID, partition, time.Now()) + err := monitoring.UpdateStartTimeForPartition(ctx, a.CatalogPool, runUUID, partition, time.Now()) if err != nil { return fmt.Errorf("failed to update start time for partition: %w", err) } @@ -587,7 +597,7 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context, }).Errorf("failed to pull records: %v", err) goroutineErr = err } else { - err = a.CatalogMirrorMonitor.UpdatePullEndTimeAndRowsForPartition(ctx, runUUID, partition, numRecords) + err = monitoring.UpdatePullEndTimeAndRowsForPartition(ctx, a.CatalogPool, runUUID, partition, numRecords) if err != nil { log.Errorf("%v", err) goroutineErr = err @@ -607,7 +617,7 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context, "flowName": config.FlowJobName, }).Infof("pulled %d records\n", len(recordBatch.Records)) - err = a.CatalogMirrorMonitor.UpdatePullEndTimeAndRowsForPartition(ctx, runUUID, partition, numRecords) + err = monitoring.UpdatePullEndTimeAndRowsForPartition(ctx, a.CatalogPool, runUUID, partition, numRecords) if err != nil { return err } @@ -645,7 +655,7 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context, }).Infof("pushed %d records\n", res) } - err = a.CatalogMirrorMonitor.UpdateEndTimeForPartition(ctx, runUUID, partition) + err = monitoring.UpdateEndTimeForPartition(ctx, a.CatalogPool, runUUID, partition) if err != nil { return err } @@ -657,7 +667,7 @@ func (a *FlowableActivity) ConsolidateQRepPartitions(ctx context.Context, config runUUID string) error { dstConn, err := connectors.GetQRepConsolidateConnector(ctx, config.DestinationPeer) if errors.Is(err, connectors.ErrUnsupportedFunctionality) { - return a.CatalogMirrorMonitor.UpdateEndTimeForQRepRun(ctx, runUUID) + return monitoring.UpdateEndTimeForQRepRun(ctx, a.CatalogPool, runUUID) } else if err != nil { return err } @@ -675,7 +685,7 @@ func (a *FlowableActivity) ConsolidateQRepPartitions(ctx context.Context, config return err } - return a.CatalogMirrorMonitor.UpdateEndTimeForQRepRun(ctx, runUUID) + return monitoring.UpdateEndTimeForQRepRun(ctx, a.CatalogPool, runUUID) } func (a *FlowableActivity) CleanupQRepFlow(ctx context.Context, config *protos.QRepConfig) error { @@ -713,12 +723,8 @@ func (a *FlowableActivity) DropFlow(ctx context.Context, config *protos.Shutdown return nil } -func getPostgresPeerConfigs(ctx context.Context) ([]*protos.Peer, error) { - catalogPool, catalogErr := catalog.GetCatalogConnectionPoolFromEnv() - if catalogErr != nil { - return nil, fmt.Errorf("error getting catalog connection pool: %w", catalogErr) - } - defer catalogPool.Close() +func (a *FlowableActivity) getPostgresPeerConfigs(ctx context.Context) ([]*protos.Peer, error) { + catalogPool := a.CatalogPool optionRows, err := catalogPool.Query(ctx, ` SELECT DISTINCT p.name, p.options @@ -762,7 +768,7 @@ func (a *FlowableActivity) SendWALHeartbeat(ctx context.Context) error { log.Info("context is done, exiting wal heartbeat send loop") return nil case <-ticker.C: - pgPeers, err := getPostgresPeerConfigs(ctx) + pgPeers, err := a.getPostgresPeerConfigs(ctx) if err != nil { log.Warn("[sendwalheartbeat]: warning: unable to fetch peers." + "Skipping walheartbeat send. error encountered: " + err.Error()) @@ -944,17 +950,17 @@ func (a *FlowableActivity) ReplicateXminPartition(ctx context.Context, }}, } } - updateErr := a.CatalogMirrorMonitor.InitializeQRepRun(ctx, config, runUUID, []*protos.QRepPartition{partitionForMetrics}) + updateErr := monitoring.InitializeQRepRun(ctx, a.CatalogPool, config, runUUID, []*protos.QRepPartition{partitionForMetrics}) if updateErr != nil { return updateErr } - err := a.CatalogMirrorMonitor.UpdateStartTimeForPartition(ctx, runUUID, partition, startTime) + err := monitoring.UpdateStartTimeForPartition(ctx, a.CatalogPool, runUUID, partition, startTime) if err != nil { return fmt.Errorf("failed to update start time for partition: %w", err) } - err = a.CatalogMirrorMonitor.UpdatePullEndTimeAndRowsForPartition(errCtx, runUUID, partition, int64(numRecords)) + err = monitoring.UpdatePullEndTimeAndRowsForPartition(errCtx, a.CatalogPool, runUUID, partition, int64(numRecords)) if err != nil { log.Errorf("%v", err) return err @@ -990,7 +996,7 @@ func (a *FlowableActivity) ReplicateXminPartition(ctx context.Context, }).Infof("pushed %d records\n", res) } - err = a.CatalogMirrorMonitor.UpdateEndTimeForPartition(ctx, runUUID, partition) + err = monitoring.UpdateEndTimeForPartition(ctx, a.CatalogPool, runUUID, partition) if err != nil { return 0, err } diff --git a/flow/cmd/api.go b/flow/cmd/api.go index 6715ff3c55..3da1286cfe 100644 --- a/flow/cmd/api.go +++ b/flow/cmd/api.go @@ -125,7 +125,6 @@ func APIMain(args *APIServerParams) error { } flowHandler := NewFlowRequestHandler(tc, catalogConn, taskQueue) - defer flowHandler.Close() err = killExistingHeartbeatFlows(ctx, tc, args.TemporalNamespace, taskQueue) if err != nil { diff --git a/flow/cmd/handler.go b/flow/cmd/handler.go index 40a8e9147f..d71f1e49a7 100644 --- a/flow/cmd/handler.go +++ b/flow/cmd/handler.go @@ -116,13 +116,6 @@ func (h *FlowRequestHandler) createQrepJobEntry(ctx context.Context, return nil } -// Close closes the connection pool -func (h *FlowRequestHandler) Close() { - if h.pool != nil { - h.pool.Close() - } -} - func (h *FlowRequestHandler) CreateCDCFlow( ctx context.Context, req *protos.CreateCDCFlowRequest) (*protos.CreateCDCFlowResponse, error) { cfg := req.ConnectionConfigs diff --git a/flow/cmd/worker.go b/flow/cmd/worker.go index f7431e52cc..a9e1281619 100644 --- a/flow/cmd/worker.go +++ b/flow/cmd/worker.go @@ -10,7 +10,6 @@ import ( "github.com/PeerDB-io/peer-flow/activities" utils "github.com/PeerDB-io/peer-flow/connectors/utils/catalog" - "github.com/PeerDB-io/peer-flow/connectors/utils/monitoring" "github.com/PeerDB-io/peer-flow/shared" peerflow "github.com/PeerDB-io/peer-flow/workflows" @@ -108,8 +107,6 @@ func WorkerMain(opts *WorkerOptions) error { if err != nil { return fmt.Errorf("unable to create catalog connection pool: %w", err) } - catalogMirrorMonitor := monitoring.NewCatalogMirrorMonitor(conn) - defer catalogMirrorMonitor.Close() c, err := client.Dial(clientOptions) if err != nil { @@ -134,7 +131,7 @@ func WorkerMain(opts *WorkerOptions) error { w.RegisterWorkflow(peerflow.DropFlowWorkflow) w.RegisterWorkflow(peerflow.HeartbeatFlowWorkflow) w.RegisterActivity(&activities.FlowableActivity{ - CatalogMirrorMonitor: catalogMirrorMonitor, + CatalogPool: conn, }) err = w.Run(worker.InterruptCh()) diff --git a/flow/connectors/bigquery/bigquery.go b/flow/connectors/bigquery/bigquery.go index 52b6479993..f1ad468c5b 100644 --- a/flow/connectors/bigquery/bigquery.go +++ b/flow/connectors/bigquery/bigquery.go @@ -199,7 +199,6 @@ func (c *BigQueryConnector) Close() error { if c == nil || c.client == nil { return nil } - c.catalogPool.Close() return c.client.Close() } diff --git a/flow/connectors/external_metadata/store.go b/flow/connectors/external_metadata/store.go index 3d4f9baf99..637432b7fe 100644 --- a/flow/connectors/external_metadata/store.go +++ b/flow/connectors/external_metadata/store.go @@ -54,7 +54,7 @@ func NewPostgresMetadataStore(ctx context.Context, pgConfig *protos.PostgresConf } func (p *PostgresMetadataStore) Close() error { - if p.pool != nil { + if p.config != nil && p.pool != nil { p.pool.Close() } diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index 394e8a9d79..eb59f1ab8d 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -246,13 +246,13 @@ func (c *PostgresConnector) PullRecords(req *model.PullRecordsRequest) error { return err } - cdcMirrorMonitor, ok := c.ctx.Value(shared.CDCMirrorMonitorKey).(*monitoring.CatalogMirrorMonitor) + catalogPool, ok := c.ctx.Value(shared.CDCMirrorMonitorKey).(*pgxpool.Pool) if ok { latestLSN, err := c.getCurrentLSN() if err != nil { return fmt.Errorf("failed to get current LSN: %w", err) } - err = cdcMirrorMonitor.UpdateLatestLSNAtSourceForCDCFlow(c.ctx, req.FlowJobName, latestLSN) + err = monitoring.UpdateLatestLSNAtSourceForCDCFlow(c.ctx, catalogPool, req.FlowJobName, latestLSN) if err != nil { return fmt.Errorf("failed to update latest LSN at source for CDC flow: %w", err) } diff --git a/flow/connectors/utils/catalog/env.go b/flow/connectors/utils/catalog/env.go index e1bd33d25a..0563f9201b 100644 --- a/flow/connectors/utils/catalog/env.go +++ b/flow/connectors/utils/catalog/env.go @@ -5,29 +5,37 @@ import ( "fmt" "os" "strconv" + "sync" "github.com/PeerDB-io/peer-flow/connectors/utils" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/jackc/pgx/v5/pgxpool" ) +var poolMutex = &sync.Mutex{} +var pool *pgxpool.Pool + func GetCatalogConnectionPoolFromEnv() (*pgxpool.Pool, error) { - catalogConnectionString, err := genCatalogConnectionString() - if err != nil { - return nil, fmt.Errorf("unable to generate catalog connection string: %w", err) - } + poolMutex.Lock() + defer poolMutex.Unlock() + if pool == nil { + catalogConnectionString, err := genCatalogConnectionString() + if err != nil { + return nil, fmt.Errorf("unable to generate catalog connection string: %w", err) + } - catalogConn, err := pgxpool.New(context.Background(), catalogConnectionString) - if err != nil { - return nil, fmt.Errorf("unable to establish connection with catalog: %w", err) + pool, err = pgxpool.New(context.Background(), catalogConnectionString) + if err != nil { + return nil, fmt.Errorf("unable to establish connection with catalog: %w", err) + } } - err = catalogConn.Ping(context.Background()) + err := pool.Ping(context.Background()) if err != nil { - return nil, fmt.Errorf("unable to establish connection with catalog: %w", err) + return pool, fmt.Errorf("unable to establish connection with catalog: %w", err) } - return catalogConn, nil + return pool, nil } func genCatalogConnectionString() (string, error) { diff --git a/flow/connectors/utils/monitoring/monitoring.go b/flow/connectors/utils/monitoring/monitoring.go index 06a6b4f125..41c634b874 100644 --- a/flow/connectors/utils/monitoring/monitoring.go +++ b/flow/connectors/utils/monitoring/monitoring.go @@ -14,10 +14,6 @@ import ( "google.golang.org/protobuf/proto" ) -type CatalogMirrorMonitor struct { - catalogConn *pgxpool.Pool -} - type CDCBatchInfo struct { BatchID int64 RowsInBatch uint32 @@ -26,29 +22,12 @@ type CDCBatchInfo struct { StartTime time.Time } -func NewCatalogMirrorMonitor(catalogConn *pgxpool.Pool) *CatalogMirrorMonitor { - return &CatalogMirrorMonitor{ - catalogConn: catalogConn, - } -} - -func (c *CatalogMirrorMonitor) IsActive() bool { - return !(c == nil || c.catalogConn == nil) -} - -func (c *CatalogMirrorMonitor) Close() { - if c == nil || c.catalogConn == nil { - return - } - c.catalogConn.Close() -} - -func (c *CatalogMirrorMonitor) InitializeCDCFlow(ctx context.Context, flowJobName string) error { - if c == nil || c.catalogConn == nil { +func InitializeCDCFlow(ctx context.Context, pool *pgxpool.Pool, flowJobName string) error { + if pool == nil { return nil } - _, err := c.catalogConn.Exec(ctx, + _, err := pool.Exec(ctx, `INSERT INTO peerdb_stats.cdc_flows(flow_name,latest_lsn_at_source,latest_lsn_at_target) VALUES($1,0,0) ON CONFLICT DO NOTHING`, flowJobName) if err != nil { @@ -57,13 +36,13 @@ func (c *CatalogMirrorMonitor) InitializeCDCFlow(ctx context.Context, flowJobNam return nil } -func (c *CatalogMirrorMonitor) UpdateLatestLSNAtSourceForCDCFlow(ctx context.Context, flowJobName string, +func UpdateLatestLSNAtSourceForCDCFlow(ctx context.Context, pool *pgxpool.Pool, flowJobName string, latestLSNAtSource pglogrepl.LSN) error { - if c == nil || c.catalogConn == nil { + if pool == nil { return nil } - _, err := c.catalogConn.Exec(ctx, + _, err := pool.Exec(ctx, "UPDATE peerdb_stats.cdc_flows SET latest_lsn_at_source=$1 WHERE flow_name=$2", uint64(latestLSNAtSource), flowJobName) if err != nil { @@ -72,13 +51,13 @@ func (c *CatalogMirrorMonitor) UpdateLatestLSNAtSourceForCDCFlow(ctx context.Con return nil } -func (c *CatalogMirrorMonitor) UpdateLatestLSNAtTargetForCDCFlow(ctx context.Context, flowJobName string, +func UpdateLatestLSNAtTargetForCDCFlow(ctx context.Context, pool *pgxpool.Pool, flowJobName string, latestLSNAtTarget pglogrepl.LSN) error { - if c == nil || c.catalogConn == nil { + if pool == nil { return nil } - _, err := c.catalogConn.Exec(ctx, + _, err := pool.Exec(ctx, "UPDATE peerdb_stats.cdc_flows SET latest_lsn_at_target=$1 WHERE flow_name=$2", uint64(latestLSNAtTarget), flowJobName) if err != nil { @@ -87,13 +66,13 @@ func (c *CatalogMirrorMonitor) UpdateLatestLSNAtTargetForCDCFlow(ctx context.Con return nil } -func (c *CatalogMirrorMonitor) AddCDCBatchForFlow(ctx context.Context, flowJobName string, +func AddCDCBatchForFlow(ctx context.Context, pool *pgxpool.Pool, flowJobName string, batchInfo CDCBatchInfo) error { - if c == nil || c.catalogConn == nil { + if pool == nil { return nil } - _, err := c.catalogConn.Exec(ctx, + _, err := pool.Exec(ctx, `INSERT INTO peerdb_stats.cdc_batches(flow_name,batch_id,rows_in_batch,batch_start_lsn,batch_end_lsn, start_time) VALUES($1,$2,$3,$4,$5,$6) ON CONFLICT DO NOTHING`, flowJobName, batchInfo.BatchID, batchInfo.RowsInBatch, @@ -105,18 +84,19 @@ func (c *CatalogMirrorMonitor) AddCDCBatchForFlow(ctx context.Context, flowJobNa } // update num records and end-lsn for a cdc batch -func (c *CatalogMirrorMonitor) UpdateNumRowsAndEndLSNForCDCBatch( +func UpdateNumRowsAndEndLSNForCDCBatch( ctx context.Context, + pool *pgxpool.Pool, flowJobName string, batchID int64, numRows uint32, batchEndLSN pglogrepl.LSN, ) error { - if c == nil || c.catalogConn == nil { + if pool == nil { return nil } - _, err := c.catalogConn.Exec(ctx, + _, err := pool.Exec(ctx, "UPDATE peerdb_stats.cdc_batches SET rows_in_batch=$1,batch_end_lsn=$2 WHERE flow_name=$3 AND batch_id=$4", numRows, uint64(batchEndLSN), flowJobName, batchID) if err != nil { @@ -125,16 +105,17 @@ func (c *CatalogMirrorMonitor) UpdateNumRowsAndEndLSNForCDCBatch( return nil } -func (c *CatalogMirrorMonitor) UpdateEndTimeForCDCBatch( +func UpdateEndTimeForCDCBatch( ctx context.Context, + pool *pgxpool.Pool, flowJobName string, batchID int64, ) error { - if c == nil || c.catalogConn == nil { + if pool == nil { return nil } - _, err := c.catalogConn.Exec(ctx, + _, err := pool.Exec(ctx, "UPDATE peerdb_stats.cdc_batches SET end_time=$1 WHERE flow_name=$2 AND batch_id=$3", time.Now(), flowJobName, batchID) if err != nil { @@ -143,13 +124,13 @@ func (c *CatalogMirrorMonitor) UpdateEndTimeForCDCBatch( return nil } -func (c *CatalogMirrorMonitor) AddCDCBatchTablesForFlow(ctx context.Context, flowJobName string, +func AddCDCBatchTablesForFlow(ctx context.Context, pool *pgxpool.Pool, flowJobName string, batchID int64, tableNameRowsMapping map[string]uint32) error { - if c == nil || c.catalogConn == nil { + if pool == nil { return nil } - insertBatchTablesTx, err := c.catalogConn.Begin(ctx) + insertBatchTablesTx, err := pool.Begin(ctx) if err != nil { return fmt.Errorf("error while beginning transaction for inserting statistics into cdc_batch_table: %w", err) } @@ -177,18 +158,19 @@ func (c *CatalogMirrorMonitor) AddCDCBatchTablesForFlow(ctx context.Context, flo return nil } -func (c *CatalogMirrorMonitor) InitializeQRepRun( +func InitializeQRepRun( ctx context.Context, + pool *pgxpool.Pool, config *protos.QRepConfig, runUUID string, partitions []*protos.QRepPartition, ) error { - if c == nil || c.catalogConn == nil { + if pool == nil { return nil } flowJobName := config.GetFlowJobName() - _, err := c.catalogConn.Exec(ctx, + _, err := pool.Exec(ctx, "INSERT INTO peerdb_stats.qrep_runs(flow_name,run_uuid) VALUES($1,$2) ON CONFLICT DO NOTHING", flowJobName, runUUID) if err != nil { @@ -200,7 +182,7 @@ func (c *CatalogMirrorMonitor) InitializeQRepRun( return fmt.Errorf("unable to marshal flow config: %w", err) } - _, err = c.catalogConn.Exec(ctx, + _, err = pool.Exec(ctx, "UPDATE peerdb_stats.qrep_runs SET config_proto = $1 WHERE flow_name = $2", cfgBytes, flowJobName) if err != nil { @@ -208,7 +190,7 @@ func (c *CatalogMirrorMonitor) InitializeQRepRun( } for _, partition := range partitions { - if err := c.addPartitionToQRepRun(ctx, flowJobName, runUUID, partition); err != nil { + if err := addPartitionToQRepRun(ctx, pool, flowJobName, runUUID, partition); err != nil { return fmt.Errorf("unable to add partition to qrep run: %w", err) } } @@ -216,12 +198,12 @@ func (c *CatalogMirrorMonitor) InitializeQRepRun( return nil } -func (c *CatalogMirrorMonitor) UpdateStartTimeForQRepRun(ctx context.Context, runUUID string) error { - if c == nil || c.catalogConn == nil { +func UpdateStartTimeForQRepRun(ctx context.Context, pool *pgxpool.Pool, runUUID string) error { + if pool == nil { return nil } - _, err := c.catalogConn.Exec(ctx, + _, err := pool.Exec(ctx, "UPDATE peerdb_stats.qrep_runs SET start_time=$1 WHERE run_uuid=$2", time.Now(), runUUID) if err != nil { @@ -231,12 +213,12 @@ func (c *CatalogMirrorMonitor) UpdateStartTimeForQRepRun(ctx context.Context, ru return nil } -func (c *CatalogMirrorMonitor) UpdateEndTimeForQRepRun(ctx context.Context, runUUID string) error { - if c == nil || c.catalogConn == nil { +func UpdateEndTimeForQRepRun(ctx context.Context, pool *pgxpool.Pool, runUUID string) error { + if pool == nil { return nil } - _, err := c.catalogConn.Exec(ctx, + _, err := pool.Exec(ctx, "UPDATE peerdb_stats.qrep_runs SET end_time=$1 WHERE run_uuid=$2", time.Now(), runUUID) if err != nil { @@ -246,16 +228,17 @@ func (c *CatalogMirrorMonitor) UpdateEndTimeForQRepRun(ctx context.Context, runU return nil } -func (c *CatalogMirrorMonitor) AppendSlotSizeInfo( +func AppendSlotSizeInfo( ctx context.Context, + pool *pgxpool.Pool, peerName string, slotInfo *protos.SlotInfo, ) error { - if c == nil || c.catalogConn == nil || slotInfo == nil { + if pool == nil || slotInfo == nil { return nil } - _, err := c.catalogConn.Exec(ctx, + _, err := pool.Exec(ctx, "INSERT INTO peerdb_stats.peer_slot_size"+ "(peer_name, slot_name, restart_lsn, redo_lsn, confirmed_flush_lsn, slot_size, wal_status) "+ "VALUES($1,$2,$3,$4,$5,$6) ON CONFLICT DO NOTHING;", @@ -274,9 +257,9 @@ func (c *CatalogMirrorMonitor) AppendSlotSizeInfo( return nil } -func (c *CatalogMirrorMonitor) addPartitionToQRepRun(ctx context.Context, flowJobName string, +func addPartitionToQRepRun(ctx context.Context, pool *pgxpool.Pool, flowJobName string, runUUID string, partition *protos.QRepPartition) error { - if c == nil || c.catalogConn == nil { + if pool == nil { return nil } @@ -317,7 +300,7 @@ func (c *CatalogMirrorMonitor) addPartitionToQRepRun(ctx context.Context, flowJo return fmt.Errorf("unknown range type: %v", x) } - _, err := c.catalogConn.Exec(ctx, + _, err := pool.Exec(ctx, `INSERT INTO peerdb_stats.qrep_partitions (flow_name,run_uuid,partition_uuid,partition_start,partition_end,restart_count) VALUES($1,$2,$3,$4,$5,$6) ON CONFLICT(run_uuid,partition_uuid) DO UPDATE SET @@ -330,17 +313,18 @@ func (c *CatalogMirrorMonitor) addPartitionToQRepRun(ctx context.Context, flowJo return nil } -func (c *CatalogMirrorMonitor) UpdateStartTimeForPartition( +func UpdateStartTimeForPartition( ctx context.Context, + pool *pgxpool.Pool, runUUID string, partition *protos.QRepPartition, startTime time.Time, ) error { - if c == nil || c.catalogConn == nil { + if pool == nil { return nil } - _, err := c.catalogConn.Exec(ctx, `UPDATE peerdb_stats.qrep_partitions SET start_time=$1 + _, err := pool.Exec(ctx, `UPDATE peerdb_stats.qrep_partitions SET start_time=$1 WHERE run_uuid=$2 AND partition_uuid=$3`, startTime, runUUID, partition.PartitionId) if err != nil { return fmt.Errorf("error while updating qrep partition in qrep_partitions: %w", err) @@ -348,13 +332,13 @@ func (c *CatalogMirrorMonitor) UpdateStartTimeForPartition( return nil } -func (c *CatalogMirrorMonitor) UpdatePullEndTimeAndRowsForPartition(ctx context.Context, runUUID string, +func UpdatePullEndTimeAndRowsForPartition(ctx context.Context, pool *pgxpool.Pool, runUUID string, partition *protos.QRepPartition, rowsInPartition int64) error { - if c == nil || c.catalogConn == nil { + if pool == nil { return nil } - _, err := c.catalogConn.Exec(ctx, `UPDATE peerdb_stats.qrep_partitions SET pull_end_time=$1,rows_in_partition=$2 + _, err := pool.Exec(ctx, `UPDATE peerdb_stats.qrep_partitions SET pull_end_time=$1,rows_in_partition=$2 WHERE run_uuid=$3 AND partition_uuid=$4`, time.Now(), rowsInPartition, runUUID, partition.PartitionId) if err != nil { return fmt.Errorf("error while updating qrep partition in qrep_partitions: %w", err) @@ -362,13 +346,13 @@ func (c *CatalogMirrorMonitor) UpdatePullEndTimeAndRowsForPartition(ctx context. return nil } -func (c *CatalogMirrorMonitor) UpdateEndTimeForPartition(ctx context.Context, runUUID string, +func UpdateEndTimeForPartition(ctx context.Context, pool *pgxpool.Pool, runUUID string, partition *protos.QRepPartition) error { - if c == nil || c.catalogConn == nil { + if pool == nil { return nil } - _, err := c.catalogConn.Exec(ctx, `UPDATE peerdb_stats.qrep_partitions SET end_time=$1 + _, err := pool.Exec(ctx, `UPDATE peerdb_stats.qrep_partitions SET end_time=$1 WHERE run_uuid=$2 AND partition_uuid=$3`, time.Now(), runUUID, partition.PartitionId) if err != nil { return fmt.Errorf("error while updating qrep partition in qrep_partitions: %w", err) diff --git a/flow/e2e/bigquery/peer_flow_bq_test.go b/flow/e2e/bigquery/peer_flow_bq_test.go index 559f03ba62..50ddfa43c4 100644 --- a/flow/e2e/bigquery/peer_flow_bq_test.go +++ b/flow/e2e/bigquery/peer_flow_bq_test.go @@ -117,7 +117,7 @@ func (s PeerFlowE2ETestSuiteBQ) tearDownSuite() { func (s PeerFlowE2ETestSuiteBQ) Test_Invalid_Connection_Config() { env := e2e.NewTemporalTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env) + e2e.RegisterWorkflowsAndActivities(env, s.t) // TODO (kaushikiska): ensure flow name can only be alpha numeric and underscores. limits := peerflow.CDCFlowLimits{ @@ -139,7 +139,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Invalid_Connection_Config() { func (s PeerFlowE2ETestSuiteBQ) Test_Complete_Flow_No_Data() { env := e2e.NewTemporalTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env) + e2e.RegisterWorkflowsAndActivities(env, s.t) srcTableName := s.attachSchemaSuffix("test_no_data") dstTableName := "test_no_data" @@ -183,7 +183,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Complete_Flow_No_Data() { func (s PeerFlowE2ETestSuiteBQ) Test_Char_ColType_Error() { env := e2e.NewTemporalTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env) + e2e.RegisterWorkflowsAndActivities(env, s.t) srcTableName := s.attachSchemaSuffix("test_char_coltype") dstTableName := "test_char_coltype" @@ -230,7 +230,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Char_ColType_Error() { // correctly synced to the destination table after sync flow completes. func (s PeerFlowE2ETestSuiteBQ) Test_Complete_Simple_Flow_BQ() { env := e2e.NewTemporalTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env) + e2e.RegisterWorkflowsAndActivities(env, s.t) srcTableName := s.attachSchemaSuffix("test_simple_flow_bq") dstTableName := "test_simple_flow_bq" @@ -297,7 +297,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Complete_Simple_Flow_BQ() { func (s PeerFlowE2ETestSuiteBQ) Test_Toast_BQ() { env := e2e.NewTemporalTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env) + e2e.RegisterWorkflowsAndActivities(env, s.t) srcTableName := s.attachSchemaSuffix("test_toast_bq_1") dstTableName := "test_toast_bq_1" @@ -365,7 +365,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_BQ() { func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Nochanges_BQ() { env := e2e.NewTemporalTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env) + e2e.RegisterWorkflowsAndActivities(env, s.t) srcTableName := s.attachSchemaSuffix("test_toast_bq_2") dstTableName := "test_toast_bq_2" @@ -426,7 +426,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Nochanges_BQ() { func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_1_BQ() { env := e2e.NewTemporalTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env) + e2e.RegisterWorkflowsAndActivities(env, s.t) srcTableName := s.attachSchemaSuffix("test_toast_bq_3") dstTableName := "test_toast_bq_3" @@ -500,7 +500,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_1_BQ() { func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_2_BQ() { env := e2e.NewTemporalTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env) + e2e.RegisterWorkflowsAndActivities(env, s.t) srcTableName := s.attachSchemaSuffix("test_toast_bq_4") dstTableName := "test_toast_bq_4" @@ -567,7 +567,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_2_BQ() { func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_3_BQ() { env := e2e.NewTemporalTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env) + e2e.RegisterWorkflowsAndActivities(env, s.t) srcTableName := s.attachSchemaSuffix("test_toast_bq_5") dstTableName := "test_toast_bq_5" @@ -634,7 +634,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_3_BQ() { func (s PeerFlowE2ETestSuiteBQ) Test_Types_BQ() { env := e2e.NewTemporalTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env) + e2e.RegisterWorkflowsAndActivities(env, s.t) srcTableName := s.attachSchemaSuffix("test_types_bq") dstTableName := "test_types_bq" @@ -712,7 +712,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Types_BQ() { func (s PeerFlowE2ETestSuiteBQ) Test_Multi_Table_BQ() { env := e2e.NewTemporalTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env) + e2e.RegisterWorkflowsAndActivities(env, s.t) srcTable1Name := s.attachSchemaSuffix("test1_bq") dstTable1Name := "test1_bq" @@ -774,7 +774,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Multi_Table_BQ() { // TODO: not checking schema exactly, add later func (s PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() { env := e2e.NewTemporalTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env) + e2e.RegisterWorkflowsAndActivities(env, s.t) srcTableName := s.attachSchemaSuffix("test_simple_schema_changes") dstTableName := "test_simple_schema_changes" @@ -874,7 +874,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() { func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_BQ() { env := e2e.NewTemporalTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env) + e2e.RegisterWorkflowsAndActivities(env, s.t) srcTableName := s.attachSchemaSuffix("test_simple_cpkey") dstTableName := "test_simple_cpkey" @@ -947,7 +947,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_BQ() { func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_1_BQ() { env := e2e.NewTemporalTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env) + e2e.RegisterWorkflowsAndActivities(env, s.t) srcTableName := s.attachSchemaSuffix("test_cpkey_toast1") dstTableName := "test_cpkey_toast1" @@ -1024,7 +1024,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_1_BQ() { func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_2_BQ() { env := e2e.NewTemporalTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env) + e2e.RegisterWorkflowsAndActivities(env, s.t) srcTableName := s.attachSchemaSuffix("test_cpkey_toast2") dstTableName := "test_cpkey_toast2" diff --git a/flow/e2e/bigquery/qrep_flow_bq_test.go b/flow/e2e/bigquery/qrep_flow_bq_test.go index c12634cec8..f520014b04 100644 --- a/flow/e2e/bigquery/qrep_flow_bq_test.go +++ b/flow/e2e/bigquery/qrep_flow_bq_test.go @@ -48,7 +48,7 @@ func (s PeerFlowE2ETestSuiteBQ) compareTableContentsBQ(tableName string, colsStr func (s PeerFlowE2ETestSuiteBQ) Test_Complete_QRep_Flow_Avro() { env := e2e.NewTemporalTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env) + e2e.RegisterWorkflowsAndActivities(env, s.t) numRows := 10 diff --git a/flow/e2e/postgres/peer_flow_pg_test.go b/flow/e2e/postgres/peer_flow_pg_test.go index 3f5718bb72..2720891fb6 100644 --- a/flow/e2e/postgres/peer_flow_pg_test.go +++ b/flow/e2e/postgres/peer_flow_pg_test.go @@ -20,7 +20,7 @@ func (s *PeerFlowE2ETestSuitePG) attachSuffix(input string) string { func (s *PeerFlowE2ETestSuitePG) Test_Simple_Flow_PG() { env := s.NewTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env) + e2e.RegisterWorkflowsAndActivities(env, s.T()) srcTableName := s.attachSchemaSuffix("test_simple_flow") dstTableName := s.attachSchemaSuffix("test_simple_flow_dst") @@ -83,7 +83,7 @@ func (s *PeerFlowE2ETestSuitePG) Test_Simple_Flow_PG() { func (s *PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() { env := s.NewTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env) + e2e.RegisterWorkflowsAndActivities(env, s.T()) srcTableName := s.attachSchemaSuffix("test_simple_schema_changes") dstTableName := s.attachSchemaSuffix("test_simple_schema_changes_dst") @@ -244,7 +244,7 @@ func (s *PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() { func (s *PeerFlowE2ETestSuitePG) Test_Composite_PKey_PG() { env := s.NewTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env) + e2e.RegisterWorkflowsAndActivities(env, s.T()) srcTableName := s.attachSchemaSuffix("test_simple_cpkey") dstTableName := s.attachSchemaSuffix("test_simple_cpkey_dst") @@ -319,7 +319,7 @@ func (s *PeerFlowE2ETestSuitePG) Test_Composite_PKey_PG() { func (s *PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_1_PG() { env := s.NewTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env) + e2e.RegisterWorkflowsAndActivities(env, s.T()) srcTableName := s.attachSchemaSuffix("test_cpkey_toast1") dstTableName := s.attachSchemaSuffix("test_cpkey_toast1_dst") @@ -400,7 +400,7 @@ func (s *PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_1_PG() { func (s *PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_2_PG() { env := s.NewTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env) + e2e.RegisterWorkflowsAndActivities(env, s.T()) srcTableName := s.attachSchemaSuffix("test_cpkey_toast2") dstTableName := s.attachSchemaSuffix("test_cpkey_toast2_dst") diff --git a/flow/e2e/postgres/qrep_flow_pg_test.go b/flow/e2e/postgres/qrep_flow_pg_test.go index c9b82a4c6c..b5a098e004 100644 --- a/flow/e2e/postgres/qrep_flow_pg_test.go +++ b/flow/e2e/postgres/qrep_flow_pg_test.go @@ -138,7 +138,7 @@ func (s *PeerFlowE2ETestSuitePG) compareQuery(srcSchemaQualified, dstSchemaQuali func (s *PeerFlowE2ETestSuitePG) Test_Complete_QRep_Flow_Multi_Insert_PG() { env := s.NewTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env) + e2e.RegisterWorkflowsAndActivities(env, s.t) numRows := 10 diff --git a/flow/e2e/s3/cdc_s3_test.go b/flow/e2e/s3/cdc_s3_test.go index 85993e2fe8..478877ef4b 100644 --- a/flow/e2e/s3/cdc_s3_test.go +++ b/flow/e2e/s3/cdc_s3_test.go @@ -20,7 +20,7 @@ func (s *PeerFlowE2ETestSuiteS3) attachSuffix(input string) string { func (s *PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_S3() { env := s.NewTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env) + e2e.RegisterWorkflowsAndActivities(env, s.T()) srcTableName := s.attachSchemaSuffix("test_simple_flow_s3") dstTableName := fmt.Sprintf("%s.%s", "peerdb_test_s3", "test_simple_flow_s3") @@ -88,7 +88,7 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_S3() { func (s *PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_GCS_Interop() { env := s.NewTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env) + e2e.RegisterWorkflowsAndActivities(env, s.T()) setupErr := s.setupS3("gcs") if setupErr != nil { s.Fail("failed to setup S3", setupErr) diff --git a/flow/e2e/s3/qrep_flow_s3_test.go b/flow/e2e/s3/qrep_flow_s3_test.go index d807cf6335..e9f9f5795f 100644 --- a/flow/e2e/s3/qrep_flow_s3_test.go +++ b/flow/e2e/s3/qrep_flow_s3_test.go @@ -93,7 +93,7 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_QRep_Flow_S3() { } env := s.NewTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env) + e2e.RegisterWorkflowsAndActivities(env, s.T()) jobName := "test_complete_flow_s3" schemaQualifiedName := fmt.Sprintf("e2e_test_%s.%s", s3Suffix, jobName) @@ -140,7 +140,7 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_QRep_Flow_S3_CTID() { } env := s.NewTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env) + e2e.RegisterWorkflowsAndActivities(env, s.t) jobName := "test_complete_flow_s3_ctid" schemaQualifiedName := fmt.Sprintf("e2e_test_%s.%s", s3Suffix, jobName) diff --git a/flow/e2e/snowflake/peer_flow_sf_test.go b/flow/e2e/snowflake/peer_flow_sf_test.go index 759ab34b05..1960c03ae5 100644 --- a/flow/e2e/snowflake/peer_flow_sf_test.go +++ b/flow/e2e/snowflake/peer_flow_sf_test.go @@ -128,7 +128,7 @@ func (s PeerFlowE2ETestSuiteSF) tearDownSuite() { func (s PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF() { env := e2e.NewTemporalTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env) + e2e.RegisterWorkflowsAndActivities(env, s.t) srcTableName := s.attachSchemaSuffix("test_simple_flow_sf") dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "test_simple_flow_sf") @@ -203,7 +203,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF() { func (s PeerFlowE2ETestSuiteSF) Test_Invalid_Geo_SF_Avro_CDC() { env := e2e.NewTemporalTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env) + e2e.RegisterWorkflowsAndActivities(env, s.t) srcTableName := s.attachSchemaSuffix("test_invalid_geo_sf_avro_cdc") dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "test_invalid_geo_sf_avro_cdc") @@ -288,7 +288,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Invalid_Geo_SF_Avro_CDC() { func (s PeerFlowE2ETestSuiteSF) Test_Toast_SF() { env := e2e.NewTemporalTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env) + e2e.RegisterWorkflowsAndActivities(env, s.t) srcTableName := s.attachSchemaSuffix("test_toast_sf_1") dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "test_toast_sf_1") @@ -355,7 +355,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Toast_SF() { func (s PeerFlowE2ETestSuiteSF) Test_Toast_Nochanges_SF() { env := e2e.NewTemporalTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env) + e2e.RegisterWorkflowsAndActivities(env, s.t) srcTableName := s.attachSchemaSuffix("test_toast_sf_2") dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "test_toast_sf_2") @@ -424,7 +424,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Toast_Nochanges_SF() { func (s PeerFlowE2ETestSuiteSF) Test_Toast_Advance_1_SF() { env := e2e.NewTemporalTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env) + e2e.RegisterWorkflowsAndActivities(env, s.t) srcTableName := s.attachSchemaSuffix("test_toast_sf_3") dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "test_toast_sf_3") @@ -497,7 +497,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Toast_Advance_1_SF() { func (s PeerFlowE2ETestSuiteSF) Test_Toast_Advance_2_SF() { env := e2e.NewTemporalTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env) + e2e.RegisterWorkflowsAndActivities(env, s.t) srcTableName := s.attachSchemaSuffix("test_toast_sf_4") dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "test_toast_sf_4") @@ -563,7 +563,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Toast_Advance_2_SF() { func (s PeerFlowE2ETestSuiteSF) Test_Toast_Advance_3_SF() { env := e2e.NewTemporalTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env) + e2e.RegisterWorkflowsAndActivities(env, s.t) srcTableName := s.attachSchemaSuffix("test_toast_sf_5") dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "test_toast_sf_5") @@ -629,7 +629,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Toast_Advance_3_SF() { func (s PeerFlowE2ETestSuiteSF) Test_Types_SF() { env := e2e.NewTemporalTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env) + e2e.RegisterWorkflowsAndActivities(env, s.t) srcTableName := s.attachSchemaSuffix("test_types_sf") dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "test_types_sf") @@ -706,7 +706,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Types_SF() { func (s PeerFlowE2ETestSuiteSF) Test_Multi_Table_SF() { env := e2e.NewTemporalTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env) + e2e.RegisterWorkflowsAndActivities(env, s.t) srcTable1Name := s.attachSchemaSuffix("test1_sf") srcTable2Name := s.attachSchemaSuffix("test2_sf") @@ -765,7 +765,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Multi_Table_SF() { func (s PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { env := e2e.NewTemporalTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env) + e2e.RegisterWorkflowsAndActivities(env, s.t) srcTableName := s.attachSchemaSuffix("test_simple_schema_changes") dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "test_simple_schema_changes") @@ -925,7 +925,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_SF() { env := e2e.NewTemporalTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env) + e2e.RegisterWorkflowsAndActivities(env, s.t) srcTableName := s.attachSchemaSuffix("test_simple_cpkey") dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "test_simple_cpkey") @@ -998,7 +998,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_SF() { func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_1_SF() { env := e2e.NewTemporalTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env) + e2e.RegisterWorkflowsAndActivities(env, s.t) srcTableName := s.attachSchemaSuffix("test_cpkey_toast1") dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "test_cpkey_toast1") @@ -1074,7 +1074,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_1_SF() { func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_2_SF() { env := e2e.NewTemporalTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env) + e2e.RegisterWorkflowsAndActivities(env, s.t) srcTableName := s.attachSchemaSuffix("test_cpkey_toast2") dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "test_cpkey_toast2") @@ -1146,7 +1146,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_2_SF() { func (s PeerFlowE2ETestSuiteSF) Test_Column_Exclusion() { env := e2e.NewTemporalTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env) + e2e.RegisterWorkflowsAndActivities(env, s.t) srcTableName := s.attachSchemaSuffix("test_exclude_sf") dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "test_exclude_sf") @@ -1228,7 +1228,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Column_Exclusion() { func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_Basic() { env := e2e.NewTemporalTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env) + e2e.RegisterWorkflowsAndActivities(env, s.t) cmpTableName := s.attachSchemaSuffix("test_softdel") srcTableName := fmt.Sprintf("%s_src", cmpTableName) @@ -1314,7 +1314,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_Basic() { func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_IUD_Same_Batch() { env := e2e.NewTemporalTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env) + e2e.RegisterWorkflowsAndActivities(env, s.t) cmpTableName := s.attachSchemaSuffix("test_softdel_iud") srcTableName := fmt.Sprintf("%s_src", cmpTableName) @@ -1396,7 +1396,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_IUD_Same_Batch() { func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_UD_Same_Batch() { env := e2e.NewTemporalTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env) + e2e.RegisterWorkflowsAndActivities(env, s.t) cmpTableName := s.attachSchemaSuffix("test_softdel_ud") srcTableName := fmt.Sprintf("%s_src", cmpTableName) @@ -1482,7 +1482,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_UD_Same_Batch() { func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_Insert_After_Delete() { env := e2e.NewTemporalTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env) + e2e.RegisterWorkflowsAndActivities(env, s.t) srcTableName := s.attachSchemaSuffix("test_softdel_iad") dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "test_softdel_iad") diff --git a/flow/e2e/snowflake/qrep_flow_sf_test.go b/flow/e2e/snowflake/qrep_flow_sf_test.go index 81884d2e3d..a541be9283 100644 --- a/flow/e2e/snowflake/qrep_flow_sf_test.go +++ b/flow/e2e/snowflake/qrep_flow_sf_test.go @@ -58,7 +58,7 @@ func (s PeerFlowE2ETestSuiteSF) compareTableContentsSF(tableName string, selecto func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF() { env := e2e.NewTemporalTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env) + e2e.RegisterWorkflowsAndActivities(env, s.t) numRows := 10 @@ -97,7 +97,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF() { func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_Upsert_Simple() { env := e2e.NewTemporalTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env) + e2e.RegisterWorkflowsAndActivities(env, s.t) numRows := 10 @@ -140,7 +140,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_Upsert_Simple() func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_S3() { env := e2e.NewTemporalTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env) + e2e.RegisterWorkflowsAndActivities(env, s.t) numRows := 10 @@ -180,7 +180,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_S3() { func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_Upsert_XMIN() { env := e2e.NewTemporalTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env) + e2e.RegisterWorkflowsAndActivities(env, s.t) numRows := 10 @@ -224,7 +224,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_Upsert_XMIN() { func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_S3_Integration() { env := e2e.NewTemporalTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env) + e2e.RegisterWorkflowsAndActivities(env, s.t) numRows := 10 diff --git a/flow/e2e/sqlserver/qrep_flow_sqlserver_test.go b/flow/e2e/sqlserver/qrep_flow_sqlserver_test.go index 013685a364..2e1acbc9ef 100644 --- a/flow/e2e/sqlserver/qrep_flow_sqlserver_test.go +++ b/flow/e2e/sqlserver/qrep_flow_sqlserver_test.go @@ -137,7 +137,7 @@ func (s *PeerFlowE2ETestSuiteSQLServer) Test_Complete_QRep_Flow_SqlServer_Append } env := s.NewTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env) + e2e.RegisterWorkflowsAndActivities(env, s.t) numRows := 10 tblName := "test_qrep_flow_avro_ss_append" diff --git a/flow/e2e/test_utils.go b/flow/e2e/test_utils.go index c16040062a..b781107a23 100644 --- a/flow/e2e/test_utils.go +++ b/flow/e2e/test_utils.go @@ -7,9 +7,11 @@ import ( "io" "os" "strings" + "testing" "time" "github.com/PeerDB-io/peer-flow/activities" + utils "github.com/PeerDB-io/peer-flow/connectors/utils/catalog" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/model/qvalue" @@ -39,7 +41,13 @@ func ReadFileToBytes(path string) ([]byte, error) { return ret, nil } -func RegisterWorkflowsAndActivities(env *testsuite.TestWorkflowEnvironment) { +func RegisterWorkflowsAndActivities(env *testsuite.TestWorkflowEnvironment, t *testing.T) { + conn, err := utils.GetCatalogConnectionPoolFromEnv() + if err != nil { + t.Errorf("unable to create catalog connection pool: %w", err) + t.FailNow() + } + // set a 300 second timeout for the workflow to execute a few runs. env.SetTestTimeout(300 * time.Second) @@ -51,7 +59,7 @@ func RegisterWorkflowsAndActivities(env *testsuite.TestWorkflowEnvironment) { env.RegisterWorkflow(peerflow.QRepFlowWorkflow) env.RegisterWorkflow(peerflow.XminFlowWorkflow) env.RegisterWorkflow(peerflow.QRepPartitionWorkflow) - env.RegisterActivity(&activities.FlowableActivity{}) + env.RegisterActivity(&activities.FlowableActivity{CatalogPool: conn}) env.RegisterActivity(&activities.SnapshotActivity{}) }