diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index dcf1fb3ec4..dbe9459642 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -12,6 +12,7 @@ import ( "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgxpool" "go.temporal.io/sdk/activity" + "go.temporal.io/sdk/temporal" "golang.org/x/sync/errgroup" "google.golang.org/protobuf/proto" @@ -233,7 +234,10 @@ func (a *FlowableActivity) MaintainPull( case <-ticker.C: activity.RecordHeartbeat(ctx, "keep session alive") if err := srcConn.ReplPing(ctx); err != nil { - activity.GetLogger(ctx).Error("Failed to send keep alive ping to replication connection", slog.Any("error", err)) + a.CdcCacheRw.Lock() + delete(a.CdcCache, sessionID) + a.CdcCacheRw.Unlock() + return err } case <-ctx.Done(): a.CdcCacheRw.Lock() @@ -293,7 +297,7 @@ func (a *FlowableActivity) SyncFlow( return nil, err } if err := srcConn.ConnectionActive(ctx); err != nil { - return nil, err + return nil, temporal.NewNonRetryableApplicationError("connection to source down", "disconnect", nil) } shutdown := utils.HeartbeatRoutine(ctx, func() string {