Skip to content

Commit

Permalink
Test qrep pause/unpause (#1388)
Browse files Browse the repository at this point in the history
As is apt to happen when adding tests, also fix pause/unpause for qrep/xmin

1. `waitForNewRows` would prevent pausing; use a selector to wait on signals while waiting on new rows while waiting on context cancelation
2. `CurrentFlowStatus` wasn't being set to RUNNING when exiting from PAUSED

As an aside, it turns out all the qrep/xmin tests are doing `InitialCopyOnly` besides this new test

Also remove `DisableWaitForNewRows` because it's unused & diverges testing from reality
  • Loading branch information
serprex authored Mar 5, 2024
1 parent 6612e76 commit 3e3e2ab
Show file tree
Hide file tree
Showing 8 changed files with 127 additions and 75 deletions.
2 changes: 1 addition & 1 deletion docker-compose-dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ services:
catalog:
condition: service_healthy
environment:
- DB=postgres12
- DB=postgresql
- DB_PORT=5432
- POSTGRES_USER=postgres
- POSTGRES_PWD=postgres
Expand Down
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ services:
catalog:
condition: service_healthy
environment:
- DB=postgres12
- DB=postgresql
- DB_PORT=5432
- POSTGRES_USER=postgres
- POSTGRES_PWD=postgres
Expand Down
34 changes: 14 additions & 20 deletions flow/e2e/congen.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,25 +218,19 @@ type QRepFlowConnectionGenerationConfig struct {
// GenerateQRepConfig generates a qrep config for testing.
func (c *QRepFlowConnectionGenerationConfig) GenerateQRepConfig(
query string, watermark string,
) (*protos.QRepConfig, error) {
ret := &protos.QRepConfig{}
ret.FlowJobName = c.FlowJobName
ret.WatermarkTable = c.WatermarkTable
ret.DestinationTableIdentifier = c.DestinationTableIdentifier

postgresPeer := GeneratePostgresPeer()
ret.SourcePeer = postgresPeer

ret.DestinationPeer = c.Destination

ret.Query = query
ret.WatermarkColumn = watermark

ret.StagingPath = c.StagingPath
ret.WriteMode = &protos.QRepWriteMode{
WriteType: protos.QRepWriteType_QREP_WRITE_MODE_APPEND,
) *protos.QRepConfig {
return &protos.QRepConfig{
FlowJobName: c.FlowJobName,
WatermarkTable: c.WatermarkTable,
DestinationTableIdentifier: c.DestinationTableIdentifier,
SourcePeer: GeneratePostgresPeer(),
DestinationPeer: c.Destination,
Query: query,
WatermarkColumn: watermark,
StagingPath: c.StagingPath,
WriteMode: &protos.QRepWriteMode{
WriteType: protos.QRepWriteType_QREP_WRITE_MODE_APPEND,
},
NumRowsPerPartition: 1000,
}
ret.NumRowsPerPartition = 1000

return ret, nil
}
63 changes: 63 additions & 0 deletions flow/e2e/postgres/qrep_flow_pg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/PeerDB-io/peer-flow/e2e"
"github.com/PeerDB-io/peer-flow/e2eshared"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/shared"
)

Expand Down Expand Up @@ -324,3 +325,65 @@ func (s PeerFlowE2ETestSuitePG) Test_No_Rows_QRep_PG() {
e2e.EnvWaitForFinished(s.t, env, 3*time.Minute)
require.NoError(s.t, env.Error())
}

func (s PeerFlowE2ETestSuitePG) Test_Pause() {
numRows := 10

srcTable := "qrep_pause"
s.setupSourceTable(srcTable, numRows)

dstTable := "qrep_pause_dst"

srcSchemaQualified := fmt.Sprintf("%s_%s.%s", "e2e_test", s.suffix, srcTable)
dstSchemaQualified := fmt.Sprintf("%s_%s.%s", "e2e_test", s.suffix, dstTable)

query := fmt.Sprintf("SELECT * FROM e2e_test_%s.%s WHERE updated_at BETWEEN {{.start}} AND {{.end}}",
s.suffix, srcTable)

config, err := e2e.CreateQRepWorkflowConfig(
"test_qrep_pause_pg",
srcSchemaQualified,
dstSchemaQualified,
query,
e2e.GeneratePostgresPeer(),
"",
true,
"_PEERDB_SYNCED_AT",
)
require.NoError(s.t, err)
config.InitialCopyOnly = false

tc := e2e.NewTemporalClient(s.t)
env := e2e.RunQrepFlowWorkflow(tc, config)
e2e.SignalWorkflow(env, model.FlowSignal, model.PauseSignal)

e2e.EnvWaitFor(s.t, env, 3*time.Minute, "pausing", func() bool {
response, err := env.Query(shared.QRepFlowStateQuery)
if err != nil {
s.t.Log(err)
return false
}
var state *protos.QRepFlowState
err = response.Get(&state)
if err != nil {
s.t.Fatal("decode failed", err)
}
return state.CurrentFlowStatus == protos.FlowStatus_STATUS_PAUSED
})
e2e.SignalWorkflow(env, model.FlowSignal, model.NoopSignal)
e2e.EnvWaitFor(s.t, env, time.Minute, "unpausing", func() bool {
response, err := env.Query(shared.QRepFlowStateQuery)
if err != nil {
s.t.Fatal(err)
}
var state *protos.QRepFlowState
err = response.Get(&state)
if err != nil {
s.t.Fatal("decode failed", err)
}
return state.CurrentFlowStatus == protos.FlowStatus_STATUS_RUNNING
})

env.Cancel()
e2e.RequireEnvCanceled(s.t, env)
}
5 changes: 1 addition & 4 deletions flow/e2e/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,10 +396,7 @@ func CreateQRepWorkflowConfig(

watermark := "updated_at"

qrepConfig, err := connectionGen.GenerateQRepConfig(query, watermark)
if err != nil {
return nil, err
}
qrepConfig := connectionGen.GenerateQRepConfig(query, watermark)
qrepConfig.InitialCopyOnly = true
qrepConfig.SyncedAtColName = syncedAtCol
qrepConfig.SetupWatermarkTableOnDestination = setupDst
Expand Down
76 changes: 33 additions & 43 deletions flow/workflows/qrep_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,19 +46,6 @@ func NewQRepFlowState() *protos.QRepFlowState {
}
}

// returns a new empty QRepFlowState
func NewQRepFlowStateForTesting() *protos.QRepFlowState {
return &protos.QRepFlowState{
LastPartition: &protos.QRepPartition{
PartitionId: "not-applicable-partition",
Range: nil,
},
NumPartitionsProcessed: 0,
NeedsResync: true,
DisableWaitForNewRows: true,
}
}

// NewQRepFlowExecution creates a new instance of QRepFlowExecution.
func NewQRepFlowExecution(ctx workflow.Context, config *protos.QRepConfig, runUUID string) *QRepFlowExecution {
return &QRepFlowExecution{
Expand Down Expand Up @@ -99,7 +86,7 @@ func (q *QRepFlowExecution) SetupMetadataTables(ctx workflow.Context) error {
}

func (q *QRepFlowExecution) getTableSchema(ctx workflow.Context, tableName string) (*protos.TableSchema, error) {
q.logger.Info("fetching schema for table - ", tableName)
q.logger.Info("fetching schema for table", slog.String("table", tableName))

ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: 5 * time.Minute,
Expand Down Expand Up @@ -291,20 +278,39 @@ func (q *QRepFlowExecution) consolidatePartitions(ctx workflow.Context) error {
return nil
}

func (q *QRepFlowExecution) waitForNewRows(ctx workflow.Context, lastPartition *protos.QRepPartition) error {
func (q *QRepFlowExecution) waitForNewRows(
ctx workflow.Context,
signalChan model.TypedReceiveChannel[model.CDCFlowSignal],
lastPartition *protos.QRepPartition,
) error {
q.logger.Info("idling until new rows are detected")

var done bool
var doneErr error
selector := workflow.NewNamedSelector(ctx, "WaitForNewRows")

ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: 16 * 365 * 24 * time.Hour, // 16 years
HeartbeatTimeout: time.Minute,
})
fWait := workflow.ExecuteActivity(ctx, flowable.QRepWaitUntilNewRows, q.config, lastPartition)
selector.AddReceive(ctx.Done(), func(_ workflow.ReceiveChannel, _ bool) {})
selector.AddFuture(fWait, func(f workflow.Future) {
doneErr = f.Get(ctx, nil)
done = true
})
signalChan.AddToSelector(selector, func(val model.CDCFlowSignal, _ bool) {
q.activeSignal = model.FlowSignalHandler(q.activeSignal, val, q.logger)
})

if err := workflow.ExecuteActivity(ctx, flowable.QRepWaitUntilNewRows, q.config,
lastPartition).Get(ctx, nil); err != nil {
return fmt.Errorf("failed while idling for new rows: %w", err)
for ctx.Err() == nil && ((!done && q.activeSignal != model.PauseSignal) || selector.HasPending()) {
selector.Select(ctx)
}

return nil
if err := ctx.Err(); err != nil {
return err
}
return doneErr
}

func (q *QRepFlowExecution) handleTableCreationForResync(ctx workflow.Context, state *protos.QRepFlowState) error {
Expand Down Expand Up @@ -364,16 +370,6 @@ func (q *QRepFlowExecution) handleTableRenameForResync(ctx workflow.Context, sta
return nil
}

func (q *QRepFlowExecution) receiveAndHandleSignalAsync(signalChan model.TypedReceiveChannel[model.CDCFlowSignal]) {
for {
val, ok := signalChan.ReceiveAsync()
if !ok {
break
}
q.activeSignal = model.FlowSignalHandler(q.activeSignal, val, q.logger)
}
}

func setWorkflowQueries(ctx workflow.Context, state *protos.QRepFlowState) error {
// Support an Update for the current status of the qrep flow.
err := workflow.SetUpdateHandler(ctx, shared.FlowStatusUpdate, func(status *protos.FlowStatus) error {
Expand Down Expand Up @@ -452,6 +448,7 @@ func QRepFlowWorkflow(
return err
}
}
state.CurrentFlowStatus = protos.FlowStatus_STATUS_RUNNING
}

maxParallelWorkers := 16
Expand All @@ -475,13 +472,13 @@ func QRepFlowWorkflow(
return err
}

logger.Info("fetching partitions to replicate for peer flow - ", config.FlowJobName)
logger.Info("fetching partitions to replicate for peer flow")
partitions, err := q.GetPartitions(ctx, state.LastPartition)
if err != nil {
return fmt.Errorf("failed to get partitions: %w", err)
}

logger.Info("partitions to replicate - ", len(partitions.Partitions))
logger.Info(fmt.Sprintf("%d partitions to replicate", len(partitions.Partitions)))
if err := q.processPartitions(ctx, maxParallelWorkers, partitions.Partitions); err != nil {
return err
}
Expand All @@ -501,29 +498,22 @@ func QRepFlowWorkflow(
return err
}

logger.Info("partitions processed - ", len(partitions.Partitions))
logger.Info(fmt.Sprintf("%d partitions processed", len(partitions.Partitions)))
state.NumPartitionsProcessed += uint64(len(partitions.Partitions))

if len(partitions.Partitions) > 0 {
state.LastPartition = partitions.Partitions[len(partitions.Partitions)-1]
}

if !state.DisableWaitForNewRows {
// sleep for a while and continue the workflow
err = q.waitForNewRows(ctx, state.LastPartition)
if err != nil {
return err
}
err = q.waitForNewRows(ctx, signalChan, state.LastPartition)
if err != nil {
return err
}

logger.Info("Continuing as new workflow",
slog.Any("Last Partition", state.LastPartition),
slog.Any("Number of Partitions Processed", state.NumPartitionsProcessed))
slog.Uint64("Number of Partitions Processed", state.NumPartitionsProcessed))

q.receiveAndHandleSignalAsync(signalChan)
if err := ctx.Err(); err != nil {
return err
}
if q.activeSignal == model.PauseSignal {
state.CurrentFlowStatus = protos.FlowStatus_STATUS_PAUSED
}
Expand Down
18 changes: 13 additions & 5 deletions flow/workflows/xmin_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ func XminFlowWorkflow(
return err
}
}
state.CurrentFlowStatus = protos.FlowStatus_STATUS_RUNNING
}

err = q.SetupWatermarkTableOnDestination(ctx)
Expand Down Expand Up @@ -100,14 +101,21 @@ func XminFlowWorkflow(
Range: &protos.PartitionRange{Range: &protos.PartitionRange_IntRange{IntRange: &protos.IntPartitionRange{Start: lastPartition}}},
}

logger.Info("Continuing as new workflow",
slog.Any("Last Partition", state.LastPartition),
slog.Any("Number of Partitions Processed", state.NumPartitionsProcessed))

q.receiveAndHandleSignalAsync(signalChan)
if err := ctx.Err(); err != nil {
return err
}
for {
val, ok := signalChan.ReceiveAsync()
if !ok {
break
}
q.activeSignal = model.FlowSignalHandler(q.activeSignal, val, q.logger)
}

logger.Info("Continuing as new workflow",
slog.Any("Last Partition", state.LastPartition),
slog.Uint64("Number of Partitions Processed", state.NumPartitionsProcessed))

if q.activeSignal == model.PauseSignal {
state.CurrentFlowStatus = protos.FlowStatus_STATUS_PAUSED
}
Expand Down
2 changes: 1 addition & 1 deletion protos/flow.proto
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ message QRepFlowState {
QRepPartition last_partition = 1;
uint64 num_partitions_processed = 2;
bool needs_resync = 3;
bool disable_wait_for_new_rows = 4;
bool disable_wait_for_new_rows = 4; // deprecated
FlowStatus current_flow_status = 5;
}

Expand Down

0 comments on commit 3e3e2ab

Please sign in to comment.