diff --git a/flow/alerting/alerting.go b/flow/alerting/alerting.go index 93cb946f78..acdbe6c290 100644 --- a/flow/alerting/alerting.go +++ b/flow/alerting/alerting.go @@ -3,8 +3,10 @@ package alerting import ( "context" "encoding/json" + "errors" "fmt" "log/slog" + "strings" "time" "github.com/jackc/pgx/v5" @@ -23,32 +25,67 @@ type Alerter struct { telemetrySender telemetry.Sender } -func (a *Alerter) registerSendersFromPool(ctx context.Context) ([]*slackAlertSender, error) { +type AlertSenderConfig struct { + Id int64 + Sender AlertSender +} + +func (a *Alerter) registerSendersFromPool(ctx context.Context) ([]AlertSenderConfig, error) { rows, err := a.catalogPool.Query(ctx, - "SELECT service_type,service_config FROM peerdb_stats.alerting_config") + "SELECT id,service_type,service_config FROM peerdb_stats.alerting_config") if err != nil { return nil, fmt.Errorf("failed to read alerter config from catalog: %w", err) } - var slackAlertSenders []*slackAlertSender - var serviceType, serviceConfig string - _, err = pgx.ForEachRow(rows, []any{&serviceType, &serviceConfig}, func() error { + var alertSenderConfigs []AlertSenderConfig + var serviceType ServiceType + var serviceConfig string + var id int64 + _, err = pgx.ForEachRow(rows, []any{&id, &serviceType, &serviceConfig}, func() error { switch serviceType { - case "slack": + case SLACK: var slackServiceConfig slackAlertConfig err = json.Unmarshal([]byte(serviceConfig), &slackServiceConfig) if err != nil { - return fmt.Errorf("failed to unmarshal Slack service config: %w", err) + return fmt.Errorf("failed to unmarshal %s service config: %w", serviceType, err) + } + + alertSenderConfigs = append(alertSenderConfigs, AlertSenderConfig{Id: id, Sender: newSlackAlertSender(&slackServiceConfig)}) + case EMAIL: + var replyToAddresses []string + if replyToEnvString := strings.TrimSpace( + peerdbenv.PeerDBAlertingEmailSenderReplyToAddresses()); replyToEnvString != "" { + replyToAddresses = strings.Split(replyToEnvString, ",") + } + emailServiceConfig := EmailAlertSenderConfig{ + sourceEmail: peerdbenv.PeerDBAlertingEmailSenderSourceEmail(), + configurationSetName: peerdbenv.PeerDBAlertingEmailSenderConfigurationSet(), + replyToAddresses: replyToAddresses, + } + if emailServiceConfig.sourceEmail == "" { + return errors.New("missing sourceEmail for Email alerting service") + } + err = json.Unmarshal([]byte(serviceConfig), &emailServiceConfig) + if err != nil { + return fmt.Errorf("failed to unmarshal %s service config: %w", serviceType, err) + } + var region *string + if envRegion := peerdbenv.PeerDBAlertingEmailSenderRegion(); envRegion != "" { + region = &envRegion } - slackAlertSenders = append(slackAlertSenders, newSlackAlertSender(&slackServiceConfig)) + alertSender, alertSenderErr := NewEmailAlertSenderWithNewClient(ctx, region, &emailServiceConfig) + if alertSenderErr != nil { + return fmt.Errorf("failed to initialize email alerter: %w", alertSenderErr) + } + alertSenderConfigs = append(alertSenderConfigs, AlertSenderConfig{Id: id, Sender: alertSender}) default: return fmt.Errorf("unknown service type: %s", serviceType) } return nil }) - return slackAlertSenders, nil + return alertSenderConfigs, nil } // doesn't take care of closing pool, needs to be done externally. @@ -75,9 +112,9 @@ func NewAlerter(ctx context.Context, catalogPool *pgxpool.Pool) *Alerter { } func (a *Alerter) AlertIfSlotLag(ctx context.Context, peerName string, slotInfo *protos.SlotInfo) { - slackAlertSenders, err := a.registerSendersFromPool(ctx) + alertSenderConfigs, err := a.registerSendersFromPool(ctx) if err != nil { - logger.LoggerFromCtx(ctx).Warn("failed to set Slack senders", slog.Any("error", err)) + logger.LoggerFromCtx(ctx).Warn("failed to set alert senders", slog.Any("error", err)) return } @@ -89,29 +126,30 @@ func (a *Alerter) AlertIfSlotLag(ctx context.Context, peerName string, slotInfo defaultSlotLagMBAlertThreshold := dynamicconf.PeerDBSlotLagMBAlertThreshold(ctx) // catalog cannot use default threshold to space alerts properly, use the lowest set threshold instead lowestSlotLagMBAlertThreshold := defaultSlotLagMBAlertThreshold - for _, slackAlertSender := range slackAlertSenders { - if slackAlertSender.slotLagMBAlertThreshold > 0 { - lowestSlotLagMBAlertThreshold = min(lowestSlotLagMBAlertThreshold, slackAlertSender.slotLagMBAlertThreshold) + for _, alertSender := range alertSenderConfigs { + if alertSender.Sender.getSlotLagMBAlertThreshold() > 0 { + lowestSlotLagMBAlertThreshold = min(lowestSlotLagMBAlertThreshold, alertSender.Sender.getSlotLagMBAlertThreshold()) } } - alertKey := peerName + "-slot-lag-threshold-exceeded" + alertKey := fmt.Sprintf("%s Slot Lag Threshold Exceeded for Peer %s", deploymentUIDPrefix, peerName) alertMessageTemplate := fmt.Sprintf("%sSlot `%s` on peer `%s` has exceeded threshold size of %%dMB, "+ - `currently at %.2fMB! - cc: `, deploymentUIDPrefix, slotInfo.SlotName, peerName, slotInfo.LagInMb) - - if slotInfo.LagInMb > float32(lowestSlotLagMBAlertThreshold) && - a.checkAndAddAlertToCatalog(ctx, alertKey, fmt.Sprintf(alertMessageTemplate, lowestSlotLagMBAlertThreshold)) { - for _, slackAlertSender := range slackAlertSenders { - if slackAlertSender.slotLagMBAlertThreshold > 0 { - if slotInfo.LagInMb > float32(slackAlertSender.slotLagMBAlertThreshold) { - a.alertToSlack(ctx, slackAlertSender, alertKey, - fmt.Sprintf(alertMessageTemplate, slackAlertSender.slotLagMBAlertThreshold)) - } - } else { - if slotInfo.LagInMb > float32(defaultSlotLagMBAlertThreshold) { - a.alertToSlack(ctx, slackAlertSender, alertKey, - fmt.Sprintf(alertMessageTemplate, defaultSlotLagMBAlertThreshold)) + `currently at %.2fMB!`, deploymentUIDPrefix, slotInfo.SlotName, peerName, slotInfo.LagInMb) + + if slotInfo.LagInMb > float32(lowestSlotLagMBAlertThreshold) { + for _, alertSenderConfig := range alertSenderConfigs { + if a.checkAndAddAlertToCatalog(ctx, + alertSenderConfig.Id, alertKey, fmt.Sprintf(alertMessageTemplate, lowestSlotLagMBAlertThreshold)) { + if alertSenderConfig.Sender.getSlotLagMBAlertThreshold() > 0 { + if slotInfo.LagInMb > float32(alertSenderConfig.Sender.getSlotLagMBAlertThreshold()) { + a.alertToProvider(ctx, alertSenderConfig, alertKey, + fmt.Sprintf(alertMessageTemplate, alertSenderConfig.Sender.getSlotLagMBAlertThreshold())) + } + } else { + if slotInfo.LagInMb > float32(defaultSlotLagMBAlertThreshold) { + a.alertToProvider(ctx, alertSenderConfig, alertKey, + fmt.Sprintf(alertMessageTemplate, defaultSlotLagMBAlertThreshold)) + } } } } @@ -121,7 +159,7 @@ func (a *Alerter) AlertIfSlotLag(ctx context.Context, peerName string, slotInfo func (a *Alerter) AlertIfOpenConnections(ctx context.Context, peerName string, openConnections *protos.GetOpenConnectionsForUserResult, ) { - slackAlertSenders, err := a.registerSendersFromPool(ctx) + alertSenderConfigs, err := a.registerSendersFromPool(ctx) if err != nil { logger.LoggerFromCtx(ctx).Warn("failed to set Slack senders", slog.Any("error", err)) return @@ -129,44 +167,45 @@ func (a *Alerter) AlertIfOpenConnections(ctx context.Context, peerName string, deploymentUIDPrefix := "" if peerdbenv.PeerDBDeploymentUID() != "" { - deploymentUIDPrefix = fmt.Sprintf("[%s] ", peerdbenv.PeerDBDeploymentUID()) + deploymentUIDPrefix = fmt.Sprintf("[%s] - ", peerdbenv.PeerDBDeploymentUID()) } // same as with slot lag, use lowest threshold for catalog defaultOpenConnectionsThreshold := dynamicconf.PeerDBOpenConnectionsAlertThreshold(ctx) lowestOpenConnectionsThreshold := defaultOpenConnectionsThreshold - for _, slackAlertSender := range slackAlertSenders { - if slackAlertSender.openConnectionsAlertThreshold > 0 { - lowestOpenConnectionsThreshold = min(lowestOpenConnectionsThreshold, slackAlertSender.openConnectionsAlertThreshold) + for _, alertSender := range alertSenderConfigs { + if alertSender.Sender.getOpenConnectionsAlertThreshold() > 0 { + lowestOpenConnectionsThreshold = min(lowestOpenConnectionsThreshold, alertSender.Sender.getOpenConnectionsAlertThreshold()) } } - alertKey := peerName + "-max-open-connections-threshold-exceeded" + alertKey := fmt.Sprintf("%s Max Open Connections Threshold Exceeded for Peer %s", deploymentUIDPrefix, peerName) alertMessageTemplate := fmt.Sprintf("%sOpen connections from PeerDB user `%s` on peer `%s`"+ - ` has exceeded threshold size of %%d connections, currently at %d connections! - cc: `, deploymentUIDPrefix, openConnections.UserName, peerName, openConnections.CurrentOpenConnections) - - if openConnections.CurrentOpenConnections > int64(lowestOpenConnectionsThreshold) && - a.checkAndAddAlertToCatalog(ctx, alertKey, fmt.Sprintf(alertMessageTemplate, lowestOpenConnectionsThreshold)) { - for _, slackAlertSender := range slackAlertSenders { - if slackAlertSender.openConnectionsAlertThreshold > 0 { - if openConnections.CurrentOpenConnections > int64(slackAlertSender.openConnectionsAlertThreshold) { - a.alertToSlack(ctx, slackAlertSender, alertKey, - fmt.Sprintf(alertMessageTemplate, slackAlertSender.openConnectionsAlertThreshold)) - } - } else { - if openConnections.CurrentOpenConnections > int64(defaultOpenConnectionsThreshold) { - a.alertToSlack(ctx, slackAlertSender, alertKey, - fmt.Sprintf(alertMessageTemplate, defaultOpenConnectionsThreshold)) + ` has exceeded threshold size of %%d connections, currently at %d connections!`, + deploymentUIDPrefix, openConnections.UserName, peerName, openConnections.CurrentOpenConnections) + + if openConnections.CurrentOpenConnections > int64(lowestOpenConnectionsThreshold) { + for _, alertSenderConfig := range alertSenderConfigs { + if a.checkAndAddAlertToCatalog(ctx, + alertSenderConfig.Id, alertKey, fmt.Sprintf(alertMessageTemplate, lowestOpenConnectionsThreshold)) { + if alertSenderConfig.Sender.getOpenConnectionsAlertThreshold() > 0 { + if openConnections.CurrentOpenConnections > int64(alertSenderConfig.Sender.getOpenConnectionsAlertThreshold()) { + a.alertToProvider(ctx, alertSenderConfig, alertKey, + fmt.Sprintf(alertMessageTemplate, alertSenderConfig.Sender.getOpenConnectionsAlertThreshold())) + } + } else { + if openConnections.CurrentOpenConnections > int64(defaultOpenConnectionsThreshold) { + a.alertToProvider(ctx, alertSenderConfig, alertKey, + fmt.Sprintf(alertMessageTemplate, defaultOpenConnectionsThreshold)) + } } } } } } -func (a *Alerter) alertToSlack(ctx context.Context, slackAlertSender *slackAlertSender, alertKey string, alertMessage string) { - err := slackAlertSender.sendAlert(ctx, - ":rotating_light:Alert:rotating_light:: "+alertKey, alertMessage) +func (a *Alerter) alertToProvider(ctx context.Context, alertSenderConfig AlertSenderConfig, alertKey string, alertMessage string) { + err := alertSenderConfig.Sender.sendAlert(ctx, alertKey, alertMessage) if err != nil { logger.LoggerFromCtx(ctx).Warn("failed to send alert", slog.Any("error", err)) return @@ -176,7 +215,7 @@ func (a *Alerter) alertToSlack(ctx context.Context, slackAlertSender *slackAlert // Only raises an alert if another alert with the same key hasn't been raised // in the past X minutes, where X is configurable and defaults to 15 minutes // returns true if alert added to catalog, so proceed with processing alerts to slack -func (a *Alerter) checkAndAddAlertToCatalog(ctx context.Context, alertKey string, alertMessage string) bool { +func (a *Alerter) checkAndAddAlertToCatalog(ctx context.Context, alertConfigId int64, alertKey string, alertMessage string) bool { dur := dynamicconf.PeerDBAlertingGapMinutesAsDuration(ctx) if dur == 0 { logger.LoggerFromCtx(ctx).Warn("Alerting disabled via environment variable, returning") @@ -184,9 +223,9 @@ func (a *Alerter) checkAndAddAlertToCatalog(ctx context.Context, alertKey string } row := a.catalogPool.QueryRow(ctx, - `SELECT created_timestamp FROM peerdb_stats.alerts_v1 WHERE alert_key=$1 + `SELECT created_timestamp FROM peerdb_stats.alerts_v1 WHERE alert_key=$1 AND alert_config_id=$2 ORDER BY created_timestamp DESC LIMIT 1`, - alertKey) + alertKey, alertConfigId) var createdTimestamp time.Time err := row.Scan(&createdTimestamp) if err != nil && err != pgx.ErrNoRows { @@ -196,14 +235,18 @@ func (a *Alerter) checkAndAddAlertToCatalog(ctx context.Context, alertKey string if time.Since(createdTimestamp) >= dur { _, err = a.catalogPool.Exec(ctx, - "INSERT INTO peerdb_stats.alerts_v1(alert_key,alert_message) VALUES($1,$2)", - alertKey, alertMessage) + "INSERT INTO peerdb_stats.alerts_v1(alert_key,alert_message,alert_config_id) VALUES($1,$2,$3)", + alertKey, alertMessage, alertConfigId) if err != nil { logger.LoggerFromCtx(ctx).Warn("failed to insert alert", slog.Any("error", err)) return false } return true } + + logger.LoggerFromCtx(ctx).Info( + fmt.Sprintf("Skipped sending alerts: last alert was sent at %s, which was >=%s ago", + createdTimestamp.String(), dur.String())) return false } diff --git a/flow/alerting/email_alert_sender.go b/flow/alerting/email_alert_sender.go new file mode 100644 index 0000000000..2a534318c0 --- /dev/null +++ b/flow/alerting/email_alert_sender.go @@ -0,0 +1,104 @@ +package alerting + +import ( + "context" + "fmt" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/ses" + "github.com/aws/aws-sdk-go-v2/service/ses/types" + + "github.com/PeerDB-io/peer-flow/logger" + "github.com/PeerDB-io/peer-flow/peerdbenv" + "github.com/PeerDB-io/peer-flow/shared/aws_common" +) + +type EmailAlertSender struct { + AlertSender + client *ses.Client + sourceEmail string + configurationSetName string + replyToAddresses []string + slotLagMBAlertThreshold uint32 + openConnectionsAlertThreshold uint32 + emailAddresses []string +} + +func (e *EmailAlertSender) getSlotLagMBAlertThreshold() uint32 { + return e.slotLagMBAlertThreshold +} + +func (e *EmailAlertSender) getOpenConnectionsAlertThreshold() uint32 { + return e.openConnectionsAlertThreshold +} + +type EmailAlertSenderConfig struct { + sourceEmail string + configurationSetName string + replyToAddresses []string + SlotLagMBAlertThreshold uint32 `json:"slot_lag_mb_alert_threshold"` + OpenConnectionsAlertThreshold uint32 `json:"open_connections_alert_threshold"` + EmailAddresses []string `json:"email_addresses"` +} + +func (e *EmailAlertSender) sendAlert(ctx context.Context, alertTitle string, alertMessage string) error { + _, err := e.client.SendEmail(ctx, &ses.SendEmailInput{ + Destination: &types.Destination{ + ToAddresses: e.emailAddresses, + }, + Message: &types.Message{ + Body: &types.Body{ + Text: &types.Content{ + Data: aws.String(alertMessage), + Charset: aws.String("utf-8"), + }, + }, + Subject: &types.Content{ + Data: aws.String(alertTitle), + Charset: aws.String("utf-8"), + }, + }, + Source: aws.String(e.sourceEmail), + ConfigurationSetName: aws.String(e.configurationSetName), + ReplyToAddresses: e.replyToAddresses, + Tags: []types.MessageTag{ + {Name: aws.String("DeploymentUUID"), Value: aws.String(peerdbenv.PeerDBDeploymentUID())}, + }, + }) + if err != nil { + logger.LoggerFromCtx(ctx).Warn(fmt.Sprintf( + "Error sending email alert from %v to %s subject=[%s], body=[%s], configurationSet=%s, replyToAddresses=[%v]", + e.sourceEmail, e.emailAddresses, alertTitle, alertMessage, e.configurationSetName, e.replyToAddresses)) + return err + } + return nil +} + +func NewEmailAlertSenderWithNewClient(ctx context.Context, region *string, config *EmailAlertSenderConfig) (*EmailAlertSender, error) { + client, err := newSesClient(ctx, region) + if err != nil { + return nil, err + } + return NewEmailAlertSender(client, config), nil +} + +func NewEmailAlertSender(client *ses.Client, config *EmailAlertSenderConfig) *EmailAlertSender { + return &EmailAlertSender{ + client: client, + sourceEmail: config.sourceEmail, + configurationSetName: config.configurationSetName, + replyToAddresses: config.replyToAddresses, + slotLagMBAlertThreshold: config.SlotLagMBAlertThreshold, + openConnectionsAlertThreshold: config.OpenConnectionsAlertThreshold, + emailAddresses: config.EmailAddresses, + } +} + +func newSesClient(ctx context.Context, region *string) (*ses.Client, error) { + sdkConfig, err := aws_common.LoadSdkConfig(ctx, region) + if err != nil { + return nil, err + } + snsClient := ses.NewFromConfig(*sdkConfig) + return snsClient, nil +} diff --git a/flow/alerting/interface.go b/flow/alerting/interface.go new file mode 100644 index 0000000000..a9dd2d51b9 --- /dev/null +++ b/flow/alerting/interface.go @@ -0,0 +1,9 @@ +package alerting + +import "context" + +type AlertSender interface { + sendAlert(ctx context.Context, alertTitle string, alertMessage string) error + getSlotLagMBAlertThreshold() uint32 + getOpenConnectionsAlertThreshold() uint32 +} diff --git a/flow/alerting/slack_alert_sender.go b/flow/alerting/slack_alert_sender.go index 1ad007536b..85f0657d11 100644 --- a/flow/alerting/slack_alert_sender.go +++ b/flow/alerting/slack_alert_sender.go @@ -7,13 +7,22 @@ import ( "github.com/slack-go/slack" ) -type slackAlertSender struct { +type SlackAlertSender struct { + AlertSender client *slack.Client channelIDs []string slotLagMBAlertThreshold uint32 openConnectionsAlertThreshold uint32 } +func (s *SlackAlertSender) getSlotLagMBAlertThreshold() uint32 { + return s.slotLagMBAlertThreshold +} + +func (s *SlackAlertSender) getOpenConnectionsAlertThreshold() uint32 { + return s.openConnectionsAlertThreshold +} + type slackAlertConfig struct { AuthToken string `json:"auth_token"` ChannelIDs []string `json:"channel_ids"` @@ -21,8 +30,8 @@ type slackAlertConfig struct { OpenConnectionsAlertThreshold uint32 `json:"open_connections_alert_threshold"` } -func newSlackAlertSender(config *slackAlertConfig) *slackAlertSender { - return &slackAlertSender{ +func newSlackAlertSender(config *slackAlertConfig) *SlackAlertSender { + return &SlackAlertSender{ client: slack.New(config.AuthToken), channelIDs: config.ChannelIDs, slotLagMBAlertThreshold: config.SlotLagMBAlertThreshold, @@ -30,11 +39,11 @@ func newSlackAlertSender(config *slackAlertConfig) *slackAlertSender { } } -func (s *slackAlertSender) sendAlert(ctx context.Context, alertTitle string, alertMessage string) error { +func (s *SlackAlertSender) sendAlert(ctx context.Context, alertTitle string, alertMessage string) error { for _, channelID := range s.channelIDs { _, _, _, err := s.client.SendMessageContext(ctx, channelID, slack.MsgOptionBlocks( - slack.NewHeaderBlock(slack.NewTextBlockObject("plain_text", alertTitle, true, false)), - slack.NewSectionBlock(slack.NewTextBlockObject("mrkdwn", alertMessage, false, false), nil, nil), + slack.NewHeaderBlock(slack.NewTextBlockObject("plain_text", ":rotating_light:Alert:rotating_light:: "+alertTitle, true, false)), + slack.NewSectionBlock(slack.NewTextBlockObject("mrkdwn", alertMessage+"\ncc: ", false, false), nil, nil), )) if err != nil { return fmt.Errorf("failed to send message to Slack channel %s: %w", channelID, err) diff --git a/flow/alerting/types.go b/flow/alerting/types.go new file mode 100644 index 0000000000..6277010ee0 --- /dev/null +++ b/flow/alerting/types.go @@ -0,0 +1,8 @@ +package alerting + +type ServiceType string + +const ( + SLACK ServiceType = "slack" + EMAIL ServiceType = "email" +) diff --git a/flow/go.mod b/flow/go.mod index 6c319239ae..5126e3146d 100644 --- a/flow/go.mod +++ b/flow/go.mod @@ -15,6 +15,7 @@ require ( github.com/aws/aws-sdk-go-v2/credentials v1.17.7 github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.16.9 github.com/aws/aws-sdk-go-v2/service/s3 v1.51.4 + github.com/aws/aws-sdk-go-v2/service/ses v1.22.2 github.com/aws/aws-sdk-go-v2/service/sns v1.29.2 github.com/cockroachdb/pebble v1.1.0 github.com/google/uuid v1.6.0 diff --git a/flow/go.sum b/flow/go.sum index acb83d06ba..377b2c2a16 100644 --- a/flow/go.sum +++ b/flow/go.sum @@ -92,6 +92,8 @@ github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.3 h1:4t+QEX7BsXz98W github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.3/go.mod h1:oFcjjUq5Hm09N9rpxTdeMeLeQcxS7mIkBkL8qUKng+A= github.com/aws/aws-sdk-go-v2/service/s3 v1.51.4 h1:lW5xUzOPGAMY7HPuNF4FdyBwRc3UJ/e8KsapbesVeNU= github.com/aws/aws-sdk-go-v2/service/s3 v1.51.4/go.mod h1:MGTaf3x/+z7ZGugCGvepnx2DS6+caCYYqKhzVoLNYPk= +github.com/aws/aws-sdk-go-v2/service/ses v1.22.2 h1:cW5JtW23Lio3KDJ4l3jqRiOcCPKxJg7ooRA1SpIiuMo= +github.com/aws/aws-sdk-go-v2/service/ses v1.22.2/go.mod h1:MLj/NROJoperecxBME2zMN/O8Zrm0wv+6ah1Uqwpa1E= github.com/aws/aws-sdk-go-v2/service/sns v1.29.2 h1:kHm1SYs/NkxZpKINc4zOXOLJHVMzKtU4d7FlAMtDm50= github.com/aws/aws-sdk-go-v2/service/sns v1.29.2/go.mod h1:ZIs7/BaYel9NODoYa8PW39o15SFAXDEb4DxOG2It15U= github.com/aws/aws-sdk-go-v2/service/sso v1.20.2 h1:XOPfar83RIRPEzfihnp+U6udOveKZJvPQ76SKWrLRHc= diff --git a/flow/peerdbenv/config.go b/flow/peerdbenv/config.go index 9a59bad5ef..4903bec091 100644 --- a/flow/peerdbenv/config.go +++ b/flow/peerdbenv/config.go @@ -95,3 +95,20 @@ func PeerDBEnableParallelSyncNormalize() bool { func PeerDBTelemetryAWSSNSTopicArn() string { return getEnvString("PEERDB_TELEMETRY_AWS_SNS_TOPIC_ARN", "") } + +func PeerDBAlertingEmailSenderSourceEmail() string { + return getEnvString("PEERDB_ALERTING_EMAIL_SENDER_SOURCE_EMAIL", "") +} + +func PeerDBAlertingEmailSenderConfigurationSet() string { + return getEnvString("PEERDB_ALERTING_EMAIL_SENDER_CONFIGURATION_SET", "") +} + +func PeerDBAlertingEmailSenderRegion() string { + return getEnvString("PEERDB_ALERTING_EMAIL_SENDER_REGION", "") +} + +// Comma-separated reply-to addresses +func PeerDBAlertingEmailSenderReplyToAddresses() string { + return getEnvString("PEERDB_ALERTING_EMAIL_SENDER_REPLY_TO_ADDRESSES", "") +} diff --git a/flow/shared/aws_common/config.go b/flow/shared/aws_common/config.go new file mode 100644 index 0000000000..6eced96f05 --- /dev/null +++ b/flow/shared/aws_common/config.go @@ -0,0 +1,21 @@ +package aws_common + +import ( + "context" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/config" +) + +func LoadSdkConfig(ctx context.Context, region *string) (*aws.Config, error) { + sdkConfig, err := config.LoadDefaultConfig(ctx, func(options *config.LoadOptions) error { + if region != nil { + options.Region = *region + } + return nil + }) + if err != nil { + return nil, err + } + return &sdkConfig, nil +} diff --git a/flow/shared/telemetry/sns_message_sender.go b/flow/shared/telemetry/sns_message_sender.go index 9d32dcf8f9..218b693b38 100644 --- a/flow/shared/telemetry/sns_message_sender.go +++ b/flow/shared/telemetry/sns_message_sender.go @@ -8,10 +8,11 @@ import ( "unicode" "github.com/aws/aws-sdk-go-v2/aws" - "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/service/sns" "github.com/aws/aws-sdk-go-v2/service/sns/types" "go.temporal.io/sdk/activity" + + "github.com/PeerDB-io/peer-flow/shared/aws_common" ) type SNSMessageSender interface { @@ -109,15 +110,10 @@ func NewSNSMessageSender(client *sns.Client, config *SNSMessageSenderConfig) SNS } func newSnsClient(ctx context.Context, region *string) (*sns.Client, error) { - sdkConfig, err := config.LoadDefaultConfig(ctx, func(options *config.LoadOptions) error { - if region != nil { - options.Region = *region - } - return nil - }) + sdkConfig, err := aws_common.LoadSdkConfig(ctx, region) if err != nil { return nil, err } - snsClient := sns.NewFromConfig(sdkConfig) + snsClient := sns.NewFromConfig(*sdkConfig) return snsClient, nil } diff --git a/nexus/catalog/migrations/V21__alert_constraint_update.sql b/nexus/catalog/migrations/V21__alert_constraint_update.sql new file mode 100644 index 0000000000..18ddb7d9f8 --- /dev/null +++ b/nexus/catalog/migrations/V21__alert_constraint_update.sql @@ -0,0 +1,6 @@ +ALTER TABLE peerdb_stats.alerting_config +DROP CONSTRAINT alerting_config_service_type_check; + +ALTER TABLE peerdb_stats.alerting_config +ADD CONSTRAINT alerting_config_service_type_check +CHECK (service_type IN ('slack', 'email')); \ No newline at end of file diff --git a/nexus/catalog/migrations/V22__alert_column_add_config_id.sql b/nexus/catalog/migrations/V22__alert_column_add_config_id.sql new file mode 100644 index 0000000000..35c1147bf0 --- /dev/null +++ b/nexus/catalog/migrations/V22__alert_column_add_config_id.sql @@ -0,0 +1,2 @@ +ALTER TABLE peerdb_stats.alerts_v1 +ADD COLUMN alert_config_id BIGINT DEFAULT NULL; diff --git a/ui/app/alert-config/new.tsx b/ui/app/alert-config/new.tsx index 6a399e4c81..12dfbc9cf5 100644 --- a/ui/app/alert-config/new.tsx +++ b/ui/app/alert-config/new.tsx @@ -1,21 +1,27 @@ import { Button } from '@/lib/Button'; import { TextField } from '@/lib/TextField'; import Image from 'next/image'; -import { useState } from 'react'; +import { Dispatch, SetStateAction, useState } from 'react'; import ReactSelect from 'react-select'; import { PulseLoader } from 'react-spinners'; import { ToastContainer, toast } from 'react-toastify'; import 'react-toastify/dist/ReactToastify.css'; import SelectTheme from '../styles/select'; -import { alertConfigReqSchema, alertConfigType } from './validation'; +import { + alertConfigReqSchema, + alertConfigType, + emailConfigType, + serviceConfigType, + serviceTypeSchemaMap, + slackConfigType, +} from './validation'; + +export type ServiceType = 'slack' | 'email'; export interface AlertConfigProps { id?: bigint; - serviceType: string; - authToken: string; - channelIdString: string; - slotLagGBAlertThreshold: number; - openConnectionsAlertThreshold: number; + serviceType: ServiceType; + alertConfig: serviceConfigType; forEdit?: boolean; } @@ -25,45 +31,152 @@ const notifyErr = (errMsg: string) => { }); }; -function ConfigLabel() { +function ConfigLabel(data: { label: string; value: string }) { return (
- slack + {data.value} + {data.label}
); } -const NewAlertConfig = (alertProps: AlertConfigProps) => { - const [serviceType, setServiceType] = useState('slack'); - const [authToken, setAuthToken] = useState(alertProps.authToken); - const [channelIdString, setChannelIdString] = useState( - alertProps.channelIdString +function getSlackProps( + config: slackConfigType, + setConfig: Dispatch> +) { + return ( + <> +
+

Authorisation Token

+ { + setConfig((previous) => ({ + ...previous, + auth_token: e.target.value, + })); + }} + /> +
+
+

Channel IDs

+ { + setConfig((previous) => ({ + ...previous, + channel_ids: e.target.value.split(','), + })); + }} + /> +
+ + ); +} + +function getEmailProps( + config: emailConfigType, + setConfig: Dispatch> +) { + return ( + <> +
+

Email Addresses

+ { + setConfig((previous) => ({ + ...previous, + email_addresses: e.target.value.split(','), + })); + }} + /> +
+ + ); +} +function getServiceFields( + serviceType: ServiceType, + config: T, + setConfig: Dispatch> +) { + switch (serviceType) { + case 'email': + return getEmailProps( + config as emailConfigType, + setConfig as Dispatch> + ); + case 'slack': { + return getSlackProps( + config as slackConfigType, + setConfig as Dispatch> + ); + } + } +} + +export function NewConfig(alertProps: AlertConfigProps) { + const [serviceType, setServiceType] = useState( + alertProps.serviceType ); - const [slotLagGBAlertThreshold, setSlotLagGBAlertThreshold] = - useState(alertProps.slotLagGBAlertThreshold); - const [openConnectionsAlertThreshold, setOpenConnectionsAlertThreshold] = - useState(alertProps.openConnectionsAlertThreshold); + + const [config, setConfig] = useState( + alertProps.alertConfig + ); + const [loading, setLoading] = useState(false); + const handleAdd = async () => { - if (serviceType !== 'slack') { - notifyErr('Service Type must be selected'); + if (!serviceType) { + notifyErr('Service type must be selected'); + return; + } + + const serviceSchema = serviceTypeSchemaMap[serviceType]; + const serviceValidity = serviceSchema.safeParse(config); + if (!serviceValidity?.success) { + notifyErr( + 'Invalid alert service configuration for ' + + serviceType + + '. ' + + serviceValidity.error.issues[0].message + ); return; } + const serviceConfig = serviceValidity.data; const alertConfigReq: alertConfigType = { + id: Number(alertProps.id || -1), serviceType: serviceType, - serviceConfig: { - auth_token: authToken ?? '', - channel_ids: channelIdString?.split(',')!, - slot_lag_mb_alert_threshold: slotLagGBAlertThreshold * 1000 || 20000, - open_connections_alert_threshold: openConnectionsAlertThreshold || 5, - }, + serviceConfig, }; + const alertReqValidity = alertConfigReqSchema.safeParse(alertConfigReq); if (!alertReqValidity.success) { notifyErr(alertReqValidity.error.issues[0].message); return; } + setLoading(true); if (alertProps.forEdit) { alertConfigReq.id = Number(alertProps.id); @@ -72,6 +185,7 @@ const NewAlertConfig = (alertProps: AlertConfigProps) => { method: alertProps.forEdit ? 'PUT' : 'POST', body: JSON.stringify(alertConfigReq), }); + const createStatus = await createRes.text(); setLoading(false); if (createStatus !== 'success') { @@ -81,7 +195,7 @@ const NewAlertConfig = (alertProps: AlertConfigProps) => { window.location.reload(); }; - + const ServiceFields = getServiceFields(serviceType, config, setConfig); return (
{

Alert Provider

val && setServiceType(val.value)} + onChange={(val, _) => val && setServiceType(val.value as ServiceType)} theme={SelectTheme} />
-
-

Authorisation Token

- setAuthToken(e.target.value)} - /> -
- -
-

Channel IDs

- setChannelIdString(e.target.value)} - /> -
-

Slot Lag Alert Threshold (in GB)

setSlotLagGBAlertThreshold(e.target.valueAsNumber)} + value={config.slot_lag_mb_alert_threshold / 1000} + onChange={(e) => + setConfig((previous) => ({ + ...previous, + slot_lag_mb_alert_threshold: e.target.valueAsNumber * 1000, + })) + } />
-

Open Connections Alert Threshold

- setOpenConnectionsAlertThreshold(e.target.valueAsNumber) + setConfig((previous) => ({ + ...previous, + open_connections_alert_threshold: e.target.valueAsNumber, + })) } />
- + {ServiceFields}
); -}; +} -export default NewAlertConfig; +export default NewConfig; diff --git a/ui/app/alert-config/page.tsx b/ui/app/alert-config/page.tsx index 514f586155..25a92338b9 100644 --- a/ui/app/alert-config/page.tsx +++ b/ui/app/alert-config/page.tsx @@ -12,13 +12,25 @@ import useSWR from 'swr'; import { UAlertConfigResponse } from '../dto/AlertDTO'; import { tableStyle } from '../peers/[peerName]/style'; import { fetcher } from '../utils/swr'; -import NewAlertConfig, { AlertConfigProps } from './new'; -const ServiceIcon = (serviceType: string) => { - switch (serviceType.toLowerCase()) { - default: - return alt; - } +import { AlertConfigProps, NewConfig, ServiceType } from './new'; + +const ServiceIcon = ({ + serviceType, + size, +}: { + serviceType: string; + size: number; +}) => { + return ( + {serviceType} + ); }; + const AlertConfigPage: React.FC = () => { const { data: alerts, @@ -28,14 +40,19 @@ const AlertConfigPage: React.FC = () => { error: any; isLoading: boolean; } = useSWR('/api/alert-config', fetcher); + const blankAlert: AlertConfigProps = { - serviceType: '', - authToken: '', - channelIdString: '', - slotLagGBAlertThreshold: 20, - openConnectionsAlertThreshold: 5, + serviceType: 'slack', + alertConfig: { + email_addresses: [''], + auth_token: '', + channel_ids: [''], + open_connections_alert_threshold: 20, + slot_lag_mb_alert_threshold: 5000, + }, forEdit: false, }; + const [inEditOrAddMode, setInEditOrAddMode] = useState(false); const [editAlertConfig, setEditAlertConfig] = useState(blankAlert); @@ -43,19 +60,15 @@ const AlertConfigPage: React.FC = () => { const onEdit = (alertConfig: UAlertConfigResponse, id: bigint) => { setInEditOrAddMode(true); const configJSON = JSON.stringify(alertConfig.service_config); - const channelIds: string[] = JSON.parse(configJSON)?.channel_ids; + setEditAlertConfig({ id, - serviceType: alertConfig.service_type, - authToken: JSON.parse(configJSON)?.auth_token, - channelIdString: channelIds.join(','), - slotLagGBAlertThreshold: - (JSON.parse(configJSON)?.slot_lag_mb_alert_threshold as number) / 1000, - openConnectionsAlertThreshold: - JSON.parse(configJSON)?.open_connections_alert_threshold, + serviceType: alertConfig.service_type as ServiceType, + alertConfig: JSON.parse(configJSON), forEdit: true, }); }; + return (
@@ -79,11 +92,32 @@ const AlertConfigPage: React.FC = () => { {alerts?.length ? ( alerts.map((alertConfig: UAlertConfigResponse, index) => ( - - {alertConfig.id} - - - {ServiceIcon(alertConfig.service_type)} + +
+
+ + +
+
@@ -141,7 +175,7 @@ const AlertConfigPage: React.FC = () => { {inEditOrAddMode ? 'Cancel' : 'Add Configuration'} - {inEditOrAddMode && } + {inEditOrAddMode && }
); }; diff --git a/ui/app/alert-config/validation.ts b/ui/app/alert-config/validation.ts index 3256c51aa0..9b1a14b2b3 100644 --- a/ui/app/alert-config/validation.ts +++ b/ui/app/alert-config/validation.ts @@ -1,33 +1,77 @@ import z from 'zod'; -export const alertConfigReqSchema = z.object({ - id: z.optional(z.number()), - serviceType: z.enum(['slack'], { - errorMap: (issue, ctx) => ({ message: 'Invalid service type' }), - }), - serviceConfig: z.object({ +const baseServiceConfigSchema = z.object({ + slot_lag_mb_alert_threshold: z + .number({ + invalid_type_error: 'Slot threshold must be a number', + }) + .min(0, 'Slot threshold must be non-negative'), + open_connections_alert_threshold: z + .number({ + invalid_type_error: 'Threshold must be a number', + }) + .int({ message: 'Connections threshold must be a valid integer' }) + .min(0, 'Connections threshold must be non-negative'), +}); + +export const slackServiceConfigSchema = z.intersection( + baseServiceConfigSchema, + z.object({ auth_token: z .string({ required_error: 'Auth Token is needed.' }) .min(1, { message: 'Auth Token cannot be empty' }) .max(256, { message: 'Auth Token is too long' }), channel_ids: z - .array(z.string().min(1, { message: 'Channel IDs cannot be empty' }), { - required_error: 'We need a channel ID', - }) + .array( + z.string().trim().min(1, { message: 'Channel IDs cannot be empty' }), + { + required_error: 'We need a channel ID', + } + ) .min(1, { message: 'Atleast one channel ID is needed' }), - slot_lag_mb_alert_threshold: z - .number({ - invalid_type_error: 'Threshold must be a number', - }) - .int() - .min(0, 'Threshold must be non-negative'), - open_connections_alert_threshold: z - .number({ - invalid_type_error: 'Threshold must be a number', - }) - .int() - .min(0, 'Threshold must be non-negative'), + }) +); + +export const emailServiceConfigSchema = z.intersection( + baseServiceConfigSchema, + z.object({ + email_addresses: z + .array( + z + .string() + .trim() + .min(1, { message: 'Email Addresses cannot be empty' }) + .includes('@'), + { + required_error: 'We need an Email Address', + } + ) + .min(1, { message: 'Atleast one email address is needed' }), + }) +); + +export const serviceConfigSchema = z.union([ + slackServiceConfigSchema, + emailServiceConfigSchema, +]); +export const alertConfigReqSchema = z.object({ + id: z.optional(z.number({ invalid_type_error: 'ID must be a valid number' })), + serviceType: z.enum(['slack', 'email'], { + errorMap: (issue, ctx) => ({ message: 'Invalid service type' }), }), + serviceConfig: serviceConfigSchema, }); +export type baseServiceConfigType = z.infer; + +export type slackConfigType = z.infer; +export type emailConfigType = z.infer; + +export type serviceConfigType = z.infer; + export type alertConfigType = z.infer; + +export const serviceTypeSchemaMap = { + slack: slackServiceConfigSchema, + email: emailServiceConfigSchema, +}; diff --git a/ui/app/api/alert-config/route.ts b/ui/app/api/alert-config/route.ts index 7f66da191f..9b8ff019ed 100644 --- a/ui/app/api/alert-config/route.ts +++ b/ui/app/api/alert-config/route.ts @@ -23,6 +23,7 @@ export async function POST(request: Request) { if (createRes.id) { createStatus = 'success'; } + return new Response(createStatus); } diff --git a/ui/components/AlertDropdown.tsx b/ui/components/AlertDropdown.tsx index dd3ae482e3..bfc30648a7 100644 --- a/ui/components/AlertDropdown.tsx +++ b/ui/components/AlertDropdown.tsx @@ -40,19 +40,16 @@ const AlertDropdown = ({ borderRadius: '0.5rem', }} > - - - - - - + + + diff --git a/ui/public/images/email.png b/ui/public/images/email.png new file mode 100644 index 0000000000..0d302af32a Binary files /dev/null and b/ui/public/images/email.png differ diff --git a/ui/public/images/slack.png b/ui/public/images/slack.png index 32fe41912a..a045ed1962 100644 Binary files a/ui/public/images/slack.png and b/ui/public/images/slack.png differ diff --git a/ui/public/images/slack_full.png b/ui/public/images/slack_full.png new file mode 100644 index 0000000000..32fe41912a Binary files /dev/null and b/ui/public/images/slack_full.png differ