Skip to content

Commit

Permalink
cleanup context management
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Dec 24, 2023
1 parent dcb1628 commit 37bc387
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 33 deletions.
57 changes: 25 additions & 32 deletions flow/workflows/cdc_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ func CDCFlowWorkflowWithConfig(
return nil, fmt.Errorf("invalid connection configs")
}

ctx = workflow.WithValue(ctx, "flowName", cfg.FlowJobName)
w := NewCDCFlowWorkflowExecution(ctx)

if limits.TotalSyncFlows == 0 {
Expand All @@ -170,6 +171,10 @@ func CDCFlowWorkflowWithConfig(
return state, fmt.Errorf("failed to set `%s` query handler: %w", CDCFlowStatusQuery, err)
}

mirrorNameSearch := map[string]interface{}{
shared.MirrorNameSearchAttribute: cfg.FlowJobName,
}

// we cannot skip SetupFlow if SnapshotFlow did not complete in cases where Resync is enabled
// because Resync modifies TableMappings before Setup and also before Snapshot
// for safety, rely on the idempotency of SetupFlow instead
Expand All @@ -185,10 +190,6 @@ func CDCFlowWorkflowWithConfig(
}
}

mirrorNameSearch := map[string]interface{}{
shared.MirrorNameSearchAttribute: cfg.FlowJobName,
}

// start the SetupFlow workflow as a child workflow, and wait for it to complete
// it should return the table schema for the source peer
setupFlowID, err := GetChildWorkflowID(ctx, "setup-flow", cfg.FlowJobName)
Expand All @@ -204,7 +205,6 @@ func CDCFlowWorkflowWithConfig(
SearchAttributes: mirrorNameSearch,
}
setupFlowCtx := workflow.WithChildOptions(ctx, childSetupFlowOpts)
setupFlowCtx = workflow.WithValue(setupFlowCtx, "flowName", cfg.FlowJobName)
setupFlowFuture := workflow.ExecuteChildWorkflow(setupFlowCtx, SetupFlowWorkflow, cfg)
if err := setupFlowFuture.Get(setupFlowCtx, &cfg); err != nil {
return state, fmt.Errorf("failed to execute child workflow: %w", err)
Expand Down Expand Up @@ -232,7 +232,6 @@ func CDCFlowWorkflowWithConfig(
SearchAttributes: mirrorNameSearch,
}
snapshotFlowCtx := workflow.WithChildOptions(ctx, childSnapshotFlowOpts)
snapshotFlowCtx = workflow.WithValue(snapshotFlowCtx, "flowName", cfg.FlowJobName)
snapshotFlowFuture := workflow.ExecuteChildWorkflow(snapshotFlowCtx, SnapshotFlowWorkflow, cfg)
if err := snapshotFlowFuture.Get(snapshotFlowCtx, nil); err != nil {
return state, fmt.Errorf("failed to execute child workflow: %w", err)
Expand Down Expand Up @@ -266,7 +265,6 @@ func CDCFlowWorkflowWithConfig(
StartToCloseTimeout: 12 * time.Hour,
HeartbeatTimeout: 1 * time.Hour,
})
renameTablesCtx = workflow.WithValue(renameTablesCtx, "flowName", cfg.FlowJobName)
renameTablesFuture := workflow.ExecuteActivity(renameTablesCtx, flowable.RenameTables, renameOpts)
if err := renameTablesFuture.Get(renameTablesCtx, nil); err != nil {
return state, fmt.Errorf("failed to execute rename tables activity: %w", err)
Expand Down Expand Up @@ -347,9 +345,6 @@ func CDCFlowWorkflowWithConfig(
return state, err
}

mirrorNameSearch := map[string]interface{}{
shared.MirrorNameSearchAttribute: cfg.FlowJobName,
}
// execute the sync flow as a child workflow
childSyncFlowOpts := workflow.ChildWorkflowOptions{
WorkflowID: syncFlowID,
Expand All @@ -359,18 +354,17 @@ func CDCFlowWorkflowWithConfig(
},
SearchAttributes: mirrorNameSearch,
}
ctx = workflow.WithChildOptions(ctx, childSyncFlowOpts)
ctx = workflow.WithValue(ctx, "flowName", cfg.FlowJobName)
syncCtx := workflow.WithChildOptions(ctx, childSyncFlowOpts)
syncFlowOptions.RelationMessageMapping = *state.RelationMessageMapping
childSyncFlowFuture := workflow.ExecuteChildWorkflow(
ctx,
syncCtx,
SyncFlowWorkflow,
cfg,
syncFlowOptions,
)

var childSyncFlowRes *model.SyncResponse
if err := childSyncFlowFuture.Get(ctx, &childSyncFlowRes); err != nil {
if err := childSyncFlowFuture.Get(syncCtx, &childSyncFlowRes); err != nil {
w.logger.Error("failed to execute sync flow: ", err)
state.SyncFlowErrors = append(state.SyncFlowErrors, err.Error())
} else {
Expand All @@ -383,20 +377,6 @@ func CDCFlowWorkflowWithConfig(

w.logger.Info("Total records synced: ", totalRecordsSynced)

normalizeFlowID, err := GetChildWorkflowID(ctx, "normalize-flow", cfg.FlowJobName)
if err != nil {
return state, err
}

childNormalizeFlowOpts := workflow.ChildWorkflowOptions{
WorkflowID: normalizeFlowID,
ParentClosePolicy: enums.PARENT_CLOSE_POLICY_REQUEST_CANCEL,
RetryPolicy: &temporal.RetryPolicy{
MaximumAttempts: 20,
},
SearchAttributes: mirrorNameSearch,
}
ctx = workflow.WithChildOptions(ctx, childNormalizeFlowOpts)
var tableSchemaDeltas []*protos.TableSchemaDelta = nil
if childSyncFlowRes != nil {
tableSchemaDeltas = childSyncFlowRes.TableSchemaDeltas
Expand All @@ -415,7 +395,6 @@ func CDCFlowWorkflowWithConfig(
getModifiedSchemaCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: 5 * time.Minute,
})
getModifiedSchemaCtx = workflow.WithValue(getModifiedSchemaCtx, "flowName", cfg.FlowJobName)
getModifiedSchemaFuture := workflow.ExecuteActivity(getModifiedSchemaCtx, flowable.GetTableSchema,
&protos.GetTableSchemaBatchInput{
PeerConnectionConfig: cfg.Source,
Expand All @@ -432,15 +411,29 @@ func CDCFlowWorkflowWithConfig(
}
}
}
ctx = workflow.WithValue(ctx, "flowName", cfg.FlowJobName)

normalizeFlowID, err := GetChildWorkflowID(ctx, "normalize-flow", cfg.FlowJobName)
if err != nil {
return state, err
}

childNormalizeFlowOpts := workflow.ChildWorkflowOptions{
WorkflowID: normalizeFlowID,
ParentClosePolicy: enums.PARENT_CLOSE_POLICY_REQUEST_CANCEL,
RetryPolicy: &temporal.RetryPolicy{
MaximumAttempts: 20,
},
SearchAttributes: mirrorNameSearch,
}
normCtx := workflow.WithChildOptions(ctx, childNormalizeFlowOpts)
childNormalizeFlowFuture := workflow.ExecuteChildWorkflow(
ctx,
normCtx,
NormalizeFlowWorkflow,
cfg,
)

var childNormalizeFlowRes *model.NormalizeResponse
if err := childNormalizeFlowFuture.Get(ctx, &childNormalizeFlowRes); err != nil {
if err := childNormalizeFlowFuture.Get(normCtx, &childNormalizeFlowRes); err != nil {
w.logger.Error("failed to execute normalize flow: ", err)
state.NormalizeFlowErrors = append(state.NormalizeFlowErrors, err.Error())
} else {
Expand Down
1 change: 0 additions & 1 deletion flow/workflows/normalize_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ func (s *NormalizeFlowExecution) executeNormalizeFlow(
HeartbeatTimeout: 5 * time.Minute,
})

// execute StartFlow on the peers to start the flow
startNormalizeInput := &protos.StartNormalizeInput{
FlowConnectionConfigs: config,
}
Expand Down

0 comments on commit 37bc387

Please sign in to comment.