From a6c5033e19d59924ca9a835d9941ad45c07bdcdc Mon Sep 17 00:00:00 2001 From: Kevin Biju <52661649+heavycrystal@users.noreply.github.com> Date: Wed, 14 Feb 2024 01:29:21 +0530 Subject: [PATCH] custom threshold support for slack alerts (#1277) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit set via UI closes #1234 --------- Co-authored-by: Philip Dubé --- flow/connectors/postgres/postgres.go | 24 +--- flow/shared/alerting/alerting.go | 150 ++++++++++++++++----- flow/shared/alerting/slack_alert_sender.go | 18 ++- ui/app/alert-config/new.tsx | 34 +++++ ui/app/alert-config/validation.ts | 12 ++ 5 files changed, 176 insertions(+), 62 deletions(-) diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index 6c9d02631f..f41e015672 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -17,12 +17,10 @@ import ( "github.com/PeerDB-io/peer-flow/connectors/utils" "github.com/PeerDB-io/peer-flow/connectors/utils/monitoring" - "github.com/PeerDB-io/peer-flow/dynamicconf" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/logger" "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/model/qvalue" - "github.com/PeerDB-io/peer-flow/peerdbenv" "github.com/PeerDB-io/peer-flow/shared/alerting" ) @@ -928,33 +926,15 @@ func (c *PostgresConnector) HandleSlotInfo( return nil } - deploymentUIDPrefix := "" - if peerdbenv.PeerDBDeploymentUID() != "" { - deploymentUIDPrefix = fmt.Sprintf("[%s] ", peerdbenv.PeerDBDeploymentUID()) - } - - slotLagInMBThreshold := dynamicconf.PeerDBSlotLagMBAlertThreshold(ctx) - if (slotLagInMBThreshold > 0) && (slotInfo[0].LagInMb >= float32(slotLagInMBThreshold)) { - alerter.AlertIf(ctx, fmt.Sprintf("%s-slot-lag-threshold-exceeded", peerName), - fmt.Sprintf(`%sSlot `+"`%s`"+` on peer `+"`%s`"+` has exceeded threshold size of %dMB, currently at %.2fMB! -cc: `, - deploymentUIDPrefix, slotName, peerName, slotLagInMBThreshold, slotInfo[0].LagInMb)) - } + alerter.AlertIfSlotLag(ctx, peerName, slotInfo[0]) // Also handles alerts for PeerDB user connections exceeding a given limit here - maxOpenConnectionsThreshold := dynamicconf.PeerDBOpenConnectionsAlertThreshold(ctx) res, err := getOpenConnectionsForUser(ctx, conn, c.config.User) if err != nil { logger.Warn("warning: failed to get current open connections", "error", err) return err } - if (maxOpenConnectionsThreshold > 0) && (res.CurrentOpenConnections >= int64(maxOpenConnectionsThreshold)) { - alerter.AlertIf(ctx, fmt.Sprintf("%s-max-open-connections-threshold-exceeded", peerName), - fmt.Sprintf(`%sOpen connections from PeerDB user `+"`%s`"+` on peer `+"`%s`"+ - ` has exceeded threshold size of %d connections, currently at %d connections! -cc: `, - deploymentUIDPrefix, res.UserName, peerName, maxOpenConnectionsThreshold, res.CurrentOpenConnections)) - } + alerter.AlertIfOpenConnections(ctx, peerName, res) if len(slotInfo) != 0 { return monitoring.AppendSlotSizeInfo(ctx, catalogPool, peerName, slotInfo[0]) diff --git a/flow/shared/alerting/alerting.go b/flow/shared/alerting/alerting.go index a817dd285f..4e95f1d5d1 100644 --- a/flow/shared/alerting/alerting.go +++ b/flow/shared/alerting/alerting.go @@ -3,6 +3,7 @@ package alerting import ( "context" "encoding/json" + "errors" "fmt" "log/slog" "time" @@ -11,7 +12,9 @@ import ( "github.com/jackc/pgx/v5/pgxpool" "github.com/PeerDB-io/peer-flow/dynamicconf" + "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/logger" + "github.com/PeerDB-io/peer-flow/peerdbenv" ) // alerting service, no cool name :( @@ -19,8 +22,8 @@ type Alerter struct { catalogPool *pgxpool.Pool } -func registerSendersFromPool(ctx context.Context, catalogPool *pgxpool.Pool) ([]*slackAlertSender, error) { - rows, err := catalogPool.Query(ctx, +func (a *Alerter) registerSendersFromPool(ctx context.Context) ([]*slackAlertSender, error) { + rows, err := a.catalogPool.Query(ctx, "SELECT service_type,service_config FROM peerdb_stats.alerting_config") if err != nil { return nil, fmt.Errorf("failed to read alerter config from catalog: %w", err) @@ -50,7 +53,7 @@ func registerSendersFromPool(ctx context.Context, catalogPool *pgxpool.Pool) ([] // doesn't take care of closing pool, needs to be done externally. func NewAlerter(catalogPool *pgxpool.Pool) (*Alerter, error) { if catalogPool == nil { - return nil, fmt.Errorf("catalog pool is nil for Alerter") + return nil, errors.New("catalog pool is nil for Alerter") } return &Alerter{ @@ -58,60 +61,139 @@ func NewAlerter(catalogPool *pgxpool.Pool) (*Alerter, error) { }, nil } -// Only raises an alert if another alert with the same key hasn't been raised -// in the past X minutes, where X is configurable and defaults to 15 minutes -func (a *Alerter) AlertIf(ctx context.Context, alertKey string, alertMessage string) { - dur := dynamicconf.PeerDBAlertingGapMinutesAsDuration(ctx) - if dur == 0 { - logger.LoggerFromCtx(ctx).Warn("Alerting disabled via environment variable, returning") +func (a *Alerter) AlertIfSlotLag(ctx context.Context, peerName string, slotInfo *protos.SlotInfo) { + slackAlertSenders, err := a.registerSendersFromPool(ctx) + if err != nil { + logger.LoggerFromCtx(ctx).Warn("failed to set Slack senders", slog.Any("error", err)) return } - var err error - row := a.catalogPool.QueryRow(ctx, - `SELECT created_timestamp FROM peerdb_stats.alerts_v1 WHERE alert_key=$1 - ORDER BY created_timestamp DESC LIMIT 1`, - alertKey) - var createdTimestamp time.Time - err = row.Scan(&createdTimestamp) - if err != nil && err != pgx.ErrNoRows { - logger.LoggerFromCtx(ctx).Warn("failed to send alert: ", slog.String("err", err.Error())) - return + deploymentUIDPrefix := "" + if peerdbenv.PeerDBDeploymentUID() != "" { + deploymentUIDPrefix = fmt.Sprintf("[%s] ", peerdbenv.PeerDBDeploymentUID()) } - if time.Since(createdTimestamp) >= dur { - a.AddAlertToCatalog(ctx, alertKey, alertMessage) - a.AlertToSlack(ctx, alertKey, alertMessage) + defaultSlotLagMBAlertThreshold := dynamicconf.PeerDBSlotLagMBAlertThreshold(ctx) + // catalog cannot use default threshold to space alerts properly, use the lowest set threshold instead + lowestSlotLagMBAlertThreshold := defaultSlotLagMBAlertThreshold + for _, slackAlertSender := range slackAlertSenders { + if slackAlertSender.slotLagMBAlertThreshold > 0 { + lowestSlotLagMBAlertThreshold = min(lowestSlotLagMBAlertThreshold, slackAlertSender.slotLagMBAlertThreshold) + } + } + + alertKey := peerName + "-slot-lag-threshold-exceeded" + alertMessageTemplate := fmt.Sprintf("%sSlot `%s` on peer `%s` has exceeded threshold size of %%dMB, "+ + `currently at %.2fMB! + cc: `, deploymentUIDPrefix, slotInfo.SlotName, peerName, slotInfo.LagInMb) + + if slotInfo.LagInMb > float32(lowestSlotLagMBAlertThreshold) && + a.checkAndAddAlertToCatalog(ctx, alertKey, fmt.Sprintf(alertMessageTemplate, lowestSlotLagMBAlertThreshold)) { + for _, slackAlertSender := range slackAlertSenders { + if slackAlertSender.slotLagMBAlertThreshold > 0 { + if slotInfo.LagInMb > float32(slackAlertSender.slotLagMBAlertThreshold) { + a.alertToSlack(ctx, slackAlertSender, alertKey, + fmt.Sprintf(alertMessageTemplate, slackAlertSender.slotLagMBAlertThreshold)) + } + } else { + if slotInfo.LagInMb > float32(defaultSlotLagMBAlertThreshold) { + a.alertToSlack(ctx, slackAlertSender, alertKey, + fmt.Sprintf(alertMessageTemplate, defaultSlotLagMBAlertThreshold)) + } + } + } } } -func (a *Alerter) AlertToSlack(ctx context.Context, alertKey string, alertMessage string) { - slackAlertSenders, err := registerSendersFromPool(ctx, a.catalogPool) +func (a *Alerter) AlertIfOpenConnections(ctx context.Context, peerName string, + openConnections *protos.GetOpenConnectionsForUserResult, +) { + slackAlertSenders, err := a.registerSendersFromPool(ctx) if err != nil { logger.LoggerFromCtx(ctx).Warn("failed to set Slack senders", slog.Any("error", err)) return } + deploymentUIDPrefix := "" + if peerdbenv.PeerDBDeploymentUID() != "" { + deploymentUIDPrefix = fmt.Sprintf("[%s] ", peerdbenv.PeerDBDeploymentUID()) + } + + // same as with slot lag, use lowest threshold for catalog + defaultOpenConnectionsThreshold := dynamicconf.PeerDBOpenConnectionsAlertThreshold(ctx) + lowestOpenConnectionsThreshold := defaultOpenConnectionsThreshold for _, slackAlertSender := range slackAlertSenders { - err = slackAlertSender.sendAlert(ctx, - fmt.Sprintf(":rotating_light:Alert:rotating_light:: %s", alertKey), alertMessage) - if err != nil { - logger.LoggerFromCtx(ctx).Warn("failed to send alert", slog.Any("error", err)) - return + if slackAlertSender.openConnectionsAlertThreshold > 0 { + lowestOpenConnectionsThreshold = min(lowestOpenConnectionsThreshold, slackAlertSender.openConnectionsAlertThreshold) + } + } + + alertKey := peerName + "-max-open-connections-threshold-exceeded" + alertMessageTemplate := fmt.Sprintf("%sOpen connections from PeerDB user `%s` on peer `%s`"+ + ` has exceeded threshold size of %%d connections, currently at %d connections! + cc: `, deploymentUIDPrefix, openConnections.UserName, peerName, openConnections.CurrentOpenConnections) + + if openConnections.CurrentOpenConnections > int64(lowestOpenConnectionsThreshold) && + a.checkAndAddAlertToCatalog(ctx, alertKey, fmt.Sprintf(alertMessageTemplate, lowestOpenConnectionsThreshold)) { + for _, slackAlertSender := range slackAlertSenders { + if slackAlertSender.openConnectionsAlertThreshold > 0 { + if openConnections.CurrentOpenConnections > int64(slackAlertSender.openConnectionsAlertThreshold) { + a.alertToSlack(ctx, slackAlertSender, alertKey, + fmt.Sprintf(alertMessageTemplate, slackAlertSender.openConnectionsAlertThreshold)) + } + } else { + if openConnections.CurrentOpenConnections > int64(defaultOpenConnectionsThreshold) { + a.alertToSlack(ctx, slackAlertSender, alertKey, + fmt.Sprintf(alertMessageTemplate, defaultOpenConnectionsThreshold)) + } + } } } } -func (a *Alerter) AddAlertToCatalog(ctx context.Context, alertKey string, alertMessage string) { - _, err := a.catalogPool.Exec(ctx, - "INSERT INTO peerdb_stats.alerts_v1(alert_key,alert_message) VALUES($1,$2)", - alertKey, alertMessage) +func (a *Alerter) alertToSlack(ctx context.Context, slackAlertSender *slackAlertSender, alertKey string, alertMessage string) { + err := slackAlertSender.sendAlert(ctx, + ":rotating_light:Alert:rotating_light:: "+alertKey, alertMessage) if err != nil { - logger.LoggerFromCtx(ctx).Warn("failed to insert alert", slog.Any("error", err)) + logger.LoggerFromCtx(ctx).Warn("failed to send alert", slog.Any("error", err)) return } } +// Only raises an alert if another alert with the same key hasn't been raised +// in the past X minutes, where X is configurable and defaults to 15 minutes +// returns true if alert added to catalog, so proceed with processing alerts to slack +func (a *Alerter) checkAndAddAlertToCatalog(ctx context.Context, alertKey string, alertMessage string) bool { + dur := dynamicconf.PeerDBAlertingGapMinutesAsDuration(ctx) + if dur == 0 { + logger.LoggerFromCtx(ctx).Warn("Alerting disabled via environment variable, returning") + return false + } + + row := a.catalogPool.QueryRow(ctx, + `SELECT created_timestamp FROM peerdb_stats.alerts_v1 WHERE alert_key=$1 + ORDER BY created_timestamp DESC LIMIT 1`, + alertKey) + var createdTimestamp time.Time + err := row.Scan(&createdTimestamp) + if err != nil && err != pgx.ErrNoRows { + logger.LoggerFromCtx(ctx).Warn("failed to send alert: ", slog.String("err", err.Error())) + return false + } + + if time.Since(createdTimestamp) >= dur { + _, err = a.catalogPool.Exec(ctx, + "INSERT INTO peerdb_stats.alerts_v1(alert_key,alert_message) VALUES($1,$2)", + alertKey, alertMessage) + if err != nil { + logger.LoggerFromCtx(ctx).Warn("failed to insert alert", slog.Any("error", err)) + return false + } + return true + } + return false +} + func (a *Alerter) LogFlowError(ctx context.Context, flowName string, err error) { errorWithStack := fmt.Sprintf("%+v", err) _, err = a.catalogPool.Exec(ctx, diff --git a/flow/shared/alerting/slack_alert_sender.go b/flow/shared/alerting/slack_alert_sender.go index 04c9a4ad86..1ad007536b 100644 --- a/flow/shared/alerting/slack_alert_sender.go +++ b/flow/shared/alerting/slack_alert_sender.go @@ -8,19 +8,25 @@ import ( ) type slackAlertSender struct { - client *slack.Client - channelIDs []string + client *slack.Client + channelIDs []string + slotLagMBAlertThreshold uint32 + openConnectionsAlertThreshold uint32 } type slackAlertConfig struct { - AuthToken string `json:"auth_token"` - ChannelIDs []string `json:"channel_ids"` + AuthToken string `json:"auth_token"` + ChannelIDs []string `json:"channel_ids"` + SlotLagMBAlertThreshold uint32 `json:"slot_lag_mb_alert_threshold"` + OpenConnectionsAlertThreshold uint32 `json:"open_connections_alert_threshold"` } func newSlackAlertSender(config *slackAlertConfig) *slackAlertSender { return &slackAlertSender{ - client: slack.New(config.AuthToken), - channelIDs: config.ChannelIDs, + client: slack.New(config.AuthToken), + channelIDs: config.ChannelIDs, + slotLagMBAlertThreshold: config.SlotLagMBAlertThreshold, + openConnectionsAlertThreshold: config.OpenConnectionsAlertThreshold, } } diff --git a/ui/app/alert-config/new.tsx b/ui/app/alert-config/new.tsx index 70f7fbbfd2..aad7789029 100644 --- a/ui/app/alert-config/new.tsx +++ b/ui/app/alert-config/new.tsx @@ -25,17 +25,25 @@ const NewAlertConfig = () => { const [serviceType, setServiceType] = useState(); const [authToken, setAuthToken] = useState(); const [channelIdString, setChannelIdString] = useState(); + const [slotLagMBAlertThreshold, setSlotLagMBAlertThreshold] = + useState(); + const [openConnectionsAlertThreshold, setOpenConnectionsAlertThreshold] = + useState(); const [loading, setLoading] = useState(false); const handleAdd = async () => { if (serviceType !== 'slack') { notifyErr('Service Type must be selected'); return; } + console.log(slotLagMBAlertThreshold); + console.log(openConnectionsAlertThreshold); const alertConfigReq: alertConfigType = { serviceType: serviceType, serviceConfig: { auth_token: authToken ?? '', channel_ids: channelIdString?.split(',')!, + slot_lag_mb_alert_threshold: slotLagMBAlertThreshold || 0, + open_connections_alert_threshold: openConnectionsAlertThreshold || 0, }, }; const alertReqValidity = alertConfigReqSchema.safeParse(alertConfigReq); @@ -104,6 +112,32 @@ const NewAlertConfig = () => { /> +
+

Slot Lag Alert Threshold (in MB)

+ setSlotLagMBAlertThreshold(e.target.valueAsNumber)} + /> +
+ +
+

Open Connections Alert Threshold

+ + setOpenConnectionsAlertThreshold(e.target.valueAsNumber) + } + /> +
+