diff --git a/flow/workflows/qrep_flow.go b/flow/workflows/qrep_flow.go index c80e76c01..1d72b5b1c 100644 --- a/flow/workflows/qrep_flow.go +++ b/flow/workflows/qrep_flow.go @@ -46,19 +46,6 @@ func NewQRepFlowState() *protos.QRepFlowState { } } -// returns a new empty QRepFlowState -func NewQRepFlowStateForTesting() *protos.QRepFlowState { - return &protos.QRepFlowState{ - LastPartition: &protos.QRepPartition{ - PartitionId: "not-applicable-partition", - Range: nil, - }, - NumPartitionsProcessed: 0, - NeedsResync: true, - DisableWaitForNewRows: true, - } -} - // NewQRepFlowExecution creates a new instance of QRepFlowExecution. func NewQRepFlowExecution(ctx workflow.Context, config *protos.QRepConfig, runUUID string) *QRepFlowExecution { return &QRepFlowExecution{ @@ -291,20 +278,39 @@ func (q *QRepFlowExecution) consolidatePartitions(ctx workflow.Context) error { return nil } -func (q *QRepFlowExecution) waitForNewRows(ctx workflow.Context, lastPartition *protos.QRepPartition) error { +func (q *QRepFlowExecution) waitForNewRows( + ctx workflow.Context, + signalChan model.TypedReceiveChannel[model.CDCFlowSignal], + lastPartition *protos.QRepPartition, +) error { q.logger.Info("idling until new rows are detected") + var done bool + var doneErr error + selector := workflow.NewNamedSelector(ctx, "WaitForNewRows") + ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ StartToCloseTimeout: 16 * 365 * 24 * time.Hour, // 16 years HeartbeatTimeout: time.Minute, }) + fWait := workflow.ExecuteActivity(ctx, flowable.QRepWaitUntilNewRows, q.config, lastPartition) + selector.AddReceive(ctx.Done(), func(_ workflow.ReceiveChannel, _ bool) {}) + selector.AddFuture(fWait, func(f workflow.Future) { + doneErr = f.Get(ctx, nil) + done = true + }) + signalChan.AddToSelector(selector, func(val model.CDCFlowSignal, _ bool) { + q.activeSignal = model.FlowSignalHandler(q.activeSignal, val, q.logger) + }) - if err := workflow.ExecuteActivity(ctx, flowable.QRepWaitUntilNewRows, q.config, - lastPartition).Get(ctx, nil); err != nil { - return fmt.Errorf("failed while idling for new rows: %w", err) + for ctx.Err() == nil && ((!done && q.activeSignal != model.PauseSignal) || selector.HasPending()) { + selector.Select(ctx) } - return nil + if err := ctx.Err(); err != nil { + return err + } + return doneErr } func (q *QRepFlowExecution) handleTableCreationForResync(ctx workflow.Context, state *protos.QRepFlowState) error { @@ -364,16 +370,6 @@ func (q *QRepFlowExecution) handleTableRenameForResync(ctx workflow.Context, sta return nil } -func (q *QRepFlowExecution) receiveAndHandleSignalAsync(signalChan model.TypedReceiveChannel[model.CDCFlowSignal]) { - for { - val, ok := signalChan.ReceiveAsync() - if !ok { - break - } - q.activeSignal = model.FlowSignalHandler(q.activeSignal, val, q.logger) - } -} - func setWorkflowQueries(ctx workflow.Context, state *protos.QRepFlowState) error { // Support an Update for the current status of the qrep flow. err := workflow.SetUpdateHandler(ctx, shared.FlowStatusUpdate, func(status *protos.FlowStatus) error { @@ -508,22 +504,16 @@ func QRepFlowWorkflow( state.LastPartition = partitions.Partitions[len(partitions.Partitions)-1] } - if !state.DisableWaitForNewRows { - // sleep for a while and continue the workflow - err = q.waitForNewRows(ctx, state.LastPartition) - if err != nil { - return err - } + // sleep for a while and continue the workflow + err = q.waitForNewRows(ctx, signalChan, state.LastPartition) + if err != nil { + return err } logger.Info("Continuing as new workflow", slog.Any("Last Partition", state.LastPartition), slog.Any("Number of Partitions Processed", state.NumPartitionsProcessed)) - q.receiveAndHandleSignalAsync(signalChan) - if err := ctx.Err(); err != nil { - return err - } if q.activeSignal == model.PauseSignal { state.CurrentFlowStatus = protos.FlowStatus_STATUS_PAUSED } diff --git a/flow/workflows/xmin_flow.go b/flow/workflows/xmin_flow.go index c5334221c..5d617f4af 100644 --- a/flow/workflows/xmin_flow.go +++ b/flow/workflows/xmin_flow.go @@ -100,14 +100,16 @@ func XminFlowWorkflow( Range: &protos.PartitionRange{Range: &protos.PartitionRange_IntRange{IntRange: &protos.IntPartitionRange{Start: lastPartition}}}, } + // sleep for a while and continue the workflow + err = q.waitForNewRows(ctx, signalChan, state.LastPartition) + if err != nil { + return err + } + logger.Info("Continuing as new workflow", slog.Any("Last Partition", state.LastPartition), slog.Any("Number of Partitions Processed", state.NumPartitionsProcessed)) - q.receiveAndHandleSignalAsync(signalChan) - if err := ctx.Err(); err != nil { - return err - } if q.activeSignal == model.PauseSignal { state.CurrentFlowStatus = protos.FlowStatus_STATUS_PAUSED } diff --git a/protos/flow.proto b/protos/flow.proto index f5d804ae7..0ab9b94a3 100644 --- a/protos/flow.proto +++ b/protos/flow.proto @@ -322,7 +322,7 @@ message QRepFlowState { QRepPartition last_partition = 1; uint64 num_partitions_processed = 2; bool needs_resync = 3; - bool disable_wait_for_new_rows = 4; + bool disable_wait_for_new_rows = 4; // deprecated FlowStatus current_flow_status = 5; }