Skip to content

Commit

Permalink
Normalize loop runs concurrently to sync loop
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Dec 24, 2023
1 parent 943f292 commit abf99dd
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 46 deletions.
79 changes: 44 additions & 35 deletions flow/workflows/cdc_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ type CDCFlowWorkflowState struct {
// Accumulates status for sync flows spawned.
SyncFlowStatuses []*model.SyncResponse
// Accumulates status for sync flows spawned.
NormalizeFlowStatuses []*model.NormalizeResponse
NormalizeFlowStatuses []model.NormalizeResponse
// Current signalled state of the peer flow.
ActiveSignal shared.CDCFlowSignal
// SetupComplete indicates whether the peer flow setup has completed.
Expand Down Expand Up @@ -156,6 +156,7 @@ func CDCFlowWorkflowWithConfig(
return nil, fmt.Errorf("invalid connection configs")
}

ctx = workflow.WithValue(ctx, "flowName", cfg.FlowJobName)
w := NewCDCFlowWorkflowExecution(ctx)

if limits.TotalSyncFlows == 0 {
Expand Down Expand Up @@ -302,6 +303,34 @@ func CDCFlowWorkflowWithConfig(
currentSyncFlowNum := 0
totalRecordsSynced := 0

normalizeFlowID, err := GetChildWorkflowID(ctx, "normalize-flow", cfg.FlowJobName)
if err != nil {
return state, err
}

mirrorNameSearch := map[string]interface{}{
shared.MirrorNameSearchAttribute: cfg.FlowJobName,
}

childNormalizeFlowOpts := workflow.ChildWorkflowOptions{
WorkflowID: normalizeFlowID,
ParentClosePolicy: enums.PARENT_CLOSE_POLICY_REQUEST_CANCEL,
RetryPolicy: &temporal.RetryPolicy{
MaximumAttempts: 20,
},
SearchAttributes: mirrorNameSearch,
}
normCtx := workflow.WithChildOptions(ctx, childNormalizeFlowOpts)
childNormalizeFlowFuture := workflow.ExecuteChildWorkflow(
normCtx,
NormalizeFlowWorkflow,
cfg,
)
var normExecution workflow.Execution
if err := childNormalizeFlowFuture.GetChildWorkflowExecution().Get(ctx, &normExecution); err != nil {
return state, fmt.Errorf("normalize workflow failed to start: %w", err)
}

for {
// check and act on signals before a fresh flow starts.
w.receiveAndHandleSignalAsync(ctx, state)
Expand All @@ -322,6 +351,7 @@ func CDCFlowWorkflowWithConfig(
}
// check if the peer flow has been shutdown
if state.ActiveSignal == shared.ShutdownSignal {
workflow.SignalExternalWorkflow(ctx, normExecution.ID, normExecution.RunID, "Sync", true)
w.logger.Info("peer flow has been shutdown")
return state, nil
}
Expand All @@ -344,12 +374,10 @@ func CDCFlowWorkflowWithConfig(

syncFlowID, err := GetChildWorkflowID(ctx, "sync-flow", cfg.FlowJobName)
if err != nil {
workflow.SignalExternalWorkflow(ctx, normExecution.ID, normExecution.RunID, "Sync", true)
return state, err
}

mirrorNameSearch := map[string]interface{}{
shared.MirrorNameSearchAttribute: cfg.FlowJobName,
}
// execute the sync flow as a child workflow
childSyncFlowOpts := workflow.ChildWorkflowOptions{
WorkflowID: syncFlowID,
Expand All @@ -359,11 +387,10 @@ func CDCFlowWorkflowWithConfig(
},
SearchAttributes: mirrorNameSearch,
}
ctx = workflow.WithChildOptions(ctx, childSyncFlowOpts)
ctx = workflow.WithValue(ctx, "flowName", cfg.FlowJobName)
syncCtx := workflow.WithChildOptions(ctx, childSyncFlowOpts)
syncFlowOptions.RelationMessageMapping = *state.RelationMessageMapping
childSyncFlowFuture := workflow.ExecuteChildWorkflow(
ctx,
syncCtx,
SyncFlowWorkflow,
cfg,
syncFlowOptions,
Expand All @@ -382,21 +409,8 @@ func CDCFlowWorkflowWithConfig(
}

w.logger.Info("Total records synced: ", totalRecordsSynced)
workflow.SignalExternalWorkflow(ctx, normExecution.ID, normExecution.RunID, "Sync", false)

normalizeFlowID, err := GetChildWorkflowID(ctx, "normalize-flow", cfg.FlowJobName)
if err != nil {
return state, err
}

childNormalizeFlowOpts := workflow.ChildWorkflowOptions{
WorkflowID: normalizeFlowID,
ParentClosePolicy: enums.PARENT_CLOSE_POLICY_REQUEST_CANCEL,
RetryPolicy: &temporal.RetryPolicy{
MaximumAttempts: 20,
},
SearchAttributes: mirrorNameSearch,
}
ctx = workflow.WithChildOptions(ctx, childNormalizeFlowOpts)
var tableSchemaDeltas []*protos.TableSchemaDelta = nil
if childSyncFlowRes != nil {
tableSchemaDeltas = childSyncFlowRes.TableSchemaDeltas
Expand All @@ -415,7 +429,6 @@ func CDCFlowWorkflowWithConfig(
getModifiedSchemaCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: 5 * time.Minute,
})
getModifiedSchemaCtx = workflow.WithValue(getModifiedSchemaCtx, "flowName", cfg.FlowJobName)
getModifiedSchemaFuture := workflow.ExecuteActivity(getModifiedSchemaCtx, flowable.GetTableSchema,
&protos.GetTableSchemaBatchInput{
PeerConnectionConfig: cfg.Source,
Expand All @@ -432,23 +445,19 @@ func CDCFlowWorkflowWithConfig(
}
}
}
ctx = workflow.WithValue(ctx, "flowName", cfg.FlowJobName)
childNormalizeFlowFuture := workflow.ExecuteChildWorkflow(
ctx,
NormalizeFlowWorkflow,
cfg,
)

var childNormalizeFlowRes *model.NormalizeResponse
if err := childNormalizeFlowFuture.Get(ctx, &childNormalizeFlowRes); err != nil {
w.logger.Error("failed to execute normalize flow: ", err)
state.NormalizeFlowErrors = append(state.NormalizeFlowErrors, err.Error())
} else {
state.NormalizeFlowStatuses = append(state.NormalizeFlowStatuses, childNormalizeFlowRes)
}
batchSizeSelector.Select(ctx)
}

workflow.SignalExternalWorkflow(ctx, normExecution.ID, normExecution.RunID, "Sync", true)
var childNormalizeFlowRes []model.NormalizeResponse
if err := childNormalizeFlowFuture.Get(ctx, &childNormalizeFlowRes); err != nil {
w.logger.Error("failed to execute normalize flow: ", err)
state.NormalizeFlowErrors = append(state.NormalizeFlowErrors, err.Error())
} else {
state.NormalizeFlowStatuses = append(state.NormalizeFlowStatuses, childNormalizeFlowRes...)
}

state.TruncateProgress(w.logger)
return nil, workflow.NewContinueAsNewError(ctx, CDCFlowWorkflowWithConfig, cfg, limits, state)
}
40 changes: 29 additions & 11 deletions flow/workflows/normalize_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func NewNormalizeFlowExecution(ctx workflow.Context, state *NormalizeFlowState)

func NormalizeFlowWorkflow(ctx workflow.Context,
config *protos.FlowConnectionConfigs,
) (*model.NormalizeResponse, error) {
) ([]model.NormalizeResponse, error) {
s := NewNormalizeFlowExecution(ctx, &NormalizeFlowState{
CDCFlowName: config.FlowJobName,
Progress: []string{},
Expand All @@ -43,24 +43,42 @@ func NormalizeFlowWorkflow(ctx workflow.Context,
func (s *NormalizeFlowExecution) executeNormalizeFlow(
ctx workflow.Context,
config *protos.FlowConnectionConfigs,
) (*model.NormalizeResponse, error) {
) ([]model.NormalizeResponse, error) {
s.logger.Info("executing normalize flow - ", s.CDCFlowName)

normalizeFlowCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: 7 * 24 * time.Hour,
HeartbeatTimeout: 5 * time.Minute,
})

// execute StartFlow on the peers to start the flow
startNormalizeInput := &protos.StartNormalizeInput{
FlowConnectionConfigs: config,
}
fStartNormalize := workflow.ExecuteActivity(normalizeFlowCtx, flowable.StartNormalize, startNormalizeInput)
result := make([]model.NormalizeResponse, 0)
syncChan := workflow.GetSignalChannel(normalizeFlowCtx, "Sync")

stopLoop := false
for stopLoop {
var stopLoopVal bool
var anyFalse bool
syncChan.Receive(normalizeFlowCtx, &stopLoopVal)
stopLoop = stopLoop || stopLoopVal
anyFalse = anyFalse || !stopLoopVal
for syncChan.ReceiveAsync(&stopLoopVal) {
stopLoop = stopLoop || stopLoopVal
anyFalse = anyFalse || !stopLoopVal
}

if anyFalse {
startNormalizeInput := &protos.StartNormalizeInput{
FlowConnectionConfigs: config,
}
fStartNormalize := workflow.ExecuteActivity(normalizeFlowCtx, flowable.StartNormalize, startNormalizeInput)

var normalizeResponse *model.NormalizeResponse
if err := fStartNormalize.Get(normalizeFlowCtx, &normalizeResponse); err != nil {
return nil, fmt.Errorf("failed to flow: %w", err)
var normalizeResponse *model.NormalizeResponse
if err := fStartNormalize.Get(normalizeFlowCtx, &normalizeResponse); err != nil {
return result, fmt.Errorf("failed to flow: %w", err)
}
result = append(result, *normalizeResponse)
}
}

return normalizeResponse, nil
return result, nil
}

0 comments on commit abf99dd

Please sign in to comment.