From 95d19011bd90bd70e0484aed7dff4f03650ad7d0 Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Mon, 12 Feb 2024 12:13:40 +0530 Subject: [PATCH] patch tables --- flow/workflows/cdc_flow.go | 93 +++++++++++++++++--------------------- 1 file changed, 41 insertions(+), 52 deletions(-) diff --git a/flow/workflows/cdc_flow.go b/flow/workflows/cdc_flow.go index d0f825dfaf..60a085f33b 100644 --- a/flow/workflows/cdc_flow.go +++ b/flow/workflows/cdc_flow.go @@ -53,6 +53,10 @@ type CDCFlowWorkflowState struct { TableNameSchemaMapping map[string]*protos.TableSchema // flow config update request, set to nil after processed FlowConfigUpdates []*protos.CDCFlowConfigUpdate + // options passed to all SyncFlows + SyncFlowOptions *protos.SyncFlowOptions + // options passed to all NormalizeFlows + NormalizeFlowOptions *protos.NormalizeFlowOptions } // returns a new empty PeerFlowState @@ -75,6 +79,8 @@ func NewCDCFlowWorkflowState(numTables int) *CDCFlowWorkflowState { SrcTableIdNameMapping: nil, TableNameSchemaMapping: nil, FlowConfigUpdates: nil, + SyncFlowOptions: nil, + NormalizeFlowOptions: nil, } } @@ -356,44 +362,17 @@ func CDCFlowWorkflowWithConfig( } } - syncFlowOptions := &protos.SyncFlowOptions{ - BatchSize: limits.MaxBatchSize, - IdleTimeoutSeconds: 0, + state.SyncFlowOptions = &protos.SyncFlowOptions{ + BatchSize: limits.MaxBatchSize, + // this means the env variable assignment path is never hit + IdleTimeoutSeconds: cfg.IdleTimeoutSeconds, SrcTableIdNameMapping: state.SrcTableIdNameMapping, TableNameSchemaMapping: state.TableNameSchemaMapping, } - normalizeFlowOptions := &protos.NormalizeFlowOptions{ + state.NormalizeFlowOptions = &protos.NormalizeFlowOptions{ TableNameSchemaMapping: state.TableNameSchemaMapping, } - // add a signal to change CDC properties - cdcPropertiesSignalChannel := workflow.GetSignalChannel(ctx, shared.CDCDynamicPropertiesSignalName) - cdcPropertiesSelector := workflow.NewSelector(ctx) - cdcPropertiesSelector.AddReceive(cdcPropertiesSignalChannel, func(c workflow.ReceiveChannel, more bool) { - var cdcConfigUpdate *protos.CDCFlowConfigUpdate - c.Receive(ctx, &cdcConfigUpdate) - // only modify for options since SyncFlow uses it - if cdcConfigUpdate.BatchSize > 0 { - syncFlowOptions.BatchSize = cdcConfigUpdate.BatchSize - } - if cdcConfigUpdate.IdleTimeout > 0 { - syncFlowOptions.IdleTimeoutSeconds = cdcConfigUpdate.IdleTimeout - } - if len(cdcConfigUpdate.AdditionalTables) > 0 { - state.FlowConfigUpdates = append(state.FlowConfigUpdates, cdcConfigUpdate) - } - - slog.Info("CDC Signal received. Parameters on signal reception:", - slog.Int("BatchSize", int(syncFlowOptions.BatchSize)), - slog.Int("IdleTimeout", int(syncFlowOptions.IdleTimeoutSeconds)), - slog.Any("AdditionalTables", cdcConfigUpdate.AdditionalTables)) - }) - - cdcPropertiesSelector.AddDefault(func() { - w.logger.Info("no batch size signal received, batch size remains: ", - syncFlowOptions.BatchSize) - }) - currentSyncFlowNum := 0 totalRecordsSynced := int64(0) @@ -416,7 +395,7 @@ func CDCFlowWorkflowWithConfig( normCtx, NormalizeFlowWorkflow, cfg, - normalizeFlowOptions, + state.NormalizeFlowOptions, ) var normWaitChan workflow.ReceiveChannel @@ -454,6 +433,27 @@ func CDCFlowWorkflowWithConfig( c.ReceiveAsync(&signalVal) state.ActiveSignal = shared.FlowSignalHandler(state.ActiveSignal, signalVal, w.logger) }) + // add a signal to change CDC properties + cdcPropertiesSignalChan := workflow.GetSignalChannel(ctx, shared.CDCDynamicPropertiesSignalName) + mainLoopSelector.AddReceive(cdcPropertiesSignalChan, func(c workflow.ReceiveChannel, more bool) { + var cdcConfigUpdate *protos.CDCFlowConfigUpdate + c.Receive(ctx, &cdcConfigUpdate) + // only modify for options since SyncFlow uses it + if cdcConfigUpdate.BatchSize > 0 { + state.SyncFlowOptions.BatchSize = cdcConfigUpdate.BatchSize + } + if cdcConfigUpdate.IdleTimeout > 0 { + state.SyncFlowOptions.IdleTimeoutSeconds = cdcConfigUpdate.IdleTimeout + } + if len(cdcConfigUpdate.AdditionalTables) > 0 { + state.FlowConfigUpdates = append(state.FlowConfigUpdates, cdcConfigUpdate) + } + + slog.Info("CDC Signal received. Parameters on signal reception:", + slog.Int("BatchSize", int(state.SyncFlowOptions.BatchSize)), + slog.Int("IdleTimeout", int(state.SyncFlowOptions.IdleTimeoutSeconds)), + slog.Any("AdditionalTables", cdcConfigUpdate.AdditionalTables)) + }) for { for !canceled && mainLoopSelector.HasPending() { @@ -466,31 +466,22 @@ func CDCFlowWorkflowWithConfig( if state.ActiveSignal == shared.PauseSignal { startTime := time.Now() state.CurrentFlowStatus = protos.FlowStatus_STATUS_PAUSED - signalChan := workflow.GetSignalChannel(ctx, shared.FlowSignalName) - var signalVal shared.CDCFlowSignal for state.ActiveSignal == shared.PauseSignal { w.logger.Info("mirror has been paused for ", time.Since(startTime)) // only place we block on receive, so signal processing is immediate - ok, _ := signalChan.ReceiveWithTimeout(ctx, 1*time.Minute, &signalVal) - if ok { - state.ActiveSignal = shared.FlowSignalHandler(state.ActiveSignal, signalVal, w.logger) - // only process config updates when going from STATUS_PAUSED to STATUS_RUNNING - if state.ActiveSignal == shared.NoopSignal { - err = w.processCDCFlowConfigUpdates(ctx, cfg, state, limits, mirrorNameSearch) - if err != nil { - return state, err - } + mainLoopSelector.Select(ctx) + if state.ActiveSignal == shared.NoopSignal { + err = w.processCDCFlowConfigUpdates(ctx, cfg, state, limits, mirrorNameSearch) + if err != nil { + return state, err } - } else if err := ctx.Err(); err != nil { - return nil, err } } w.logger.Info("mirror has been resumed after ", time.Since(startTime)) } - cdcPropertiesSelector.Select(ctx) state.CurrentFlowStatus = protos.FlowStatus_STATUS_RUNNING // check if total sync flows have been completed @@ -526,12 +517,12 @@ func CDCFlowWorkflowWithConfig( WaitForCancellation: true, } syncCtx := workflow.WithChildOptions(ctx, childSyncFlowOpts) - syncFlowOptions.RelationMessageMapping = state.RelationMessageMapping + state.SyncFlowOptions.RelationMessageMapping = state.RelationMessageMapping childSyncFlowFuture := workflow.ExecuteChildWorkflow( syncCtx, SyncFlowWorkflow, cfg, - syncFlowOptions, + state.SyncFlowOptions, ) var syncDone bool @@ -554,7 +545,6 @@ func CDCFlowWorkflowWithConfig( if childSyncFlowRes != nil { tableSchemaDeltasCount := len(childSyncFlowRes.TableSchemaDeltas) - var normalizeTableNameSchemaMapping map[string]*protos.TableSchema // slightly hacky: table schema mapping is cached, so we need to manually update it if schema changes. if tableSchemaDeltasCount != 0 { modifiedSrcTables := make([]string, 0, tableSchemaDeltasCount) @@ -583,14 +573,13 @@ func CDCFlowWorkflowWithConfig( dstTable := modifiedDstTables[i] state.TableNameSchemaMapping[dstTable] = getModifiedSchemaRes.TableNameSchemaMapping[srcTable] } - normalizeTableNameSchemaMapping = state.TableNameSchemaMapping } } signalFuture := childNormalizeFlowFuture.SignalChildWorkflow(ctx, shared.NormalizeSyncSignalName, model.NormalizeSignal{ Done: false, SyncBatchID: childSyncFlowRes.CurrentSyncBatchID, - TableNameSchemaMapping: normalizeTableNameSchemaMapping, + TableNameSchemaMapping: state.TableNameSchemaMapping, }) normalizeSignalError = signalFuture.Get(ctx, nil) } else {