Skip to content

Commit

Permalink
qrep pause: move signal processing into waiting
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Mar 4, 2024
1 parent c64baba commit bc0a7cc
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 43 deletions.
66 changes: 28 additions & 38 deletions flow/workflows/qrep_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,19 +46,6 @@ func NewQRepFlowState() *protos.QRepFlowState {
}
}

// returns a new empty QRepFlowState
func NewQRepFlowStateForTesting() *protos.QRepFlowState {
return &protos.QRepFlowState{
LastPartition: &protos.QRepPartition{
PartitionId: "not-applicable-partition",
Range: nil,
},
NumPartitionsProcessed: 0,
NeedsResync: true,
DisableWaitForNewRows: true,
}
}

// NewQRepFlowExecution creates a new instance of QRepFlowExecution.
func NewQRepFlowExecution(ctx workflow.Context, config *protos.QRepConfig, runUUID string) *QRepFlowExecution {
return &QRepFlowExecution{
Expand Down Expand Up @@ -291,20 +278,39 @@ func (q *QRepFlowExecution) consolidatePartitions(ctx workflow.Context) error {
return nil
}

func (q *QRepFlowExecution) waitForNewRows(ctx workflow.Context, lastPartition *protos.QRepPartition) error {
func (q *QRepFlowExecution) waitForNewRows(
ctx workflow.Context,
signalChan model.TypedReceiveChannel[model.CDCFlowSignal],
lastPartition *protos.QRepPartition,
) error {
q.logger.Info("idling until new rows are detected")

var done bool
var doneErr error
selector := workflow.NewNamedSelector(ctx, "WaitForNewRows")

ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: 16 * 365 * 24 * time.Hour, // 16 years
HeartbeatTimeout: time.Minute,
})
fWait := workflow.ExecuteActivity(ctx, flowable.QRepWaitUntilNewRows, q.config, lastPartition)
selector.AddReceive(ctx.Done(), func(_ workflow.ReceiveChannel, _ bool) {})
selector.AddFuture(fWait, func(f workflow.Future) {
doneErr = f.Get(ctx, nil)
done = true
})
signalChan.AddToSelector(selector, func(val model.CDCFlowSignal, _ bool) {
q.activeSignal = model.FlowSignalHandler(q.activeSignal, val, q.logger)
})

if err := workflow.ExecuteActivity(ctx, flowable.QRepWaitUntilNewRows, q.config,
lastPartition).Get(ctx, nil); err != nil {
return fmt.Errorf("failed while idling for new rows: %w", err)
for ctx.Err() == nil && ((!done && q.activeSignal != model.PauseSignal) || selector.HasPending()) {
selector.Select(ctx)
}

return nil
if err := ctx.Err(); err != nil {
return err
}
return doneErr
}

func (q *QRepFlowExecution) handleTableCreationForResync(ctx workflow.Context, state *protos.QRepFlowState) error {
Expand Down Expand Up @@ -364,16 +370,6 @@ func (q *QRepFlowExecution) handleTableRenameForResync(ctx workflow.Context, sta
return nil
}

func (q *QRepFlowExecution) receiveAndHandleSignalAsync(signalChan model.TypedReceiveChannel[model.CDCFlowSignal]) {
for {
val, ok := signalChan.ReceiveAsync()
if !ok {
break
}
q.activeSignal = model.FlowSignalHandler(q.activeSignal, val, q.logger)
}
}

func setWorkflowQueries(ctx workflow.Context, state *protos.QRepFlowState) error {
// Support an Update for the current status of the qrep flow.
err := workflow.SetUpdateHandler(ctx, shared.FlowStatusUpdate, func(status *protos.FlowStatus) error {
Expand Down Expand Up @@ -508,22 +504,16 @@ func QRepFlowWorkflow(
state.LastPartition = partitions.Partitions[len(partitions.Partitions)-1]
}

if !state.DisableWaitForNewRows {
// sleep for a while and continue the workflow
err = q.waitForNewRows(ctx, state.LastPartition)
if err != nil {
return err
}
// sleep for a while and continue the workflow
err = q.waitForNewRows(ctx, signalChan, state.LastPartition)
if err != nil {
return err
}

logger.Info("Continuing as new workflow",
slog.Any("Last Partition", state.LastPartition),
slog.Any("Number of Partitions Processed", state.NumPartitionsProcessed))

q.receiveAndHandleSignalAsync(signalChan)
if err := ctx.Err(); err != nil {
return err
}
if q.activeSignal == model.PauseSignal {
state.CurrentFlowStatus = protos.FlowStatus_STATUS_PAUSED
}
Expand Down
10 changes: 6 additions & 4 deletions flow/workflows/xmin_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,14 +100,16 @@ func XminFlowWorkflow(
Range: &protos.PartitionRange{Range: &protos.PartitionRange_IntRange{IntRange: &protos.IntPartitionRange{Start: lastPartition}}},
}

// sleep for a while and continue the workflow
err = q.waitForNewRows(ctx, signalChan, state.LastPartition)
if err != nil {
return err
}

logger.Info("Continuing as new workflow",
slog.Any("Last Partition", state.LastPartition),
slog.Any("Number of Partitions Processed", state.NumPartitionsProcessed))

q.receiveAndHandleSignalAsync(signalChan)
if err := ctx.Err(); err != nil {
return err
}
if q.activeSignal == model.PauseSignal {
state.CurrentFlowStatus = protos.FlowStatus_STATUS_PAUSED
}
Expand Down
2 changes: 1 addition & 1 deletion protos/flow.proto
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ message QRepFlowState {
QRepPartition last_partition = 1;
uint64 num_partitions_processed = 2;
bool needs_resync = 3;
bool disable_wait_for_new_rows = 4;
bool disable_wait_for_new_rows = 4; // deprecated
FlowStatus current_flow_status = 5;
}

Expand Down

0 comments on commit bc0a7cc

Please sign in to comment.