Skip to content

Commit

Permalink
remove session id check
Browse files Browse the repository at this point in the history
  • Loading branch information
iskakaushik committed May 27, 2024
1 parent fe87482 commit 8e2a2d1
Showing 1 changed file with 7 additions and 20 deletions.
27 changes: 7 additions & 20 deletions flow/activities/flowable_core.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (

"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/yuin/gopher-lua"
lua "github.com/yuin/gopher-lua"
"go.temporal.io/sdk/activity"
"go.temporal.io/sdk/log"
"go.temporal.io/sdk/temporal"
Expand Down Expand Up @@ -78,25 +78,12 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon
tblNameMapping[v.SourceTableIdentifier] = model.NewNameAndExclude(v.DestinationTableIdentifier, v.Exclude)
}

var srcConn TPull
if sessionID == "" {
srcConn, err = connectors.GetAs[TPull](ctx, config.Source)
if err != nil {
return nil, err
}
defer connectors.CloseConnector(ctx, srcConn)

if err := srcConn.SetupReplConn(ctx); err != nil {
return nil, err
}
} else {
srcConn, err = waitForCdcCache[TPull](ctx, a, sessionID)
if err != nil {
return nil, err
}
if err := srcConn.ConnectionActive(ctx); err != nil {
return nil, temporal.NewNonRetryableApplicationError("connection to source down", "disconnect", nil)
}
srcConn, err := waitForCdcCache[TPull](ctx, a, sessionID)
if err != nil {
return nil, err
}
if err := srcConn.ConnectionActive(ctx); err != nil {
return nil, temporal.NewNonRetryableApplicationError("connection to source down", "disconnect", nil)
}

shutdown := utils.HeartbeatRoutine(ctx, func() string {
Expand Down

0 comments on commit 8e2a2d1

Please sign in to comment.