Skip to content

Commit

Permalink
fix: no errgroup and done
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed Dec 8, 2023
1 parent 79fcafb commit e6fab37
Showing 1 changed file with 6 additions and 16 deletions.
22 changes: 6 additions & 16 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,10 +182,8 @@ func (a *FlowableActivity) recordSlotSizePeriodically(
ctx context.Context,
srcConn connectors.CDCPullConnector,
slotName string,
done <-chan struct{},
peerName string,
) error {

) {
timeout := 10 * time.Minute
ticker := time.NewTicker(timeout)

Expand All @@ -195,12 +193,10 @@ func (a *FlowableActivity) recordSlotSizePeriodically(
case <-ticker.C:
err := a.handleSlotInfo(ctx, srcConn, slotName, peerName)
if err != nil {
return err
return
}
case <-done:
return a.handleSlotInfo(ctx, srcConn, slotName, peerName)
case <-ctx.Done():
return nil
return
}
ticker.Stop()
ticker = time.NewTicker(timeout)
Expand All @@ -211,12 +207,8 @@ func (a *FlowableActivity) recordSlotSizePeriodically(
func (a *FlowableActivity) StartFlow(ctx context.Context,
input *protos.StartFlowInput) (*model.SyncResponse, error) {
activity.RecordHeartbeat(ctx, "starting flow...")
done := make(chan struct{})
defer close(done)
conn := input.FlowConnectionConfigs

ctx = context.WithValue(ctx, shared.CDCMirrorMonitorKey, a.CatalogMirrorMonitor)

dstConn, err := connectors.GetCDCSyncConnector(ctx, conn.Destination)
if err != nil {
return nil, fmt.Errorf("failed to get destination connector: %w", err)
Expand Down Expand Up @@ -259,9 +251,8 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
slotNameForMetrics = input.FlowConnectionConfigs.ReplicationSlotName
}

errGroup.Go(func() error {
return a.recordSlotSizePeriodically(errCtx, srcConn, slotNameForMetrics, done, input.FlowConnectionConfigs.Source.Name)
})
go a.recordSlotSizePeriodically(errCtx, srcConn, slotNameForMetrics, input.FlowConnectionConfigs.Source.Name)

// start a goroutine to pull records from the source
errGroup.Go(func() error {
return srcConn.PullRecords(&model.PullRecordsRequest{
Expand Down Expand Up @@ -304,7 +295,6 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
}

if !hasRecords {
done <- struct{}{}
// wait for the pull goroutine to finish
err = errGroup.Wait()
if err != nil {
Expand Down Expand Up @@ -339,7 +329,7 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
log.Warnf("failed to push records: %v", err)
return nil, fmt.Errorf("failed to push records: %w", err)
}
done <- struct{}{}

err = errGroup.Wait()
if err != nil {
return nil, fmt.Errorf("failed to pull records: %w", err)
Expand Down

0 comments on commit e6fab37

Please sign in to comment.