diff --git a/flow/shared/workflow.go b/flow/shared/workflow.go index c9cafc37e..4dfb19251 100644 --- a/flow/shared/workflow.go +++ b/flow/shared/workflow.go @@ -6,6 +6,7 @@ import ( "log/slog" "go.temporal.io/sdk/client" + "go.temporal.io/sdk/workflow" "github.com/PeerDB-io/peer-flow/generated/protos" ) @@ -25,3 +26,9 @@ func GetWorkflowStatus(ctx context.Context, temporalClient client.Client, workfl } return state, nil } + +func ShouldWorkflowContinueAsNew(ctx workflow.Context) bool { + info := workflow.GetInfo(ctx) + return info.GetContinueAsNewSuggested() && + (info.GetCurrentHistoryLength() > 40960 || info.GetCurrentHistorySize() > 40*1024*1024) +} diff --git a/flow/workflows/cdc_flow.go b/flow/workflows/cdc_flow.go index 0c97af9b7..403df26da 100644 --- a/flow/workflows/cdc_flow.go +++ b/flow/workflows/cdc_flow.go @@ -610,7 +610,7 @@ func CDCFlowWorkflow( return state, err } - if state.ActiveSignal == model.PauseSignal || workflow.GetInfo(ctx).GetContinueAsNewSuggested() { + if state.ActiveSignal == model.PauseSignal || shared.ShouldWorkflowContinueAsNew(ctx) { restart = true if syncFlowFuture != nil { err := model.SyncStopSignal.SignalChildWorkflow(ctx, syncFlowFuture, struct{}{}).Get(ctx, nil) diff --git a/flow/workflows/sync_flow.go b/flow/workflows/sync_flow.go index 8b00364dd..ba86d7bac 100644 --- a/flow/workflows/sync_flow.go +++ b/flow/workflows/sync_flow.go @@ -161,7 +161,7 @@ func SyncFlowWorkflow( break } - restart := syncErr || workflow.GetInfo(ctx).GetContinueAsNewSuggested() + restart := syncErr || shared.ShouldWorkflowContinueAsNew(ctx) if !stop && !syncErr && mustWait { waitSelector.Select(ctx) if restart {