diff --git a/docker-compose-dev.yml b/docker-compose-dev.yml index 8a8181ba6..210fc507d 100644 --- a/docker-compose-dev.yml +++ b/docker-compose-dev.yml @@ -56,7 +56,7 @@ services: catalog: condition: service_healthy environment: - - DB=postgres12 + - DB=postgresql - DB_PORT=5432 - POSTGRES_USER=postgres - POSTGRES_PWD=postgres diff --git a/docker-compose.yml b/docker-compose.yml index 7ac7e19bb..2e0a83be2 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -49,7 +49,7 @@ services: catalog: condition: service_healthy environment: - - DB=postgres12 + - DB=postgresql - DB_PORT=5432 - POSTGRES_USER=postgres - POSTGRES_PWD=postgres diff --git a/flow/e2e/congen.go b/flow/e2e/congen.go index 603c6f0b7..82387c9ff 100644 --- a/flow/e2e/congen.go +++ b/flow/e2e/congen.go @@ -218,25 +218,19 @@ type QRepFlowConnectionGenerationConfig struct { // GenerateQRepConfig generates a qrep config for testing. func (c *QRepFlowConnectionGenerationConfig) GenerateQRepConfig( query string, watermark string, -) (*protos.QRepConfig, error) { - ret := &protos.QRepConfig{} - ret.FlowJobName = c.FlowJobName - ret.WatermarkTable = c.WatermarkTable - ret.DestinationTableIdentifier = c.DestinationTableIdentifier - - postgresPeer := GeneratePostgresPeer() - ret.SourcePeer = postgresPeer - - ret.DestinationPeer = c.Destination - - ret.Query = query - ret.WatermarkColumn = watermark - - ret.StagingPath = c.StagingPath - ret.WriteMode = &protos.QRepWriteMode{ - WriteType: protos.QRepWriteType_QREP_WRITE_MODE_APPEND, +) *protos.QRepConfig { + return &protos.QRepConfig{ + FlowJobName: c.FlowJobName, + WatermarkTable: c.WatermarkTable, + DestinationTableIdentifier: c.DestinationTableIdentifier, + SourcePeer: GeneratePostgresPeer(), + DestinationPeer: c.Destination, + Query: query, + WatermarkColumn: watermark, + StagingPath: c.StagingPath, + WriteMode: &protos.QRepWriteMode{ + WriteType: protos.QRepWriteType_QREP_WRITE_MODE_APPEND, + }, + NumRowsPerPartition: 1000, } - ret.NumRowsPerPartition = 1000 - - return ret, nil } diff --git a/flow/e2e/postgres/qrep_flow_pg_test.go b/flow/e2e/postgres/qrep_flow_pg_test.go index a7a367211..fb49d3d24 100644 --- a/flow/e2e/postgres/qrep_flow_pg_test.go +++ b/flow/e2e/postgres/qrep_flow_pg_test.go @@ -17,6 +17,7 @@ import ( "github.com/PeerDB-io/peer-flow/e2e" "github.com/PeerDB-io/peer-flow/e2eshared" "github.com/PeerDB-io/peer-flow/generated/protos" + "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/shared" ) @@ -324,3 +325,65 @@ func (s PeerFlowE2ETestSuitePG) Test_No_Rows_QRep_PG() { e2e.EnvWaitForFinished(s.t, env, 3*time.Minute) require.NoError(s.t, env.Error()) } + +func (s PeerFlowE2ETestSuitePG) Test_Pause() { + numRows := 10 + + srcTable := "qrep_pause" + s.setupSourceTable(srcTable, numRows) + + dstTable := "qrep_pause_dst" + + srcSchemaQualified := fmt.Sprintf("%s_%s.%s", "e2e_test", s.suffix, srcTable) + dstSchemaQualified := fmt.Sprintf("%s_%s.%s", "e2e_test", s.suffix, dstTable) + + query := fmt.Sprintf("SELECT * FROM e2e_test_%s.%s WHERE updated_at BETWEEN {{.start}} AND {{.end}}", + s.suffix, srcTable) + + config, err := e2e.CreateQRepWorkflowConfig( + "test_qrep_pause_pg", + srcSchemaQualified, + dstSchemaQualified, + query, + e2e.GeneratePostgresPeer(), + "", + true, + "_PEERDB_SYNCED_AT", + ) + require.NoError(s.t, err) + config.InitialCopyOnly = false + + tc := e2e.NewTemporalClient(s.t) + env := e2e.RunQrepFlowWorkflow(tc, config) + e2e.SignalWorkflow(env, model.FlowSignal, model.PauseSignal) + + e2e.EnvWaitFor(s.t, env, 3*time.Minute, "pausing", func() bool { + response, err := env.Query(shared.QRepFlowStateQuery) + if err != nil { + s.t.Log(err) + return false + } + var state *protos.QRepFlowState + err = response.Get(&state) + if err != nil { + s.t.Fatal("decode failed", err) + } + return state.CurrentFlowStatus == protos.FlowStatus_STATUS_PAUSED + }) + e2e.SignalWorkflow(env, model.FlowSignal, model.NoopSignal) + e2e.EnvWaitFor(s.t, env, time.Minute, "unpausing", func() bool { + response, err := env.Query(shared.QRepFlowStateQuery) + if err != nil { + s.t.Fatal(err) + } + var state *protos.QRepFlowState + err = response.Get(&state) + if err != nil { + s.t.Fatal("decode failed", err) + } + return state.CurrentFlowStatus == protos.FlowStatus_STATUS_RUNNING + }) + + env.Cancel() + e2e.RequireEnvCanceled(s.t, env) +} diff --git a/flow/e2e/test_utils.go b/flow/e2e/test_utils.go index 92d04d1f3..8592eb406 100644 --- a/flow/e2e/test_utils.go +++ b/flow/e2e/test_utils.go @@ -396,10 +396,7 @@ func CreateQRepWorkflowConfig( watermark := "updated_at" - qrepConfig, err := connectionGen.GenerateQRepConfig(query, watermark) - if err != nil { - return nil, err - } + qrepConfig := connectionGen.GenerateQRepConfig(query, watermark) qrepConfig.InitialCopyOnly = true qrepConfig.SyncedAtColName = syncedAtCol qrepConfig.SetupWatermarkTableOnDestination = setupDst diff --git a/flow/workflows/qrep_flow.go b/flow/workflows/qrep_flow.go index c80e76c01..839eab9dd 100644 --- a/flow/workflows/qrep_flow.go +++ b/flow/workflows/qrep_flow.go @@ -46,19 +46,6 @@ func NewQRepFlowState() *protos.QRepFlowState { } } -// returns a new empty QRepFlowState -func NewQRepFlowStateForTesting() *protos.QRepFlowState { - return &protos.QRepFlowState{ - LastPartition: &protos.QRepPartition{ - PartitionId: "not-applicable-partition", - Range: nil, - }, - NumPartitionsProcessed: 0, - NeedsResync: true, - DisableWaitForNewRows: true, - } -} - // NewQRepFlowExecution creates a new instance of QRepFlowExecution. func NewQRepFlowExecution(ctx workflow.Context, config *protos.QRepConfig, runUUID string) *QRepFlowExecution { return &QRepFlowExecution{ @@ -99,7 +86,7 @@ func (q *QRepFlowExecution) SetupMetadataTables(ctx workflow.Context) error { } func (q *QRepFlowExecution) getTableSchema(ctx workflow.Context, tableName string) (*protos.TableSchema, error) { - q.logger.Info("fetching schema for table - ", tableName) + q.logger.Info("fetching schema for table", slog.String("table", tableName)) ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ StartToCloseTimeout: 5 * time.Minute, @@ -291,20 +278,39 @@ func (q *QRepFlowExecution) consolidatePartitions(ctx workflow.Context) error { return nil } -func (q *QRepFlowExecution) waitForNewRows(ctx workflow.Context, lastPartition *protos.QRepPartition) error { +func (q *QRepFlowExecution) waitForNewRows( + ctx workflow.Context, + signalChan model.TypedReceiveChannel[model.CDCFlowSignal], + lastPartition *protos.QRepPartition, +) error { q.logger.Info("idling until new rows are detected") + var done bool + var doneErr error + selector := workflow.NewNamedSelector(ctx, "WaitForNewRows") + ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ StartToCloseTimeout: 16 * 365 * 24 * time.Hour, // 16 years HeartbeatTimeout: time.Minute, }) + fWait := workflow.ExecuteActivity(ctx, flowable.QRepWaitUntilNewRows, q.config, lastPartition) + selector.AddReceive(ctx.Done(), func(_ workflow.ReceiveChannel, _ bool) {}) + selector.AddFuture(fWait, func(f workflow.Future) { + doneErr = f.Get(ctx, nil) + done = true + }) + signalChan.AddToSelector(selector, func(val model.CDCFlowSignal, _ bool) { + q.activeSignal = model.FlowSignalHandler(q.activeSignal, val, q.logger) + }) - if err := workflow.ExecuteActivity(ctx, flowable.QRepWaitUntilNewRows, q.config, - lastPartition).Get(ctx, nil); err != nil { - return fmt.Errorf("failed while idling for new rows: %w", err) + for ctx.Err() == nil && ((!done && q.activeSignal != model.PauseSignal) || selector.HasPending()) { + selector.Select(ctx) } - return nil + if err := ctx.Err(); err != nil { + return err + } + return doneErr } func (q *QRepFlowExecution) handleTableCreationForResync(ctx workflow.Context, state *protos.QRepFlowState) error { @@ -364,16 +370,6 @@ func (q *QRepFlowExecution) handleTableRenameForResync(ctx workflow.Context, sta return nil } -func (q *QRepFlowExecution) receiveAndHandleSignalAsync(signalChan model.TypedReceiveChannel[model.CDCFlowSignal]) { - for { - val, ok := signalChan.ReceiveAsync() - if !ok { - break - } - q.activeSignal = model.FlowSignalHandler(q.activeSignal, val, q.logger) - } -} - func setWorkflowQueries(ctx workflow.Context, state *protos.QRepFlowState) error { // Support an Update for the current status of the qrep flow. err := workflow.SetUpdateHandler(ctx, shared.FlowStatusUpdate, func(status *protos.FlowStatus) error { @@ -452,6 +448,7 @@ func QRepFlowWorkflow( return err } } + state.CurrentFlowStatus = protos.FlowStatus_STATUS_RUNNING } maxParallelWorkers := 16 @@ -475,13 +472,13 @@ func QRepFlowWorkflow( return err } - logger.Info("fetching partitions to replicate for peer flow - ", config.FlowJobName) + logger.Info("fetching partitions to replicate for peer flow") partitions, err := q.GetPartitions(ctx, state.LastPartition) if err != nil { return fmt.Errorf("failed to get partitions: %w", err) } - logger.Info("partitions to replicate - ", len(partitions.Partitions)) + logger.Info(fmt.Sprintf("%d partitions to replicate", len(partitions.Partitions))) if err := q.processPartitions(ctx, maxParallelWorkers, partitions.Partitions); err != nil { return err } @@ -501,29 +498,22 @@ func QRepFlowWorkflow( return err } - logger.Info("partitions processed - ", len(partitions.Partitions)) + logger.Info(fmt.Sprintf("%d partitions processed", len(partitions.Partitions))) state.NumPartitionsProcessed += uint64(len(partitions.Partitions)) if len(partitions.Partitions) > 0 { state.LastPartition = partitions.Partitions[len(partitions.Partitions)-1] } - if !state.DisableWaitForNewRows { - // sleep for a while and continue the workflow - err = q.waitForNewRows(ctx, state.LastPartition) - if err != nil { - return err - } + err = q.waitForNewRows(ctx, signalChan, state.LastPartition) + if err != nil { + return err } logger.Info("Continuing as new workflow", slog.Any("Last Partition", state.LastPartition), - slog.Any("Number of Partitions Processed", state.NumPartitionsProcessed)) + slog.Uint64("Number of Partitions Processed", state.NumPartitionsProcessed)) - q.receiveAndHandleSignalAsync(signalChan) - if err := ctx.Err(); err != nil { - return err - } if q.activeSignal == model.PauseSignal { state.CurrentFlowStatus = protos.FlowStatus_STATUS_PAUSED } diff --git a/flow/workflows/xmin_flow.go b/flow/workflows/xmin_flow.go index c5334221c..777daba38 100644 --- a/flow/workflows/xmin_flow.go +++ b/flow/workflows/xmin_flow.go @@ -47,6 +47,7 @@ func XminFlowWorkflow( return err } } + state.CurrentFlowStatus = protos.FlowStatus_STATUS_RUNNING } err = q.SetupWatermarkTableOnDestination(ctx) @@ -100,14 +101,21 @@ func XminFlowWorkflow( Range: &protos.PartitionRange{Range: &protos.PartitionRange_IntRange{IntRange: &protos.IntPartitionRange{Start: lastPartition}}}, } - logger.Info("Continuing as new workflow", - slog.Any("Last Partition", state.LastPartition), - slog.Any("Number of Partitions Processed", state.NumPartitionsProcessed)) - - q.receiveAndHandleSignalAsync(signalChan) if err := ctx.Err(); err != nil { return err } + for { + val, ok := signalChan.ReceiveAsync() + if !ok { + break + } + q.activeSignal = model.FlowSignalHandler(q.activeSignal, val, q.logger) + } + + logger.Info("Continuing as new workflow", + slog.Any("Last Partition", state.LastPartition), + slog.Uint64("Number of Partitions Processed", state.NumPartitionsProcessed)) + if q.activeSignal == model.PauseSignal { state.CurrentFlowStatus = protos.FlowStatus_STATUS_PAUSED } diff --git a/protos/flow.proto b/protos/flow.proto index f5d804ae7..0ab9b94a3 100644 --- a/protos/flow.proto +++ b/protos/flow.proto @@ -322,7 +322,7 @@ message QRepFlowState { QRepPartition last_partition = 1; uint64 num_partitions_processed = 2; bool needs_resync = 3; - bool disable_wait_for_new_rows = 4; + bool disable_wait_for_new_rows = 4; // deprecated FlowStatus current_flow_status = 5; }