Skip to content

Commit

Permalink
fix: close and recreate destination connections during syncflow when …
Browse files Browse the repository at this point in the history
…needed (#1735)

Co-authored-by: Amogh Bharadwaj <amogh@peerdb.io>
Co-authored-by: Philip Dubé <serprex@users.noreply.github.com>
Co-authored-by: Kaushik Iska <iska.kaushik@gmail.com>
  • Loading branch information
4 people authored May 29, 2024
1 parent 74d149a commit f97f78b
Showing 1 changed file with 6 additions and 0 deletions.
6 changes: 6 additions & 0 deletions flow/activities/flowable_core.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon
if err != nil {
return nil, err
}
connectors.CloseConnector(ctx, dstConn)
logger.Info("pulling records...", slog.Int64("LastOffset", lastOffset))
consumedOffset := atomic.Int64{}
consumedOffset.Store(lastOffset)
Expand Down Expand Up @@ -149,6 +150,11 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon
hasRecords := !recordBatchSync.WaitAndCheckEmpty()
logger.Info("current sync flow has records?", slog.Bool("hasRecords", hasRecords))

dstConn, err = connectors.GetAs[TSync](ctx, config.Destination)
if err != nil {
return nil, fmt.Errorf("failed to recreate destination connector: %w", err)
}

if !hasRecords {
// wait for the pull goroutine to finish
if err := errGroup.Wait(); err != nil {
Expand Down

0 comments on commit f97f78b

Please sign in to comment.