From 02da43f187d6f86390e6678e72057a08fb317617 Mon Sep 17 00:00:00 2001 From: Kevin Biju <52661649+heavycrystal@users.noreply.github.com> Date: Tue, 17 Dec 2024 22:54:25 +0530 Subject: [PATCH] [temporal] adjust threshold for workflows to ContinueAsNew (#2328) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Philip Dubé --- flow/shared/workflow.go | 7 +++++++ flow/workflows/cdc_flow.go | 2 +- flow/workflows/sync_flow.go | 2 +- 3 files changed, 9 insertions(+), 2 deletions(-) diff --git a/flow/shared/workflow.go b/flow/shared/workflow.go index c9cafc37e..4dfb19251 100644 --- a/flow/shared/workflow.go +++ b/flow/shared/workflow.go @@ -6,6 +6,7 @@ import ( "log/slog" "go.temporal.io/sdk/client" + "go.temporal.io/sdk/workflow" "github.com/PeerDB-io/peer-flow/generated/protos" ) @@ -25,3 +26,9 @@ func GetWorkflowStatus(ctx context.Context, temporalClient client.Client, workfl } return state, nil } + +func ShouldWorkflowContinueAsNew(ctx workflow.Context) bool { + info := workflow.GetInfo(ctx) + return info.GetContinueAsNewSuggested() && + (info.GetCurrentHistoryLength() > 40960 || info.GetCurrentHistorySize() > 40*1024*1024) +} diff --git a/flow/workflows/cdc_flow.go b/flow/workflows/cdc_flow.go index 9533ecdde..1bb86dd53 100644 --- a/flow/workflows/cdc_flow.go +++ b/flow/workflows/cdc_flow.go @@ -552,7 +552,7 @@ func CDCFlowWorkflow( return state, err } - if workflow.GetInfo(ctx).GetContinueAsNewSuggested() { + if shared.ShouldWorkflowContinueAsNew(ctx) { restart = true if syncFlowFuture != nil { if err := model.SyncStopSignal.SignalChildWorkflow(ctx, syncFlowFuture, struct{}{}).Get(ctx, nil); err != nil { diff --git a/flow/workflows/sync_flow.go b/flow/workflows/sync_flow.go index f6d75d4ee..e3337de2d 100644 --- a/flow/workflows/sync_flow.go +++ b/flow/workflows/sync_flow.go @@ -108,7 +108,7 @@ func SyncFlowWorkflow( } if (options.NumberOfSyncs > 0 && currentSyncFlowNum >= options.NumberOfSyncs) || - syncErr || ctx.Err() != nil || workflow.GetInfo(ctx).GetContinueAsNewSuggested() { + syncErr || ctx.Err() != nil || shared.ShouldWorkflowContinueAsNew(ctx) { break } }