From 53235aa0004d460d165045ef7677b4835e18a997 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Tue, 5 Dec 2023 01:10:58 +0000 Subject: [PATCH] Xmin replication to account for wrap-around (#747) QRep based xmin replication has a fault: xmin isn't monotonic & has wraparound issues Wraparound workaround: grab all records with `0 < age(xmin) <= age(last snapshot xmin)` which'll include some records from before wraparound but so it goes Logic is copied from qrep code --------- Co-authored-by: phdub Co-authored-by: Amogh-Bharadwaj --- flow/activities/flowable.go | 100 +++++- flow/cmd/handler.go | 27 +- flow/cmd/worker.go | 1 + flow/connectors/core.go | 2 +- flow/connectors/postgres/qrep.go | 83 +++-- .../postgres/qrep_query_executor.go | 48 +++ flow/e2e/snowflake/qrep_flow_sf_test.go | 2 +- flow/workflows/xmin_flow.go | 288 ++++++++++++++++++ ui/app/mirrors/create/handlers.ts | 2 +- 9 files changed, 490 insertions(+), 63 deletions(-) create mode 100644 flow/workflows/xmin_flow.go diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index c57bd29f92..33c944f993 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -467,7 +467,7 @@ func (a *FlowableActivity) GetQRepPartitions(ctx context.Context, }, nil } -// ReplicateQRepPartition replicates a QRepPartition from the source to the destination. +// ReplicateQRepPartitions spawns multiple ReplicateQRepPartition func (a *FlowableActivity) ReplicateQRepPartitions(ctx context.Context, config *protos.QRepConfig, partitions *protos.QRepPartitionBatch, @@ -537,11 +537,12 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context, "flowName": config.FlowJobName, }).Errorf("failed to pull records: %v", err) goroutineErr = err - } - err = a.CatalogMirrorMonitor.UpdatePullEndTimeAndRowsForPartition(ctx, runUUID, partition, numRecords) - if err != nil { - log.Errorf("%v", err) - goroutineErr = err + } else { + err = a.CatalogMirrorMonitor.UpdatePullEndTimeAndRowsForPartition(ctx, runUUID, partition, numRecords) + if err != nil { + log.Errorf("%v", err) + goroutineErr = err + } } wg.Done() } @@ -845,3 +846,90 @@ func (a *FlowableActivity) CreateTablesFromExisting(ctx context.Context, req *pr } return nil, fmt.Errorf("create tables from existing is only supported on snowflake and bigquery") } + +// ReplicateXminPartition replicates a XminPartition from the source to the destination. +func (a *FlowableActivity) ReplicateXminPartition(ctx context.Context, + config *protos.QRepConfig, + partition *protos.QRepPartition, + runUUID string, +) (int64, error) { + err := a.CatalogMirrorMonitor.UpdateStartTimeForPartition(ctx, runUUID, partition) + if err != nil { + return 0, fmt.Errorf("failed to update start time for partition: %w", err) + } + + srcConn, err := connectors.GetQRepPullConnector(ctx, config.SourcePeer) + if err != nil { + return 0, fmt.Errorf("failed to get qrep source connector: %w", err) + } + defer connectors.CloseConnector(srcConn) + + dstConn, err := connectors.GetQRepSyncConnector(ctx, config.DestinationPeer) + if err != nil { + return 0, fmt.Errorf("failed to get qrep destination connector: %w", err) + } + defer connectors.CloseConnector(dstConn) + + log.WithFields(log.Fields{ + "flowName": config.FlowJobName, + }).Info("replicating xmin\n") + + bufferSize := shared.FetchAndChannelSize + errGroup, errCtx := errgroup.WithContext(ctx) + + stream := model.NewQRecordStream(bufferSize) + + var currentSnapshotXmin int64 + errGroup.Go(func() error { + pgConn := srcConn.(*connpostgres.PostgresConnector) + var pullErr error + var numRecords int + numRecords, currentSnapshotXmin, pullErr = pgConn.PullXminRecordStream(config, partition, stream) + if pullErr != nil { + log.WithFields(log.Fields{ + "flowName": config.FlowJobName, + }).Errorf("failed to pull records: %v", err) + return err + } + err = a.CatalogMirrorMonitor.UpdatePullEndTimeAndRowsForPartition(errCtx, runUUID, partition, int64(numRecords)) + if err != nil { + log.Errorf("%v", err) + return err + } + return nil + }) + + shutdown := utils.HeartbeatRoutine(ctx, 5*time.Minute, func() string { + return "syncing xmin." + }) + + defer func() { + shutdown <- true + }() + + res, err := dstConn.SyncQRepRecords(config, partition, stream) + if err != nil { + return 0, fmt.Errorf("failed to sync records: %w", err) + } + + if res == 0 { + log.WithFields(log.Fields{ + "flowName": config.FlowJobName, + }).Info("no records to push for xmin\n") + } else { + err := errGroup.Wait() + if err != nil { + return 0, err + } + log.WithFields(log.Fields{ + "flowName": config.FlowJobName, + }).Infof("pushed %d records\n", res) + } + + err = a.CatalogMirrorMonitor.UpdateEndTimeForPartition(ctx, runUUID, partition) + if err != nil { + return 0, err + } + + return currentSnapshotXmin, nil +} diff --git a/flow/cmd/handler.go b/flow/cmd/handler.go index 14a9603125..349501c306 100644 --- a/flow/cmd/handler.go +++ b/flow/cmd/handler.go @@ -3,6 +3,7 @@ package main import ( "context" "fmt" + "strconv" "strings" "time" @@ -248,13 +249,25 @@ func (h *FlowRequestHandler) CreateQRepFlow( } state := peerflow.NewQRepFlowState() - _, err := h.temporalClient.ExecuteWorkflow( - ctx, // context - workflowOptions, // workflow start options - peerflow.QRepFlowWorkflow, // workflow function - cfg, // workflow input - state, - ) + preColon, postColon, hasColon := strings.Cut(cfg.WatermarkColumn, "::") + var workflowFn interface{} + if cfg.SourcePeer.Type == protos.DBType_POSTGRES && + preColon == "xmin" { + state.LastPartition.PartitionId = uuid.New().String() + if hasColon { + // hack to facilitate migrating from existing xmin sync + txid, err := strconv.ParseInt(postColon, 10, 64) + if err != nil { + return nil, fmt.Errorf("invalid xmin txid for xmin rep: %w", err) + } + state.LastPartition.Range = &protos.PartitionRange{Range: &protos.PartitionRange_IntRange{IntRange: &protos.IntPartitionRange{Start: txid}}} + } + + workflowFn = peerflow.XminFlowWorkflow + } else { + workflowFn = peerflow.QRepFlowWorkflow + } + _, err := h.temporalClient.ExecuteWorkflow(ctx, workflowOptions, workflowFn, cfg, state) if err != nil { return nil, fmt.Errorf("unable to start QRepFlow workflow: %w", err) } diff --git a/flow/cmd/worker.go b/flow/cmd/worker.go index adba89371e..f7431e52cc 100644 --- a/flow/cmd/worker.go +++ b/flow/cmd/worker.go @@ -130,6 +130,7 @@ func WorkerMain(opts *WorkerOptions) error { w.RegisterWorkflow(peerflow.NormalizeFlowWorkflow) w.RegisterWorkflow(peerflow.QRepFlowWorkflow) w.RegisterWorkflow(peerflow.QRepPartitionWorkflow) + w.RegisterWorkflow(peerflow.XminFlowWorkflow) w.RegisterWorkflow(peerflow.DropFlowWorkflow) w.RegisterWorkflow(peerflow.HeartbeatFlowWorkflow) w.RegisterActivity(&activities.FlowableActivity{ diff --git a/flow/connectors/core.go b/flow/connectors/core.go index 6327ed4bb9..6c7fa6c9e8 100644 --- a/flow/connectors/core.go +++ b/flow/connectors/core.go @@ -101,7 +101,7 @@ type QRepPullConnector interface { // GetQRepPartitions returns the partitions for a given table that haven't been synced yet. GetQRepPartitions(config *protos.QRepConfig, last *protos.QRepPartition) ([]*protos.QRepPartition, error) - // GetQRepRecords returns the records for a given partition. + // PullQRepRecords returns the records for a given partition. PullQRepRecords(config *protos.QRepConfig, partition *protos.QRepPartition) (*model.QRecordBatch, error) } diff --git a/flow/connectors/postgres/qrep.go b/flow/connectors/postgres/qrep.go index 66b7e9aa69..5bde72370c 100644 --- a/flow/connectors/postgres/qrep.go +++ b/flow/connectors/postgres/qrep.go @@ -3,6 +3,7 @@ package connpostgres import ( "bytes" "fmt" + "strconv" "text/template" "time" @@ -84,50 +85,6 @@ func (c *PostgresConnector) getNumRowsPartitions( var err error numRowsPerPartition := int64(config.NumRowsPerPartition) quotedWatermarkColumn := fmt.Sprintf("\"%s\"", config.WatermarkColumn) - if config.WatermarkColumn == "xmin" { - minVal, maxVal, err := c.getMinMaxValues(tx, config, last) - if err != nil { - return nil, fmt.Errorf("failed to get min max values for xmin: %w", err) - } - - // we know these are int64s so we can just cast them - minValInt := minVal.(int64) - maxValInt := maxVal.(int64) - - // we will only return 1 partition for xmin: - // if there is no last partition, we will return a partition with the min and max values - // if there is a last partition, we will return a partition with the last partition's - // end value + 1 and the max value - if last != nil && last.Range != nil { - minValInt += 1 - } - - if minValInt > maxValInt { - // log and return an empty partition - log.WithFields(log.Fields{ - "flowName": config.FlowJobName, - }).Infof("xmin min value is greater than max value, returning empty partition") - return make([]*protos.QRepPartition, 0), nil - } - - log.WithFields(log.Fields{ - "flowName": config.FlowJobName, - }).Infof("single xmin partition range: %v - %v", minValInt, maxValInt) - - partition := &protos.QRepPartition{ - PartitionId: uuid.New().String(), - Range: &protos.PartitionRange{ - Range: &protos.PartitionRange_IntRange{ - IntRange: &protos.IntPartitionRange{ - Start: minValInt, - End: maxValInt, - }, - }, - }, - } - - return []*protos.QRepPartition{partition}, nil - } whereClause := "" if last != nil && last.Range != nil { @@ -244,9 +201,6 @@ func (c *PostgresConnector) getMinMaxValues( ) (interface{}, interface{}, error) { var minValue, maxValue interface{} quotedWatermarkColumn := fmt.Sprintf("\"%s\"", config.WatermarkColumn) - if config.WatermarkColumn == "xmin" { - quotedWatermarkColumn = fmt.Sprintf("%s::text::bigint", quotedWatermarkColumn) - } parsedWatermarkTable, err := utils.ParseSchemaTable(config.WatermarkTable) if err != nil { @@ -596,6 +550,41 @@ func (c *PostgresConnector) SetupQRepMetadataTables(config *protos.QRepConfig) e return nil } +func (c *PostgresConnector) PullXminRecordStream( + config *protos.QRepConfig, + partition *protos.QRepPartition, + stream *model.QRecordStream, +) (int, int64, error) { + var currentSnapshotXmin int64 + query := config.Query + oldxid := "" + if partition.Range != nil { + oldxid = strconv.FormatInt(partition.Range.Range.(*protos.PartitionRange_IntRange).IntRange.Start&0xffffffff, 10) + query += " WHERE age(xmin) > 0 AND age(xmin) <= age($1::xid)" + } + + executor, err := NewQRepQueryExecutorSnapshot(c.pool, c.ctx, c.config.TransactionSnapshot, + config.FlowJobName, partition.PartitionId) + if err != nil { + return 0, currentSnapshotXmin, err + } + + var numRecords int + if partition.Range != nil { + numRecords, currentSnapshotXmin, err = executor.ExecuteAndProcessQueryStreamGettingCurrentTxid(stream, query, oldxid) + } else { + numRecords, currentSnapshotXmin, err = executor.ExecuteAndProcessQueryStreamGettingCurrentTxid(stream, query) + } + if err != nil { + return 0, currentSnapshotXmin, err + } + + log.WithFields(log.Fields{ + "partition": partition.PartitionId, + }).Infof("pulled %d records for flow job %s", numRecords, config.FlowJobName) + return numRecords, currentSnapshotXmin, nil +} + func BuildQuery(query string, flowJobName string) (string, error) { tmpl, err := template.New("query").Parse(query) if err != nil { diff --git a/flow/connectors/postgres/qrep_query_executor.go b/flow/connectors/postgres/qrep_query_executor.go index c7d75c4500..81c7095c66 100644 --- a/flow/connectors/postgres/qrep_query_executor.go +++ b/flow/connectors/postgres/qrep_query_executor.go @@ -321,6 +321,51 @@ func (qe *QRepQueryExecutor) ExecuteAndProcessQueryStream( return 0, fmt.Errorf("[pg_query_executor] failed to begin transaction: %w", err) } + totalRecordsFetched, err := qe.ExecuteAndProcessQueryStreamWithTx(tx, stream, query, args...) + return totalRecordsFetched, err +} + +func (qe *QRepQueryExecutor) ExecuteAndProcessQueryStreamGettingCurrentTxid( + stream *model.QRecordStream, + query string, + args ...interface{}, +) (int, int64, error) { + var currentSnapshotXmin int64 + log.WithFields(log.Fields{ + "flowName": qe.flowJobName, + "partitionID": qe.partitionID, + }).Infof("Executing and processing query stream '%s'", query) + defer close(stream.Records) + + tx, err := qe.pool.BeginTx(qe.ctx, pgx.TxOptions{ + AccessMode: pgx.ReadOnly, + IsoLevel: pgx.RepeatableRead, + }) + if err != nil { + log.WithFields(log.Fields{ + "flowName": qe.flowJobName, + "partitionID": qe.partitionID, + }).Errorf("[pg_query_executor] failed to begin transaction: %v", err) + return 0, currentSnapshotXmin, fmt.Errorf("[pg_query_executor] failed to begin transaction: %w", err) + } + + err = tx.QueryRow(qe.ctx, "select txid_snapshot_xmin(txid_current_snapshot())").Scan(¤tSnapshotXmin) + if err != nil { + return 0, currentSnapshotXmin, err + } + + totalRecordsFetched, err := qe.ExecuteAndProcessQueryStreamWithTx(tx, stream, query, args...) + return totalRecordsFetched, currentSnapshotXmin, err +} + +func (qe *QRepQueryExecutor) ExecuteAndProcessQueryStreamWithTx( + tx pgx.Tx, + stream *model.QRecordStream, + query string, + args ...interface{}, +) (int, error) { + var err error + defer func() { err := tx.Rollback(qe.ctx) if err != nil && err != pgx.ErrTxClosed { @@ -397,6 +442,9 @@ func (qe *QRepQueryExecutor) ExecuteAndProcessQueryStream( qe.recordHeartbeat("#%d fetched %d rows", numFetchOpsComplete, numRows) } + log.WithFields(log.Fields{ + "flowName": qe.flowJobName, + }).Info("Committing transaction") err = tx.Commit(qe.ctx) if err != nil { stream.Records <- &model.QRecordOrError{ diff --git a/flow/e2e/snowflake/qrep_flow_sf_test.go b/flow/e2e/snowflake/qrep_flow_sf_test.go index 9795412667..f39686d0bb 100644 --- a/flow/e2e/snowflake/qrep_flow_sf_test.go +++ b/flow/e2e/snowflake/qrep_flow_sf_test.go @@ -190,7 +190,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_Upsert_XMIN() { dstSchemaQualified := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, tblName) - query := fmt.Sprintf("SELECT * FROM e2e_test_%s.%s WHERE xmin::text::bigint BETWEEN {{.start}} AND {{.end}}", + query := fmt.Sprintf("SELECT * FROM e2e_test_%s.%s", s.pgSuffix, tblName) qrepConfig, err := e2e.CreateQRepWorkflowConfig( diff --git a/flow/workflows/xmin_flow.go b/flow/workflows/xmin_flow.go new file mode 100644 index 0000000000..db09b7abc9 --- /dev/null +++ b/flow/workflows/xmin_flow.go @@ -0,0 +1,288 @@ +// This file corresponds to xmin based replication. +package peerflow + +import ( + "fmt" + "strings" + "time" + + "github.com/PeerDB-io/peer-flow/generated/protos" + "github.com/PeerDB-io/peer-flow/shared" + util "github.com/PeerDB-io/peer-flow/utils" + "github.com/google/uuid" + "go.temporal.io/sdk/log" + "go.temporal.io/sdk/workflow" +) + +type XminFlowExecution struct { + config *protos.QRepConfig + flowExecutionID string + logger log.Logger + runUUID string + // being tracked for future workflow signalling + childPartitionWorkflows []workflow.ChildWorkflowFuture + // Current signalled state of the peer flow. + activeSignal shared.CDCFlowSignal +} + +// NewXminFlowExecution creates a new instance of XminFlowExecution. +func NewXminFlowExecution(ctx workflow.Context, config *protos.QRepConfig, runUUID string) *XminFlowExecution { + return &XminFlowExecution{ + config: config, + flowExecutionID: workflow.GetInfo(ctx).WorkflowExecution.ID, + logger: workflow.GetLogger(ctx), + runUUID: runUUID, + childPartitionWorkflows: nil, + activeSignal: shared.NoopSignal, + } +} + +// SetupMetadataTables creates the metadata tables for query based replication. +func (q *XminFlowExecution) SetupMetadataTables(ctx workflow.Context) error { + q.logger.Info("setting up metadata tables for xmin flow - ", q.config.FlowJobName) + + ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ + StartToCloseTimeout: 5 * time.Minute, + }) + + if err := workflow.ExecuteActivity(ctx, flowable.SetupQRepMetadataTables, q.config).Get(ctx, nil); err != nil { + return fmt.Errorf("failed to setup metadata tables: %w", err) + } + + q.logger.Info("metadata tables setup for xmin flow - ", q.config.FlowJobName) + return nil +} + +func (q *XminFlowExecution) SetupWatermarkTableOnDestination(ctx workflow.Context) error { + if q.config.SetupWatermarkTableOnDestination { + q.logger.Info("setting up watermark table on destination for xmin flow: ", q.config.FlowJobName) + + ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ + StartToCloseTimeout: 5 * time.Minute, + }) + + tableSchemaInput := &protos.GetTableSchemaBatchInput{ + PeerConnectionConfig: q.config.SourcePeer, + TableIdentifiers: []string{q.config.WatermarkTable}, + } + + future := workflow.ExecuteActivity(ctx, flowable.GetTableSchema, tableSchemaInput) + + var tblSchemaOutput *protos.GetTableSchemaBatchOutput + if err := future.Get(ctx, &tblSchemaOutput); err != nil { + q.logger.Error("failed to fetch schema for watermark table: ", err) + return fmt.Errorf("failed to fetch schema for watermark table %s: %w", q.config.WatermarkTable, err) + } + + // now setup the normalized tables on the destination peer + setupConfig := &protos.SetupNormalizedTableBatchInput{ + PeerConnectionConfig: q.config.DestinationPeer, + TableNameSchemaMapping: map[string]*protos.TableSchema{ + q.config.DestinationTableIdentifier: tblSchemaOutput.TableNameSchemaMapping[q.config.WatermarkTable], + }, + } + + future = workflow.ExecuteActivity(ctx, flowable.CreateNormalizedTable, setupConfig) + var createNormalizedTablesOutput *protos.SetupNormalizedTableBatchOutput + if err := future.Get(ctx, &createNormalizedTablesOutput); err != nil { + q.logger.Error("failed to create watermark table: ", err) + return fmt.Errorf("failed to create watermark table: %w", err) + } + q.logger.Info("finished setting up watermark table for xmin flow: ", q.config.FlowJobName) + } + return nil +} + +func (q *XminFlowExecution) handleTableCreationForResync(ctx workflow.Context, state *protos.QRepFlowState) error { + if state.NeedsResync && q.config.DstTableFullResync { + renamedTableIdentifier := fmt.Sprintf("%s_peerdb_resync", q.config.DestinationTableIdentifier) + createTablesFromExistingCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ + StartToCloseTimeout: 10 * time.Minute, + HeartbeatTimeout: 2 * time.Minute, + }) + createTablesFromExistingFuture := workflow.ExecuteActivity( + createTablesFromExistingCtx, flowable.CreateTablesFromExisting, &protos.CreateTablesFromExistingInput{ + FlowJobName: q.config.FlowJobName, + Peer: q.config.DestinationPeer, + NewToExistingTableMapping: map[string]string{ + renamedTableIdentifier: q.config.DestinationTableIdentifier, + }, + }) + if err := createTablesFromExistingFuture.Get(createTablesFromExistingCtx, nil); err != nil { + return fmt.Errorf("failed to create table for mirror resync: %w", err) + } + q.config.DestinationTableIdentifier = renamedTableIdentifier + } + return nil +} + +func (q *XminFlowExecution) handleTableRenameForResync(ctx workflow.Context, state *protos.QRepFlowState) error { + if state.NeedsResync && q.config.DstTableFullResync { + oldTableIdentifier := strings.TrimSuffix(q.config.DestinationTableIdentifier, "_peerdb_resync") + renameOpts := &protos.RenameTablesInput{} + renameOpts.FlowJobName = q.config.FlowJobName + renameOpts.Peer = q.config.DestinationPeer + renameOpts.RenameTableOptions = []*protos.RenameTableOption{ + { + CurrentName: q.config.DestinationTableIdentifier, + NewName: oldTableIdentifier, + }, + } + + renameTablesCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ + StartToCloseTimeout: 30 * time.Minute, + HeartbeatTimeout: 5 * time.Minute, + }) + renameTablesFuture := workflow.ExecuteActivity(renameTablesCtx, flowable.RenameTables, renameOpts) + if err := renameTablesFuture.Get(renameTablesCtx, nil); err != nil { + return fmt.Errorf("failed to execute rename tables activity: %w", err) + } + q.config.DestinationTableIdentifier = oldTableIdentifier + } + state.NeedsResync = false + return nil +} + +func (q *XminFlowExecution) receiveAndHandleSignalAsync(ctx workflow.Context) { + signalChan := workflow.GetSignalChannel(ctx, shared.CDCFlowSignalName) + + var signalVal shared.CDCFlowSignal + ok := signalChan.ReceiveAsync(&signalVal) + if ok { + q.activeSignal = util.FlowSignalHandler(q.activeSignal, signalVal, q.logger) + } +} + +// For some targets we need to consolidate all the partitions from stages before +// we proceed to next batch. +func (q *XminFlowExecution) consolidatePartitions(ctx workflow.Context) error { + q.logger.Info("consolidating partitions") + + // only an operation for Snowflake currently. + ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ + StartToCloseTimeout: 24 * time.Hour, + HeartbeatTimeout: 10 * time.Minute, + }) + + if err := workflow.ExecuteActivity(ctx, flowable.ConsolidateQRepPartitions, q.config, + q.runUUID).Get(ctx, nil); err != nil { + return fmt.Errorf("failed to consolidate partitions: %w", err) + } + + q.logger.Info("partitions consolidated") + + // clean up qrep flow as well + if err := workflow.ExecuteActivity(ctx, flowable.CleanupQRepFlow, q.config).Get(ctx, nil); err != nil { + return fmt.Errorf("failed to cleanup qrep flow: %w", err) + } + + q.logger.Info("qrep flow cleaned up") + + return nil +} + +func XminFlowWorkflow( + ctx workflow.Context, + config *protos.QRepConfig, + state *protos.QRepFlowState, +) error { + // register a query to get the number of partitions processed + err := workflow.SetQueryHandler(ctx, "num-partitions-processed", func() (uint64, error) { + return state.NumPartitionsProcessed, nil + }) + if err != nil { + return fmt.Errorf("failed to register query handler: %w", err) + } + + // get xmin run uuid via side-effect + runUUIDSideEffect := workflow.SideEffect(ctx, func(ctx workflow.Context) interface{} { + return uuid.New().String() + }) + + var runUUID string + if err := runUUIDSideEffect.Get(&runUUID); err != nil { + return fmt.Errorf("failed to get run uuid: %w", err) + } + + q := NewXminFlowExecution(ctx, config, runUUID) + + err = q.SetupWatermarkTableOnDestination(ctx) + if err != nil { + return fmt.Errorf("failed to setup watermark table: %w", err) + } + + err = q.SetupMetadataTables(ctx) + if err != nil { + return fmt.Errorf("failed to setup metadata tables: %w", err) + } + q.logger.Info("metadata tables setup for peer flow - ", config.FlowJobName) + + err = q.handleTableCreationForResync(ctx, state) + if err != nil { + return err + } + + var lastPartition int64 + replicateXminPartitionCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ + StartToCloseTimeout: 24 * 5 * time.Hour, + HeartbeatTimeout: 5 * time.Minute, + }) + err = workflow.ExecuteActivity( + replicateXminPartitionCtx, + flowable.ReplicateXminPartition, + q.config, + state.LastPartition, + q.runUUID, + ).Get(ctx, &lastPartition) + if err != nil { + return fmt.Errorf("xmin replication failed: %w", err) + } + + state.LastPartition = &protos.QRepPartition{ + PartitionId: q.runUUID, + Range: &protos.PartitionRange{Range: &protos.PartitionRange_IntRange{IntRange: &protos.IntPartitionRange{Start: lastPartition}}}, + } + + if config.InitialCopyOnly { + q.logger.Info("initial copy completed for peer flow - ", config.FlowJobName) + return nil + } + + err = q.handleTableRenameForResync(ctx, state) + if err != nil { + return err + } + + if err = q.consolidatePartitions(ctx); err != nil { + return err + } + + workflow.GetLogger(ctx).Info("Continuing as new workflow", + "Last Partition", state.LastPartition, + "Number of Partitions Processed", state.NumPartitionsProcessed) + + // here, we handle signals after the end of the flow because a new workflow does not inherit the signals + // and the chance of missing a signal is much higher if the check is before the time consuming parts run + q.receiveAndHandleSignalAsync(ctx) + if q.activeSignal == shared.PauseSignal { + startTime := time.Now() + signalChan := workflow.GetSignalChannel(ctx, shared.CDCFlowSignalName) + var signalVal shared.CDCFlowSignal + + for q.activeSignal == shared.PauseSignal { + q.logger.Info("mirror has been paused for ", time.Since(startTime)) + // only place we block on receive, so signal processing is immediate + ok, _ := signalChan.ReceiveWithTimeout(ctx, 1*time.Minute, &signalVal) + if ok { + q.activeSignal = util.FlowSignalHandler(q.activeSignal, signalVal, q.logger) + } + } + } + if q.activeSignal == shared.ShutdownSignal { + q.logger.Info("terminating workflow - ", config.FlowJobName) + return nil + } + + // Continue the workflow with new state + return workflow.NewContinueAsNewError(ctx, XminFlowWorkflow, config, state) +} diff --git a/ui/app/mirrors/create/handlers.ts b/ui/app/mirrors/create/handlers.ts index f65ecc8fd8..7c0b28ae07 100644 --- a/ui/app/mirrors/create/handlers.ts +++ b/ui/app/mirrors/create/handlers.ts @@ -221,7 +221,7 @@ export const handleCreateQRep = async ( config.watermarkColumn = 'xmin'; config.query = `SELECT * FROM ${quotedWatermarkTable( config.watermarkTable - )} WHERE xmin::text::bigint BETWEEN {{.start}} AND {{.end}}`; + )}`; query = config.query; config.initialCopyOnly = false; }