Skip to content

Commit

Permalink
Add the ability to dynamically configure some thresholds (#1057)
Browse files Browse the repository at this point in the history
Co-authored-by: Kevin Biju <[email protected]>
  • Loading branch information
iskakaushik and heavycrystal authored Jan 11, 2024
1 parent 19da0fb commit 7cdfb86
Show file tree
Hide file tree
Showing 5 changed files with 82 additions and 21 deletions.
5 changes: 3 additions & 2 deletions flow/activities/slot.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/PeerDB-io/peer-flow/connectors"
"github.com/PeerDB-io/peer-flow/connectors/utils/monitoring"
"github.com/PeerDB-io/peer-flow/dynamicconf"
"github.com/PeerDB-io/peer-flow/peerdbenv"
)

Expand All @@ -33,7 +34,7 @@ func (a *FlowableActivity) handleSlotInfo(
deploymentUIDPrefix = fmt.Sprintf("[%s] ", peerdbenv.PeerDBDeploymentUID())
}

slotLagInMBThreshold := peerdbenv.PeerDBSlotLagMBAlertThreshold()
slotLagInMBThreshold := dynamicconf.PeerDBSlotLagMBAlertThreshold(ctx)
if (slotLagInMBThreshold > 0) && (slotInfo[0].LagInMb >= float32(slotLagInMBThreshold)) {
a.Alerter.AlertIf(ctx, fmt.Sprintf("%s-slot-lag-threshold-exceeded", peerName),
fmt.Sprintf(`%sSlot `+"`%s`"+` on peer `+"`%s`"+` has exceeded threshold size of %dMB, currently at %.2fMB!
Expand All @@ -42,7 +43,7 @@ cc: <!channel>`,
}

// Also handles alerts for PeerDB user connections exceeding a given limit here
maxOpenConnectionsThreshold := peerdbenv.PeerDBOpenConnectionsAlertThreshold()
maxOpenConnectionsThreshold := dynamicconf.PeerDBOpenConnectionsAlertThreshold(ctx)
res, err := srcConn.GetOpenConnectionsForUser()
if err != nil {
slog.WarnContext(ctx, "warning: failed to get current open connections", slog.Any("error", err))
Expand Down
68 changes: 68 additions & 0 deletions flow/dynamicconf/dynamicconf.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package dynamicconf

import (
"context"
"log/slog"
"strconv"
"time"

utils "github.com/PeerDB-io/peer-flow/connectors/utils/catalog"
"github.com/jackc/pgx/v5/pgtype"
"github.com/jackc/pgx/v5/pgxpool"
)

func dynamicConfKeyExists(ctx context.Context, conn *pgxpool.Pool, key string) bool {
var exists pgtype.Bool
query := "SELECT EXISTS(SELECT 1 FROM alerting_settings WHERE config_name = $1)"
err := conn.QueryRow(ctx, query, key).Scan(&exists)
if err != nil {
slog.Error("Failed to check if key exists: %v", err)
return false
}

return exists.Bool
}

func dynamicConfUint32(ctx context.Context, key string, defaultValue uint32) uint32 {
conn, err := utils.GetCatalogConnectionPoolFromEnv()
if err != nil {
slog.Error("Failed to get catalog connection pool: %v", err)
return defaultValue
}

if !dynamicConfKeyExists(ctx, conn, key) {
return defaultValue
}

var value pgtype.Text
query := "SELECT config_value FROM alerting_settings WHERE config_name = $1"
err = conn.QueryRow(ctx, query, key).Scan(&value)
if err != nil {
slog.Error("Failed to get key: %v", err)
return defaultValue
}

result, err := strconv.ParseUint(value.String, 10, 32)
if err != nil {
slog.Error("Failed to parse uint32: %v", err)
return defaultValue
}

return uint32(result)
}

// PEERDB_SLOT_LAG_MB_ALERT_THRESHOLD, 0 disables slot lag alerting entirely
func PeerDBSlotLagMBAlertThreshold(ctx context.Context) uint32 {
return dynamicConfUint32(ctx, "PEERDB_SLOT_LAG_MB_ALERT_THRESHOLD", 5000)
}

// PEERDB_ALERTING_GAP_MINUTES, 0 disables all alerting entirely
func PeerDBAlertingGapMinutesAsDuration(ctx context.Context) time.Duration {
why := int64(dynamicConfUint32(ctx, "PEERDB_ALERTING_GAP_MINUTES", 15))
return time.Duration(why) * time.Minute
}

// PEERDB_PGPEER_OPEN_CONNECTIONS_ALERT_THRESHOLD, 0 disables open connections alerting entirely
func PeerDBOpenConnectionsAlertThreshold(ctx context.Context) uint32 {
return dynamicConfUint32(ctx, "PEERDB_PGPEER_OPEN_CONNECTIONS_ALERT_THRESHOLD", 5)
}
16 changes: 0 additions & 16 deletions flow/peerdbenv/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,22 +70,6 @@ func PeerDBCatalogDatabase() string {
return getEnvString("PEERDB_CATALOG_DATABASE", "")
}

// PEERDB_SLOT_LAG_MB_ALERT_THRESHOLD, 0 disables slot lag alerting entirely
func PeerDBSlotLagMBAlertThreshold() uint32 {
return getEnvUint32("PEERDB_SLOT_LAG_MB_ALERT_THRESHOLD", 5000)
}

// PEERDB_ALERTING_GAP_MINUTES, 0 disables all alerting entirely
func PeerDBAlertingGapMinutesAsDuration() time.Duration {
why := int64(getEnvUint32("PEERDB_ALERTING_GAP_MINUTES", 15))
return time.Duration(why) * time.Minute
}

// PEERDB_PGPEER_OPEN_CONNECTIONS_ALERT_THRESHOLD, 0 disables open connections alerting entirely
func PeerDBOpenConnectionsAlertThreshold() uint32 {
return getEnvUint32("PEERDB_PGPEER_OPEN_CONNECTIONS_ALERT_THRESHOLD", 5)
}

// PEERDB_ENABLE_WAL_HEARTBEAT
func PeerDBEnableWALHeartbeat() bool {
return getEnvBool("PEERDB_ENABLE_WAL_HEARTBEAT", false)
Expand Down
7 changes: 4 additions & 3 deletions flow/shared/alerting/alerting.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"log/slog"
"time"

"github.com/PeerDB-io/peer-flow/peerdbenv"
"github.com/PeerDB-io/peer-flow/dynamicconf"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
)
Expand Down Expand Up @@ -63,7 +63,8 @@ func NewAlerter(catalogPool *pgxpool.Pool) (*Alerter, error) {
// Only raises an alert if another alert with the same key hasn't been raised
// in the past X minutes, where X is configurable and defaults to 15 minutes
func (a *Alerter) AlertIf(ctx context.Context, alertKey string, alertMessage string) {
if peerdbenv.PeerDBAlertingGapMinutesAsDuration() == 0 {
dur := dynamicconf.PeerDBAlertingGapMinutesAsDuration(ctx)
if dur == 0 {
a.logger.WarnContext(ctx, "Alerting disabled via environment variable, returning")
return
}
Expand All @@ -80,7 +81,7 @@ func (a *Alerter) AlertIf(ctx context.Context, alertKey string, alertMessage str
return
}

if time.Since(createdTimestamp) >= peerdbenv.PeerDBAlertingGapMinutesAsDuration() {
if time.Since(createdTimestamp) >= dur {
a.AddAlertToCatalog(ctx, alertKey, alertMessage)
a.AlertToSlack(ctx, alertKey, alertMessage)
}
Expand Down
7 changes: 7 additions & 0 deletions nexus/catalog/migrations/V18__alerting_config.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
CREATE TABLE alerting_settings(
id INT PRIMARY KEY GENERATED ALWAYS AS IDENTITY,
config_name TEXT NOT NULL,
config_value TEXT NOT NULL
);

CREATE UNIQUE INDEX idx_alerting_settings_config_name ON alerting_settings (config_name);

0 comments on commit 7cdfb86

Please sign in to comment.