Skip to content

Commit

Permalink
slot size alert only on env set
Browse files Browse the repository at this point in the history
  • Loading branch information
heavycrystal committed Dec 8, 2023
1 parent 83f19b2 commit 0ad9858
Showing 1 changed file with 21 additions and 0 deletions.
21 changes: 21 additions & 0 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ import (
"database/sql"
"errors"
"fmt"
"os"
"runtime"
"strconv"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -194,6 +196,21 @@ func (a *FlowableActivity) handleSlotInfo(
return err
}

slotLagInMBThresholdStr, ok := os.LookupEnv("PEERDB_SLOT_LAG_MB_ALERT_THRESHOLD")
if ok {
slotLagInMBThreshold, err := strconv.ParseInt(slotLagInMBThresholdStr, 10, 64)
if err != nil {
log.Warnf("failed to parse PEERDB_SLOT_LAG_MB_ALERT_THRESHOLD as integer!")
return nil
}

if int64(slotInfo[0].LagInMb) >= slotLagInMBThreshold {
a.Vigil.AlertIf(fmt.Sprintf("%s-slot-size-exceeded", peerName),
fmt.Sprintf("Slot %s on peer %s has exceeded threshold size of %dMB, currently at %fMB!",
slotName, peerName, slotLagInMBThreshold, slotInfo[0].LagInMb))
}
}

if len(slotInfo) != 0 {
return a.CatalogMirrorMonitor.AppendSlotSizeInfo(ctx, peerName, slotInfo[0])
}
Expand All @@ -206,6 +223,10 @@ func (a *FlowableActivity) recordSlotSizePeriodically(
slotName string,
peerName string,
) {
err := a.handleSlotInfo(ctx, srcConn, slotName, peerName)
if err != nil {
return
}
timeout := 10 * time.Minute
ticker := time.NewTicker(timeout)

Expand Down

0 comments on commit 0ad9858

Please sign in to comment.