From dff14a5462393a2cd91377056338588145b8c3a0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Thu, 29 Feb 2024 16:40:13 +0000 Subject: [PATCH 1/2] qrep: stop pulling records if sync completes (#1407) avoids pull continuing after sync returns 0 records also stop syncing records if pull records hits error --- flow/activities/flowable.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index e880b17518..ecdfedc92a 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -641,16 +641,17 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context, bufferSize := shared.FetchAndChannelSize if config.SourcePeer.Type == protos.DBType_POSTGRES { errGroup, errCtx := errgroup.WithContext(ctx) + taskCtx, taskCancel := context.WithCancel(errCtx) stream := model.NewQRecordStream(bufferSize) errGroup.Go(func() error { pgConn := srcConn.(*connpostgres.PostgresConnector) - tmp, err := pgConn.PullQRepRecordStream(errCtx, config, partition, stream) + tmp, err := pgConn.PullQRepRecordStream(taskCtx, config, partition, stream) numRecords := int64(tmp) if err != nil { a.Alerter.LogFlowError(ctx, config.FlowJobName, err) return fmt.Errorf("failed to pull records: %w", err) } else { - err = monitoring.UpdatePullEndTimeAndRowsForPartition(errCtx, + err = monitoring.UpdatePullEndTimeAndRowsForPartition(taskCtx, a.CatalogPool, runUUID, partition, numRecords) if err != nil { logger.Error(err.Error()) @@ -660,16 +661,17 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context, }) errGroup.Go(func() error { - rowsSynced, err = dstConn.SyncQRepRecords(ctx, config, partition, stream) + rowsSynced, err = dstConn.SyncQRepRecords(taskCtx, config, partition, stream) if err != nil { a.Alerter.LogFlowError(ctx, config.FlowJobName, err) return fmt.Errorf("failed to sync records: %w", err) } + taskCancel() return nil }) err = errGroup.Wait() - if err != nil { + if err != nil && err != context.Canceled { a.Alerter.LogFlowError(ctx, config.FlowJobName, err) return err } From 5aaf28ba8c00b06c2e419444c0b5d6225aeb71c8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Thu, 29 Feb 2024 16:46:14 +0000 Subject: [PATCH 2/2] update golangci github action to v4 (#1408) Updates action to node20 --- .github/workflows/golang-lint.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/golang-lint.yml b/.github/workflows/golang-lint.yml index eadc52ed7d..74bbb629df 100644 --- a/.github/workflows/golang-lint.yml +++ b/.github/workflows/golang-lint.yml @@ -27,7 +27,7 @@ jobs: go-version: "1.22" cache: false - name: golangci-lint - uses: golangci/golangci-lint-action@v3 + uses: golangci/golangci-lint-action@v4 with: version: v1.56 working-directory: ./flow