Skip to content

Commit

Permalink
Normalize loop runs concurrently to sync loop
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Dec 24, 2023
1 parent cc5d54a commit e785a96
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 38 deletions.
65 changes: 37 additions & 28 deletions flow/workflows/cdc_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ type CDCFlowWorkflowState struct {
// Accumulates status for sync flows spawned.
SyncFlowStatuses []*model.SyncResponse
// Accumulates status for sync flows spawned.
NormalizeFlowStatuses []*model.NormalizeResponse
NormalizeFlowStatuses []model.NormalizeResponse
// Current signalled state of the peer flow.
ActiveSignal shared.CDCFlowSignal
// SetupComplete indicates whether the peer flow setup has completed.
Expand Down Expand Up @@ -300,6 +300,30 @@ func CDCFlowWorkflowWithConfig(
currentSyncFlowNum := 0
totalRecordsSynced := 0

normalizeFlowID, err := GetChildWorkflowID(ctx, "normalize-flow", cfg.FlowJobName)
if err != nil {
return state, err
}

childNormalizeFlowOpts := workflow.ChildWorkflowOptions{
WorkflowID: normalizeFlowID,
ParentClosePolicy: enums.PARENT_CLOSE_POLICY_REQUEST_CANCEL,
RetryPolicy: &temporal.RetryPolicy{
MaximumAttempts: 20,
},
SearchAttributes: mirrorNameSearch,
}
normCtx := workflow.WithChildOptions(ctx, childNormalizeFlowOpts)
childNormalizeFlowFuture := workflow.ExecuteChildWorkflow(
normCtx,
NormalizeFlowWorkflow,
cfg,
)
var normExecution workflow.Execution
if err := childNormalizeFlowFuture.GetChildWorkflowExecution().Get(ctx, &normExecution); err != nil {
return state, fmt.Errorf("normalize workflow failed to start: %w", err)
}

for {
// check and act on signals before a fresh flow starts.
w.receiveAndHandleSignalAsync(ctx, state)
Expand All @@ -320,6 +344,7 @@ func CDCFlowWorkflowWithConfig(
}
// check if the peer flow has been shutdown
if state.ActiveSignal == shared.ShutdownSignal {
workflow.SignalExternalWorkflow(ctx, normExecution.ID, normExecution.RunID, "Sync", true)
w.logger.Info("peer flow has been shutdown")
return state, nil
}
Expand All @@ -342,6 +367,7 @@ func CDCFlowWorkflowWithConfig(

syncFlowID, err := GetChildWorkflowID(ctx, "sync-flow", cfg.FlowJobName)
if err != nil {
workflow.SignalExternalWorkflow(ctx, normExecution.ID, normExecution.RunID, "Sync", true)
return state, err
}

Expand Down Expand Up @@ -376,6 +402,7 @@ func CDCFlowWorkflowWithConfig(
}

w.logger.Info("Total records synced: ", totalRecordsSynced)
workflow.SignalExternalWorkflow(ctx, normExecution.ID, normExecution.RunID, "Sync", false)

var tableSchemaDeltas []*protos.TableSchemaDelta = nil
if childSyncFlowRes != nil {
Expand Down Expand Up @@ -412,36 +439,18 @@ func CDCFlowWorkflowWithConfig(
}
}

normalizeFlowID, err := GetChildWorkflowID(ctx, "normalize-flow", cfg.FlowJobName)
if err != nil {
return state, err
}

childNormalizeFlowOpts := workflow.ChildWorkflowOptions{
WorkflowID: normalizeFlowID,
ParentClosePolicy: enums.PARENT_CLOSE_POLICY_REQUEST_CANCEL,
RetryPolicy: &temporal.RetryPolicy{
MaximumAttempts: 20,
},
SearchAttributes: mirrorNameSearch,
}
normCtx := workflow.WithChildOptions(ctx, childNormalizeFlowOpts)
childNormalizeFlowFuture := workflow.ExecuteChildWorkflow(
normCtx,
NormalizeFlowWorkflow,
cfg,
)

var childNormalizeFlowRes *model.NormalizeResponse
if err := childNormalizeFlowFuture.Get(normCtx, &childNormalizeFlowRes); err != nil {
w.logger.Error("failed to execute normalize flow: ", err)
state.NormalizeFlowErrors = append(state.NormalizeFlowErrors, err.Error())
} else {
state.NormalizeFlowStatuses = append(state.NormalizeFlowStatuses, childNormalizeFlowRes)
}
batchSizeSelector.Select(ctx)
}

workflow.SignalExternalWorkflow(ctx, normExecution.ID, normExecution.RunID, "Sync", true)
var childNormalizeFlowRes []model.NormalizeResponse
if err := childNormalizeFlowFuture.Get(ctx, &childNormalizeFlowRes); err != nil {
w.logger.Error("failed to execute normalize flow: ", err)
state.NormalizeFlowErrors = append(state.NormalizeFlowErrors, err.Error())
} else {
state.NormalizeFlowStatuses = append(state.NormalizeFlowStatuses, childNormalizeFlowRes...)
}

state.TruncateProgress(w.logger)
return nil, workflow.NewContinueAsNewError(ctx, CDCFlowWorkflowWithConfig, cfg, limits, state)
}
39 changes: 29 additions & 10 deletions flow/workflows/normalize_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func NewNormalizeFlowExecution(ctx workflow.Context, state *NormalizeFlowState)

func NormalizeFlowWorkflow(ctx workflow.Context,
config *protos.FlowConnectionConfigs,
) (*model.NormalizeResponse, error) {
) ([]model.NormalizeResponse, error) {
s := NewNormalizeFlowExecution(ctx, &NormalizeFlowState{
CDCFlowName: config.FlowJobName,
Progress: []string{},
Expand All @@ -43,23 +43,42 @@ func NormalizeFlowWorkflow(ctx workflow.Context,
func (s *NormalizeFlowExecution) executeNormalizeFlow(
ctx workflow.Context,
config *protos.FlowConnectionConfigs,
) (*model.NormalizeResponse, error) {
) ([]model.NormalizeResponse, error) {
s.logger.Info("executing normalize flow - ", s.CDCFlowName)

normalizeFlowCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: 7 * 24 * time.Hour,
HeartbeatTimeout: 5 * time.Minute,
})

startNormalizeInput := &protos.StartNormalizeInput{
FlowConnectionConfigs: config,
}
fStartNormalize := workflow.ExecuteActivity(normalizeFlowCtx, flowable.StartNormalize, startNormalizeInput)
result := make([]model.NormalizeResponse, 0)
syncChan := workflow.GetSignalChannel(normalizeFlowCtx, "Sync")

stopLoop := false
for stopLoop {
var stopLoopVal bool
var anyFalse bool
syncChan.Receive(normalizeFlowCtx, &stopLoopVal)
stopLoop = stopLoop || stopLoopVal
anyFalse = anyFalse || !stopLoopVal
for syncChan.ReceiveAsync(&stopLoopVal) {
stopLoop = stopLoop || stopLoopVal
anyFalse = anyFalse || !stopLoopVal
}

if anyFalse {
startNormalizeInput := &protos.StartNormalizeInput{
FlowConnectionConfigs: config,
}
fStartNormalize := workflow.ExecuteActivity(normalizeFlowCtx, flowable.StartNormalize, startNormalizeInput)

var normalizeResponse *model.NormalizeResponse
if err := fStartNormalize.Get(normalizeFlowCtx, &normalizeResponse); err != nil {
return nil, fmt.Errorf("failed to flow: %w", err)
var normalizeResponse *model.NormalizeResponse
if err := fStartNormalize.Get(normalizeFlowCtx, &normalizeResponse); err != nil {
return result, fmt.Errorf("failed to flow: %w", err)
}
result = append(result, *normalizeResponse)
}
}

return normalizeResponse, nil
return result, nil
}

0 comments on commit e785a96

Please sign in to comment.