Skip to content

Commit

Permalink
feat(alerting): add email sender
Browse files Browse the repository at this point in the history
  • Loading branch information
iamKunalGupta committed Mar 4, 2024
1 parent 81c9320 commit 61bdc52
Show file tree
Hide file tree
Showing 8 changed files with 198 additions and 41 deletions.
87 changes: 52 additions & 35 deletions flow/alerting/alerting.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package alerting
import (
"context"
"encoding/json"
"errors"
"fmt"
"log/slog"
"time"
Expand All @@ -23,32 +24,50 @@ type Alerter struct {
telemetrySender telemetry.Sender
}

func (a *Alerter) registerSendersFromPool(ctx context.Context) ([]*slackAlertSender, error) {
func (a *Alerter) registerSendersFromPool(ctx context.Context) ([]AlertSender, error) {
rows, err := a.catalogPool.Query(ctx,
"SELECT service_type,service_config FROM peerdb_stats.alerting_config")
if err != nil {
return nil, fmt.Errorf("failed to read alerter config from catalog: %w", err)
}

var slackAlertSenders []*slackAlertSender
var serviceType, serviceConfig string
var alertSenders []AlertSender
var serviceType ServiceType
var serviceConfig string
_, err = pgx.ForEachRow(rows, []any{&serviceType, &serviceConfig}, func() error {
switch serviceType {
case "slack":
case SLACK:
var slackServiceConfig slackAlertConfig
err = json.Unmarshal([]byte(serviceConfig), &slackServiceConfig)
if err != nil {
return fmt.Errorf("failed to unmarshal Slack service config: %w", err)
return fmt.Errorf("failed to unmarshal %s service config: %w", serviceType, err)
}

slackAlertSenders = append(slackAlertSenders, newSlackAlertSender(&slackServiceConfig))
alertSenders = append(alertSenders, newSlackAlertSender(&slackServiceConfig))
case EMAIL:
emailServiceConfig := EmailAlertSenderConfig{
sourceEmail: peerdbenv.PeerDBAlertingEmailSenderSourceEmail(),
configurationSetName: peerdbenv.PeerDBAlertingEmailSenderConfigurationSet(),
}
if emailServiceConfig.sourceEmail == "" {
return errors.New("missing sourceEmail for Email alerting service")
}
err = json.Unmarshal([]byte(serviceConfig), &emailServiceConfig)
if err != nil {
return fmt.Errorf("failed to unmarshal %s service config: %w", serviceType, err)
}
alertSender, err2 := NewEmailAlertSenderWithNewClient(ctx, &emailServiceConfig)
if err2 != nil {
return fmt.Errorf("failed to initialize email alerter: %w", err2)
}
alertSenders = append(alertSenders, alertSender)
default:
return fmt.Errorf("unknown service type: %s", serviceType)
}
return nil
})

return slackAlertSenders, nil
return alertSenders, nil
}

// doesn't take care of closing pool, needs to be done externally.
Expand All @@ -75,9 +94,9 @@ func NewAlerter(ctx context.Context, catalogPool *pgxpool.Pool) *Alerter {
}

func (a *Alerter) AlertIfSlotLag(ctx context.Context, peerName string, slotInfo *protos.SlotInfo) {
slackAlertSenders, err := a.registerSendersFromPool(ctx)
alertSenders, err := a.registerSendersFromPool(ctx)
if err != nil {
logger.LoggerFromCtx(ctx).Warn("failed to set Slack senders", slog.Any("error", err))
logger.LoggerFromCtx(ctx).Warn("failed to set alert senders", slog.Any("error", err))
return
}

Expand All @@ -89,28 +108,27 @@ func (a *Alerter) AlertIfSlotLag(ctx context.Context, peerName string, slotInfo
defaultSlotLagMBAlertThreshold := dynamicconf.PeerDBSlotLagMBAlertThreshold(ctx)
// catalog cannot use default threshold to space alerts properly, use the lowest set threshold instead
lowestSlotLagMBAlertThreshold := defaultSlotLagMBAlertThreshold
for _, slackAlertSender := range slackAlertSenders {
if slackAlertSender.slotLagMBAlertThreshold > 0 {
lowestSlotLagMBAlertThreshold = min(lowestSlotLagMBAlertThreshold, slackAlertSender.slotLagMBAlertThreshold)
for _, alertSender := range alertSenders {
if alertSender.getSlotLagMBAlertThreshold() > 0 {
lowestSlotLagMBAlertThreshold = min(lowestSlotLagMBAlertThreshold, alertSender.getSlotLagMBAlertThreshold())
}
}

alertKey := peerName + "-slot-lag-threshold-exceeded"
alertMessageTemplate := fmt.Sprintf("%sSlot `%s` on peer `%s` has exceeded threshold size of %%dMB, "+
`currently at %.2fMB!
cc: <!channel>`, deploymentUIDPrefix, slotInfo.SlotName, peerName, slotInfo.LagInMb)
`currently at %.2fMB!`, deploymentUIDPrefix, slotInfo.SlotName, peerName, slotInfo.LagInMb)

if slotInfo.LagInMb > float32(lowestSlotLagMBAlertThreshold) &&
a.checkAndAddAlertToCatalog(ctx, alertKey, fmt.Sprintf(alertMessageTemplate, lowestSlotLagMBAlertThreshold)) {
for _, slackAlertSender := range slackAlertSenders {
if slackAlertSender.slotLagMBAlertThreshold > 0 {
if slotInfo.LagInMb > float32(slackAlertSender.slotLagMBAlertThreshold) {
a.alertToSlack(ctx, slackAlertSender, alertKey,
fmt.Sprintf(alertMessageTemplate, slackAlertSender.slotLagMBAlertThreshold))
for _, alertSender := range alertSenders {
if alertSender.getSlotLagMBAlertThreshold() > 0 {
if slotInfo.LagInMb > float32(alertSender.getSlotLagMBAlertThreshold()) {
a.alertToProvider(ctx, alertSender, alertKey,
fmt.Sprintf(alertMessageTemplate, alertSender.getSlotLagMBAlertThreshold()))
}
} else {
if slotInfo.LagInMb > float32(defaultSlotLagMBAlertThreshold) {
a.alertToSlack(ctx, slackAlertSender, alertKey,
a.alertToProvider(ctx, alertSender, alertKey,
fmt.Sprintf(alertMessageTemplate, defaultSlotLagMBAlertThreshold))
}
}
Expand All @@ -121,7 +139,7 @@ func (a *Alerter) AlertIfSlotLag(ctx context.Context, peerName string, slotInfo
func (a *Alerter) AlertIfOpenConnections(ctx context.Context, peerName string,
openConnections *protos.GetOpenConnectionsForUserResult,
) {
slackAlertSenders, err := a.registerSendersFromPool(ctx)
alertSenders, err := a.registerSendersFromPool(ctx)
if err != nil {
logger.LoggerFromCtx(ctx).Warn("failed to set Slack senders", slog.Any("error", err))
return
Expand All @@ -135,38 +153,37 @@ func (a *Alerter) AlertIfOpenConnections(ctx context.Context, peerName string,
// same as with slot lag, use lowest threshold for catalog
defaultOpenConnectionsThreshold := dynamicconf.PeerDBOpenConnectionsAlertThreshold(ctx)
lowestOpenConnectionsThreshold := defaultOpenConnectionsThreshold
for _, slackAlertSender := range slackAlertSenders {
if slackAlertSender.openConnectionsAlertThreshold > 0 {
lowestOpenConnectionsThreshold = min(lowestOpenConnectionsThreshold, slackAlertSender.openConnectionsAlertThreshold)
for _, alertSender := range alertSenders {
if alertSender.getOpenConnectionsAlertThreshold() > 0 {
lowestOpenConnectionsThreshold = min(lowestOpenConnectionsThreshold, alertSender.getOpenConnectionsAlertThreshold())
}
}

alertKey := peerName + "-max-open-connections-threshold-exceeded"
alertMessageTemplate := fmt.Sprintf("%sOpen connections from PeerDB user `%s` on peer `%s`"+
` has exceeded threshold size of %%d connections, currently at %d connections!
cc: <!channel>`, deploymentUIDPrefix, openConnections.UserName, peerName, openConnections.CurrentOpenConnections)
` has exceeded threshold size of %%d connections, currently at %d connections!`,
deploymentUIDPrefix, openConnections.UserName, peerName, openConnections.CurrentOpenConnections)

if openConnections.CurrentOpenConnections > int64(lowestOpenConnectionsThreshold) &&
a.checkAndAddAlertToCatalog(ctx, alertKey, fmt.Sprintf(alertMessageTemplate, lowestOpenConnectionsThreshold)) {
for _, slackAlertSender := range slackAlertSenders {
if slackAlertSender.openConnectionsAlertThreshold > 0 {
if openConnections.CurrentOpenConnections > int64(slackAlertSender.openConnectionsAlertThreshold) {
a.alertToSlack(ctx, slackAlertSender, alertKey,
fmt.Sprintf(alertMessageTemplate, slackAlertSender.openConnectionsAlertThreshold))
for _, alertSender := range alertSenders {
if alertSender.getOpenConnectionsAlertThreshold() > 0 {
if openConnections.CurrentOpenConnections > int64(alertSender.getOpenConnectionsAlertThreshold()) {
a.alertToProvider(ctx, alertSender, alertKey,
fmt.Sprintf(alertMessageTemplate, alertSender.getOpenConnectionsAlertThreshold()))
}
} else {
if openConnections.CurrentOpenConnections > int64(defaultOpenConnectionsThreshold) {
a.alertToSlack(ctx, slackAlertSender, alertKey,
a.alertToProvider(ctx, alertSender, alertKey,
fmt.Sprintf(alertMessageTemplate, defaultOpenConnectionsThreshold))
}
}
}
}
}

func (a *Alerter) alertToSlack(ctx context.Context, slackAlertSender *slackAlertSender, alertKey string, alertMessage string) {
err := slackAlertSender.sendAlert(ctx,
":rotating_light:Alert:rotating_light:: "+alertKey, alertMessage)
func (a *Alerter) alertToProvider(ctx context.Context, alertSender AlertSender, alertKey string, alertMessage string) {
err := alertSender.sendAlert(ctx, alertKey, alertMessage)
if err != nil {
logger.LoggerFromCtx(ctx).Warn("failed to send alert", slog.Any("error", err))
return
Expand Down
100 changes: 100 additions & 0 deletions flow/alerting/email_alert_sender.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package alerting

import (
"context"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/ses"
"github.com/aws/aws-sdk-go-v2/service/ses/types"

"github.com/PeerDB-io/peer-flow/peerdbenv"
)

type EmailAlertSender interface {
AlertSender
}

type emailAlertSenderImpl struct {
client *ses.Client
sourceEmail string
configurationSetName string
slotLagMBAlertThreshold uint32
openConnectionsAlertThreshold uint32
}

func (e *emailAlertSenderImpl) getSlotLagMBAlertThreshold() uint32 {
return e.slotLagMBAlertThreshold
}

func (e *emailAlertSenderImpl) getOpenConnectionsAlertThreshold() uint32 {
return e.openConnectionsAlertThreshold
}

type EmailAlertSenderConfig struct {
sourceEmail string
configurationSetName string
SlotLagMBAlertThreshold uint32 `json:"slot_lag_mb_alert_threshold"`
OpenConnectionsAlertThreshold uint32 `json:"open_connections_alert_threshold"`
}

func (e *emailAlertSenderImpl) sendAlert(ctx context.Context, alertTitle string, alertMessage string) error {
_, err := e.client.SendEmail(ctx, &ses.SendEmailInput{
Destination: &types.Destination{
ToAddresses: []string{},
},
Message: &types.Message{
Body: &types.Body{
Text: &types.Content{
Data: aws.String(alertMessage),
Charset: aws.String("utf-8"),
},
},
Subject: &types.Content{
Data: aws.String(alertTitle),
Charset: aws.String("utf-8"),
},
},
Source: aws.String(e.sourceEmail),
ConfigurationSetName: aws.String(e.configurationSetName),
Tags: []types.MessageTag{
{Name: aws.String("DeploymentUUID"), Value: aws.String(peerdbenv.PeerDBDeploymentUID())},
},
})
if err != nil {
return err
}
return nil
}

func NewEmailAlertSenderWithNewClient(ctx context.Context, config *EmailAlertSenderConfig) (EmailAlertSender, error) {
client, err := newSesClient(ctx, nil)
if err != nil {
return nil, err
}
return NewEmailAlertSender(client, config), nil
}

func NewEmailAlertSender(client *ses.Client, config *EmailAlertSenderConfig) EmailAlertSender {
return &emailAlertSenderImpl{
client: client,
sourceEmail: config.sourceEmail,
configurationSetName: config.configurationSetName,
slotLagMBAlertThreshold: config.SlotLagMBAlertThreshold,
openConnectionsAlertThreshold: config.OpenConnectionsAlertThreshold,
}
}

func newSesClient(ctx context.Context, region *string) (*ses.Client, error) {
sdkConfig, err := config.LoadDefaultConfig(ctx, func(options *config.LoadOptions) error {
if region != nil {
options.Region = *region
}
return nil
})
if err != nil {
return nil, err
}
snsClient := ses.NewFromConfig(sdkConfig)
return snsClient, nil
}
9 changes: 9 additions & 0 deletions flow/alerting/interface.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package alerting

import "context"

type AlertSender interface {
sendAlert(ctx context.Context, alertTitle string, alertMessage string) error
getSlotLagMBAlertThreshold() uint32
getOpenConnectionsAlertThreshold() uint32
}
24 changes: 18 additions & 6 deletions flow/alerting/slack_alert_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,34 +7,46 @@ import (
"github.com/slack-go/slack"
)

type slackAlertSender struct {
type SlackAlertSender interface {
AlertSender
}

type slackAlertSenderImpl struct {
client *slack.Client
channelIDs []string
slotLagMBAlertThreshold uint32
openConnectionsAlertThreshold uint32
}

func (s *slackAlertSenderImpl) getSlotLagMBAlertThreshold() uint32 {
return s.slotLagMBAlertThreshold
}

func (s *slackAlertSenderImpl) getOpenConnectionsAlertThreshold() uint32 {
return s.openConnectionsAlertThreshold
}

type slackAlertConfig struct {
AuthToken string `json:"auth_token"`
ChannelIDs []string `json:"channel_ids"`
SlotLagMBAlertThreshold uint32 `json:"slot_lag_mb_alert_threshold"`
OpenConnectionsAlertThreshold uint32 `json:"open_connections_alert_threshold"`
}

func newSlackAlertSender(config *slackAlertConfig) *slackAlertSender {
return &slackAlertSender{
func newSlackAlertSender(config *slackAlertConfig) SlackAlertSender {
return &slackAlertSenderImpl{
client: slack.New(config.AuthToken),
channelIDs: config.ChannelIDs,
slotLagMBAlertThreshold: config.SlotLagMBAlertThreshold,
openConnectionsAlertThreshold: config.OpenConnectionsAlertThreshold,
}
}

func (s *slackAlertSender) sendAlert(ctx context.Context, alertTitle string, alertMessage string) error {
func (s *slackAlertSenderImpl) sendAlert(ctx context.Context, alertTitle string, alertMessage string) error {
for _, channelID := range s.channelIDs {
_, _, _, err := s.client.SendMessageContext(ctx, channelID, slack.MsgOptionBlocks(
slack.NewHeaderBlock(slack.NewTextBlockObject("plain_text", alertTitle, true, false)),
slack.NewSectionBlock(slack.NewTextBlockObject("mrkdwn", alertMessage, false, false), nil, nil),
slack.NewHeaderBlock(slack.NewTextBlockObject("plain_text", ":rotating_light:Alert:rotating_light:: "+alertTitle, true, false)),
slack.NewSectionBlock(slack.NewTextBlockObject("mrkdwn", alertMessage+"\ncc: <!channel>", false, false), nil, nil),
))
if err != nil {
return fmt.Errorf("failed to send message to Slack channel %s: %w", channelID, err)
Expand Down
8 changes: 8 additions & 0 deletions flow/alerting/types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package alerting

type ServiceType string

const (
SLACK ServiceType = "slack"
EMAIL ServiceType = "email"
)
1 change: 1 addition & 0 deletions flow/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ require (
github.com/aws/aws-sdk-go-v2/credentials v1.17.0
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.16.1
github.com/aws/aws-sdk-go-v2/service/s3 v1.50.0
github.com/aws/aws-sdk-go-v2/service/ses v1.22.1
github.com/aws/aws-sdk-go-v2/service/sns v1.29.1
github.com/cockroachdb/pebble v1.1.0
github.com/google/uuid v1.6.0
Expand Down
2 changes: 2 additions & 0 deletions flow/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.0 h1:l5puwOHr7IxECu
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.0/go.mod h1:Oov79flWa/n7Ni+lQC3z+VM7PoRM47omRqbJU9B5Y7E=
github.com/aws/aws-sdk-go-v2/service/s3 v1.50.0 h1:jZAdMD1ioZdqirzzVVRhpHHWJmcGGCn8JqDYBs5nmYA=
github.com/aws/aws-sdk-go-v2/service/s3 v1.50.0/go.mod h1:1o/W6JFUuREj2ExoQ21vHJgO7wakvjhol91M9eknFgs=
github.com/aws/aws-sdk-go-v2/service/ses v1.22.1 h1:FOkVxvctmbFpp9QYyu6tcDsRa8ZXM1EuozwoKG0OnOM=
github.com/aws/aws-sdk-go-v2/service/ses v1.22.1/go.mod h1:jAAwtV9eq69pttQ8d24aQh+JD4RgotYZaz/XvvEJ5bI=
github.com/aws/aws-sdk-go-v2/service/sns v1.29.1 h1:K2FiR/547lI9vGuDL0Ghin4QPSEvOKxbHY9aXFq8wfU=
github.com/aws/aws-sdk-go-v2/service/sns v1.29.1/go.mod h1:PBmfgVv83oBgZVFhs/+oWsL6r0hLyB6qHRFEWwHyHn4=
github.com/aws/aws-sdk-go-v2/service/sso v1.19.0 h1:u6OkVDxtBPnxPkZ9/63ynEe+8kHbtS5IfaC4PzVxzWM=
Expand Down
8 changes: 8 additions & 0 deletions flow/peerdbenv/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,3 +95,11 @@ func PeerDBEnableParallelSyncNormalize() bool {
func PeerDBTelemetryAWSSNSTopicArn() string {
return getEnvString("PEERDB_TELEMETRY_AWS_SNS_TOPIC_ARN", "")
}

func PeerDBAlertingEmailSenderSourceEmail() string {
return getEnvString("PEERDB_ALERTING_EMAIL_SENDER_SOURCE_EMAIL", "")
}

func PeerDBAlertingEmailSenderConfigurationSet() string {
return getEnvString("PEERDB_ALERTING_EMAIL_SENDER_CONFIGURATION_SET", "")
}

0 comments on commit 61bdc52

Please sign in to comment.