Skip to content

Commit

Permalink
xmin should not waitForNewRows
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Mar 5, 2024
1 parent d659494 commit e3f9ff9
Showing 1 changed file with 8 additions and 2 deletions.
10 changes: 8 additions & 2 deletions flow/workflows/xmin_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,16 @@ func XminFlowWorkflow(
Range: &protos.PartitionRange{Range: &protos.PartitionRange_IntRange{IntRange: &protos.IntPartitionRange{Start: lastPartition}}},
}

err = q.waitForNewRows(ctx, signalChan, state.LastPartition)
if err != nil {
if err := ctx.Err(); err != nil {
return err
}
for {
val, ok := signalChan.ReceiveAsync()
if !ok {
break
}
q.activeSignal = model.FlowSignalHandler(q.activeSignal, val, q.logger)
}

logger.Info("Continuing as new workflow",
slog.Any("Last Partition", state.LastPartition),
Expand Down

0 comments on commit e3f9ff9

Please sign in to comment.