Skip to content

Commit

Permalink
fix(alerting): check alert config id while checking for sent alerts
Browse files Browse the repository at this point in the history
  • Loading branch information
iamKunalGupta committed Mar 11, 2024
1 parent dce5399 commit 149af0e
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 46 deletions.
100 changes: 54 additions & 46 deletions flow/alerting/alerting.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,23 @@ type Alerter struct {
telemetrySender telemetry.Sender
}

func (a *Alerter) registerSendersFromPool(ctx context.Context) ([]AlertSender, error) {
type AlertSenderConfig struct {
Id int64
Sender AlertSender
}

func (a *Alerter) registerSendersFromPool(ctx context.Context) ([]AlertSenderConfig, error) {
rows, err := a.catalogPool.Query(ctx,
"SELECT service_type,service_config FROM peerdb_stats.alerting_config")
"SELECT id,service_type,service_config FROM peerdb_stats.alerting_config")
if err != nil {
return nil, fmt.Errorf("failed to read alerter config from catalog: %w", err)
}

var alertSenders []AlertSender
var alertSenderConfigs []AlertSenderConfig
var serviceType ServiceType
var serviceConfig string
_, err = pgx.ForEachRow(rows, []any{&serviceType, &serviceConfig}, func() error {
var id int64
_, err = pgx.ForEachRow(rows, []any{&id, &serviceType, &serviceConfig}, func() error {
switch serviceType {
case SLACK:
var slackServiceConfig slackAlertConfig
Expand All @@ -43,7 +49,7 @@ func (a *Alerter) registerSendersFromPool(ctx context.Context) ([]AlertSender, e
return fmt.Errorf("failed to unmarshal %s service config: %w", serviceType, err)
}

alertSenders = append(alertSenders, newSlackAlertSender(&slackServiceConfig))
alertSenderConfigs = append(alertSenderConfigs, AlertSenderConfig{Id: id, Sender: newSlackAlertSender(&slackServiceConfig)})
case EMAIL:
emailServiceConfig := EmailAlertSenderConfig{
sourceEmail: peerdbenv.PeerDBAlertingEmailSenderSourceEmail(),
Expand All @@ -65,14 +71,14 @@ func (a *Alerter) registerSendersFromPool(ctx context.Context) ([]AlertSender, e
if err2 != nil {
return fmt.Errorf("failed to initialize email alerter: %w", err2)
}
alertSenders = append(alertSenders, alertSender)
alertSenderConfigs = append(alertSenderConfigs, AlertSenderConfig{Id: id, Sender: alertSender})
default:
return fmt.Errorf("unknown service type: %s", serviceType)
}
return nil
})

return alertSenders, nil
return alertSenderConfigs, nil
}

// doesn't take care of closing pool, needs to be done externally.
Expand All @@ -99,7 +105,7 @@ func NewAlerter(ctx context.Context, catalogPool *pgxpool.Pool) *Alerter {
}

func (a *Alerter) AlertIfSlotLag(ctx context.Context, peerName string, slotInfo *protos.SlotInfo) {
alertSenders, err := a.registerSendersFromPool(ctx)
alertSenderConfigs, err := a.registerSendersFromPool(ctx)
if err != nil {
logger.LoggerFromCtx(ctx).Warn("failed to set alert senders", slog.Any("error", err))
return
Expand All @@ -113,28 +119,29 @@ func (a *Alerter) AlertIfSlotLag(ctx context.Context, peerName string, slotInfo
defaultSlotLagMBAlertThreshold := dynamicconf.PeerDBSlotLagMBAlertThreshold(ctx)
// catalog cannot use default threshold to space alerts properly, use the lowest set threshold instead
lowestSlotLagMBAlertThreshold := defaultSlotLagMBAlertThreshold
for _, alertSender := range alertSenders {
if alertSender.getSlotLagMBAlertThreshold() > 0 {
lowestSlotLagMBAlertThreshold = min(lowestSlotLagMBAlertThreshold, alertSender.getSlotLagMBAlertThreshold())
for _, alertSender := range alertSenderConfigs {
if alertSender.Sender.getSlotLagMBAlertThreshold() > 0 {
lowestSlotLagMBAlertThreshold = min(lowestSlotLagMBAlertThreshold, alertSender.Sender.getSlotLagMBAlertThreshold())
}
}

alertKey := fmt.Sprintf("%s Slot Lag Threshold Exceeded for Peer %s", deploymentUIDPrefix, peerName)
alertMessageTemplate := fmt.Sprintf("%sSlot `%s` on peer `%s` has exceeded threshold size of %%dMB, "+
`currently at %.2fMB!`, deploymentUIDPrefix, slotInfo.SlotName, peerName, slotInfo.LagInMb)

if slotInfo.LagInMb > float32(lowestSlotLagMBAlertThreshold) &&
a.checkAndAddAlertToCatalog(ctx, alertKey, fmt.Sprintf(alertMessageTemplate, lowestSlotLagMBAlertThreshold)) {
for _, alertSender := range alertSenders {
if alertSender.getSlotLagMBAlertThreshold() > 0 {
if slotInfo.LagInMb > float32(alertSender.getSlotLagMBAlertThreshold()) {
a.alertToProvider(ctx, alertSender, alertKey,
fmt.Sprintf(alertMessageTemplate, alertSender.getSlotLagMBAlertThreshold()))
}
} else {
if slotInfo.LagInMb > float32(defaultSlotLagMBAlertThreshold) {
a.alertToProvider(ctx, alertSender, alertKey,
fmt.Sprintf(alertMessageTemplate, defaultSlotLagMBAlertThreshold))
if slotInfo.LagInMb > float32(lowestSlotLagMBAlertThreshold) {
for _, alertSenderConfig := range alertSenderConfigs {
if a.checkAndAddAlertToCatalog(ctx, alertSenderConfig.Id, alertKey, fmt.Sprintf(alertMessageTemplate, lowestSlotLagMBAlertThreshold)) {

Check failure on line 134 in flow/alerting/alerting.go

View workflow job for this annotation

GitHub Actions / lint

line is 147 characters (lll)
if alertSenderConfig.Sender.getSlotLagMBAlertThreshold() > 0 {
if slotInfo.LagInMb > float32(alertSenderConfig.Sender.getSlotLagMBAlertThreshold()) {
a.alertToProvider(ctx, alertSenderConfig, alertKey,
fmt.Sprintf(alertMessageTemplate, alertSenderConfig.Sender.getSlotLagMBAlertThreshold()))
}
} else {
if slotInfo.LagInMb > float32(defaultSlotLagMBAlertThreshold) {
a.alertToProvider(ctx, alertSenderConfig, alertKey,
fmt.Sprintf(alertMessageTemplate, defaultSlotLagMBAlertThreshold))
}
}
}
}
Expand All @@ -144,7 +151,7 @@ func (a *Alerter) AlertIfSlotLag(ctx context.Context, peerName string, slotInfo
func (a *Alerter) AlertIfOpenConnections(ctx context.Context, peerName string,
openConnections *protos.GetOpenConnectionsForUserResult,
) {
alertSenders, err := a.registerSendersFromPool(ctx)
alertSenderConfigs, err := a.registerSendersFromPool(ctx)
if err != nil {
logger.LoggerFromCtx(ctx).Warn("failed to set Slack senders", slog.Any("error", err))
return
Expand All @@ -158,9 +165,9 @@ func (a *Alerter) AlertIfOpenConnections(ctx context.Context, peerName string,
// same as with slot lag, use lowest threshold for catalog
defaultOpenConnectionsThreshold := dynamicconf.PeerDBOpenConnectionsAlertThreshold(ctx)
lowestOpenConnectionsThreshold := defaultOpenConnectionsThreshold
for _, alertSender := range alertSenders {
if alertSender.getOpenConnectionsAlertThreshold() > 0 {
lowestOpenConnectionsThreshold = min(lowestOpenConnectionsThreshold, alertSender.getOpenConnectionsAlertThreshold())
for _, alertSender := range alertSenderConfigs {
if alertSender.Sender.getOpenConnectionsAlertThreshold() > 0 {
lowestOpenConnectionsThreshold = min(lowestOpenConnectionsThreshold, alertSender.Sender.getOpenConnectionsAlertThreshold())
}
}

Expand All @@ -169,26 +176,27 @@ func (a *Alerter) AlertIfOpenConnections(ctx context.Context, peerName string,
` has exceeded threshold size of %%d connections, currently at %d connections!`,
deploymentUIDPrefix, openConnections.UserName, peerName, openConnections.CurrentOpenConnections)

if openConnections.CurrentOpenConnections > int64(lowestOpenConnectionsThreshold) &&
a.checkAndAddAlertToCatalog(ctx, alertKey, fmt.Sprintf(alertMessageTemplate, lowestOpenConnectionsThreshold)) {
for _, alertSender := range alertSenders {
if alertSender.getOpenConnectionsAlertThreshold() > 0 {
if openConnections.CurrentOpenConnections > int64(alertSender.getOpenConnectionsAlertThreshold()) {
a.alertToProvider(ctx, alertSender, alertKey,
fmt.Sprintf(alertMessageTemplate, alertSender.getOpenConnectionsAlertThreshold()))
}
} else {
if openConnections.CurrentOpenConnections > int64(defaultOpenConnectionsThreshold) {
a.alertToProvider(ctx, alertSender, alertKey,
fmt.Sprintf(alertMessageTemplate, defaultOpenConnectionsThreshold))
if openConnections.CurrentOpenConnections > int64(lowestOpenConnectionsThreshold) {
for _, alertSenderConfig := range alertSenderConfigs {
if a.checkAndAddAlertToCatalog(ctx, alertSenderConfig.Id, alertKey, fmt.Sprintf(alertMessageTemplate, lowestOpenConnectionsThreshold)) {

Check failure on line 181 in flow/alerting/alerting.go

View workflow job for this annotation

GitHub Actions / lint

line is 148 characters (lll)
if alertSenderConfig.Sender.getOpenConnectionsAlertThreshold() > 0 {
if openConnections.CurrentOpenConnections > int64(alertSenderConfig.Sender.getOpenConnectionsAlertThreshold()) {
a.alertToProvider(ctx, alertSenderConfig, alertKey,
fmt.Sprintf(alertMessageTemplate, alertSenderConfig.Sender.getOpenConnectionsAlertThreshold()))
}
} else {
if openConnections.CurrentOpenConnections > int64(defaultOpenConnectionsThreshold) {
a.alertToProvider(ctx, alertSenderConfig, alertKey,
fmt.Sprintf(alertMessageTemplate, defaultOpenConnectionsThreshold))
}
}
}
}
}
}

func (a *Alerter) alertToProvider(ctx context.Context, alertSender AlertSender, alertKey string, alertMessage string) {
err := alertSender.sendAlert(ctx, alertKey, alertMessage)
func (a *Alerter) alertToProvider(ctx context.Context, alertSenderConfig AlertSenderConfig, alertKey string, alertMessage string) {
err := alertSenderConfig.Sender.sendAlert(ctx, alertKey, alertMessage)
if err != nil {
logger.LoggerFromCtx(ctx).Warn("failed to send alert", slog.Any("error", err))
return
Expand All @@ -198,17 +206,17 @@ func (a *Alerter) alertToProvider(ctx context.Context, alertSender AlertSender,
// Only raises an alert if another alert with the same key hasn't been raised
// in the past X minutes, where X is configurable and defaults to 15 minutes
// returns true if alert added to catalog, so proceed with processing alerts to slack
func (a *Alerter) checkAndAddAlertToCatalog(ctx context.Context, alertKey string, alertMessage string) bool {
func (a *Alerter) checkAndAddAlertToCatalog(ctx context.Context, alertConfigId int64, alertKey string, alertMessage string) bool {
dur := dynamicconf.PeerDBAlertingGapMinutesAsDuration(ctx)
if dur == 0 {
logger.LoggerFromCtx(ctx).Warn("Alerting disabled via environment variable, returning")
return false
}

row := a.catalogPool.QueryRow(ctx,
`SELECT created_timestamp FROM peerdb_stats.alerts_v1 WHERE alert_key=$1
`SELECT created_timestamp FROM peerdb_stats.alerts_v1 WHERE alert_key=$1 AND alert_config_id=$2
ORDER BY created_timestamp DESC LIMIT 1`,
alertKey)
alertKey, alertConfigId)
var createdTimestamp time.Time
err := row.Scan(&createdTimestamp)
if err != nil && err != pgx.ErrNoRows {
Expand All @@ -218,8 +226,8 @@ func (a *Alerter) checkAndAddAlertToCatalog(ctx context.Context, alertKey string

if time.Since(createdTimestamp) >= dur {
_, err = a.catalogPool.Exec(ctx,
"INSERT INTO peerdb_stats.alerts_v1(alert_key,alert_message) VALUES($1,$2)",
alertKey, alertMessage)
"INSERT INTO peerdb_stats.alerts_v1(alert_key,alert_message,alert_config_id) VALUES($1,$2,$3)",
alertKey, alertMessage, alertConfigId)
if err != nil {
logger.LoggerFromCtx(ctx).Warn("failed to insert alert", slog.Any("error", err))
return false
Expand Down
2 changes: 2 additions & 0 deletions nexus/catalog/migrations/V22__alert_column_add_config_id.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ALTER TABLE peerdb_stats.alerts_v1
ADD COLUMN alert_config_id BIGINT DEFAULT NULL;

0 comments on commit 149af0e

Please sign in to comment.