From f97f78be107f8bb62ed186b6e91a76fe0dd1a4dc Mon Sep 17 00:00:00 2001 From: Kunal Gupta <39487888+iamKunalGupta@users.noreply.github.com> Date: Wed, 29 May 2024 20:25:54 +0530 Subject: [PATCH] fix: close and recreate destination connections during syncflow when needed (#1735) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Amogh Bharadwaj Co-authored-by: Philip Dubé Co-authored-by: Kaushik Iska --- flow/activities/flowable_core.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/flow/activities/flowable_core.go b/flow/activities/flowable_core.go index f7c5175e97..24a98bdca6 100644 --- a/flow/activities/flowable_core.go +++ b/flow/activities/flowable_core.go @@ -113,6 +113,7 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon if err != nil { return nil, err } + connectors.CloseConnector(ctx, dstConn) logger.Info("pulling records...", slog.Int64("LastOffset", lastOffset)) consumedOffset := atomic.Int64{} consumedOffset.Store(lastOffset) @@ -149,6 +150,11 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon hasRecords := !recordBatchSync.WaitAndCheckEmpty() logger.Info("current sync flow has records?", slog.Bool("hasRecords", hasRecords)) + dstConn, err = connectors.GetAs[TSync](ctx, config.Destination) + if err != nil { + return nil, fmt.Errorf("failed to recreate destination connector: %w", err) + } + if !hasRecords { // wait for the pull goroutine to finish if err := errGroup.Wait(); err != nil {