diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index e880b17518..2728d29db0 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -640,17 +640,18 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context, var rowsSynced int bufferSize := shared.FetchAndChannelSize if config.SourcePeer.Type == protos.DBType_POSTGRES { - errGroup, errCtx := errgroup.WithContext(ctx) + errGroup, errCtx := errgroup.WithContext(taskCtx) + taskCtx, taskCancel := context.WithCancel(errCtx) stream := model.NewQRecordStream(bufferSize) errGroup.Go(func() error { pgConn := srcConn.(*connpostgres.PostgresConnector) - tmp, err := pgConn.PullQRepRecordStream(errCtx, config, partition, stream) + tmp, err := pgConn.PullQRepRecordStream(taskCtx, config, partition, stream) numRecords := int64(tmp) if err != nil { a.Alerter.LogFlowError(ctx, config.FlowJobName, err) return fmt.Errorf("failed to pull records: %w", err) } else { - err = monitoring.UpdatePullEndTimeAndRowsForPartition(errCtx, + err = monitoring.UpdatePullEndTimeAndRowsForPartition(taskCtx, a.CatalogPool, runUUID, partition, numRecords) if err != nil { logger.Error(err.Error()) @@ -665,11 +666,12 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context, a.Alerter.LogFlowError(ctx, config.FlowJobName, err) return fmt.Errorf("failed to sync records: %w", err) } + taskCancel() return nil }) err = errGroup.Wait() - if err != nil { + if err != nil && err != context.Canceled { a.Alerter.LogFlowError(ctx, config.FlowJobName, err) return err }