diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 2bb3136a1e..a8cc819214 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -564,7 +564,8 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context, return fmt.Errorf("failed to update start time for partition: %w", err) } - srcConn, err := connectors.GetQRepPullConnector(ctx, config.SourcePeer) + pullCtx, pullCancel := context.WithCancel(ctx) + srcConn, err := connectors.GetQRepPullConnector(pullCtx, config.SourcePeer) if err != nil { return fmt.Errorf("failed to get qrep source connector: %w", err) } @@ -642,6 +643,7 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context, } if rowsSynced == 0 { + pullCancel() log.WithFields(log.Fields{ "flowName": config.FlowJobName, }).Infof("no records to push for partition %s\n", partition.PartitionId)