From 9892807e577fc93daf2561cd2c0ee96adb96cbad Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Wed, 28 Feb 2024 02:07:43 +0000 Subject: [PATCH] cdc: trigger session reconnect when source connector connections fail --- flow/activities/flowable.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index dcf1fb3ec4..dbe9459642 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -12,6 +12,7 @@ import ( "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgxpool" "go.temporal.io/sdk/activity" + "go.temporal.io/sdk/temporal" "golang.org/x/sync/errgroup" "google.golang.org/protobuf/proto" @@ -233,7 +234,10 @@ func (a *FlowableActivity) MaintainPull( case <-ticker.C: activity.RecordHeartbeat(ctx, "keep session alive") if err := srcConn.ReplPing(ctx); err != nil { - activity.GetLogger(ctx).Error("Failed to send keep alive ping to replication connection", slog.Any("error", err)) + a.CdcCacheRw.Lock() + delete(a.CdcCache, sessionID) + a.CdcCacheRw.Unlock() + return err } case <-ctx.Done(): a.CdcCacheRw.Lock() @@ -293,7 +297,7 @@ func (a *FlowableActivity) SyncFlow( return nil, err } if err := srcConn.ConnectionActive(ctx); err != nil { - return nil, err + return nil, temporal.NewNonRetryableApplicationError("connection to source down", "disconnect", nil) } shutdown := utils.HeartbeatRoutine(ctx, func() string {