Skip to content

Commit

Permalink
cdc: trigger session reconnect when source connector connections fail
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Feb 28, 2024
1 parent caf2a0c commit 9892807
Showing 1 changed file with 6 additions and 2 deletions.
8 changes: 6 additions & 2 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
"go.temporal.io/sdk/activity"
"go.temporal.io/sdk/temporal"
"golang.org/x/sync/errgroup"
"google.golang.org/protobuf/proto"

Expand Down Expand Up @@ -233,7 +234,10 @@ func (a *FlowableActivity) MaintainPull(
case <-ticker.C:
activity.RecordHeartbeat(ctx, "keep session alive")
if err := srcConn.ReplPing(ctx); err != nil {
activity.GetLogger(ctx).Error("Failed to send keep alive ping to replication connection", slog.Any("error", err))
a.CdcCacheRw.Lock()
delete(a.CdcCache, sessionID)
a.CdcCacheRw.Unlock()
return err
}
case <-ctx.Done():
a.CdcCacheRw.Lock()
Expand Down Expand Up @@ -293,7 +297,7 @@ func (a *FlowableActivity) SyncFlow(
return nil, err
}
if err := srcConn.ConnectionActive(ctx); err != nil {
return nil, err
return nil, temporal.NewNonRetryableApplicationError("connection to source down", "disconnect", nil)
}

shutdown := utils.HeartbeatRoutine(ctx, func() string {
Expand Down

0 comments on commit 9892807

Please sign in to comment.