diff --git a/flow/alerting/alerting.go b/flow/alerting/alerting.go index a6c1549816..e78b410f58 100644 --- a/flow/alerting/alerting.go +++ b/flow/alerting/alerting.go @@ -24,17 +24,23 @@ type Alerter struct { telemetrySender telemetry.Sender } -func (a *Alerter) registerSendersFromPool(ctx context.Context) ([]AlertSender, error) { +type AlertSenderConfig struct { + Id int64 + Sender AlertSender +} + +func (a *Alerter) registerSendersFromPool(ctx context.Context) ([]AlertSenderConfig, error) { rows, err := a.catalogPool.Query(ctx, - "SELECT service_type,service_config FROM peerdb_stats.alerting_config") + "SELECT id,service_type,service_config FROM peerdb_stats.alerting_config") if err != nil { return nil, fmt.Errorf("failed to read alerter config from catalog: %w", err) } - var alertSenders []AlertSender + var alertSenderConfigs []AlertSenderConfig var serviceType ServiceType var serviceConfig string - _, err = pgx.ForEachRow(rows, []any{&serviceType, &serviceConfig}, func() error { + var id int64 + _, err = pgx.ForEachRow(rows, []any{&id, &serviceType, &serviceConfig}, func() error { switch serviceType { case SLACK: var slackServiceConfig slackAlertConfig @@ -43,7 +49,7 @@ func (a *Alerter) registerSendersFromPool(ctx context.Context) ([]AlertSender, e return fmt.Errorf("failed to unmarshal %s service config: %w", serviceType, err) } - alertSenders = append(alertSenders, newSlackAlertSender(&slackServiceConfig)) + alertSenderConfigs = append(alertSenderConfigs, AlertSenderConfig{Id: id, Sender: newSlackAlertSender(&slackServiceConfig)}) case EMAIL: emailServiceConfig := EmailAlertSenderConfig{ sourceEmail: peerdbenv.PeerDBAlertingEmailSenderSourceEmail(), @@ -65,14 +71,14 @@ func (a *Alerter) registerSendersFromPool(ctx context.Context) ([]AlertSender, e if err2 != nil { return fmt.Errorf("failed to initialize email alerter: %w", err2) } - alertSenders = append(alertSenders, alertSender) + alertSenderConfigs = append(alertSenderConfigs, AlertSenderConfig{Id: id, Sender: alertSender}) default: return fmt.Errorf("unknown service type: %s", serviceType) } return nil }) - return alertSenders, nil + return alertSenderConfigs, nil } // doesn't take care of closing pool, needs to be done externally. @@ -99,7 +105,7 @@ func NewAlerter(ctx context.Context, catalogPool *pgxpool.Pool) *Alerter { } func (a *Alerter) AlertIfSlotLag(ctx context.Context, peerName string, slotInfo *protos.SlotInfo) { - alertSenders, err := a.registerSendersFromPool(ctx) + alertSenderConfigs, err := a.registerSendersFromPool(ctx) if err != nil { logger.LoggerFromCtx(ctx).Warn("failed to set alert senders", slog.Any("error", err)) return @@ -113,9 +119,9 @@ func (a *Alerter) AlertIfSlotLag(ctx context.Context, peerName string, slotInfo defaultSlotLagMBAlertThreshold := dynamicconf.PeerDBSlotLagMBAlertThreshold(ctx) // catalog cannot use default threshold to space alerts properly, use the lowest set threshold instead lowestSlotLagMBAlertThreshold := defaultSlotLagMBAlertThreshold - for _, alertSender := range alertSenders { - if alertSender.getSlotLagMBAlertThreshold() > 0 { - lowestSlotLagMBAlertThreshold = min(lowestSlotLagMBAlertThreshold, alertSender.getSlotLagMBAlertThreshold()) + for _, alertSender := range alertSenderConfigs { + if alertSender.Sender.getSlotLagMBAlertThreshold() > 0 { + lowestSlotLagMBAlertThreshold = min(lowestSlotLagMBAlertThreshold, alertSender.Sender.getSlotLagMBAlertThreshold()) } } @@ -123,18 +129,19 @@ func (a *Alerter) AlertIfSlotLag(ctx context.Context, peerName string, slotInfo alertMessageTemplate := fmt.Sprintf("%sSlot `%s` on peer `%s` has exceeded threshold size of %%dMB, "+ `currently at %.2fMB!`, deploymentUIDPrefix, slotInfo.SlotName, peerName, slotInfo.LagInMb) - if slotInfo.LagInMb > float32(lowestSlotLagMBAlertThreshold) && - a.checkAndAddAlertToCatalog(ctx, alertKey, fmt.Sprintf(alertMessageTemplate, lowestSlotLagMBAlertThreshold)) { - for _, alertSender := range alertSenders { - if alertSender.getSlotLagMBAlertThreshold() > 0 { - if slotInfo.LagInMb > float32(alertSender.getSlotLagMBAlertThreshold()) { - a.alertToProvider(ctx, alertSender, alertKey, - fmt.Sprintf(alertMessageTemplate, alertSender.getSlotLagMBAlertThreshold())) - } - } else { - if slotInfo.LagInMb > float32(defaultSlotLagMBAlertThreshold) { - a.alertToProvider(ctx, alertSender, alertKey, - fmt.Sprintf(alertMessageTemplate, defaultSlotLagMBAlertThreshold)) + if slotInfo.LagInMb > float32(lowestSlotLagMBAlertThreshold) { + for _, alertSenderConfig := range alertSenderConfigs { + if a.checkAndAddAlertToCatalog(ctx, alertSenderConfig.Id, alertKey, fmt.Sprintf(alertMessageTemplate, lowestSlotLagMBAlertThreshold)) { + if alertSenderConfig.Sender.getSlotLagMBAlertThreshold() > 0 { + if slotInfo.LagInMb > float32(alertSenderConfig.Sender.getSlotLagMBAlertThreshold()) { + a.alertToProvider(ctx, alertSenderConfig, alertKey, + fmt.Sprintf(alertMessageTemplate, alertSenderConfig.Sender.getSlotLagMBAlertThreshold())) + } + } else { + if slotInfo.LagInMb > float32(defaultSlotLagMBAlertThreshold) { + a.alertToProvider(ctx, alertSenderConfig, alertKey, + fmt.Sprintf(alertMessageTemplate, defaultSlotLagMBAlertThreshold)) + } } } } @@ -144,7 +151,7 @@ func (a *Alerter) AlertIfSlotLag(ctx context.Context, peerName string, slotInfo func (a *Alerter) AlertIfOpenConnections(ctx context.Context, peerName string, openConnections *protos.GetOpenConnectionsForUserResult, ) { - alertSenders, err := a.registerSendersFromPool(ctx) + alertSenderConfigs, err := a.registerSendersFromPool(ctx) if err != nil { logger.LoggerFromCtx(ctx).Warn("failed to set Slack senders", slog.Any("error", err)) return @@ -158,9 +165,9 @@ func (a *Alerter) AlertIfOpenConnections(ctx context.Context, peerName string, // same as with slot lag, use lowest threshold for catalog defaultOpenConnectionsThreshold := dynamicconf.PeerDBOpenConnectionsAlertThreshold(ctx) lowestOpenConnectionsThreshold := defaultOpenConnectionsThreshold - for _, alertSender := range alertSenders { - if alertSender.getOpenConnectionsAlertThreshold() > 0 { - lowestOpenConnectionsThreshold = min(lowestOpenConnectionsThreshold, alertSender.getOpenConnectionsAlertThreshold()) + for _, alertSender := range alertSenderConfigs { + if alertSender.Sender.getOpenConnectionsAlertThreshold() > 0 { + lowestOpenConnectionsThreshold = min(lowestOpenConnectionsThreshold, alertSender.Sender.getOpenConnectionsAlertThreshold()) } } @@ -169,26 +176,27 @@ func (a *Alerter) AlertIfOpenConnections(ctx context.Context, peerName string, ` has exceeded threshold size of %%d connections, currently at %d connections!`, deploymentUIDPrefix, openConnections.UserName, peerName, openConnections.CurrentOpenConnections) - if openConnections.CurrentOpenConnections > int64(lowestOpenConnectionsThreshold) && - a.checkAndAddAlertToCatalog(ctx, alertKey, fmt.Sprintf(alertMessageTemplate, lowestOpenConnectionsThreshold)) { - for _, alertSender := range alertSenders { - if alertSender.getOpenConnectionsAlertThreshold() > 0 { - if openConnections.CurrentOpenConnections > int64(alertSender.getOpenConnectionsAlertThreshold()) { - a.alertToProvider(ctx, alertSender, alertKey, - fmt.Sprintf(alertMessageTemplate, alertSender.getOpenConnectionsAlertThreshold())) - } - } else { - if openConnections.CurrentOpenConnections > int64(defaultOpenConnectionsThreshold) { - a.alertToProvider(ctx, alertSender, alertKey, - fmt.Sprintf(alertMessageTemplate, defaultOpenConnectionsThreshold)) + if openConnections.CurrentOpenConnections > int64(lowestOpenConnectionsThreshold) { + for _, alertSenderConfig := range alertSenderConfigs { + if a.checkAndAddAlertToCatalog(ctx, alertSenderConfig.Id, alertKey, fmt.Sprintf(alertMessageTemplate, lowestOpenConnectionsThreshold)) { + if alertSenderConfig.Sender.getOpenConnectionsAlertThreshold() > 0 { + if openConnections.CurrentOpenConnections > int64(alertSenderConfig.Sender.getOpenConnectionsAlertThreshold()) { + a.alertToProvider(ctx, alertSenderConfig, alertKey, + fmt.Sprintf(alertMessageTemplate, alertSenderConfig.Sender.getOpenConnectionsAlertThreshold())) + } + } else { + if openConnections.CurrentOpenConnections > int64(defaultOpenConnectionsThreshold) { + a.alertToProvider(ctx, alertSenderConfig, alertKey, + fmt.Sprintf(alertMessageTemplate, defaultOpenConnectionsThreshold)) + } } } } } } -func (a *Alerter) alertToProvider(ctx context.Context, alertSender AlertSender, alertKey string, alertMessage string) { - err := alertSender.sendAlert(ctx, alertKey, alertMessage) +func (a *Alerter) alertToProvider(ctx context.Context, alertSenderConfig AlertSenderConfig, alertKey string, alertMessage string) { + err := alertSenderConfig.Sender.sendAlert(ctx, alertKey, alertMessage) if err != nil { logger.LoggerFromCtx(ctx).Warn("failed to send alert", slog.Any("error", err)) return @@ -198,7 +206,7 @@ func (a *Alerter) alertToProvider(ctx context.Context, alertSender AlertSender, // Only raises an alert if another alert with the same key hasn't been raised // in the past X minutes, where X is configurable and defaults to 15 minutes // returns true if alert added to catalog, so proceed with processing alerts to slack -func (a *Alerter) checkAndAddAlertToCatalog(ctx context.Context, alertKey string, alertMessage string) bool { +func (a *Alerter) checkAndAddAlertToCatalog(ctx context.Context, alertConfigId int64, alertKey string, alertMessage string) bool { dur := dynamicconf.PeerDBAlertingGapMinutesAsDuration(ctx) if dur == 0 { logger.LoggerFromCtx(ctx).Warn("Alerting disabled via environment variable, returning") @@ -206,9 +214,9 @@ func (a *Alerter) checkAndAddAlertToCatalog(ctx context.Context, alertKey string } row := a.catalogPool.QueryRow(ctx, - `SELECT created_timestamp FROM peerdb_stats.alerts_v1 WHERE alert_key=$1 + `SELECT created_timestamp FROM peerdb_stats.alerts_v1 WHERE alert_key=$1 AND alert_config_id=$2 ORDER BY created_timestamp DESC LIMIT 1`, - alertKey) + alertKey, alertConfigId) var createdTimestamp time.Time err := row.Scan(&createdTimestamp) if err != nil && err != pgx.ErrNoRows { @@ -218,8 +226,8 @@ func (a *Alerter) checkAndAddAlertToCatalog(ctx context.Context, alertKey string if time.Since(createdTimestamp) >= dur { _, err = a.catalogPool.Exec(ctx, - "INSERT INTO peerdb_stats.alerts_v1(alert_key,alert_message) VALUES($1,$2)", - alertKey, alertMessage) + "INSERT INTO peerdb_stats.alerts_v1(alert_key,alert_message,alert_config_id) VALUES($1,$2,$3)", + alertKey, alertMessage, alertConfigId) if err != nil { logger.LoggerFromCtx(ctx).Warn("failed to insert alert", slog.Any("error", err)) return false diff --git a/nexus/catalog/migrations/V22__alert_column_add_config_id.sql b/nexus/catalog/migrations/V22__alert_column_add_config_id.sql new file mode 100644 index 0000000000..ec69219fc5 --- /dev/null +++ b/nexus/catalog/migrations/V22__alert_column_add_config_id.sql @@ -0,0 +1,2 @@ +ALTER TABLE peerdb_stats.alerts_v1 +ADD COLUMN alert_config_id BIGINT NOT NULL DEFAULT NULL;