Skip to content

Commit

Permalink
Merge branch 'main' into generic-simple-schema-changes
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex authored Mar 12, 2024
2 parents 4576b8c + ad65c28 commit d0ef472
Show file tree
Hide file tree
Showing 22 changed files with 594 additions and 194 deletions.
161 changes: 102 additions & 59 deletions flow/alerting/alerting.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ package alerting
import (
"context"
"encoding/json"
"errors"
"fmt"
"log/slog"
"strings"
"time"

"github.com/jackc/pgx/v5"
Expand All @@ -23,32 +25,67 @@ type Alerter struct {
telemetrySender telemetry.Sender
}

func (a *Alerter) registerSendersFromPool(ctx context.Context) ([]*slackAlertSender, error) {
type AlertSenderConfig struct {
Id int64
Sender AlertSender
}

func (a *Alerter) registerSendersFromPool(ctx context.Context) ([]AlertSenderConfig, error) {
rows, err := a.catalogPool.Query(ctx,
"SELECT service_type,service_config FROM peerdb_stats.alerting_config")
"SELECT id,service_type,service_config FROM peerdb_stats.alerting_config")
if err != nil {
return nil, fmt.Errorf("failed to read alerter config from catalog: %w", err)
}

var slackAlertSenders []*slackAlertSender
var serviceType, serviceConfig string
_, err = pgx.ForEachRow(rows, []any{&serviceType, &serviceConfig}, func() error {
var alertSenderConfigs []AlertSenderConfig
var serviceType ServiceType
var serviceConfig string
var id int64
_, err = pgx.ForEachRow(rows, []any{&id, &serviceType, &serviceConfig}, func() error {
switch serviceType {
case "slack":
case SLACK:
var slackServiceConfig slackAlertConfig
err = json.Unmarshal([]byte(serviceConfig), &slackServiceConfig)
if err != nil {
return fmt.Errorf("failed to unmarshal Slack service config: %w", err)
return fmt.Errorf("failed to unmarshal %s service config: %w", serviceType, err)
}

alertSenderConfigs = append(alertSenderConfigs, AlertSenderConfig{Id: id, Sender: newSlackAlertSender(&slackServiceConfig)})
case EMAIL:
var replyToAddresses []string
if replyToEnvString := strings.TrimSpace(
peerdbenv.PeerDBAlertingEmailSenderReplyToAddresses()); replyToEnvString != "" {
replyToAddresses = strings.Split(replyToEnvString, ",")
}
emailServiceConfig := EmailAlertSenderConfig{
sourceEmail: peerdbenv.PeerDBAlertingEmailSenderSourceEmail(),
configurationSetName: peerdbenv.PeerDBAlertingEmailSenderConfigurationSet(),
replyToAddresses: replyToAddresses,
}
if emailServiceConfig.sourceEmail == "" {
return errors.New("missing sourceEmail for Email alerting service")
}
err = json.Unmarshal([]byte(serviceConfig), &emailServiceConfig)
if err != nil {
return fmt.Errorf("failed to unmarshal %s service config: %w", serviceType, err)
}
var region *string
if envRegion := peerdbenv.PeerDBAlertingEmailSenderRegion(); envRegion != "" {
region = &envRegion
}

slackAlertSenders = append(slackAlertSenders, newSlackAlertSender(&slackServiceConfig))
alertSender, alertSenderErr := NewEmailAlertSenderWithNewClient(ctx, region, &emailServiceConfig)
if alertSenderErr != nil {
return fmt.Errorf("failed to initialize email alerter: %w", alertSenderErr)
}
alertSenderConfigs = append(alertSenderConfigs, AlertSenderConfig{Id: id, Sender: alertSender})
default:
return fmt.Errorf("unknown service type: %s", serviceType)
}
return nil
})

return slackAlertSenders, nil
return alertSenderConfigs, nil
}

// doesn't take care of closing pool, needs to be done externally.
Expand All @@ -75,9 +112,9 @@ func NewAlerter(ctx context.Context, catalogPool *pgxpool.Pool) *Alerter {
}

func (a *Alerter) AlertIfSlotLag(ctx context.Context, peerName string, slotInfo *protos.SlotInfo) {
slackAlertSenders, err := a.registerSendersFromPool(ctx)
alertSenderConfigs, err := a.registerSendersFromPool(ctx)
if err != nil {
logger.LoggerFromCtx(ctx).Warn("failed to set Slack senders", slog.Any("error", err))
logger.LoggerFromCtx(ctx).Warn("failed to set alert senders", slog.Any("error", err))
return
}

Expand All @@ -89,29 +126,30 @@ func (a *Alerter) AlertIfSlotLag(ctx context.Context, peerName string, slotInfo
defaultSlotLagMBAlertThreshold := dynamicconf.PeerDBSlotLagMBAlertThreshold(ctx)
// catalog cannot use default threshold to space alerts properly, use the lowest set threshold instead
lowestSlotLagMBAlertThreshold := defaultSlotLagMBAlertThreshold
for _, slackAlertSender := range slackAlertSenders {
if slackAlertSender.slotLagMBAlertThreshold > 0 {
lowestSlotLagMBAlertThreshold = min(lowestSlotLagMBAlertThreshold, slackAlertSender.slotLagMBAlertThreshold)
for _, alertSender := range alertSenderConfigs {
if alertSender.Sender.getSlotLagMBAlertThreshold() > 0 {
lowestSlotLagMBAlertThreshold = min(lowestSlotLagMBAlertThreshold, alertSender.Sender.getSlotLagMBAlertThreshold())
}
}

alertKey := peerName + "-slot-lag-threshold-exceeded"
alertKey := fmt.Sprintf("%s Slot Lag Threshold Exceeded for Peer %s", deploymentUIDPrefix, peerName)
alertMessageTemplate := fmt.Sprintf("%sSlot `%s` on peer `%s` has exceeded threshold size of %%dMB, "+
`currently at %.2fMB!
cc: <!channel>`, deploymentUIDPrefix, slotInfo.SlotName, peerName, slotInfo.LagInMb)

if slotInfo.LagInMb > float32(lowestSlotLagMBAlertThreshold) &&
a.checkAndAddAlertToCatalog(ctx, alertKey, fmt.Sprintf(alertMessageTemplate, lowestSlotLagMBAlertThreshold)) {
for _, slackAlertSender := range slackAlertSenders {
if slackAlertSender.slotLagMBAlertThreshold > 0 {
if slotInfo.LagInMb > float32(slackAlertSender.slotLagMBAlertThreshold) {
a.alertToSlack(ctx, slackAlertSender, alertKey,
fmt.Sprintf(alertMessageTemplate, slackAlertSender.slotLagMBAlertThreshold))
}
} else {
if slotInfo.LagInMb > float32(defaultSlotLagMBAlertThreshold) {
a.alertToSlack(ctx, slackAlertSender, alertKey,
fmt.Sprintf(alertMessageTemplate, defaultSlotLagMBAlertThreshold))
`currently at %.2fMB!`, deploymentUIDPrefix, slotInfo.SlotName, peerName, slotInfo.LagInMb)

if slotInfo.LagInMb > float32(lowestSlotLagMBAlertThreshold) {
for _, alertSenderConfig := range alertSenderConfigs {
if a.checkAndAddAlertToCatalog(ctx,
alertSenderConfig.Id, alertKey, fmt.Sprintf(alertMessageTemplate, lowestSlotLagMBAlertThreshold)) {
if alertSenderConfig.Sender.getSlotLagMBAlertThreshold() > 0 {
if slotInfo.LagInMb > float32(alertSenderConfig.Sender.getSlotLagMBAlertThreshold()) {
a.alertToProvider(ctx, alertSenderConfig, alertKey,
fmt.Sprintf(alertMessageTemplate, alertSenderConfig.Sender.getSlotLagMBAlertThreshold()))
}
} else {
if slotInfo.LagInMb > float32(defaultSlotLagMBAlertThreshold) {
a.alertToProvider(ctx, alertSenderConfig, alertKey,
fmt.Sprintf(alertMessageTemplate, defaultSlotLagMBAlertThreshold))
}
}
}
}
Expand All @@ -121,52 +159,53 @@ func (a *Alerter) AlertIfSlotLag(ctx context.Context, peerName string, slotInfo
func (a *Alerter) AlertIfOpenConnections(ctx context.Context, peerName string,
openConnections *protos.GetOpenConnectionsForUserResult,
) {
slackAlertSenders, err := a.registerSendersFromPool(ctx)
alertSenderConfigs, err := a.registerSendersFromPool(ctx)
if err != nil {
logger.LoggerFromCtx(ctx).Warn("failed to set Slack senders", slog.Any("error", err))
return
}

deploymentUIDPrefix := ""
if peerdbenv.PeerDBDeploymentUID() != "" {
deploymentUIDPrefix = fmt.Sprintf("[%s] ", peerdbenv.PeerDBDeploymentUID())
deploymentUIDPrefix = fmt.Sprintf("[%s] - ", peerdbenv.PeerDBDeploymentUID())
}

// same as with slot lag, use lowest threshold for catalog
defaultOpenConnectionsThreshold := dynamicconf.PeerDBOpenConnectionsAlertThreshold(ctx)
lowestOpenConnectionsThreshold := defaultOpenConnectionsThreshold
for _, slackAlertSender := range slackAlertSenders {
if slackAlertSender.openConnectionsAlertThreshold > 0 {
lowestOpenConnectionsThreshold = min(lowestOpenConnectionsThreshold, slackAlertSender.openConnectionsAlertThreshold)
for _, alertSender := range alertSenderConfigs {
if alertSender.Sender.getOpenConnectionsAlertThreshold() > 0 {
lowestOpenConnectionsThreshold = min(lowestOpenConnectionsThreshold, alertSender.Sender.getOpenConnectionsAlertThreshold())
}
}

alertKey := peerName + "-max-open-connections-threshold-exceeded"
alertKey := fmt.Sprintf("%s Max Open Connections Threshold Exceeded for Peer %s", deploymentUIDPrefix, peerName)
alertMessageTemplate := fmt.Sprintf("%sOpen connections from PeerDB user `%s` on peer `%s`"+
` has exceeded threshold size of %%d connections, currently at %d connections!
cc: <!channel>`, deploymentUIDPrefix, openConnections.UserName, peerName, openConnections.CurrentOpenConnections)

if openConnections.CurrentOpenConnections > int64(lowestOpenConnectionsThreshold) &&
a.checkAndAddAlertToCatalog(ctx, alertKey, fmt.Sprintf(alertMessageTemplate, lowestOpenConnectionsThreshold)) {
for _, slackAlertSender := range slackAlertSenders {
if slackAlertSender.openConnectionsAlertThreshold > 0 {
if openConnections.CurrentOpenConnections > int64(slackAlertSender.openConnectionsAlertThreshold) {
a.alertToSlack(ctx, slackAlertSender, alertKey,
fmt.Sprintf(alertMessageTemplate, slackAlertSender.openConnectionsAlertThreshold))
}
} else {
if openConnections.CurrentOpenConnections > int64(defaultOpenConnectionsThreshold) {
a.alertToSlack(ctx, slackAlertSender, alertKey,
fmt.Sprintf(alertMessageTemplate, defaultOpenConnectionsThreshold))
` has exceeded threshold size of %%d connections, currently at %d connections!`,
deploymentUIDPrefix, openConnections.UserName, peerName, openConnections.CurrentOpenConnections)

if openConnections.CurrentOpenConnections > int64(lowestOpenConnectionsThreshold) {
for _, alertSenderConfig := range alertSenderConfigs {
if a.checkAndAddAlertToCatalog(ctx,
alertSenderConfig.Id, alertKey, fmt.Sprintf(alertMessageTemplate, lowestOpenConnectionsThreshold)) {
if alertSenderConfig.Sender.getOpenConnectionsAlertThreshold() > 0 {
if openConnections.CurrentOpenConnections > int64(alertSenderConfig.Sender.getOpenConnectionsAlertThreshold()) {
a.alertToProvider(ctx, alertSenderConfig, alertKey,
fmt.Sprintf(alertMessageTemplate, alertSenderConfig.Sender.getOpenConnectionsAlertThreshold()))
}
} else {
if openConnections.CurrentOpenConnections > int64(defaultOpenConnectionsThreshold) {
a.alertToProvider(ctx, alertSenderConfig, alertKey,
fmt.Sprintf(alertMessageTemplate, defaultOpenConnectionsThreshold))
}
}
}
}
}
}

func (a *Alerter) alertToSlack(ctx context.Context, slackAlertSender *slackAlertSender, alertKey string, alertMessage string) {
err := slackAlertSender.sendAlert(ctx,
":rotating_light:Alert:rotating_light:: "+alertKey, alertMessage)
func (a *Alerter) alertToProvider(ctx context.Context, alertSenderConfig AlertSenderConfig, alertKey string, alertMessage string) {
err := alertSenderConfig.Sender.sendAlert(ctx, alertKey, alertMessage)
if err != nil {
logger.LoggerFromCtx(ctx).Warn("failed to send alert", slog.Any("error", err))
return
Expand All @@ -176,17 +215,17 @@ func (a *Alerter) alertToSlack(ctx context.Context, slackAlertSender *slackAlert
// Only raises an alert if another alert with the same key hasn't been raised
// in the past X minutes, where X is configurable and defaults to 15 minutes
// returns true if alert added to catalog, so proceed with processing alerts to slack
func (a *Alerter) checkAndAddAlertToCatalog(ctx context.Context, alertKey string, alertMessage string) bool {
func (a *Alerter) checkAndAddAlertToCatalog(ctx context.Context, alertConfigId int64, alertKey string, alertMessage string) bool {
dur := dynamicconf.PeerDBAlertingGapMinutesAsDuration(ctx)
if dur == 0 {
logger.LoggerFromCtx(ctx).Warn("Alerting disabled via environment variable, returning")
return false
}

row := a.catalogPool.QueryRow(ctx,
`SELECT created_timestamp FROM peerdb_stats.alerts_v1 WHERE alert_key=$1
`SELECT created_timestamp FROM peerdb_stats.alerts_v1 WHERE alert_key=$1 AND alert_config_id=$2
ORDER BY created_timestamp DESC LIMIT 1`,
alertKey)
alertKey, alertConfigId)
var createdTimestamp time.Time
err := row.Scan(&createdTimestamp)
if err != nil && err != pgx.ErrNoRows {
Expand All @@ -196,14 +235,18 @@ func (a *Alerter) checkAndAddAlertToCatalog(ctx context.Context, alertKey string

if time.Since(createdTimestamp) >= dur {
_, err = a.catalogPool.Exec(ctx,
"INSERT INTO peerdb_stats.alerts_v1(alert_key,alert_message) VALUES($1,$2)",
alertKey, alertMessage)
"INSERT INTO peerdb_stats.alerts_v1(alert_key,alert_message,alert_config_id) VALUES($1,$2,$3)",
alertKey, alertMessage, alertConfigId)
if err != nil {
logger.LoggerFromCtx(ctx).Warn("failed to insert alert", slog.Any("error", err))
return false
}
return true
}

logger.LoggerFromCtx(ctx).Info(
fmt.Sprintf("Skipped sending alerts: last alert was sent at %s, which was >=%s ago",
createdTimestamp.String(), dur.String()))
return false
}

Expand Down
104 changes: 104 additions & 0 deletions flow/alerting/email_alert_sender.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
package alerting

import (
"context"
"fmt"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/ses"
"github.com/aws/aws-sdk-go-v2/service/ses/types"

"github.com/PeerDB-io/peer-flow/logger"
"github.com/PeerDB-io/peer-flow/peerdbenv"
"github.com/PeerDB-io/peer-flow/shared/aws_common"
)

type EmailAlertSender struct {
AlertSender
client *ses.Client
sourceEmail string
configurationSetName string
replyToAddresses []string
slotLagMBAlertThreshold uint32
openConnectionsAlertThreshold uint32
emailAddresses []string
}

func (e *EmailAlertSender) getSlotLagMBAlertThreshold() uint32 {
return e.slotLagMBAlertThreshold
}

func (e *EmailAlertSender) getOpenConnectionsAlertThreshold() uint32 {
return e.openConnectionsAlertThreshold
}

type EmailAlertSenderConfig struct {
sourceEmail string
configurationSetName string
replyToAddresses []string
SlotLagMBAlertThreshold uint32 `json:"slot_lag_mb_alert_threshold"`
OpenConnectionsAlertThreshold uint32 `json:"open_connections_alert_threshold"`
EmailAddresses []string `json:"email_addresses"`
}

func (e *EmailAlertSender) sendAlert(ctx context.Context, alertTitle string, alertMessage string) error {
_, err := e.client.SendEmail(ctx, &ses.SendEmailInput{
Destination: &types.Destination{
ToAddresses: e.emailAddresses,
},
Message: &types.Message{
Body: &types.Body{
Text: &types.Content{
Data: aws.String(alertMessage),
Charset: aws.String("utf-8"),
},
},
Subject: &types.Content{
Data: aws.String(alertTitle),
Charset: aws.String("utf-8"),
},
},
Source: aws.String(e.sourceEmail),
ConfigurationSetName: aws.String(e.configurationSetName),
ReplyToAddresses: e.replyToAddresses,
Tags: []types.MessageTag{
{Name: aws.String("DeploymentUUID"), Value: aws.String(peerdbenv.PeerDBDeploymentUID())},
},
})
if err != nil {
logger.LoggerFromCtx(ctx).Warn(fmt.Sprintf(
"Error sending email alert from %v to %s subject=[%s], body=[%s], configurationSet=%s, replyToAddresses=[%v]",
e.sourceEmail, e.emailAddresses, alertTitle, alertMessage, e.configurationSetName, e.replyToAddresses))
return err
}
return nil
}

func NewEmailAlertSenderWithNewClient(ctx context.Context, region *string, config *EmailAlertSenderConfig) (*EmailAlertSender, error) {
client, err := newSesClient(ctx, region)
if err != nil {
return nil, err
}
return NewEmailAlertSender(client, config), nil
}

func NewEmailAlertSender(client *ses.Client, config *EmailAlertSenderConfig) *EmailAlertSender {
return &EmailAlertSender{
client: client,
sourceEmail: config.sourceEmail,
configurationSetName: config.configurationSetName,
replyToAddresses: config.replyToAddresses,
slotLagMBAlertThreshold: config.SlotLagMBAlertThreshold,
openConnectionsAlertThreshold: config.OpenConnectionsAlertThreshold,
emailAddresses: config.EmailAddresses,
}
}

func newSesClient(ctx context.Context, region *string) (*ses.Client, error) {
sdkConfig, err := aws_common.LoadSdkConfig(ctx, region)
if err != nil {
return nil, err
}
snsClient := ses.NewFromConfig(*sdkConfig)
return snsClient, nil
}
9 changes: 9 additions & 0 deletions flow/alerting/interface.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package alerting

import "context"

type AlertSender interface {
sendAlert(ctx context.Context, alertTitle string, alertMessage string) error
getSlotLagMBAlertThreshold() uint32
getOpenConnectionsAlertThreshold() uint32
}
Loading

0 comments on commit d0ef472

Please sign in to comment.