Skip to content

Commit

Permalink
Split normalize changes from split-cdc-sync (#1381)
Browse files Browse the repository at this point in the history
Results/Errors are sent via signal instead of buffering & sending at end
  • Loading branch information
serprex authored Feb 27, 2024
1 parent 702d050 commit 6623ae5
Show file tree
Hide file tree
Showing 5 changed files with 164 additions and 121 deletions.
14 changes: 5 additions & 9 deletions flow/e2e/postgres/peer_flow_pg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1168,7 +1168,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Dynamic_Mirror_Config_Via_Signals() {
s.t.Logf("Inserted %d rows into the source table", numRows)
}

getWorkFlowState := func() peerflow.CDCFlowWorkflowState {
getWorkflowState := func() peerflow.CDCFlowWorkflowState {
var workflowState peerflow.CDCFlowWorkflowState
val, err := env.QueryWorkflow(shared.CDCFlowStateQuery)
e2e.EnvNoError(s.t, env, err)
Expand Down Expand Up @@ -1243,7 +1243,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Dynamic_Mirror_Config_Via_Signals() {
return s.comparePGTables(srcTable1Name, dstTable1Name, "id,t") == nil
})

workflowState := getWorkFlowState()
workflowState := getWorkflowState()
assert.EqualValues(s.t, 7, workflowState.SyncFlowOptions.IdleTimeoutSeconds)
assert.EqualValues(s.t, 6, workflowState.SyncFlowOptions.BatchSize)
assert.Len(s.t, workflowState.SyncFlowOptions.TableMappings, 1)
Expand Down Expand Up @@ -1271,12 +1271,10 @@ func (s PeerFlowE2ETestSuitePG) Test_Dynamic_Mirror_Config_Via_Signals() {
e2e.EnvWaitFor(s.t, env, 1*time.Minute, "sent updates signal", func() bool {
return sentUpdate
})
}

// add rows to both tables before resuming - should handle
addRows(18)
// add rows to both tables before resuming - should handle
addRows(18)

if !s.t.Failed() {
e2e.EnvWaitFor(s.t, env, 1*time.Minute, "resumed workflow", func() bool {
return getFlowStatus() == protos.FlowStatus_STATUS_RUNNING
})
Expand All @@ -1286,10 +1284,8 @@ func (s PeerFlowE2ETestSuitePG) Test_Dynamic_Mirror_Config_Via_Signals() {
e2e.EnvWaitFor(s.t, env, 1*time.Minute, "initial load + normalize 18 records - second table", func() bool {
return s.comparePGTables(srcTable2Name, dstTable2Name, "id,t") == nil
})
}

if !s.t.Failed() {
workflowState = getWorkFlowState()
workflowState = getWorkflowState()
assert.EqualValues(s.t, 14, workflowState.SyncFlowOptions.IdleTimeoutSeconds)
assert.EqualValues(s.t, 12, workflowState.SyncFlowOptions.BatchSize)
assert.Len(s.t, workflowState.SyncFlowOptions.TableMappings, 2)
Expand Down
7 changes: 1 addition & 6 deletions flow/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ type SyncResponse struct {
CurrentSyncBatchID int64
// TableNameRowsMapping tells how many records need to be synced to each destination table.
TableNameRowsMapping map[string]uint32
// to be carried to parent WorkFlow
// to be carried to parent workflow
TableSchemaDeltas []*protos.TableSchemaDelta
// to be stored in state for future PullFlows
RelationMessageMapping RelationMessageMapping
Expand All @@ -197,11 +197,6 @@ type NormalizePayload struct {
TableNameSchemaMapping map[string]*protos.TableSchema
}

type NormalizeFlowResponse struct {
Results []NormalizeResponse
Errors []string
}

type NormalizeResponse struct {
// Flag to depict if normalization is done
Done bool
Expand Down
40 changes: 24 additions & 16 deletions flow/model/signals.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,22 +102,6 @@ const (
PauseSignal
)

var FlowSignal = TypedSignal[CDCFlowSignal]{
Name: "peer-flow-signal",
}

var CDCDynamicPropertiesSignal = TypedSignal[*protos.CDCFlowConfigUpdate]{
Name: "cdc-dynamic-properties",
}

var NormalizeSyncSignal = TypedSignal[NormalizePayload]{
Name: "normalize-sync",
}

var NormalizeSyncDoneSignal = TypedSignal[struct{}]{
Name: "normalize-sync-done",
}

func FlowSignalHandler(activeSignal CDCFlowSignal,
v CDCFlowSignal, logger log.Logger,
) CDCFlowSignal {
Expand All @@ -137,3 +121,27 @@ func FlowSignalHandler(activeSignal CDCFlowSignal,
}
return activeSignal
}

var FlowSignal = TypedSignal[CDCFlowSignal]{
Name: "peer-flow-signal",
}

var CDCDynamicPropertiesSignal = TypedSignal[*protos.CDCFlowConfigUpdate]{
Name: "cdc-dynamic-properties",
}

var NormalizeSignal = TypedSignal[NormalizePayload]{
Name: "normalize",
}

var NormalizeErrorSignal = TypedSignal[string]{
Name: "normalize-error",
}

var NormalizeResultSignal = TypedSignal[NormalizeResponse]{
Name: "normalize-result",
}

var NormalizeDoneSignal = TypedSignal[struct{}]{
Name: "normalize-done",
}
83 changes: 45 additions & 38 deletions flow/workflows/cdc_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ func CDCFlowWorkflow(
setupFlowFuture := workflow.ExecuteChildWorkflow(setupFlowCtx, SetupFlowWorkflow, cfg)
var setupFlowOutput *protos.SetupFlowOutput
if err := setupFlowFuture.Get(setupFlowCtx, &setupFlowOutput); err != nil {
return state, fmt.Errorf("failed to execute child workflow: %w", err)
return state, fmt.Errorf("failed to execute setup workflow: %w", err)
}
state.SyncFlowOptions.SrcTableIdNameMapping = setupFlowOutput.SrcTableIdNameMapping
state.SyncFlowOptions.TableNameSchemaMapping = setupFlowOutput.TableNameSchemaMapping
Expand Down Expand Up @@ -310,7 +310,7 @@ func CDCFlowWorkflow(
snapshotFlowFuture := workflow.ExecuteChildWorkflow(snapshotFlowCtx, SnapshotFlowWorkflow, cfg)
if err := snapshotFlowFuture.Get(snapshotFlowCtx, nil); err != nil {
w.logger.Error("snapshot flow failed", slog.Any("error", err))
return state, fmt.Errorf("failed to execute child workflow: %w", err)
return state, fmt.Errorf("failed to execute snapshot workflow: %w", err)
}

if cfg.Resync {
Expand Down Expand Up @@ -383,8 +383,14 @@ func CDCFlowWorkflow(
currentSyncFlowNum := 0
totalRecordsSynced := int64(0)

var canceled bool
mainLoopSelector := workflow.NewNamedSelector(ctx, "MainLoop")
mainLoopSelector.AddReceive(ctx.Done(), func(_ workflow.ReceiveChannel, _ bool) {
canceled = true
})

normalizeFlowID := GetChildWorkflowID("normalize-flow", cfg.FlowJobName, originalRunID)
childNormalizeFlowOpts := workflow.ChildWorkflowOptions{
normalizeFlowOpts := workflow.ChildWorkflowOptions{
WorkflowID: normalizeFlowID,
ParentClosePolicy: enums.PARENT_CLOSE_POLICY_REQUEST_CANCEL,
RetryPolicy: &temporal.RetryPolicy{
Expand All @@ -393,57 +399,60 @@ func CDCFlowWorkflow(
SearchAttributes: mirrorNameSearch,
WaitForCancellation: true,
}
normCtx := workflow.WithChildOptions(ctx, childNormalizeFlowOpts)

childNormalizeFlowFuture := workflow.ExecuteChildWorkflow(
normCtx,
NormalizeFlowWorkflow,
cfg,
)
normCtx := workflow.WithChildOptions(ctx, normalizeFlowOpts)
normalizeFlowFuture := workflow.ExecuteChildWorkflow(normCtx, NormalizeFlowWorkflow, cfg, nil)

var normWaitChan model.TypedReceiveChannel[struct{}]
var waitSelector workflow.Selector
parallel := GetSideEffect(ctx, func(_ workflow.Context) bool {
return peerdbenv.PeerDBEnableParallelSyncNormalize()
})
if !parallel {
normWaitChan = model.NormalizeSyncDoneSignal.GetSignalChannel(ctx)
waitSelector = workflow.NewNamedSelector(ctx, "NormalizeWait")
waitSelector.AddReceive(ctx.Done(), func(_ workflow.ReceiveChannel, _ bool) {
canceled = true
})
waitChan := model.NormalizeDoneSignal.GetSignalChannel(ctx)
waitChan.AddToSelector(waitSelector, func(_ struct{}, _ bool) {})
}

finishNormalize := func() {
model.NormalizeSyncSignal.SignalChildWorkflow(ctx, childNormalizeFlowFuture, model.NormalizePayload{
model.NormalizeSignal.SignalChildWorkflow(ctx, normalizeFlowFuture, model.NormalizePayload{
Done: true,
SyncBatchID: -1,
})
var childNormalizeFlowRes *model.NormalizeFlowResponse
if err := childNormalizeFlowFuture.Get(ctx, &childNormalizeFlowRes); err != nil {
w.logger.Error("failed to execute normalize flow: ", err)
if err := normalizeFlowFuture.Get(ctx, nil); err != nil {
w.logger.Error("failed to execute normalize flow", slog.Any("error", err))
var panicErr *temporal.PanicError
if errors.As(err, &panicErr) {
w.logger.Error("PANIC", panicErr.Error(), panicErr.StackTrace())
}
state.NormalizeFlowErrors = append(state.NormalizeFlowErrors, err.Error())
} else {
state.NormalizeFlowErrors = append(state.NormalizeFlowErrors, childNormalizeFlowRes.Errors...)
state.NormalizeFlowStatuses = append(state.NormalizeFlowStatuses, childNormalizeFlowRes.Results...)
}
}

var canceled bool
flowSignalChan := model.FlowSignal.GetSignalChannel(ctx)
mainLoopSelector := workflow.NewNamedSelector(ctx, "Main Loop")
mainLoopSelector.AddReceive(ctx.Done(), func(_ workflow.ReceiveChannel, _ bool) {
canceled = true
})
mainLoopSelector.AddFuture(fMaintain, func(f workflow.Future) {
err := f.Get(ctx, nil)
if err != nil {
w.logger.Error("MaintainPull failed", slog.Any("error", err))
canceled = true
}
})

flowSignalChan := model.FlowSignal.GetSignalChannel(ctx)
flowSignalChan.AddToSelector(mainLoopSelector, func(val model.CDCFlowSignal, _ bool) {
state.ActiveSignal = model.FlowSignalHandler(state.ActiveSignal, val, w.logger)
})

normErrorChan := model.NormalizeErrorSignal.GetSignalChannel(ctx)
normErrorChan.AddToSelector(mainLoopSelector, func(err string, _ bool) {
state.NormalizeFlowErrors = append(state.NormalizeFlowErrors, err)
})

normResultChan := model.NormalizeResultSignal.GetSignalChannel(ctx)
normResultChan.AddToSelector(mainLoopSelector, func(result model.NormalizeResponse, _ bool) {
state.NormalizeFlowStatuses = append(state.NormalizeFlowStatuses, result)
})

// add a signal to change CDC properties
cdcPropertiesSignalChan := model.CDCDynamicPropertiesSignal.GetSignalChannel(ctx)
cdcPropertiesSignalChan.AddToSelector(mainLoopSelector, func(cdcConfigUpdate *protos.CDCFlowConfigUpdate, more bool) {
Expand Down Expand Up @@ -515,8 +524,7 @@ func CDCFlowWorkflow(
syncFlowFuture := workflow.ExecuteActivity(syncFlowCtx, flowable.SyncFlow, cfg, state.SyncFlowOptions, sessionInfo.SessionID)

var syncDone, syncErr bool
var normalizeSignalError error
normDone := normWaitChan.Chan == nil
mustWait := waitSelector != nil
mainLoopSelector.AddFuture(syncFlowFuture, func(f workflow.Future) {
syncDone = true

Expand All @@ -525,6 +533,7 @@ func CDCFlowWorkflow(
w.logger.Error("failed to execute sync flow", slog.Any("error", err))
state.SyncFlowErrors = append(state.SyncFlowErrors, err.Error())
syncErr = true
mustWait = false
} else if childSyncFlowRes != nil {
state.SyncFlowStatuses = append(state.SyncFlowStatuses, childSyncFlowRes)
state.SyncFlowOptions.RelationMessageMapping = childSyncFlowRes.RelationMessageMapping
Expand Down Expand Up @@ -565,14 +574,17 @@ func CDCFlowWorkflow(
}
}

signalFuture := model.NormalizeSyncSignal.SignalChildWorkflow(ctx, childNormalizeFlowFuture, model.NormalizePayload{
err := model.NormalizeSignal.SignalChildWorkflow(ctx, normalizeFlowFuture, model.NormalizePayload{
Done: false,
SyncBatchID: childSyncFlowRes.CurrentSyncBatchID,
TableNameSchemaMapping: state.SyncFlowOptions.TableNameSchemaMapping,
})
normalizeSignalError = signalFuture.Get(ctx, nil)
}).Get(ctx, nil)
if err != nil {
w.logger.Error("failed to trigger normalize, so skip wait", slog.Any("error", err))
mustWait = false
}
} else {
normDone = true
mustWait = false
}
})

Expand All @@ -586,13 +598,8 @@ func CDCFlowWorkflow(
state.TruncateProgress(w.logger)
return state, workflow.NewContinueAsNewError(ctx, CDCFlowWorkflow, cfg, state)
}
if normalizeSignalError != nil {
state.NormalizeFlowErrors = append(state.NormalizeFlowErrors, normalizeSignalError.Error())
state.TruncateProgress(w.logger)
return state, workflow.NewContinueAsNewError(ctx, CDCFlowWorkflow, cfg, state)
}
if !normDone {
normWaitChan.Receive(ctx)
if mustWait {
waitSelector.Select(ctx)
}
}

Expand Down
Loading

0 comments on commit 6623ae5

Please sign in to comment.