Skip to content

Commit

Permalink
patch tables
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed Feb 12, 2024
1 parent ec4a5f2 commit 95d1901
Showing 1 changed file with 41 additions and 52 deletions.
93 changes: 41 additions & 52 deletions flow/workflows/cdc_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ type CDCFlowWorkflowState struct {
TableNameSchemaMapping map[string]*protos.TableSchema
// flow config update request, set to nil after processed
FlowConfigUpdates []*protos.CDCFlowConfigUpdate
// options passed to all SyncFlows
SyncFlowOptions *protos.SyncFlowOptions
// options passed to all NormalizeFlows
NormalizeFlowOptions *protos.NormalizeFlowOptions
}

// returns a new empty PeerFlowState
Expand All @@ -75,6 +79,8 @@ func NewCDCFlowWorkflowState(numTables int) *CDCFlowWorkflowState {
SrcTableIdNameMapping: nil,
TableNameSchemaMapping: nil,
FlowConfigUpdates: nil,
SyncFlowOptions: nil,
NormalizeFlowOptions: nil,
}
}

Expand Down Expand Up @@ -356,44 +362,17 @@ func CDCFlowWorkflowWithConfig(
}
}

syncFlowOptions := &protos.SyncFlowOptions{
BatchSize: limits.MaxBatchSize,
IdleTimeoutSeconds: 0,
state.SyncFlowOptions = &protos.SyncFlowOptions{
BatchSize: limits.MaxBatchSize,
// this means the env variable assignment path is never hit
IdleTimeoutSeconds: cfg.IdleTimeoutSeconds,
SrcTableIdNameMapping: state.SrcTableIdNameMapping,
TableNameSchemaMapping: state.TableNameSchemaMapping,
}
normalizeFlowOptions := &protos.NormalizeFlowOptions{
state.NormalizeFlowOptions = &protos.NormalizeFlowOptions{
TableNameSchemaMapping: state.TableNameSchemaMapping,
}

// add a signal to change CDC properties
cdcPropertiesSignalChannel := workflow.GetSignalChannel(ctx, shared.CDCDynamicPropertiesSignalName)
cdcPropertiesSelector := workflow.NewSelector(ctx)
cdcPropertiesSelector.AddReceive(cdcPropertiesSignalChannel, func(c workflow.ReceiveChannel, more bool) {
var cdcConfigUpdate *protos.CDCFlowConfigUpdate
c.Receive(ctx, &cdcConfigUpdate)
// only modify for options since SyncFlow uses it
if cdcConfigUpdate.BatchSize > 0 {
syncFlowOptions.BatchSize = cdcConfigUpdate.BatchSize
}
if cdcConfigUpdate.IdleTimeout > 0 {
syncFlowOptions.IdleTimeoutSeconds = cdcConfigUpdate.IdleTimeout
}
if len(cdcConfigUpdate.AdditionalTables) > 0 {
state.FlowConfigUpdates = append(state.FlowConfigUpdates, cdcConfigUpdate)
}

slog.Info("CDC Signal received. Parameters on signal reception:",
slog.Int("BatchSize", int(syncFlowOptions.BatchSize)),
slog.Int("IdleTimeout", int(syncFlowOptions.IdleTimeoutSeconds)),
slog.Any("AdditionalTables", cdcConfigUpdate.AdditionalTables))
})

cdcPropertiesSelector.AddDefault(func() {
w.logger.Info("no batch size signal received, batch size remains: ",
syncFlowOptions.BatchSize)
})

currentSyncFlowNum := 0
totalRecordsSynced := int64(0)

Expand All @@ -416,7 +395,7 @@ func CDCFlowWorkflowWithConfig(
normCtx,
NormalizeFlowWorkflow,
cfg,
normalizeFlowOptions,
state.NormalizeFlowOptions,
)

var normWaitChan workflow.ReceiveChannel
Expand Down Expand Up @@ -454,6 +433,27 @@ func CDCFlowWorkflowWithConfig(
c.ReceiveAsync(&signalVal)
state.ActiveSignal = shared.FlowSignalHandler(state.ActiveSignal, signalVal, w.logger)
})
// add a signal to change CDC properties
cdcPropertiesSignalChan := workflow.GetSignalChannel(ctx, shared.CDCDynamicPropertiesSignalName)
mainLoopSelector.AddReceive(cdcPropertiesSignalChan, func(c workflow.ReceiveChannel, more bool) {
var cdcConfigUpdate *protos.CDCFlowConfigUpdate
c.Receive(ctx, &cdcConfigUpdate)
// only modify for options since SyncFlow uses it
if cdcConfigUpdate.BatchSize > 0 {
state.SyncFlowOptions.BatchSize = cdcConfigUpdate.BatchSize
}
if cdcConfigUpdate.IdleTimeout > 0 {
state.SyncFlowOptions.IdleTimeoutSeconds = cdcConfigUpdate.IdleTimeout
}
if len(cdcConfigUpdate.AdditionalTables) > 0 {
state.FlowConfigUpdates = append(state.FlowConfigUpdates, cdcConfigUpdate)
}

slog.Info("CDC Signal received. Parameters on signal reception:",
slog.Int("BatchSize", int(state.SyncFlowOptions.BatchSize)),
slog.Int("IdleTimeout", int(state.SyncFlowOptions.IdleTimeoutSeconds)),
slog.Any("AdditionalTables", cdcConfigUpdate.AdditionalTables))
})

for {
for !canceled && mainLoopSelector.HasPending() {
Expand All @@ -466,31 +466,22 @@ func CDCFlowWorkflowWithConfig(
if state.ActiveSignal == shared.PauseSignal {
startTime := time.Now()
state.CurrentFlowStatus = protos.FlowStatus_STATUS_PAUSED
signalChan := workflow.GetSignalChannel(ctx, shared.FlowSignalName)
var signalVal shared.CDCFlowSignal

for state.ActiveSignal == shared.PauseSignal {
w.logger.Info("mirror has been paused for ", time.Since(startTime))
// only place we block on receive, so signal processing is immediate
ok, _ := signalChan.ReceiveWithTimeout(ctx, 1*time.Minute, &signalVal)
if ok {
state.ActiveSignal = shared.FlowSignalHandler(state.ActiveSignal, signalVal, w.logger)
// only process config updates when going from STATUS_PAUSED to STATUS_RUNNING
if state.ActiveSignal == shared.NoopSignal {
err = w.processCDCFlowConfigUpdates(ctx, cfg, state, limits, mirrorNameSearch)
if err != nil {
return state, err
}
mainLoopSelector.Select(ctx)
if state.ActiveSignal == shared.NoopSignal {
err = w.processCDCFlowConfigUpdates(ctx, cfg, state, limits, mirrorNameSearch)
if err != nil {
return state, err
}
} else if err := ctx.Err(); err != nil {
return nil, err
}
}

w.logger.Info("mirror has been resumed after ", time.Since(startTime))
}

cdcPropertiesSelector.Select(ctx)
state.CurrentFlowStatus = protos.FlowStatus_STATUS_RUNNING

// check if total sync flows have been completed
Expand Down Expand Up @@ -526,12 +517,12 @@ func CDCFlowWorkflowWithConfig(
WaitForCancellation: true,
}
syncCtx := workflow.WithChildOptions(ctx, childSyncFlowOpts)
syncFlowOptions.RelationMessageMapping = state.RelationMessageMapping
state.SyncFlowOptions.RelationMessageMapping = state.RelationMessageMapping
childSyncFlowFuture := workflow.ExecuteChildWorkflow(
syncCtx,
SyncFlowWorkflow,
cfg,
syncFlowOptions,
state.SyncFlowOptions,
)

var syncDone bool
Expand All @@ -554,7 +545,6 @@ func CDCFlowWorkflowWithConfig(
if childSyncFlowRes != nil {
tableSchemaDeltasCount := len(childSyncFlowRes.TableSchemaDeltas)

var normalizeTableNameSchemaMapping map[string]*protos.TableSchema
// slightly hacky: table schema mapping is cached, so we need to manually update it if schema changes.
if tableSchemaDeltasCount != 0 {
modifiedSrcTables := make([]string, 0, tableSchemaDeltasCount)
Expand Down Expand Up @@ -583,14 +573,13 @@ func CDCFlowWorkflowWithConfig(
dstTable := modifiedDstTables[i]
state.TableNameSchemaMapping[dstTable] = getModifiedSchemaRes.TableNameSchemaMapping[srcTable]
}
normalizeTableNameSchemaMapping = state.TableNameSchemaMapping
}
}

signalFuture := childNormalizeFlowFuture.SignalChildWorkflow(ctx, shared.NormalizeSyncSignalName, model.NormalizeSignal{
Done: false,
SyncBatchID: childSyncFlowRes.CurrentSyncBatchID,
TableNameSchemaMapping: normalizeTableNameSchemaMapping,
TableNameSchemaMapping: state.TableNameSchemaMapping,
})
normalizeSignalError = signalFuture.Get(ctx, nil)
} else {
Expand Down

0 comments on commit 95d1901

Please sign in to comment.