Skip to content

Commit

Permalink
Revert "move the dst connector retreival further down"
Browse files Browse the repository at this point in the history
This reverts commit b404aaa.
  • Loading branch information
Amogh-Bharadwaj committed May 24, 2024
1 parent 9c13713 commit c23151e
Showing 1 changed file with 6 additions and 6 deletions.
12 changes: 6 additions & 6 deletions flow/activities/flowable_core.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (

"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
lua "github.com/yuin/gopher-lua"
"github.com/yuin/gopher-lua"
"go.temporal.io/sdk/activity"
"go.temporal.io/sdk/log"
"go.temporal.io/sdk/temporal"
Expand Down Expand Up @@ -143,6 +143,11 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon
hasRecords := !recordBatch.WaitAndCheckEmpty()
logger.Info("current sync flow has records?", slog.Bool("hasRecords", hasRecords))

dstConn, err = connectors.GetAs[TSync](ctx, config.Destination)
if err != nil {
return nil, fmt.Errorf("failed to recreate destination connector: %w", err)
}

if !hasRecords {
// wait for the pull goroutine to finish
err = errGroup.Wait()
Expand All @@ -156,11 +161,6 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon
}
logger.Info("no records to push")

dstConn, err = connectors.GetAs[TSync](ctx, config.Destination)
if err != nil {
return nil, fmt.Errorf("failed to recreate destination connector: %w", err)
}

err := dstConn.ReplayTableSchemaDeltas(ctx, flowName, recordBatch.SchemaDeltas)
if err != nil {
return nil, fmt.Errorf("failed to sync schema: %w", err)
Expand Down

0 comments on commit c23151e

Please sign in to comment.