From af255140657350bd9ab054af89206f16cf5439e6 Mon Sep 17 00:00:00 2001 From: Kevin Biju Date: Sat, 7 Dec 2024 00:54:30 +0530 Subject: [PATCH 1/2] [temporal] adjust threshold for workflows to ContinueAsNew --- flow/shared/workflow.go | 7 +++++++ flow/workflows/cdc_flow.go | 3 ++- flow/workflows/sync_flow.go | 2 +- 3 files changed, 10 insertions(+), 2 deletions(-) diff --git a/flow/shared/workflow.go b/flow/shared/workflow.go index c9cafc37e2..4dfb192511 100644 --- a/flow/shared/workflow.go +++ b/flow/shared/workflow.go @@ -6,6 +6,7 @@ import ( "log/slog" "go.temporal.io/sdk/client" + "go.temporal.io/sdk/workflow" "github.com/PeerDB-io/peer-flow/generated/protos" ) @@ -25,3 +26,9 @@ func GetWorkflowStatus(ctx context.Context, temporalClient client.Client, workfl } return state, nil } + +func ShouldWorkflowContinueAsNew(ctx workflow.Context) bool { + info := workflow.GetInfo(ctx) + return info.GetContinueAsNewSuggested() && + (info.GetCurrentHistoryLength() > 40960 || info.GetCurrentHistorySize() > 40*1024*1024) +} diff --git a/flow/workflows/cdc_flow.go b/flow/workflows/cdc_flow.go index 0c97af9b7d..0965eb6b78 100644 --- a/flow/workflows/cdc_flow.go +++ b/flow/workflows/cdc_flow.go @@ -610,7 +610,7 @@ func CDCFlowWorkflow( return state, err } - if state.ActiveSignal == model.PauseSignal || workflow.GetInfo(ctx).GetContinueAsNewSuggested() { + if state.ActiveSignal == model.PauseSignal || shared.ShouldWorkflowContinueAsNew(ctx) { restart = true if syncFlowFuture != nil { err := model.SyncStopSignal.SignalChildWorkflow(ctx, syncFlowFuture, struct{}{}).Get(ctx, nil) @@ -620,6 +620,7 @@ func CDCFlowWorkflow( } } } + workflow.GetInfo(ctx).GetCurrentHistorySize() if restart { if state.ActiveSignal == model.PauseSignal { diff --git a/flow/workflows/sync_flow.go b/flow/workflows/sync_flow.go index 8b00364dd2..ba86d7bacd 100644 --- a/flow/workflows/sync_flow.go +++ b/flow/workflows/sync_flow.go @@ -161,7 +161,7 @@ func SyncFlowWorkflow( break } - restart := syncErr || workflow.GetInfo(ctx).GetContinueAsNewSuggested() + restart := syncErr || shared.ShouldWorkflowContinueAsNew(ctx) if !stop && !syncErr && mustWait { waitSelector.Select(ctx) if restart { From dd5b558afba298494afbf75fe15bdb6b3844144a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Fri, 6 Dec 2024 19:33:02 +0000 Subject: [PATCH 2/2] Update flow/workflows/cdc_flow.go --- flow/workflows/cdc_flow.go | 1 - 1 file changed, 1 deletion(-) diff --git a/flow/workflows/cdc_flow.go b/flow/workflows/cdc_flow.go index 0965eb6b78..403df26da5 100644 --- a/flow/workflows/cdc_flow.go +++ b/flow/workflows/cdc_flow.go @@ -620,7 +620,6 @@ func CDCFlowWorkflow( } } } - workflow.GetInfo(ctx).GetCurrentHistorySize() if restart { if state.ActiveSignal == model.PauseSignal {