diff --git a/flow/alerting/alerting.go b/flow/alerting/alerting.go index 6fc903aa79..57df628b62 100644 --- a/flow/alerting/alerting.go +++ b/flow/alerting/alerting.go @@ -3,6 +3,7 @@ package alerting import ( "context" "encoding/json" + "errors" "fmt" "log/slog" "time" @@ -23,32 +24,50 @@ type Alerter struct { telemetrySender telemetry.Sender } -func (a *Alerter) registerSendersFromPool(ctx context.Context) ([]*slackAlertSender, error) { +func (a *Alerter) registerSendersFromPool(ctx context.Context) ([]AlertSender, error) { rows, err := a.catalogPool.Query(ctx, "SELECT service_type,service_config FROM peerdb_stats.alerting_config") if err != nil { return nil, fmt.Errorf("failed to read alerter config from catalog: %w", err) } - var slackAlertSenders []*slackAlertSender - var serviceType, serviceConfig string + var alertSenders []AlertSender + var serviceType ServiceType + var serviceConfig string _, err = pgx.ForEachRow(rows, []any{&serviceType, &serviceConfig}, func() error { switch serviceType { - case "slack": + case SLACK: var slackServiceConfig slackAlertConfig err = json.Unmarshal([]byte(serviceConfig), &slackServiceConfig) if err != nil { - return fmt.Errorf("failed to unmarshal Slack service config: %w", err) + return fmt.Errorf("failed to unmarshal %s service config: %w", serviceType, err) } - slackAlertSenders = append(slackAlertSenders, newSlackAlertSender(&slackServiceConfig)) + alertSenders = append(alertSenders, newSlackAlertSender(&slackServiceConfig)) + case EMAIL: + emailServiceConfig := EmailAlertSenderConfig{ + sourceEmail: peerdbenv.PeerDBAlertingEmailSenderSourceEmail(), + configurationSetName: peerdbenv.PeerDBAlertingEmailSenderConfigurationSet(), + } + if emailServiceConfig.sourceEmail == "" { + return errors.New("missing sourceEmail for Email alerting service") + } + err = json.Unmarshal([]byte(serviceConfig), &emailServiceConfig) + if err != nil { + return fmt.Errorf("failed to unmarshal %s service config: %w", serviceType, err) + } + alertSender, err2 := NewEmailAlertSenderWithNewClient(ctx, &emailServiceConfig) + if err2 != nil { + return fmt.Errorf("failed to initialize email alerter: %w", err2) + } + alertSenders = append(alertSenders, alertSender) default: return fmt.Errorf("unknown service type: %s", serviceType) } return nil }) - return slackAlertSenders, nil + return alertSenders, nil } // doesn't take care of closing pool, needs to be done externally. @@ -75,9 +94,9 @@ func NewAlerter(ctx context.Context, catalogPool *pgxpool.Pool) *Alerter { } func (a *Alerter) AlertIfSlotLag(ctx context.Context, peerName string, slotInfo *protos.SlotInfo) { - slackAlertSenders, err := a.registerSendersFromPool(ctx) + alertSenders, err := a.registerSendersFromPool(ctx) if err != nil { - logger.LoggerFromCtx(ctx).Warn("failed to set Slack senders", slog.Any("error", err)) + logger.LoggerFromCtx(ctx).Warn("failed to set alert senders", slog.Any("error", err)) return } @@ -89,28 +108,27 @@ func (a *Alerter) AlertIfSlotLag(ctx context.Context, peerName string, slotInfo defaultSlotLagMBAlertThreshold := dynamicconf.PeerDBSlotLagMBAlertThreshold(ctx) // catalog cannot use default threshold to space alerts properly, use the lowest set threshold instead lowestSlotLagMBAlertThreshold := defaultSlotLagMBAlertThreshold - for _, slackAlertSender := range slackAlertSenders { - if slackAlertSender.slotLagMBAlertThreshold > 0 { - lowestSlotLagMBAlertThreshold = min(lowestSlotLagMBAlertThreshold, slackAlertSender.slotLagMBAlertThreshold) + for _, alertSender := range alertSenders { + if alertSender.getSlotLagMBAlertThreshold() > 0 { + lowestSlotLagMBAlertThreshold = min(lowestSlotLagMBAlertThreshold, alertSender.getSlotLagMBAlertThreshold()) } } alertKey := peerName + "-slot-lag-threshold-exceeded" alertMessageTemplate := fmt.Sprintf("%sSlot `%s` on peer `%s` has exceeded threshold size of %%dMB, "+ - `currently at %.2fMB! - cc: `, deploymentUIDPrefix, slotInfo.SlotName, peerName, slotInfo.LagInMb) + `currently at %.2fMB!`, deploymentUIDPrefix, slotInfo.SlotName, peerName, slotInfo.LagInMb) if slotInfo.LagInMb > float32(lowestSlotLagMBAlertThreshold) && a.checkAndAddAlertToCatalog(ctx, alertKey, fmt.Sprintf(alertMessageTemplate, lowestSlotLagMBAlertThreshold)) { - for _, slackAlertSender := range slackAlertSenders { - if slackAlertSender.slotLagMBAlertThreshold > 0 { - if slotInfo.LagInMb > float32(slackAlertSender.slotLagMBAlertThreshold) { - a.alertToSlack(ctx, slackAlertSender, alertKey, - fmt.Sprintf(alertMessageTemplate, slackAlertSender.slotLagMBAlertThreshold)) + for _, alertSender := range alertSenders { + if alertSender.getSlotLagMBAlertThreshold() > 0 { + if slotInfo.LagInMb > float32(alertSender.getSlotLagMBAlertThreshold()) { + a.alertToProvider(ctx, alertSender, alertKey, + fmt.Sprintf(alertMessageTemplate, alertSender.getSlotLagMBAlertThreshold())) } } else { if slotInfo.LagInMb > float32(defaultSlotLagMBAlertThreshold) { - a.alertToSlack(ctx, slackAlertSender, alertKey, + a.alertToProvider(ctx, alertSender, alertKey, fmt.Sprintf(alertMessageTemplate, defaultSlotLagMBAlertThreshold)) } } @@ -121,7 +139,7 @@ func (a *Alerter) AlertIfSlotLag(ctx context.Context, peerName string, slotInfo func (a *Alerter) AlertIfOpenConnections(ctx context.Context, peerName string, openConnections *protos.GetOpenConnectionsForUserResult, ) { - slackAlertSenders, err := a.registerSendersFromPool(ctx) + alertSenders, err := a.registerSendersFromPool(ctx) if err != nil { logger.LoggerFromCtx(ctx).Warn("failed to set Slack senders", slog.Any("error", err)) return @@ -135,28 +153,28 @@ func (a *Alerter) AlertIfOpenConnections(ctx context.Context, peerName string, // same as with slot lag, use lowest threshold for catalog defaultOpenConnectionsThreshold := dynamicconf.PeerDBOpenConnectionsAlertThreshold(ctx) lowestOpenConnectionsThreshold := defaultOpenConnectionsThreshold - for _, slackAlertSender := range slackAlertSenders { - if slackAlertSender.openConnectionsAlertThreshold > 0 { - lowestOpenConnectionsThreshold = min(lowestOpenConnectionsThreshold, slackAlertSender.openConnectionsAlertThreshold) + for _, alertSender := range alertSenders { + if alertSender.getOpenConnectionsAlertThreshold() > 0 { + lowestOpenConnectionsThreshold = min(lowestOpenConnectionsThreshold, alertSender.getOpenConnectionsAlertThreshold()) } } alertKey := peerName + "-max-open-connections-threshold-exceeded" alertMessageTemplate := fmt.Sprintf("%sOpen connections from PeerDB user `%s` on peer `%s`"+ - ` has exceeded threshold size of %%d connections, currently at %d connections! - cc: `, deploymentUIDPrefix, openConnections.UserName, peerName, openConnections.CurrentOpenConnections) + ` has exceeded threshold size of %%d connections, currently at %d connections!`, + deploymentUIDPrefix, openConnections.UserName, peerName, openConnections.CurrentOpenConnections) if openConnections.CurrentOpenConnections > int64(lowestOpenConnectionsThreshold) && a.checkAndAddAlertToCatalog(ctx, alertKey, fmt.Sprintf(alertMessageTemplate, lowestOpenConnectionsThreshold)) { - for _, slackAlertSender := range slackAlertSenders { - if slackAlertSender.openConnectionsAlertThreshold > 0 { - if openConnections.CurrentOpenConnections > int64(slackAlertSender.openConnectionsAlertThreshold) { - a.alertToSlack(ctx, slackAlertSender, alertKey, - fmt.Sprintf(alertMessageTemplate, slackAlertSender.openConnectionsAlertThreshold)) + for _, alertSender := range alertSenders { + if alertSender.getOpenConnectionsAlertThreshold() > 0 { + if openConnections.CurrentOpenConnections > int64(alertSender.getOpenConnectionsAlertThreshold()) { + a.alertToProvider(ctx, alertSender, alertKey, + fmt.Sprintf(alertMessageTemplate, alertSender.getOpenConnectionsAlertThreshold())) } } else { if openConnections.CurrentOpenConnections > int64(defaultOpenConnectionsThreshold) { - a.alertToSlack(ctx, slackAlertSender, alertKey, + a.alertToProvider(ctx, alertSender, alertKey, fmt.Sprintf(alertMessageTemplate, defaultOpenConnectionsThreshold)) } } @@ -164,9 +182,8 @@ func (a *Alerter) AlertIfOpenConnections(ctx context.Context, peerName string, } } -func (a *Alerter) alertToSlack(ctx context.Context, slackAlertSender *slackAlertSender, alertKey string, alertMessage string) { - err := slackAlertSender.sendAlert(ctx, - ":rotating_light:Alert:rotating_light:: "+alertKey, alertMessage) +func (a *Alerter) alertToProvider(ctx context.Context, alertSender AlertSender, alertKey string, alertMessage string) { + err := alertSender.sendAlert(ctx, alertKey, alertMessage) if err != nil { logger.LoggerFromCtx(ctx).Warn("failed to send alert", slog.Any("error", err)) return diff --git a/flow/alerting/email_alert_sender.go b/flow/alerting/email_alert_sender.go new file mode 100644 index 0000000000..4faf812e15 --- /dev/null +++ b/flow/alerting/email_alert_sender.go @@ -0,0 +1,100 @@ +package alerting + +import ( + "context" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/service/ses" + "github.com/aws/aws-sdk-go-v2/service/ses/types" + + "github.com/PeerDB-io/peer-flow/peerdbenv" +) + +type EmailAlertSender interface { + AlertSender +} + +type emailAlertSenderImpl struct { + client *ses.Client + sourceEmail string + configurationSetName string + slotLagMBAlertThreshold uint32 + openConnectionsAlertThreshold uint32 +} + +func (e *emailAlertSenderImpl) getSlotLagMBAlertThreshold() uint32 { + return e.slotLagMBAlertThreshold +} + +func (e *emailAlertSenderImpl) getOpenConnectionsAlertThreshold() uint32 { + return e.openConnectionsAlertThreshold +} + +type EmailAlertSenderConfig struct { + sourceEmail string + configurationSetName string + SlotLagMBAlertThreshold uint32 `json:"slot_lag_mb_alert_threshold"` + OpenConnectionsAlertThreshold uint32 `json:"open_connections_alert_threshold"` +} + +func (e *emailAlertSenderImpl) sendAlert(ctx context.Context, alertTitle string, alertMessage string) error { + _, err := e.client.SendEmail(ctx, &ses.SendEmailInput{ + Destination: &types.Destination{ + ToAddresses: []string{}, + }, + Message: &types.Message{ + Body: &types.Body{ + Text: &types.Content{ + Data: aws.String(alertMessage), + Charset: aws.String("utf-8"), + }, + }, + Subject: &types.Content{ + Data: aws.String(alertTitle), + Charset: aws.String("utf-8"), + }, + }, + Source: aws.String(e.sourceEmail), + ConfigurationSetName: aws.String(e.configurationSetName), + Tags: []types.MessageTag{ + {Name: aws.String("DeploymentUUID"), Value: aws.String(peerdbenv.PeerDBDeploymentUID())}, + }, + }) + if err != nil { + return err + } + return nil +} + +func NewEmailAlertSenderWithNewClient(ctx context.Context, config *EmailAlertSenderConfig) (EmailAlertSender, error) { + client, err := newSesClient(ctx, nil) + if err != nil { + return nil, err + } + return NewEmailAlertSender(client, config), nil +} + +func NewEmailAlertSender(client *ses.Client, config *EmailAlertSenderConfig) EmailAlertSender { + return &emailAlertSenderImpl{ + client: client, + sourceEmail: config.sourceEmail, + configurationSetName: config.configurationSetName, + slotLagMBAlertThreshold: config.SlotLagMBAlertThreshold, + openConnectionsAlertThreshold: config.OpenConnectionsAlertThreshold, + } +} + +func newSesClient(ctx context.Context, region *string) (*ses.Client, error) { + sdkConfig, err := config.LoadDefaultConfig(ctx, func(options *config.LoadOptions) error { + if region != nil { + options.Region = *region + } + return nil + }) + if err != nil { + return nil, err + } + snsClient := ses.NewFromConfig(sdkConfig) + return snsClient, nil +} diff --git a/flow/alerting/interface.go b/flow/alerting/interface.go new file mode 100644 index 0000000000..a9dd2d51b9 --- /dev/null +++ b/flow/alerting/interface.go @@ -0,0 +1,9 @@ +package alerting + +import "context" + +type AlertSender interface { + sendAlert(ctx context.Context, alertTitle string, alertMessage string) error + getSlotLagMBAlertThreshold() uint32 + getOpenConnectionsAlertThreshold() uint32 +} diff --git a/flow/alerting/slack_alert_sender.go b/flow/alerting/slack_alert_sender.go index 1ad007536b..d3800db6a0 100644 --- a/flow/alerting/slack_alert_sender.go +++ b/flow/alerting/slack_alert_sender.go @@ -7,13 +7,25 @@ import ( "github.com/slack-go/slack" ) -type slackAlertSender struct { +type SlackAlertSender interface { + AlertSender +} + +type slackAlertSenderImpl struct { client *slack.Client channelIDs []string slotLagMBAlertThreshold uint32 openConnectionsAlertThreshold uint32 } +func (s *slackAlertSenderImpl) getSlotLagMBAlertThreshold() uint32 { + return s.slotLagMBAlertThreshold +} + +func (s *slackAlertSenderImpl) getOpenConnectionsAlertThreshold() uint32 { + return s.openConnectionsAlertThreshold +} + type slackAlertConfig struct { AuthToken string `json:"auth_token"` ChannelIDs []string `json:"channel_ids"` @@ -21,8 +33,8 @@ type slackAlertConfig struct { OpenConnectionsAlertThreshold uint32 `json:"open_connections_alert_threshold"` } -func newSlackAlertSender(config *slackAlertConfig) *slackAlertSender { - return &slackAlertSender{ +func newSlackAlertSender(config *slackAlertConfig) SlackAlertSender { + return &slackAlertSenderImpl{ client: slack.New(config.AuthToken), channelIDs: config.ChannelIDs, slotLagMBAlertThreshold: config.SlotLagMBAlertThreshold, @@ -30,11 +42,11 @@ func newSlackAlertSender(config *slackAlertConfig) *slackAlertSender { } } -func (s *slackAlertSender) sendAlert(ctx context.Context, alertTitle string, alertMessage string) error { +func (s *slackAlertSenderImpl) sendAlert(ctx context.Context, alertTitle string, alertMessage string) error { for _, channelID := range s.channelIDs { _, _, _, err := s.client.SendMessageContext(ctx, channelID, slack.MsgOptionBlocks( - slack.NewHeaderBlock(slack.NewTextBlockObject("plain_text", alertTitle, true, false)), - slack.NewSectionBlock(slack.NewTextBlockObject("mrkdwn", alertMessage, false, false), nil, nil), + slack.NewHeaderBlock(slack.NewTextBlockObject("plain_text", ":rotating_light:Alert:rotating_light:: "+alertTitle, true, false)), + slack.NewSectionBlock(slack.NewTextBlockObject("mrkdwn", alertMessage+"\ncc: ", false, false), nil, nil), )) if err != nil { return fmt.Errorf("failed to send message to Slack channel %s: %w", channelID, err) diff --git a/flow/alerting/types.go b/flow/alerting/types.go new file mode 100644 index 0000000000..6277010ee0 --- /dev/null +++ b/flow/alerting/types.go @@ -0,0 +1,8 @@ +package alerting + +type ServiceType string + +const ( + SLACK ServiceType = "slack" + EMAIL ServiceType = "email" +) diff --git a/flow/go.mod b/flow/go.mod index da03d355b3..d5efe0ea42 100644 --- a/flow/go.mod +++ b/flow/go.mod @@ -15,6 +15,7 @@ require ( github.com/aws/aws-sdk-go-v2/credentials v1.17.0 github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.16.1 github.com/aws/aws-sdk-go-v2/service/s3 v1.50.0 + github.com/aws/aws-sdk-go-v2/service/ses v1.22.1 github.com/aws/aws-sdk-go-v2/service/sns v1.29.1 github.com/cockroachdb/pebble v1.1.0 github.com/google/uuid v1.6.0 diff --git a/flow/go.sum b/flow/go.sum index fc03edf962..e835618c92 100644 --- a/flow/go.sum +++ b/flow/go.sum @@ -94,6 +94,8 @@ github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.0 h1:l5puwOHr7IxECu github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.0/go.mod h1:Oov79flWa/n7Ni+lQC3z+VM7PoRM47omRqbJU9B5Y7E= github.com/aws/aws-sdk-go-v2/service/s3 v1.50.0 h1:jZAdMD1ioZdqirzzVVRhpHHWJmcGGCn8JqDYBs5nmYA= github.com/aws/aws-sdk-go-v2/service/s3 v1.50.0/go.mod h1:1o/W6JFUuREj2ExoQ21vHJgO7wakvjhol91M9eknFgs= +github.com/aws/aws-sdk-go-v2/service/ses v1.22.1 h1:FOkVxvctmbFpp9QYyu6tcDsRa8ZXM1EuozwoKG0OnOM= +github.com/aws/aws-sdk-go-v2/service/ses v1.22.1/go.mod h1:jAAwtV9eq69pttQ8d24aQh+JD4RgotYZaz/XvvEJ5bI= github.com/aws/aws-sdk-go-v2/service/sns v1.29.1 h1:K2FiR/547lI9vGuDL0Ghin4QPSEvOKxbHY9aXFq8wfU= github.com/aws/aws-sdk-go-v2/service/sns v1.29.1/go.mod h1:PBmfgVv83oBgZVFhs/+oWsL6r0hLyB6qHRFEWwHyHn4= github.com/aws/aws-sdk-go-v2/service/sso v1.19.0 h1:u6OkVDxtBPnxPkZ9/63ynEe+8kHbtS5IfaC4PzVxzWM= diff --git a/flow/peerdbenv/config.go b/flow/peerdbenv/config.go index 9a59bad5ef..7be6fb8e53 100644 --- a/flow/peerdbenv/config.go +++ b/flow/peerdbenv/config.go @@ -95,3 +95,11 @@ func PeerDBEnableParallelSyncNormalize() bool { func PeerDBTelemetryAWSSNSTopicArn() string { return getEnvString("PEERDB_TELEMETRY_AWS_SNS_TOPIC_ARN", "") } + +func PeerDBAlertingEmailSenderSourceEmail() string { + return getEnvString("PEERDB_ALERTING_EMAIL_SENDER_SOURCE_EMAIL", "") +} + +func PeerDBAlertingEmailSenderConfigurationSet() string { + return getEnvString("PEERDB_ALERTING_EMAIL_SENDER_CONFIGURATION_SET", "") +}