Skip to content

Commit

Permalink
custom threshold support for slack alerts
Browse files Browse the repository at this point in the history
  • Loading branch information
heavycrystal committed Feb 13, 2024
1 parent 9081af8 commit 8e4be75
Show file tree
Hide file tree
Showing 5 changed files with 176 additions and 62 deletions.
24 changes: 2 additions & 22 deletions flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,10 @@ import (

"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/connectors/utils/monitoring"
"github.com/PeerDB-io/peer-flow/dynamicconf"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/logger"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/model/qvalue"
"github.com/PeerDB-io/peer-flow/peerdbenv"
"github.com/PeerDB-io/peer-flow/shared/alerting"
)

Expand Down Expand Up @@ -935,33 +933,15 @@ func (c *PostgresConnector) HandleSlotInfo(
return nil
}

deploymentUIDPrefix := ""
if peerdbenv.PeerDBDeploymentUID() != "" {
deploymentUIDPrefix = fmt.Sprintf("[%s] ", peerdbenv.PeerDBDeploymentUID())
}

slotLagInMBThreshold := dynamicconf.PeerDBSlotLagMBAlertThreshold(ctx)
if (slotLagInMBThreshold > 0) && (slotInfo[0].LagInMb >= float32(slotLagInMBThreshold)) {
alerter.AlertIf(ctx, fmt.Sprintf("%s-slot-lag-threshold-exceeded", peerName),
fmt.Sprintf(`%sSlot `+"`%s`"+` on peer `+"`%s`"+` has exceeded threshold size of %dMB, currently at %.2fMB!
cc: <!channel>`,
deploymentUIDPrefix, slotName, peerName, slotLagInMBThreshold, slotInfo[0].LagInMb))
}
alerter.AlertIfSlotLag(ctx, peerName, slotInfo[0])

// Also handles alerts for PeerDB user connections exceeding a given limit here
maxOpenConnectionsThreshold := dynamicconf.PeerDBOpenConnectionsAlertThreshold(ctx)
res, err := getOpenConnectionsForUser(ctx, conn, c.config.User)
if err != nil {
logger.Warn("warning: failed to get current open connections", "error", err)
return err
}
if (maxOpenConnectionsThreshold > 0) && (res.CurrentOpenConnections >= int64(maxOpenConnectionsThreshold)) {
alerter.AlertIf(ctx, fmt.Sprintf("%s-max-open-connections-threshold-exceeded", peerName),
fmt.Sprintf(`%sOpen connections from PeerDB user `+"`%s`"+` on peer `+"`%s`"+
` has exceeded threshold size of %d connections, currently at %d connections!
cc: <!channel>`,
deploymentUIDPrefix, res.UserName, peerName, maxOpenConnectionsThreshold, res.CurrentOpenConnections))
}
alerter.AlertIfOpenConnections(ctx, peerName, res)

if len(slotInfo) != 0 {
return monitoring.AppendSlotSizeInfo(ctx, catalogPool, peerName, slotInfo[0])
Expand Down
154 changes: 120 additions & 34 deletions flow/shared/alerting/alerting.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package alerting
import (
"context"
"encoding/json"
"errors"
"fmt"
"log/slog"
"time"
Expand All @@ -11,16 +12,18 @@ import (
"github.com/jackc/pgx/v5/pgxpool"

"github.com/PeerDB-io/peer-flow/dynamicconf"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/logger"
"github.com/PeerDB-io/peer-flow/peerdbenv"
)

// alerting service, no cool name :(
type Alerter struct {
catalogPool *pgxpool.Pool
}

func registerSendersFromPool(ctx context.Context, catalogPool *pgxpool.Pool) ([]*slackAlertSender, error) {
rows, err := catalogPool.Query(ctx,
func (a *Alerter) registerSendersFromPool(ctx context.Context) ([]*slackAlertSender, error) {
rows, err := a.catalogPool.Query(ctx,
"SELECT service_type,service_config FROM peerdb_stats.alerting_config")
if err != nil {
return nil, fmt.Errorf("failed to read alerter config from catalog: %w", err)
Expand Down Expand Up @@ -50,68 +53,151 @@ func registerSendersFromPool(ctx context.Context, catalogPool *pgxpool.Pool) ([]
// doesn't take care of closing pool, needs to be done externally.
func NewAlerter(catalogPool *pgxpool.Pool) (*Alerter, error) {
if catalogPool == nil {
return nil, fmt.Errorf("catalog pool is nil for Alerter")
return nil, errors.New("catalog pool is nil for Alerter")
}

return &Alerter{
catalogPool: catalogPool,
}, nil
}

// Only raises an alert if another alert with the same key hasn't been raised
// in the past X minutes, where X is configurable and defaults to 15 minutes
func (a *Alerter) AlertIf(ctx context.Context, alertKey string, alertMessage string) {
dur := dynamicconf.PeerDBAlertingGapMinutesAsDuration(ctx)
if dur == 0 {
logger.LoggerFromCtx(ctx).Warn("Alerting disabled via environment variable, returning")
func (a *Alerter) AlertIfSlotLag(ctx context.Context, peerName string, slotInfo *protos.SlotInfo) {
slackAlertSenders, err := a.registerSendersFromPool(ctx)
if err != nil {
logger.LoggerFromCtx(ctx).Warn("failed to set Slack senders", slog.Any("error", err))
return
}

var err error
row := a.catalogPool.QueryRow(ctx,
`SELECT created_timestamp FROM peerdb_stats.alerts_v1 WHERE alert_key=$1
ORDER BY created_timestamp DESC LIMIT 1`,
alertKey)
var createdTimestamp time.Time
err = row.Scan(&createdTimestamp)
if err != nil && err != pgx.ErrNoRows {
logger.LoggerFromCtx(ctx).Warn("failed to send alert: ", slog.String("err", err.Error()))
return
deploymentUIDPrefix := ""
if peerdbenv.PeerDBDeploymentUID() != "" {
deploymentUIDPrefix = fmt.Sprintf("[%s] ", peerdbenv.PeerDBDeploymentUID())
}

if time.Since(createdTimestamp) >= dur {
a.AddAlertToCatalog(ctx, alertKey, alertMessage)
a.AlertToSlack(ctx, alertKey, alertMessage)
defaultSlotLagMBAlertThreshold := dynamicconf.PeerDBSlotLagMBAlertThreshold(ctx)
// catalog cannot use default threshold to space alerts properly, use the lowest set threshold instead
lowestSlotLagMBAlertThreshold := defaultSlotLagMBAlertThreshold
for _, slackAlertSender := range slackAlertSenders {
if slackAlertSender.slotLagMBAlertThreshold > 0 {
lowestSlotLagMBAlertThreshold = min(lowestSlotLagMBAlertThreshold, slackAlertSender.slotLagMBAlertThreshold)
}
}

alertKey := peerName + "-slot-lag-threshold-exceeded"
alertMessageTemplate := fmt.Sprintf(`%sSlot `+"`%s`"+` on peer `+"`%s`"+` has exceeded threshold size of `,
deploymentUIDPrefix, slotInfo.SlotName, peerName) +
"%d" +
fmt.Sprintf(`MB, currently at %.2fMB!
cc: <!channel>`, slotInfo.LagInMb)

if slotInfo.LagInMb > float32(lowestSlotLagMBAlertThreshold) &&
a.checkAndAddAlertToCatalog(ctx, alertKey, fmt.Sprintf(alertMessageTemplate, lowestSlotLagMBAlertThreshold)) {
for _, slackAlertSender := range slackAlertSenders {
if slackAlertSender.slotLagMBAlertThreshold > 0 {
if slotInfo.LagInMb > float32(slackAlertSender.slotLagMBAlertThreshold) {
a.alertToSlack(ctx, slackAlertSender, alertKey,
fmt.Sprintf(alertMessageTemplate, slackAlertSender.slotLagMBAlertThreshold))
}
} else {
if slotInfo.LagInMb > float32(defaultSlotLagMBAlertThreshold) {
a.alertToSlack(ctx, slackAlertSender, alertKey,
fmt.Sprintf(alertMessageTemplate, defaultSlotLagMBAlertThreshold))
}
}
}
}
}

func (a *Alerter) AlertToSlack(ctx context.Context, alertKey string, alertMessage string) {
slackAlertSenders, err := registerSendersFromPool(ctx, a.catalogPool)
func (a *Alerter) AlertIfOpenConnections(ctx context.Context, peerName string,
openConnections *protos.GetOpenConnectionsForUserResult,
) {
slackAlertSenders, err := a.registerSendersFromPool(ctx)
if err != nil {
logger.LoggerFromCtx(ctx).Warn("failed to set Slack senders", slog.Any("error", err))
return
}

deploymentUIDPrefix := ""
if peerdbenv.PeerDBDeploymentUID() != "" {
deploymentUIDPrefix = fmt.Sprintf("[%s] ", peerdbenv.PeerDBDeploymentUID())
}

// same as with slot lag, use lowest threshold for catalog
defaultOpenConnectionsThreshold := dynamicconf.PeerDBOpenConnectionsAlertThreshold(ctx)
lowestOpenConnectionsThreshold := defaultOpenConnectionsThreshold
for _, slackAlertSender := range slackAlertSenders {
err = slackAlertSender.sendAlert(ctx,
fmt.Sprintf(":rotating_light:Alert:rotating_light:: %s", alertKey), alertMessage)
if err != nil {
logger.LoggerFromCtx(ctx).Warn("failed to send alert", slog.Any("error", err))
return
if slackAlertSender.openConnectionsAlertThreshold > 0 {
lowestOpenConnectionsThreshold = min(lowestOpenConnectionsThreshold, slackAlertSender.openConnectionsAlertThreshold)
}
}

alertKey := peerName + "-max-open-connections-threshold-exceeded"
alertMessageTemplate := fmt.Sprintf(`%sOpen connections from PeerDB user `+"`%s`"+` on peer `+"`%s`"+
" has exceeded threshold size of ", deploymentUIDPrefix, openConnections.UserName, peerName) +
"%d" +
fmt.Sprintf(` connections, currently at %d connections!
cc: <!channel>`, openConnections.CurrentOpenConnections)

if openConnections.CurrentOpenConnections > int64(lowestOpenConnectionsThreshold) &&
a.checkAndAddAlertToCatalog(ctx, alertKey, fmt.Sprintf(alertMessageTemplate, lowestOpenConnectionsThreshold)) {
for _, slackAlertSender := range slackAlertSenders {
if slackAlertSender.openConnectionsAlertThreshold > 0 {
if openConnections.CurrentOpenConnections > int64(slackAlertSender.openConnectionsAlertThreshold) {
a.alertToSlack(ctx, slackAlertSender, alertKey,
fmt.Sprintf(alertMessageTemplate, slackAlertSender.openConnectionsAlertThreshold))
}
} else {
if openConnections.CurrentOpenConnections > int64(defaultOpenConnectionsThreshold) {
a.alertToSlack(ctx, slackAlertSender, alertKey,
fmt.Sprintf(alertMessageTemplate, defaultOpenConnectionsThreshold))
}
}
}
}
}

func (a *Alerter) AddAlertToCatalog(ctx context.Context, alertKey string, alertMessage string) {
_, err := a.catalogPool.Exec(ctx,
"INSERT INTO peerdb_stats.alerts_v1(alert_key,alert_message) VALUES($1,$2)",
alertKey, alertMessage)
func (a *Alerter) alertToSlack(ctx context.Context, slackAlertSender *slackAlertSender, alertKey string, alertMessage string) {
err := slackAlertSender.sendAlert(ctx,
":rotating_light:Alert:rotating_light:: "+alertKey, alertMessage)
if err != nil {
logger.LoggerFromCtx(ctx).Warn("failed to insert alert", slog.Any("error", err))
logger.LoggerFromCtx(ctx).Warn("failed to send alert", slog.Any("error", err))
return
}
}

// Only raises an alert if another alert with the same key hasn't been raised
// in the past X minutes, where X is configurable and defaults to 15 minutes
// returns true if alert added to catalog, so proceed with processing alerts to slack
func (a *Alerter) checkAndAddAlertToCatalog(ctx context.Context, alertKey string, alertMessage string) bool {
dur := dynamicconf.PeerDBAlertingGapMinutesAsDuration(ctx)
if dur == 0 {
logger.LoggerFromCtx(ctx).Warn("Alerting disabled via environment variable, returning")
return false
}

row := a.catalogPool.QueryRow(ctx,
`SELECT created_timestamp FROM peerdb_stats.alerts_v1 WHERE alert_key=$1
ORDER BY created_timestamp DESC LIMIT 1`,
alertKey)
var createdTimestamp time.Time
err := row.Scan(&createdTimestamp)
if err != nil && err != pgx.ErrNoRows {
logger.LoggerFromCtx(ctx).Warn("failed to send alert: ", slog.String("err", err.Error()))
return false
}

if time.Since(createdTimestamp) >= dur {
_, err = a.catalogPool.Exec(ctx,
"INSERT INTO peerdb_stats.alerts_v1(alert_key,alert_message) VALUES($1,$2)",
alertKey, alertMessage)
if err != nil {
logger.LoggerFromCtx(ctx).Warn("failed to insert alert", slog.Any("error", err))
return false
}
return true
}
return false
}

func (a *Alerter) LogFlowError(ctx context.Context, flowName string, err error) {
errorWithStack := fmt.Sprintf("%+v", err)
_, err = a.catalogPool.Exec(ctx,
Expand Down
18 changes: 12 additions & 6 deletions flow/shared/alerting/slack_alert_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,25 @@ import (
)

type slackAlertSender struct {
client *slack.Client
channelIDs []string
client *slack.Client
channelIDs []string
slotLagMBAlertThreshold uint32
openConnectionsAlertThreshold uint32
}

type slackAlertConfig struct {
AuthToken string `json:"auth_token"`
ChannelIDs []string `json:"channel_ids"`
AuthToken string `json:"auth_token"`
ChannelIDs []string `json:"channel_ids"`
SlotLagMBAlertThreshold uint32 `json:"slot_lag_mb_alert_threshold"`
OpenConnectionsAlertThreshold uint32 `json:"open_connections_alert_threshold"`
}

func newSlackAlertSender(config *slackAlertConfig) *slackAlertSender {
return &slackAlertSender{
client: slack.New(config.AuthToken),
channelIDs: config.ChannelIDs,
client: slack.New(config.AuthToken),
channelIDs: config.ChannelIDs,
slotLagMBAlertThreshold: config.SlotLagMBAlertThreshold,
openConnectionsAlertThreshold: config.OpenConnectionsAlertThreshold,
}
}

Expand Down
30 changes: 30 additions & 0 deletions ui/app/alert-config/new.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,23 @@ const NewAlertConfig = () => {
const [serviceType, setServiceType] = useState<string>();
const [authToken, setAuthToken] = useState<string>();
const [channelIdString, setChannelIdString] = useState<string>();
const [slotLagMBAlertThreshold, setSlotLagMBAlertThreshold] = useState<number>();
const [openConnectionsAlertThreshold, setOpenConnectionsAlertThreshold] = useState<number>();
const [loading, setLoading] = useState(false);
const handleAdd = async () => {
if (serviceType !== 'slack') {
notifyErr('Service Type must be selected');
return;
}
console.log(slotLagMBAlertThreshold);
console.log(openConnectionsAlertThreshold);
const alertConfigReq: alertConfigType = {
serviceType: serviceType,
serviceConfig: {
auth_token: authToken ?? '',
channel_ids: channelIdString?.split(',')!,
slot_lag_mb_alert_threshold: slotLagMBAlertThreshold || 0,
open_connections_alert_threshold: openConnectionsAlertThreshold || 0,
},
};
const alertReqValidity = alertConfigReqSchema.safeParse(alertConfigReq);
Expand Down Expand Up @@ -104,6 +110,30 @@ const NewAlertConfig = () => {
/>
</div>

<div>
<p>Slot Lag Alert Threshold (in MB)</p>
<TextField
style={{ height: '2.5rem', marginTop: '0.5rem' }}
variant='simple'
type={'number'}
placeholder='optional'
value={slotLagMBAlertThreshold}
onChange={(e) => setSlotLagMBAlertThreshold(e.target.valueAsNumber)}
/>
</div>

<div>
<p>Open Connections Alert Threshold</p>
<TextField
style={{ height: '2.5rem', marginTop: '0.5rem' }}
variant='simple'
type={'number'}
placeholder='optional'
value={openConnectionsAlertThreshold}
onChange={(e) => setOpenConnectionsAlertThreshold(e.target.valueAsNumber)}
/>
</div>

<Button
style={{ marginTop: '1rem', width: '20%', height: '2.5rem' }}
onClick={handleAdd}
Expand Down
12 changes: 12 additions & 0 deletions ui/app/alert-config/validation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,18 @@ export const alertConfigReqSchema = z.object({
required_error: 'We need a channel ID',
})
.min(1, { message: 'Atleast one channel ID is needed' }),
slot_lag_mb_alert_threshold: z
.number({
invalid_type_error: 'Threshold must be a number',
})
.int()
.min(0, 'Threshold must be non-negative'),
open_connections_alert_threshold: z
.number({
invalid_type_error: 'Threshold must be a number',
})
.int()
.min(0, 'Threshold must be non-negative'),
}),
});

Expand Down

0 comments on commit 8e4be75

Please sign in to comment.