diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 655fd7dd63..7c3d433f8b 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -5,7 +5,9 @@ import ( "database/sql" "errors" "fmt" + "os" "runtime" + "strconv" "strings" "sync" "time" @@ -194,6 +196,21 @@ func (a *FlowableActivity) handleSlotInfo( return err } + slotLagInMBThresholdStr, ok := os.LookupEnv("PEERDB_SLOT_LAG_MB_ALERT_THRESHOLD") + if ok { + slotLagInMBThreshold, err := strconv.ParseInt(slotLagInMBThresholdStr, 10, 64) + if err != nil { + log.Warnf("failed to parse PEERDB_SLOT_LAG_MB_ALERT_THRESHOLD as integer!") + return nil + } + + if int64(slotInfo[0].LagInMb) >= slotLagInMBThreshold { + a.Vigil.AlertIf(fmt.Sprintf("%s-slot-size-exceeded", peerName), + fmt.Sprintf("Slot %s on peer %s has exceeded threshold size of %dMB, currently at %fMB!", + slotName, peerName, slotLagInMBThreshold, slotInfo[0].LagInMb)) + } + } + if len(slotInfo) != 0 { return a.CatalogMirrorMonitor.AppendSlotSizeInfo(ctx, peerName, slotInfo[0]) } @@ -206,6 +223,10 @@ func (a *FlowableActivity) recordSlotSizePeriodically( slotName string, peerName string, ) { + err := a.handleSlotInfo(ctx, srcConn, slotName, peerName) + if err != nil { + return + } timeout := 10 * time.Minute ticker := time.NewTicker(timeout)