diff --git a/flow/shared/constants.go b/flow/shared/constants.go index a10d529189..fa53bb007e 100644 --- a/flow/shared/constants.go +++ b/flow/shared/constants.go @@ -7,9 +7,10 @@ import ( ) const ( - peerFlowTaskQueue = "peer-flow-task-queue" - snapshotFlowTaskQueue = "snapshot-flow-task-queue" - CDCFlowSignalName = "peer-flow-signal" + peerFlowTaskQueue = "peer-flow-task-queue" + snapshotFlowTaskQueue = "snapshot-flow-task-queue" + CDCFlowSignalName = "peer-flow-signal" + CDCBatchSizeSignalName = "cdc-batch-size-signal" ) const MirrorNameSearchAttribute = "MirrorName" diff --git a/flow/workflows/cdc_flow.go b/flow/workflows/cdc_flow.go index c42f64c9d1..a7558e00c4 100644 --- a/flow/workflows/cdc_flow.go +++ b/flow/workflows/cdc_flow.go @@ -284,6 +284,20 @@ func CDCFlowWorkflowWithConfig( BatchSize: int32(limits.MaxBatchSize), } + // add a signal to change the batch size + batchSizeSignalChan := workflow.GetSignalChannel(ctx, shared.CDCBatchSizeSignalName) + batchSizeSelector := workflow.NewSelector(ctx) + batchSizeSelector.AddReceive(batchSizeSignalChan, func(c workflow.ReceiveChannel, more bool) { + var batchSize int32 + c.Receive(ctx, &batchSize) + w.logger.Info("received batch size signal: ", batchSize) + syncFlowOptions.BatchSize = batchSize + }) + batchSizeSelector.AddDefault(func() { + w.logger.Info("no batch size signal received, batch size remains: ", + syncFlowOptions.BatchSize) + }) + currentSyncFlowNum := 0 totalRecordsSynced := 0 @@ -435,6 +449,7 @@ func CDCFlowWorkflowWithConfig( } }) selector.Select(ctx) + batchSizeSelector.Select(ctx) } state.TruncateProgress()