From 89bc6e1657f6aa5f734e9781d86985e30373b0f6 Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Wed, 28 Feb 2024 02:25:32 +0530 Subject: [PATCH 1/5] qrep flowable errgroup --- flow/activities/flowable.go | 28 +++++++++++----------------- 1 file changed, 11 insertions(+), 17 deletions(-) diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index dcf1fb3ec4..8ffbef14a2 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -637,31 +637,25 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context, var stream *model.QRecordStream bufferSize := shared.FetchAndChannelSize - var wg sync.WaitGroup - - var goroutineErr error = nil + errGroup, errCtx := errgroup.WithContext(ctx) if config.SourcePeer.Type == protos.DBType_POSTGRES { stream = model.NewQRecordStream(bufferSize) - wg.Add(1) - - go func() { + errGroup.Go(func() error { pgConn := srcConn.(*connpostgres.PostgresConnector) - tmp, err := pgConn.PullQRepRecordStream(ctx, config, partition, stream) + tmp, err := pgConn.PullQRepRecordStream(errCtx, config, partition, stream) numRecords := int64(tmp) if err != nil { a.Alerter.LogFlowError(ctx, config.FlowJobName, err) - logger.Error("failed to pull records", slog.Any("error", err)) - goroutineErr = err + return fmt.Errorf("failed to pull records: %v", err) } else { - err = monitoring.UpdatePullEndTimeAndRowsForPartition(ctx, + err = monitoring.UpdatePullEndTimeAndRowsForPartition(errCtx, a.CatalogPool, runUUID, partition, numRecords) if err != nil { logger.Error(err.Error()) - goroutineErr = err } } - wg.Done() - }() + return nil + }) } else { recordBatch, err := srcConn.PullQRepRecords(ctx, config, partition) if err != nil { @@ -692,10 +686,10 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context, logger.Info("no records to push for partition " + partition.PartitionId) pullCancel() } else { - wg.Wait() - if goroutineErr != nil { - a.Alerter.LogFlowError(ctx, config.FlowJobName, goroutineErr) - return goroutineErr + err = errGroup.Wait() + if err != nil { + a.Alerter.LogFlowError(ctx, config.FlowJobName, err) + return err } err := monitoring.UpdateRowsSyncedForPartition(ctx, a.CatalogPool, rowsSynced, runUUID, partition) From e114960d08fefe1719c653ebefccf65127be3206 Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Wed, 28 Feb 2024 21:18:17 +0530 Subject: [PATCH 2/5] refactor replicateQRepPartition --- flow/activities/flowable.go | 51 ++++++++++++++++++++++++------------- 1 file changed, 33 insertions(+), 18 deletions(-) diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 8ffbef14a2..ddef3a8c95 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -613,9 +613,7 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context, return fmt.Errorf("failed to update start time for partition: %w", err) } - pullCtx, pullCancel := context.WithCancel(ctx) - defer pullCancel() - srcConn, err := connectors.GetQRepPullConnector(pullCtx, config.SourcePeer) + srcConn, err := connectors.GetQRepPullConnector(ctx, config.SourcePeer) if err != nil { a.Alerter.LogFlowError(ctx, config.FlowJobName, err) return fmt.Errorf("failed to get qrep source connector: %w", err) @@ -638,6 +636,7 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context, var stream *model.QRecordStream bufferSize := shared.FetchAndChannelSize errGroup, errCtx := errgroup.WithContext(ctx) + var rowsSynced int if config.SourcePeer.Type == protos.DBType_POSTGRES { stream = model.NewQRecordStream(bufferSize) errGroup.Go(func() error { @@ -656,6 +655,21 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context, } return nil }) + + errGroup.Go(func() error { + rowsSynced, err = dstConn.SyncQRepRecords(ctx, config, partition, stream) + if err != nil { + a.Alerter.LogFlowError(ctx, config.FlowJobName, err) + return fmt.Errorf("failed to sync records: %w", err) + } + return nil + }) + + err = errGroup.Wait() + if err != nil { + a.Alerter.LogFlowError(ctx, config.FlowJobName, err) + return err + } } else { recordBatch, err := srcConn.PullQRepRecords(ctx, config, partition) if err != nil { @@ -674,30 +688,31 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context, a.Alerter.LogFlowError(ctx, config.FlowJobName, err) return fmt.Errorf("failed to convert to qrecord stream: %w", err) } - } - - rowsSynced, err := dstConn.SyncQRepRecords(ctx, config, partition, stream) - if err != nil { - a.Alerter.LogFlowError(ctx, config.FlowJobName, err) - return fmt.Errorf("failed to sync records: %w", err) - } - if rowsSynced == 0 { - logger.Info("no records to push for partition " + partition.PartitionId) - pullCancel() - } else { - err = errGroup.Wait() + rowsSynced, err = dstConn.SyncQRepRecords(ctx, config, partition, stream) if err != nil { a.Alerter.LogFlowError(ctx, config.FlowJobName, err) - return err + return fmt.Errorf("failed to sync records: %w", err) } + if rowsSynced == 0 { + logger.Info("no records to push for partition " + partition.PartitionId) + } else { + err = errGroup.Wait() + if err != nil { + a.Alerter.LogFlowError(ctx, config.FlowJobName, err) + return err + } + } + } + + logger.Info(fmt.Sprintf("pushed %d records", rowsSynced)) + + if rowsSynced > 0 { err := monitoring.UpdateRowsSyncedForPartition(ctx, a.CatalogPool, rowsSynced, runUUID, partition) if err != nil { return err } - - logger.Info(fmt.Sprintf("pushed %d records", rowsSynced)) } err = monitoring.UpdateEndTimeForPartition(ctx, a.CatalogPool, runUUID, partition) From 496783ec66758dba5ec6614ba7dec426851119c0 Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Wed, 28 Feb 2024 21:20:19 +0530 Subject: [PATCH 3/5] minor change --- flow/activities/flowable.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index ddef3a8c95..d4511491e0 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -645,7 +645,7 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context, numRecords := int64(tmp) if err != nil { a.Alerter.LogFlowError(ctx, config.FlowJobName, err) - return fmt.Errorf("failed to pull records: %v", err) + return fmt.Errorf("failed to pull records: %w", err) } else { err = monitoring.UpdatePullEndTimeAndRowsForPartition(errCtx, a.CatalogPool, runUUID, partition, numRecords) From 82b6eec58703096470c4dfb3fcc0dbb07d5fca7a Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Wed, 28 Feb 2024 21:31:01 +0530 Subject: [PATCH 4/5] more refactoring --- flow/activities/flowable.go | 19 +++++-------------- 1 file changed, 5 insertions(+), 14 deletions(-) diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index d4511491e0..2739b398bc 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -633,11 +633,11 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context, }) defer shutdown() + var rowsSynced int var stream *model.QRecordStream bufferSize := shared.FetchAndChannelSize - errGroup, errCtx := errgroup.WithContext(ctx) - var rowsSynced int if config.SourcePeer.Type == protos.DBType_POSTGRES { + errGroup, errCtx := errgroup.WithContext(ctx) stream = model.NewQRecordStream(bufferSize) errGroup.Go(func() error { pgConn := srcConn.(*connpostgres.PostgresConnector) @@ -694,25 +694,16 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context, a.Alerter.LogFlowError(ctx, config.FlowJobName, err) return fmt.Errorf("failed to sync records: %w", err) } - - if rowsSynced == 0 { - logger.Info("no records to push for partition " + partition.PartitionId) - } else { - err = errGroup.Wait() - if err != nil { - a.Alerter.LogFlowError(ctx, config.FlowJobName, err) - return err - } - } } - logger.Info(fmt.Sprintf("pushed %d records", rowsSynced)) - if rowsSynced > 0 { + logger.Info(fmt.Sprintf("pushed %d records", rowsSynced)) err := monitoring.UpdateRowsSyncedForPartition(ctx, a.CatalogPool, rowsSynced, runUUID, partition) if err != nil { return err } + } else { + logger.Info("no records to push for partition " + partition.PartitionId) } err = monitoring.UpdateEndTimeForPartition(ctx, a.CatalogPool, runUUID, partition) From 4fb3a7bb854ad59d518938ec3dbc9992b1bb6b09 Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Wed, 28 Feb 2024 21:41:30 +0530 Subject: [PATCH 5/5] minor change --- flow/activities/flowable.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 2739b398bc..f37bc55ff5 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -634,11 +634,10 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context, defer shutdown() var rowsSynced int - var stream *model.QRecordStream bufferSize := shared.FetchAndChannelSize if config.SourcePeer.Type == protos.DBType_POSTGRES { errGroup, errCtx := errgroup.WithContext(ctx) - stream = model.NewQRecordStream(bufferSize) + stream := model.NewQRecordStream(bufferSize) errGroup.Go(func() error { pgConn := srcConn.(*connpostgres.PostgresConnector) tmp, err := pgConn.PullQRepRecordStream(errCtx, config, partition, stream) @@ -683,7 +682,7 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context, return err } - stream, err = recordBatch.ToQRecordStream(bufferSize) + stream, err := recordBatch.ToQRecordStream(bufferSize) if err != nil { a.Alerter.LogFlowError(ctx, config.FlowJobName, err) return fmt.Errorf("failed to convert to qrecord stream: %w", err)