diff --git a/flow/workflows/cdc_flow.go b/flow/workflows/cdc_flow.go index e4f4559b0a..f2a5a9cddd 100644 --- a/flow/workflows/cdc_flow.go +++ b/flow/workflows/cdc_flow.go @@ -53,11 +53,6 @@ type CDCFlowWorkflowState struct { FlowConfigUpdates []*protos.CDCFlowConfigUpdate } -type SignalProps struct { - BatchSize uint32 - IdleTimeout uint64 -} - // returns a new empty PeerFlowState func NewCDCFlowWorkflowState(numTables int) *CDCFlowWorkflowState { return &CDCFlowWorkflowState{ @@ -168,8 +163,6 @@ func (w *CDCFlowWorkflowExecution) processCDCFlowConfigUpdates(ctx workflow.Cont additionalTablesWorkflowCfg.DoInitialSnapshot = true additionalTablesWorkflowCfg.InitialSnapshotOnly = true additionalTablesWorkflowCfg.TableMappings = flowConfigUpdate.AdditionalTables - additionalTablesWorkflowCfg.FlowJobName = fmt.Sprintf("%s_additional_tables_%s", cfg.FlowJobName, - strings.ToLower(shared.RandomString(8))) childAdditionalTablesCDCFlowID, err := GetChildWorkflowID(ctx, "cdc-flow", additionalTablesWorkflowCfg.FlowJobName) @@ -375,18 +368,23 @@ func CDCFlowWorkflowWithConfig( cdcPropertiesSignalChannel := workflow.GetSignalChannel(ctx, shared.CDCDynamicPropertiesSignalName) cdcPropertiesSelector := workflow.NewSelector(ctx) cdcPropertiesSelector.AddReceive(cdcPropertiesSignalChannel, func(c workflow.ReceiveChannel, more bool) { - var cdcSignal SignalProps - c.Receive(ctx, &cdcSignal) + var cdcConfigUpdate *protos.CDCFlowConfigUpdate + c.Receive(ctx, &cdcConfigUpdate) // only modify for options since SyncFlow uses it - if cdcSignal.BatchSize > 0 { - syncFlowOptions.BatchSize = cdcSignal.BatchSize + if cdcConfigUpdate.BatchSize > 0 { + syncFlowOptions.BatchSize = cdcConfigUpdate.BatchSize + } + if cdcConfigUpdate.IdleTimeout > 0 { + syncFlowOptions.IdleTimeoutSeconds = cdcConfigUpdate.IdleTimeout } - if cdcSignal.IdleTimeout > 0 { - syncFlowOptions.IdleTimeoutSeconds = cdcSignal.IdleTimeout + if len(cdcConfigUpdate.AdditionalTables) > 0 { + state.FlowConfigUpdates = append(state.FlowConfigUpdates, cdcConfigUpdate) } - slog.Info("CDC Signal received. Parameters on signal reception:", slog.Int("BatchSize", int(cfg.MaxBatchSize)), - slog.Int("IdleTimeout", int(cfg.IdleTimeoutSeconds))) + slog.Info("CDC Signal received. Parameters on signal reception:", + slog.Int("BatchSize", int(syncFlowOptions.BatchSize)), + slog.Int("IdleTimeout", int(syncFlowOptions.IdleTimeoutSeconds)), + slog.Any("AdditionalTables", cdcConfigUpdate.AdditionalTables)) }) cdcPropertiesSelector.AddDefault(func() { diff --git a/protos/flow.proto b/protos/flow.proto index 46451674a4..3a54ff2727 100644 --- a/protos/flow.proto +++ b/protos/flow.proto @@ -389,6 +389,8 @@ enum FlowStatus { message CDCFlowConfigUpdate { repeated TableMapping additional_tables = 1; + uint32 batch_size = 2; + uint64 idle_timeout = 3; } message QRepFlowConfigUpdate { @@ -411,3 +413,4 @@ message AddTablesToPublicationInput{ string publication_name = 2; repeated TableMapping additional_tables = 3; } +