Skip to content

Commit

Permalink
make slotsize part of errgroup
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed Dec 8, 2023
1 parent 92e50b3 commit 6eeaeae
Showing 1 changed file with 22 additions and 14 deletions.
36 changes: 22 additions & 14 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,38 +160,44 @@ func (a *FlowableActivity) CreateNormalizedTable(
return conn.SetupNormalizedTables(config)
}

func (a *FlowableActivity) handleSlotInfo(ctx context.Context, srcConn connectors.CDCPullConnector, slotName string, peerName string) {
slotInfo, err := srcConn.GetSlotInfo(slotName)
if err != nil {
log.Warnf("warning: failed to get slot info: %v", err)
time.Sleep(30 * time.Second)
return
}

if len(slotInfo) != 0 {
a.CatalogMirrorMonitor.AppendSlotSizeInfo(ctx, peerName, slotInfo[0])
}
}

func (a *FlowableActivity) recordSlotSizePeriodically(
ctx context.Context,
srcConn connectors.CDCPullConnector,
slotName string,
done <-chan struct{},
peerName string,
) {
) error {

timeout := 10 * time.Minute
ticker := time.NewTicker(timeout)

defer ticker.Stop()
for {
slotInfo, err := srcConn.GetSlotInfo(slotName)
if err != nil {
log.Warnf("warning: failed to get slot info: %v", err)
}

if len(slotInfo) == 0 {
continue
}

select {
case <-ticker.C:
a.CatalogMirrorMonitor.AppendSlotSizeInfo(ctx, peerName, slotInfo[0])
a.handleSlotInfo(ctx, srcConn, slotName, peerName)
case <-done:
a.CatalogMirrorMonitor.AppendSlotSizeInfo(ctx, peerName, slotInfo[0])
a.handleSlotInfo(ctx, srcConn, slotName, peerName)
case <-ctx.Done():
log.Warn("recordSlotSize: context is done. ignoring")
time.Sleep(30 * time.Second)
}
ticker.Stop()
ticker = time.NewTicker(timeout)
}

}

// StartFlow implements StartFlow.
Expand Down Expand Up @@ -246,7 +252,9 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
slotNameForMetrics = input.FlowConnectionConfigs.ReplicationSlotName
}

go a.recordSlotSizePeriodically(ctx, srcConn, slotNameForMetrics, done, input.FlowConnectionConfigs.Source.Name)
errGroup.Go(func() error {
return a.recordSlotSizePeriodically(errCtx, srcConn, slotNameForMetrics, done, input.FlowConnectionConfigs.Source.Name)
})
// start a goroutine to pull records from the source
errGroup.Go(func() error {
return srcConn.PullRecords(&model.PullRecordsRequest{
Expand Down

0 comments on commit 6eeaeae

Please sign in to comment.