diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index e880b17518..ecdfedc92a 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -641,16 +641,17 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context, bufferSize := shared.FetchAndChannelSize if config.SourcePeer.Type == protos.DBType_POSTGRES { errGroup, errCtx := errgroup.WithContext(ctx) + taskCtx, taskCancel := context.WithCancel(errCtx) stream := model.NewQRecordStream(bufferSize) errGroup.Go(func() error { pgConn := srcConn.(*connpostgres.PostgresConnector) - tmp, err := pgConn.PullQRepRecordStream(errCtx, config, partition, stream) + tmp, err := pgConn.PullQRepRecordStream(taskCtx, config, partition, stream) numRecords := int64(tmp) if err != nil { a.Alerter.LogFlowError(ctx, config.FlowJobName, err) return fmt.Errorf("failed to pull records: %w", err) } else { - err = monitoring.UpdatePullEndTimeAndRowsForPartition(errCtx, + err = monitoring.UpdatePullEndTimeAndRowsForPartition(taskCtx, a.CatalogPool, runUUID, partition, numRecords) if err != nil { logger.Error(err.Error()) @@ -660,16 +661,17 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context, }) errGroup.Go(func() error { - rowsSynced, err = dstConn.SyncQRepRecords(ctx, config, partition, stream) + rowsSynced, err = dstConn.SyncQRepRecords(taskCtx, config, partition, stream) if err != nil { a.Alerter.LogFlowError(ctx, config.FlowJobName, err) return fmt.Errorf("failed to sync records: %w", err) } + taskCancel() return nil }) err = errGroup.Wait() - if err != nil { + if err != nil && err != context.Canceled { a.Alerter.LogFlowError(ctx, config.FlowJobName, err) return err }