diff --git a/flow/activities/slot.go b/flow/activities/slot.go index ef0114102a..8e8bb9aea0 100644 --- a/flow/activities/slot.go +++ b/flow/activities/slot.go @@ -8,6 +8,7 @@ import ( "github.com/PeerDB-io/peer-flow/connectors" "github.com/PeerDB-io/peer-flow/connectors/utils/monitoring" + "github.com/PeerDB-io/peer-flow/dynamicconf" "github.com/PeerDB-io/peer-flow/peerdbenv" ) @@ -33,7 +34,7 @@ func (a *FlowableActivity) handleSlotInfo( deploymentUIDPrefix = fmt.Sprintf("[%s] ", peerdbenv.PeerDBDeploymentUID()) } - slotLagInMBThreshold := peerdbenv.PeerDBSlotLagMBAlertThreshold() + slotLagInMBThreshold := dynamicconf.PeerDBSlotLagMBAlertThreshold(ctx) if (slotLagInMBThreshold > 0) && (slotInfo[0].LagInMb >= float32(slotLagInMBThreshold)) { a.Alerter.AlertIf(ctx, fmt.Sprintf("%s-slot-lag-threshold-exceeded", peerName), fmt.Sprintf(`%sSlot `+"`%s`"+` on peer `+"`%s`"+` has exceeded threshold size of %dMB, currently at %.2fMB! @@ -42,7 +43,7 @@ cc: `, } // Also handles alerts for PeerDB user connections exceeding a given limit here - maxOpenConnectionsThreshold := peerdbenv.PeerDBOpenConnectionsAlertThreshold() + maxOpenConnectionsThreshold := dynamicconf.PeerDBOpenConnectionsAlertThreshold(ctx) res, err := srcConn.GetOpenConnectionsForUser() if err != nil { slog.WarnContext(ctx, "warning: failed to get current open connections", slog.Any("error", err)) diff --git a/flow/dynamicconf/dynamicconf.go b/flow/dynamicconf/dynamicconf.go new file mode 100644 index 0000000000..af0d6afd0f --- /dev/null +++ b/flow/dynamicconf/dynamicconf.go @@ -0,0 +1,69 @@ +package dynamicconf + +import ( + "context" + "log/slog" + "strconv" + "time" + + utils "github.com/PeerDB-io/peer-flow/connectors/utils/catalog" + "github.com/jackc/pgx/v5/pgtype" + "github.com/jackc/pgx/v5/pgxpool" +) + +func dynamicConfKeyExists(ctx context.Context, conn *pgxpool.Pool, key string) bool { + var exists pgtype.Bool + query := "SELECT EXISTS(SELECT 1 FROM alerting_settings WHERE config_name = $1)" + err := conn.QueryRow(ctx, query, key).Scan(&exists) + if err != nil { + slog.Error("Failed to check if key exists: %v", err) + return false + } + + return exists.Bool +} + +func dynamicConfUint32(ctx context.Context, key string, defaultValue uint32) uint32 { + conn, err := utils.GetCatalogConnectionPoolFromEnv() + if err != nil { + slog.Error("Failed to get catalog connection pool: %v", err) + return defaultValue + } + + if !dynamicConfKeyExists(ctx, conn, key) { + return defaultValue + } + + var value pgtype.Text + query := "SELECT config_value FROM alerting_settings WHERE config_name = $1" + err = conn.QueryRow(ctx, query, key).Scan(&value) + if err != nil { + slog.Error("Failed to get key: %v", err) + return defaultValue + } + + var result uint64 + result, err = strconv.ParseUint(value.String, 10, 32) + if err != nil { + slog.Error("Failed to parse uint32: %v", err) + return defaultValue + } + + return uint32(result) +} + +// PEERDB_SLOT_LAG_MB_ALERT_THRESHOLD, 0 disables slot lag alerting entirely +func PeerDBSlotLagMBAlertThreshold(ctx context.Context) uint32 { + return dynamicConfUint32(ctx, "PEERDB_SLOT_LAG_MB_ALERT_THRESHOLD", 5000) +} + +// PEERDB_ALERTING_GAP_MINUTES, 0 disables all alerting entirely +func PeerDBAlertingGapMinutesAsDuration(ctx context.Context) time.Duration { + why := int64(dynamicConfUint32(ctx, "PEERDB_ALERTING_GAP_MINUTES", 15)) + return time.Duration(why) * time.Minute +} + +// PEERDB_PGPEER_OPEN_CONNECTIONS_ALERT_THRESHOLD, 0 disables open connections alerting entirely +func PeerDBOpenConnectionsAlertThreshold(ctx context.Context) uint32 { + return dynamicConfUint32(ctx, "PEERDB_PGPEER_OPEN_CONNECTIONS_ALERT_THRESHOLD", 5) +} diff --git a/flow/peerdbenv/config.go b/flow/peerdbenv/config.go index c17a5b89b7..ca238899ac 100644 --- a/flow/peerdbenv/config.go +++ b/flow/peerdbenv/config.go @@ -70,22 +70,6 @@ func PeerDBCatalogDatabase() string { return getEnvString("PEERDB_CATALOG_DATABASE", "") } -// PEERDB_SLOT_LAG_MB_ALERT_THRESHOLD, 0 disables slot lag alerting entirely -func PeerDBSlotLagMBAlertThreshold() uint32 { - return getEnvUint32("PEERDB_SLOT_LAG_MB_ALERT_THRESHOLD", 5000) -} - -// PEERDB_ALERTING_GAP_MINUTES, 0 disables all alerting entirely -func PeerDBAlertingGapMinutesAsDuration() time.Duration { - why := int64(getEnvUint32("PEERDB_ALERTING_GAP_MINUTES", 15)) - return time.Duration(why) * time.Minute -} - -// PEERDB_PGPEER_OPEN_CONNECTIONS_ALERT_THRESHOLD, 0 disables open connections alerting entirely -func PeerDBOpenConnectionsAlertThreshold() uint32 { - return getEnvUint32("PEERDB_PGPEER_OPEN_CONNECTIONS_ALERT_THRESHOLD", 5) -} - // PEERDB_ENABLE_WAL_HEARTBEAT func PeerDBEnableWALHeartbeat() bool { return getEnvBool("PEERDB_ENABLE_WAL_HEARTBEAT", false) diff --git a/flow/shared/alerting/alerting.go b/flow/shared/alerting/alerting.go index 95a65c346f..66ea558ada 100644 --- a/flow/shared/alerting/alerting.go +++ b/flow/shared/alerting/alerting.go @@ -7,7 +7,7 @@ import ( "log/slog" "time" - "github.com/PeerDB-io/peer-flow/peerdbenv" + "github.com/PeerDB-io/peer-flow/dynamicconf" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgxpool" ) @@ -63,7 +63,7 @@ func NewAlerter(catalogPool *pgxpool.Pool) (*Alerter, error) { // Only raises an alert if another alert with the same key hasn't been raised // in the past X minutes, where X is configurable and defaults to 15 minutes func (a *Alerter) AlertIf(ctx context.Context, alertKey string, alertMessage string) { - if peerdbenv.PeerDBAlertingGapMinutesAsDuration() == 0 { + if dynamicconf.PeerDBAlertingGapMinutesAsDuration(ctx) == 0 { a.logger.WarnContext(ctx, "Alerting disabled via environment variable, returning") return } @@ -80,7 +80,7 @@ func (a *Alerter) AlertIf(ctx context.Context, alertKey string, alertMessage str return } - if time.Since(createdTimestamp) >= peerdbenv.PeerDBAlertingGapMinutesAsDuration() { + if time.Since(createdTimestamp) >= dynamicconf.PeerDBAlertingGapMinutesAsDuration(ctx) { a.AddAlertToCatalog(ctx, alertKey, alertMessage) a.AlertToSlack(ctx, alertKey, alertMessage) } diff --git a/nexus/catalog/migrations/V18__alerting_config.sql b/nexus/catalog/migrations/V18__alerting_config.sql new file mode 100644 index 0000000000..e28875dd2a --- /dev/null +++ b/nexus/catalog/migrations/V18__alerting_config.sql @@ -0,0 +1,7 @@ +CREATE TABLE alerting_settings( + id INT PRIMARY KEY GENERATED ALWAYS AS IDENTITY, + config_name TEXT NOT NULL, + config_value TEXT NOT NULL +); + +CREATE UNIQUE INDEX idx_alerting_settings_config_name ON alerting_settings (config_name);