Skip to content

Commit

Permalink
fix: exit on error
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed Dec 8, 2023
1 parent 6eeaeae commit 8666290
Showing 1 changed file with 16 additions and 7 deletions.
23 changes: 16 additions & 7 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,17 +160,22 @@ func (a *FlowableActivity) CreateNormalizedTable(
return conn.SetupNormalizedTables(config)
}

func (a *FlowableActivity) handleSlotInfo(ctx context.Context, srcConn connectors.CDCPullConnector, slotName string, peerName string) {
func (a *FlowableActivity) handleSlotInfo(
ctx context.Context,
srcConn connectors.CDCPullConnector,
slotName string,
peerName string,
) error {
slotInfo, err := srcConn.GetSlotInfo(slotName)
if err != nil {
log.Warnf("warning: failed to get slot info: %v", err)
time.Sleep(30 * time.Second)
return
return err
}

if len(slotInfo) != 0 {
a.CatalogMirrorMonitor.AppendSlotSizeInfo(ctx, peerName, slotInfo[0])
return a.CatalogMirrorMonitor.AppendSlotSizeInfo(ctx, peerName, slotInfo[0])
}
return nil
}

func (a *FlowableActivity) recordSlotSizePeriodically(
Expand All @@ -188,12 +193,16 @@ func (a *FlowableActivity) recordSlotSizePeriodically(
for {
select {
case <-ticker.C:
a.handleSlotInfo(ctx, srcConn, slotName, peerName)
err := a.handleSlotInfo(ctx, srcConn, slotName, peerName)
if err != nil {
time.Sleep(30 * time.Second)
return nil
}
case <-done:
a.handleSlotInfo(ctx, srcConn, slotName, peerName)
return nil
case <-ctx.Done():
log.Warn("recordSlotSize: context is done. ignoring")
time.Sleep(30 * time.Second)
return nil
}
ticker.Stop()
ticker = time.NewTicker(timeout)
Expand Down

0 comments on commit 8666290

Please sign in to comment.