diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index ef0169b1b3..a7c0d5ce50 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -17,12 +17,10 @@ import ( "github.com/PeerDB-io/peer-flow/connectors/utils" "github.com/PeerDB-io/peer-flow/connectors/utils/monitoring" - "github.com/PeerDB-io/peer-flow/dynamicconf" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/logger" "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/model/qvalue" - "github.com/PeerDB-io/peer-flow/peerdbenv" "github.com/PeerDB-io/peer-flow/shared/alerting" ) @@ -935,33 +933,15 @@ func (c *PostgresConnector) HandleSlotInfo( return nil } - deploymentUIDPrefix := "" - if peerdbenv.PeerDBDeploymentUID() != "" { - deploymentUIDPrefix = fmt.Sprintf("[%s] ", peerdbenv.PeerDBDeploymentUID()) - } - - slotLagInMBThreshold := dynamicconf.PeerDBSlotLagMBAlertThreshold(ctx) - if (slotLagInMBThreshold > 0) && (slotInfo[0].LagInMb >= float32(slotLagInMBThreshold)) { - alerter.AlertIf(ctx, fmt.Sprintf("%s-slot-lag-threshold-exceeded", peerName), - fmt.Sprintf(`%sSlot `+"`%s`"+` on peer `+"`%s`"+` has exceeded threshold size of %dMB, currently at %.2fMB! -cc: `, - deploymentUIDPrefix, slotName, peerName, slotLagInMBThreshold, slotInfo[0].LagInMb)) - } + alerter.AlertIfSlotLag(ctx, peerName, slotInfo[0]) // Also handles alerts for PeerDB user connections exceeding a given limit here - maxOpenConnectionsThreshold := dynamicconf.PeerDBOpenConnectionsAlertThreshold(ctx) res, err := getOpenConnectionsForUser(ctx, conn, c.config.User) if err != nil { logger.Warn("warning: failed to get current open connections", "error", err) return err } - if (maxOpenConnectionsThreshold > 0) && (res.CurrentOpenConnections >= int64(maxOpenConnectionsThreshold)) { - alerter.AlertIf(ctx, fmt.Sprintf("%s-max-open-connections-threshold-exceeded", peerName), - fmt.Sprintf(`%sOpen connections from PeerDB user `+"`%s`"+` on peer `+"`%s`"+ - ` has exceeded threshold size of %d connections, currently at %d connections! -cc: `, - deploymentUIDPrefix, res.UserName, peerName, maxOpenConnectionsThreshold, res.CurrentOpenConnections)) - } + alerter.AlertIfOpenConnections(ctx, peerName, res) if len(slotInfo) != 0 { return monitoring.AppendSlotSizeInfo(ctx, catalogPool, peerName, slotInfo[0]) diff --git a/flow/shared/alerting/alerting.go b/flow/shared/alerting/alerting.go index a817dd285f..e438c80b0e 100644 --- a/flow/shared/alerting/alerting.go +++ b/flow/shared/alerting/alerting.go @@ -3,6 +3,7 @@ package alerting import ( "context" "encoding/json" + "errors" "fmt" "log/slog" "time" @@ -11,7 +12,9 @@ import ( "github.com/jackc/pgx/v5/pgxpool" "github.com/PeerDB-io/peer-flow/dynamicconf" + "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/logger" + "github.com/PeerDB-io/peer-flow/peerdbenv" ) // alerting service, no cool name :( @@ -19,8 +22,8 @@ type Alerter struct { catalogPool *pgxpool.Pool } -func registerSendersFromPool(ctx context.Context, catalogPool *pgxpool.Pool) ([]*slackAlertSender, error) { - rows, err := catalogPool.Query(ctx, +func (a *Alerter) registerSendersFromPool(ctx context.Context) ([]*slackAlertSender, error) { + rows, err := a.catalogPool.Query(ctx, "SELECT service_type,service_config FROM peerdb_stats.alerting_config") if err != nil { return nil, fmt.Errorf("failed to read alerter config from catalog: %w", err) @@ -50,7 +53,7 @@ func registerSendersFromPool(ctx context.Context, catalogPool *pgxpool.Pool) ([] // doesn't take care of closing pool, needs to be done externally. func NewAlerter(catalogPool *pgxpool.Pool) (*Alerter, error) { if catalogPool == nil { - return nil, fmt.Errorf("catalog pool is nil for Alerter") + return nil, errors.New("catalog pool is nil for Alerter") } return &Alerter{ @@ -58,60 +61,143 @@ func NewAlerter(catalogPool *pgxpool.Pool) (*Alerter, error) { }, nil } -// Only raises an alert if another alert with the same key hasn't been raised -// in the past X minutes, where X is configurable and defaults to 15 minutes -func (a *Alerter) AlertIf(ctx context.Context, alertKey string, alertMessage string) { - dur := dynamicconf.PeerDBAlertingGapMinutesAsDuration(ctx) - if dur == 0 { - logger.LoggerFromCtx(ctx).Warn("Alerting disabled via environment variable, returning") +func (a *Alerter) AlertIfSlotLag(ctx context.Context, peerName string, slotInfo *protos.SlotInfo) { + slackAlertSenders, err := a.registerSendersFromPool(ctx) + if err != nil { + logger.LoggerFromCtx(ctx).Warn("failed to set Slack senders", slog.Any("error", err)) return } - var err error - row := a.catalogPool.QueryRow(ctx, - `SELECT created_timestamp FROM peerdb_stats.alerts_v1 WHERE alert_key=$1 - ORDER BY created_timestamp DESC LIMIT 1`, - alertKey) - var createdTimestamp time.Time - err = row.Scan(&createdTimestamp) - if err != nil && err != pgx.ErrNoRows { - logger.LoggerFromCtx(ctx).Warn("failed to send alert: ", slog.String("err", err.Error())) - return + deploymentUIDPrefix := "" + if peerdbenv.PeerDBDeploymentUID() != "" { + deploymentUIDPrefix = fmt.Sprintf("[%s] ", peerdbenv.PeerDBDeploymentUID()) } - if time.Since(createdTimestamp) >= dur { - a.AddAlertToCatalog(ctx, alertKey, alertMessage) - a.AlertToSlack(ctx, alertKey, alertMessage) + defaultSlotLagMBAlertThreshold := dynamicconf.PeerDBSlotLagMBAlertThreshold(ctx) + // catalog cannot use default threshold to space alerts properly, use the lowest set threshold instead + lowestSlotLagMBAlertThreshold := defaultSlotLagMBAlertThreshold + for _, slackAlertSender := range slackAlertSenders { + if slackAlertSender.slotLagMBAlertThreshold > 0 { + lowestSlotLagMBAlertThreshold = min(lowestSlotLagMBAlertThreshold, slackAlertSender.slotLagMBAlertThreshold) + } + } + + alertKey := peerName + "-slot-lag-threshold-exceeded" + alertMessageTemplate := fmt.Sprintf(`%sSlot `+"`%s`"+` on peer `+"`%s`"+` has exceeded threshold size of `, + deploymentUIDPrefix, slotInfo.SlotName, peerName) + + "%d" + + fmt.Sprintf(`MB, currently at %.2fMB! + cc: `, slotInfo.LagInMb) + + if slotInfo.LagInMb > float32(lowestSlotLagMBAlertThreshold) && + a.checkAndAddAlertToCatalog(ctx, alertKey, fmt.Sprintf(alertMessageTemplate, lowestSlotLagMBAlertThreshold)) { + for _, slackAlertSender := range slackAlertSenders { + if slackAlertSender.slotLagMBAlertThreshold > 0 { + if slotInfo.LagInMb > float32(slackAlertSender.slotLagMBAlertThreshold) { + a.alertToSlack(ctx, slackAlertSender, alertKey, + fmt.Sprintf(alertMessageTemplate, slackAlertSender.slotLagMBAlertThreshold)) + } + } else { + if slotInfo.LagInMb > float32(defaultSlotLagMBAlertThreshold) { + a.alertToSlack(ctx, slackAlertSender, alertKey, + fmt.Sprintf(alertMessageTemplate, defaultSlotLagMBAlertThreshold)) + } + } + } } } -func (a *Alerter) AlertToSlack(ctx context.Context, alertKey string, alertMessage string) { - slackAlertSenders, err := registerSendersFromPool(ctx, a.catalogPool) +func (a *Alerter) AlertIfOpenConnections(ctx context.Context, peerName string, + openConnections *protos.GetOpenConnectionsForUserResult, +) { + slackAlertSenders, err := a.registerSendersFromPool(ctx) if err != nil { logger.LoggerFromCtx(ctx).Warn("failed to set Slack senders", slog.Any("error", err)) return } + deploymentUIDPrefix := "" + if peerdbenv.PeerDBDeploymentUID() != "" { + deploymentUIDPrefix = fmt.Sprintf("[%s] ", peerdbenv.PeerDBDeploymentUID()) + } + + // same as with slot lag, use lowest threshold for catalog + defaultOpenConnectionsThreshold := dynamicconf.PeerDBOpenConnectionsAlertThreshold(ctx) + lowestOpenConnectionsThreshold := defaultOpenConnectionsThreshold for _, slackAlertSender := range slackAlertSenders { - err = slackAlertSender.sendAlert(ctx, - fmt.Sprintf(":rotating_light:Alert:rotating_light:: %s", alertKey), alertMessage) - if err != nil { - logger.LoggerFromCtx(ctx).Warn("failed to send alert", slog.Any("error", err)) - return + if slackAlertSender.openConnectionsAlertThreshold > 0 { + lowestOpenConnectionsThreshold = min(lowestOpenConnectionsThreshold, slackAlertSender.openConnectionsAlertThreshold) + } + } + + alertKey := peerName + "-max-open-connections-threshold-exceeded" + alertMessageTemplate := fmt.Sprintf(`%sOpen connections from PeerDB user `+"`%s`"+` on peer `+"`%s`"+ + " has exceeded threshold size of ", deploymentUIDPrefix, openConnections.UserName, peerName) + + "%d" + + fmt.Sprintf(` connections, currently at %d connections! + cc: `, openConnections.CurrentOpenConnections) + + if openConnections.CurrentOpenConnections > int64(lowestOpenConnectionsThreshold) && + a.checkAndAddAlertToCatalog(ctx, alertKey, fmt.Sprintf(alertMessageTemplate, lowestOpenConnectionsThreshold)) { + for _, slackAlertSender := range slackAlertSenders { + if slackAlertSender.openConnectionsAlertThreshold > 0 { + if openConnections.CurrentOpenConnections > int64(slackAlertSender.openConnectionsAlertThreshold) { + a.alertToSlack(ctx, slackAlertSender, alertKey, + fmt.Sprintf(alertMessageTemplate, slackAlertSender.openConnectionsAlertThreshold)) + } + } else { + if openConnections.CurrentOpenConnections > int64(defaultOpenConnectionsThreshold) { + a.alertToSlack(ctx, slackAlertSender, alertKey, + fmt.Sprintf(alertMessageTemplate, defaultOpenConnectionsThreshold)) + } + } } } } -func (a *Alerter) AddAlertToCatalog(ctx context.Context, alertKey string, alertMessage string) { - _, err := a.catalogPool.Exec(ctx, - "INSERT INTO peerdb_stats.alerts_v1(alert_key,alert_message) VALUES($1,$2)", - alertKey, alertMessage) +func (a *Alerter) alertToSlack(ctx context.Context, slackAlertSender *slackAlertSender, alertKey string, alertMessage string) { + err := slackAlertSender.sendAlert(ctx, + ":rotating_light:Alert:rotating_light:: "+alertKey, alertMessage) if err != nil { - logger.LoggerFromCtx(ctx).Warn("failed to insert alert", slog.Any("error", err)) + logger.LoggerFromCtx(ctx).Warn("failed to send alert", slog.Any("error", err)) return } } +// Only raises an alert if another alert with the same key hasn't been raised +// in the past X minutes, where X is configurable and defaults to 15 minutes +// returns true if alert added to catalog, so proceed with processing alerts to slack +func (a *Alerter) checkAndAddAlertToCatalog(ctx context.Context, alertKey string, alertMessage string) bool { + dur := dynamicconf.PeerDBAlertingGapMinutesAsDuration(ctx) + if dur == 0 { + logger.LoggerFromCtx(ctx).Warn("Alerting disabled via environment variable, returning") + return false + } + + row := a.catalogPool.QueryRow(ctx, + `SELECT created_timestamp FROM peerdb_stats.alerts_v1 WHERE alert_key=$1 + ORDER BY created_timestamp DESC LIMIT 1`, + alertKey) + var createdTimestamp time.Time + err := row.Scan(&createdTimestamp) + if err != nil && err != pgx.ErrNoRows { + logger.LoggerFromCtx(ctx).Warn("failed to send alert: ", slog.String("err", err.Error())) + return false + } + + if time.Since(createdTimestamp) >= dur { + _, err = a.catalogPool.Exec(ctx, + "INSERT INTO peerdb_stats.alerts_v1(alert_key,alert_message) VALUES($1,$2)", + alertKey, alertMessage) + if err != nil { + logger.LoggerFromCtx(ctx).Warn("failed to insert alert", slog.Any("error", err)) + return false + } + return true + } + return false +} + func (a *Alerter) LogFlowError(ctx context.Context, flowName string, err error) { errorWithStack := fmt.Sprintf("%+v", err) _, err = a.catalogPool.Exec(ctx, diff --git a/flow/shared/alerting/slack_alert_sender.go b/flow/shared/alerting/slack_alert_sender.go index 04c9a4ad86..1ad007536b 100644 --- a/flow/shared/alerting/slack_alert_sender.go +++ b/flow/shared/alerting/slack_alert_sender.go @@ -8,19 +8,25 @@ import ( ) type slackAlertSender struct { - client *slack.Client - channelIDs []string + client *slack.Client + channelIDs []string + slotLagMBAlertThreshold uint32 + openConnectionsAlertThreshold uint32 } type slackAlertConfig struct { - AuthToken string `json:"auth_token"` - ChannelIDs []string `json:"channel_ids"` + AuthToken string `json:"auth_token"` + ChannelIDs []string `json:"channel_ids"` + SlotLagMBAlertThreshold uint32 `json:"slot_lag_mb_alert_threshold"` + OpenConnectionsAlertThreshold uint32 `json:"open_connections_alert_threshold"` } func newSlackAlertSender(config *slackAlertConfig) *slackAlertSender { return &slackAlertSender{ - client: slack.New(config.AuthToken), - channelIDs: config.ChannelIDs, + client: slack.New(config.AuthToken), + channelIDs: config.ChannelIDs, + slotLagMBAlertThreshold: config.SlotLagMBAlertThreshold, + openConnectionsAlertThreshold: config.OpenConnectionsAlertThreshold, } } diff --git a/ui/app/alert-config/new.tsx b/ui/app/alert-config/new.tsx index 70f7fbbfd2..2b598ca668 100644 --- a/ui/app/alert-config/new.tsx +++ b/ui/app/alert-config/new.tsx @@ -25,17 +25,23 @@ const NewAlertConfig = () => { const [serviceType, setServiceType] = useState(); const [authToken, setAuthToken] = useState(); const [channelIdString, setChannelIdString] = useState(); + const [slotLagMBAlertThreshold, setSlotLagMBAlertThreshold] = useState(); + const [openConnectionsAlertThreshold, setOpenConnectionsAlertThreshold] = useState(); const [loading, setLoading] = useState(false); const handleAdd = async () => { if (serviceType !== 'slack') { notifyErr('Service Type must be selected'); return; } + console.log(slotLagMBAlertThreshold); + console.log(openConnectionsAlertThreshold); const alertConfigReq: alertConfigType = { serviceType: serviceType, serviceConfig: { auth_token: authToken ?? '', channel_ids: channelIdString?.split(',')!, + slot_lag_mb_alert_threshold: slotLagMBAlertThreshold || 0, + open_connections_alert_threshold: openConnectionsAlertThreshold || 0, }, }; const alertReqValidity = alertConfigReqSchema.safeParse(alertConfigReq); @@ -104,6 +110,30 @@ const NewAlertConfig = () => { /> +
+

Slot Lag Alert Threshold (in MB)

+ setSlotLagMBAlertThreshold(e.target.valueAsNumber)} + /> +
+ +
+

Open Connections Alert Threshold

+ setOpenConnectionsAlertThreshold(e.target.valueAsNumber)} + /> +
+