diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 961c967b30..4a314a30a1 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -182,10 +182,8 @@ func (a *FlowableActivity) recordSlotSizePeriodically( ctx context.Context, srcConn connectors.CDCPullConnector, slotName string, - done <-chan struct{}, peerName string, -) error { - +) { timeout := 10 * time.Minute ticker := time.NewTicker(timeout) @@ -195,12 +193,10 @@ func (a *FlowableActivity) recordSlotSizePeriodically( case <-ticker.C: err := a.handleSlotInfo(ctx, srcConn, slotName, peerName) if err != nil { - return err + return } - case <-done: - return a.handleSlotInfo(ctx, srcConn, slotName, peerName) case <-ctx.Done(): - return nil + return } ticker.Stop() ticker = time.NewTicker(timeout) @@ -211,12 +207,8 @@ func (a *FlowableActivity) recordSlotSizePeriodically( func (a *FlowableActivity) StartFlow(ctx context.Context, input *protos.StartFlowInput) (*model.SyncResponse, error) { activity.RecordHeartbeat(ctx, "starting flow...") - done := make(chan struct{}) - defer close(done) conn := input.FlowConnectionConfigs - ctx = context.WithValue(ctx, shared.CDCMirrorMonitorKey, a.CatalogMirrorMonitor) - dstConn, err := connectors.GetCDCSyncConnector(ctx, conn.Destination) if err != nil { return nil, fmt.Errorf("failed to get destination connector: %w", err) @@ -259,9 +251,8 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, slotNameForMetrics = input.FlowConnectionConfigs.ReplicationSlotName } - errGroup.Go(func() error { - return a.recordSlotSizePeriodically(errCtx, srcConn, slotNameForMetrics, done, input.FlowConnectionConfigs.Source.Name) - }) + go a.recordSlotSizePeriodically(errCtx, srcConn, slotNameForMetrics, input.FlowConnectionConfigs.Source.Name) + // start a goroutine to pull records from the source errGroup.Go(func() error { return srcConn.PullRecords(&model.PullRecordsRequest{ @@ -304,7 +295,6 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, } if !hasRecords { - done <- struct{}{} // wait for the pull goroutine to finish err = errGroup.Wait() if err != nil { @@ -339,7 +329,7 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, log.Warnf("failed to push records: %v", err) return nil, fmt.Errorf("failed to push records: %w", err) } - done <- struct{}{} + err = errGroup.Wait() if err != nil { return nil, fmt.Errorf("failed to pull records: %w", err)