From 1e408f16a0c41108961be3dbbad7fa7277f39710 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Wed, 28 Feb 2024 17:19:10 +0000 Subject: [PATCH] MaintainPull: disable retries When MaintainPull fails the sync flow should be restarted --- flow/activities/flowable.go | 2 +- flow/workflows/sync_flow.go | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 62e8fe89f2..e880b17518 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -237,7 +237,7 @@ func (a *FlowableActivity) MaintainPull( a.CdcCacheRw.Lock() delete(a.CdcCache, sessionID) a.CdcCacheRw.Unlock() - return err + return temporal.NewNonRetryableApplicationError("connection to source down", "disconnect", err) } case <-ctx.Done(): a.CdcCacheRw.Lock() diff --git a/flow/workflows/sync_flow.go b/flow/workflows/sync_flow.go index 87af1d1d82..8fc1eaeb6b 100644 --- a/flow/workflows/sync_flow.go +++ b/flow/workflows/sync_flow.go @@ -37,13 +37,13 @@ func SyncFlowWorkflow( defer workflow.CompleteSession(syncSessionCtx) sessionInfo := workflow.GetSessionInfo(syncSessionCtx) - syncCtx := workflow.WithActivityOptions(syncSessionCtx, workflow.ActivityOptions{ + maintainCtx := workflow.WithActivityOptions(syncSessionCtx, workflow.ActivityOptions{ StartToCloseTimeout: 14 * 24 * time.Hour, HeartbeatTimeout: time.Minute, WaitForCancellation: true, }) fMaintain := workflow.ExecuteActivity( - syncCtx, + maintainCtx, flowable.MaintainPull, config, sessionInfo.SessionID,