Skip to content

Commit

Permalink
boilerplate
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed May 20, 2024
1 parent f4d507c commit a951776
Showing 1 changed file with 23 additions and 11 deletions.
34 changes: 23 additions & 11 deletions flow/workflows/qrep_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,15 @@ type QRepPartitionFlowExecution struct {
runUUID string
}

var InitialLastPartition = &protos.QRepPartition{
PartitionId: "not-applicable-partition",
Range: nil,
}

// returns a new empty QRepFlowState
func NewQRepFlowState() *protos.QRepFlowState {
return &protos.QRepFlowState{
LastPartition: &protos.QRepPartition{
PartitionId: "not-applicable-partition",
Range: nil,
},
LastPartition: InitialLastPartition,
NumPartitionsProcessed: 0,
NeedsResync: true,
CurrentFlowStatus: protos.FlowStatus_STATUS_RUNNING,
Expand Down Expand Up @@ -482,10 +484,11 @@ func QRepWaitForNewRowsWorkflow(ctx workflow.Context, config *protos.QRepConfig,
if err != nil {
return fmt.Errorf("error checking for new rows: %w", err)
}
hasNewRows := result.Found

hasNewRows := result.Found
optedForOverwrite := config.WriteMode.WriteType == protos.QRepWriteType_QREP_WRITE_MODE_OVERWRITE
// If no new rows are found, continue as new
if !hasNewRows {
if !hasNewRows || optedForOverwrite {
waitBetweenBatches := 5 * time.Second
if config.WaitBetweenBatchesSeconds > 0 {
waitBetweenBatches = time.Duration(config.WaitBetweenBatchesSeconds) * time.Second
Expand All @@ -496,6 +499,9 @@ func QRepWaitForNewRowsWorkflow(ctx workflow.Context, config *protos.QRepConfig,
}

logger.Info("QRepWaitForNewRowsWorkflow: continuing the loop")
if optedForOverwrite {
return nil
}
return workflow.NewContinueAsNewError(ctx, QRepWaitForNewRowsWorkflow, config, lastPartition)
}

Expand Down Expand Up @@ -570,15 +576,21 @@ func QRepFlowWorkflow(
return err
}

if !config.InitialCopyOnly && state.LastPartition != nil {
if err := q.waitForNewRows(ctx, signalChan, state.LastPartition); err != nil {
optedForOverwrite := config.WriteMode.WriteType == protos.QRepWriteType_QREP_WRITE_MODE_OVERWRITE
lastPartition := state.LastPartition
if optedForOverwrite {
lastPartition = InitialLastPartition
}

if !config.InitialCopyOnly && lastPartition != nil {
if err := q.waitForNewRows(ctx, signalChan, lastPartition); err != nil {
return err
}
}

if q.activeSignal != model.PauseSignal {
logger.Info("fetching partitions to replicate for peer flow")
partitions, err := q.GetPartitions(ctx, state.LastPartition)
partitions, err := q.GetPartitions(ctx, lastPartition)
if err != nil {
return fmt.Errorf("failed to get partitions: %w", err)
}
Expand Down Expand Up @@ -606,7 +618,7 @@ func QRepFlowWorkflow(
logger.Info(fmt.Sprintf("%d partitions processed", len(partitions.Partitions)))
state.NumPartitionsProcessed += uint64(len(partitions.Partitions))

if len(partitions.Partitions) > 0 {
if len(partitions.Partitions) > 0 && !optedForOverwrite {
state.LastPartition = partitions.Partitions[len(partitions.Partitions)-1]
}
}
Expand All @@ -621,7 +633,7 @@ func QRepFlowWorkflow(
}

logger.Info("Continuing as new workflow",
slog.Any("Last Partition", state.LastPartition),
slog.Any("Last Partition", lastPartition),
slog.Uint64("Number of Partitions Processed", state.NumPartitionsProcessed))

if q.activeSignal == model.PauseSignal {
Expand Down

0 comments on commit a951776

Please sign in to comment.