diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 9767ac8108..9fa859b5b4 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -640,8 +640,7 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context, var rowsSynced int bufferSize := shared.FetchAndChannelSize if config.SourcePeer.Type == protos.DBType_POSTGRES { - taskCtx, taskCancel := context.WithCancel(ctx) - errGroup, errCtx := errgroup.WithContext(taskCtx) + errGroup, errCtx := errgroup.WithContext(ctx) stream := model.NewQRecordStream(bufferSize) errGroup.Go(func() error { pgConn := srcConn.(*connpostgres.PostgresConnector) @@ -666,8 +665,7 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context, a.Alerter.LogFlowError(ctx, config.FlowJobName, err) return fmt.Errorf("failed to sync records: %w", err) } - taskCancel() - return nil + return context.Canceled }) err = errGroup.Wait()