Skip to content

Commit

Permalink
Flowable: Use errgroup in replicateQRep (#1390)
Browse files Browse the repository at this point in the history
Replaces a usage waitgroup with errgroup in replicateQRepPartitions in
flowable.go
This is a preceding PR to #1368
  • Loading branch information
Amogh-Bharadwaj authored Feb 28, 2024
1 parent 49491f0 commit 96ca628
Showing 1 changed file with 35 additions and 36 deletions.
71 changes: 35 additions & 36 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -617,9 +617,7 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context,
return fmt.Errorf("failed to update start time for partition: %w", err)
}

pullCtx, pullCancel := context.WithCancel(ctx)
defer pullCancel()
srcConn, err := connectors.GetQRepPullConnector(pullCtx, config.SourcePeer)
srcConn, err := connectors.GetQRepPullConnector(ctx, config.SourcePeer)
if err != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
return fmt.Errorf("failed to get qrep source connector: %w", err)
Expand All @@ -639,33 +637,42 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context,
})
defer shutdown()

var stream *model.QRecordStream
var rowsSynced int
bufferSize := shared.FetchAndChannelSize
var wg sync.WaitGroup

var goroutineErr error = nil
if config.SourcePeer.Type == protos.DBType_POSTGRES {
stream = model.NewQRecordStream(bufferSize)
wg.Add(1)

go func() {
errGroup, errCtx := errgroup.WithContext(ctx)
stream := model.NewQRecordStream(bufferSize)
errGroup.Go(func() error {
pgConn := srcConn.(*connpostgres.PostgresConnector)
tmp, err := pgConn.PullQRepRecordStream(ctx, config, partition, stream)
tmp, err := pgConn.PullQRepRecordStream(errCtx, config, partition, stream)
numRecords := int64(tmp)
if err != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
logger.Error("failed to pull records", slog.Any("error", err))
goroutineErr = err
return fmt.Errorf("failed to pull records: %w", err)
} else {
err = monitoring.UpdatePullEndTimeAndRowsForPartition(ctx,
err = monitoring.UpdatePullEndTimeAndRowsForPartition(errCtx,
a.CatalogPool, runUUID, partition, numRecords)
if err != nil {
logger.Error(err.Error())
goroutineErr = err
}
}
wg.Done()
}()
return nil
})

errGroup.Go(func() error {
rowsSynced, err = dstConn.SyncQRepRecords(ctx, config, partition, stream)
if err != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
return fmt.Errorf("failed to sync records: %w", err)
}
return nil
})

err = errGroup.Wait()
if err != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
return err
}
} else {
recordBatch, err := srcConn.PullQRepRecords(ctx, config, partition)
if err != nil {
Expand All @@ -679,35 +686,27 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context,
return err
}

stream, err = recordBatch.ToQRecordStream(bufferSize)
stream, err := recordBatch.ToQRecordStream(bufferSize)
if err != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
return fmt.Errorf("failed to convert to qrecord stream: %w", err)
}
}

rowsSynced, err := dstConn.SyncQRepRecords(ctx, config, partition, stream)
if err != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
return fmt.Errorf("failed to sync records: %w", err)
}

if rowsSynced == 0 {
logger.Info("no records to push for partition " + partition.PartitionId)
pullCancel()
} else {
wg.Wait()
if goroutineErr != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, goroutineErr)
return goroutineErr
rowsSynced, err = dstConn.SyncQRepRecords(ctx, config, partition, stream)
if err != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
return fmt.Errorf("failed to sync records: %w", err)
}
}

if rowsSynced > 0 {
logger.Info(fmt.Sprintf("pushed %d records", rowsSynced))
err := monitoring.UpdateRowsSyncedForPartition(ctx, a.CatalogPool, rowsSynced, runUUID, partition)
if err != nil {
return err
}

logger.Info(fmt.Sprintf("pushed %d records", rowsSynced))
} else {
logger.Info("no records to push for partition " + partition.PartitionId)
}

err = monitoring.UpdateEndTimeForPartition(ctx, a.CatalogPool, runUUID, partition)
Expand Down

0 comments on commit 96ca628

Please sign in to comment.