From de891b0a7a23b4e2d75d5b3674e48c90c8850167 Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Tue, 12 Dec 2023 11:57:39 -0500 Subject: [PATCH] Make sync batch size dynamic (#806) --- flow/shared/constants.go | 7 ++++--- flow/workflows/cdc_flow.go | 15 +++++++++++++++ 2 files changed, 19 insertions(+), 3 deletions(-) diff --git a/flow/shared/constants.go b/flow/shared/constants.go index a10d529189..fa53bb007e 100644 --- a/flow/shared/constants.go +++ b/flow/shared/constants.go @@ -7,9 +7,10 @@ import ( ) const ( - peerFlowTaskQueue = "peer-flow-task-queue" - snapshotFlowTaskQueue = "snapshot-flow-task-queue" - CDCFlowSignalName = "peer-flow-signal" + peerFlowTaskQueue = "peer-flow-task-queue" + snapshotFlowTaskQueue = "snapshot-flow-task-queue" + CDCFlowSignalName = "peer-flow-signal" + CDCBatchSizeSignalName = "cdc-batch-size-signal" ) const MirrorNameSearchAttribute = "MirrorName" diff --git a/flow/workflows/cdc_flow.go b/flow/workflows/cdc_flow.go index c42f64c9d1..a7558e00c4 100644 --- a/flow/workflows/cdc_flow.go +++ b/flow/workflows/cdc_flow.go @@ -284,6 +284,20 @@ func CDCFlowWorkflowWithConfig( BatchSize: int32(limits.MaxBatchSize), } + // add a signal to change the batch size + batchSizeSignalChan := workflow.GetSignalChannel(ctx, shared.CDCBatchSizeSignalName) + batchSizeSelector := workflow.NewSelector(ctx) + batchSizeSelector.AddReceive(batchSizeSignalChan, func(c workflow.ReceiveChannel, more bool) { + var batchSize int32 + c.Receive(ctx, &batchSize) + w.logger.Info("received batch size signal: ", batchSize) + syncFlowOptions.BatchSize = batchSize + }) + batchSizeSelector.AddDefault(func() { + w.logger.Info("no batch size signal received, batch size remains: ", + syncFlowOptions.BatchSize) + }) + currentSyncFlowNum := 0 totalRecordsSynced := 0 @@ -435,6 +449,7 @@ func CDCFlowWorkflowWithConfig( } }) selector.Select(ctx) + batchSizeSelector.Select(ctx) } state.TruncateProgress()