From f3adc5299084700c80bd7df4d5cdef7b52c75449 Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Mon, 11 Dec 2023 17:37:37 -0500 Subject: [PATCH] cancel pull when no rows synced --- flow/activities/flowable.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 2bb3136a1e..a8cc819214 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -564,7 +564,8 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context, return fmt.Errorf("failed to update start time for partition: %w", err) } - srcConn, err := connectors.GetQRepPullConnector(ctx, config.SourcePeer) + pullCtx, pullCancel := context.WithCancel(ctx) + srcConn, err := connectors.GetQRepPullConnector(pullCtx, config.SourcePeer) if err != nil { return fmt.Errorf("failed to get qrep source connector: %w", err) } @@ -642,6 +643,7 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context, } if rowsSynced == 0 { + pullCancel() log.WithFields(log.Fields{ "flowName": config.FlowJobName, }).Infof("no records to push for partition %s\n", partition.PartitionId)