Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

custom threshold support for slack alerts #1277

Merged
merged 5 commits into from
Feb 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 2 additions & 22 deletions flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,10 @@ import (

"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/connectors/utils/monitoring"
"github.com/PeerDB-io/peer-flow/dynamicconf"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/logger"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/model/qvalue"
"github.com/PeerDB-io/peer-flow/peerdbenv"
"github.com/PeerDB-io/peer-flow/shared/alerting"
)

Expand Down Expand Up @@ -928,33 +926,15 @@ func (c *PostgresConnector) HandleSlotInfo(
return nil
}

deploymentUIDPrefix := ""
if peerdbenv.PeerDBDeploymentUID() != "" {
deploymentUIDPrefix = fmt.Sprintf("[%s] ", peerdbenv.PeerDBDeploymentUID())
}

slotLagInMBThreshold := dynamicconf.PeerDBSlotLagMBAlertThreshold(ctx)
if (slotLagInMBThreshold > 0) && (slotInfo[0].LagInMb >= float32(slotLagInMBThreshold)) {
alerter.AlertIf(ctx, fmt.Sprintf("%s-slot-lag-threshold-exceeded", peerName),
fmt.Sprintf(`%sSlot `+"`%s`"+` on peer `+"`%s`"+` has exceeded threshold size of %dMB, currently at %.2fMB!
cc: <!channel>`,
deploymentUIDPrefix, slotName, peerName, slotLagInMBThreshold, slotInfo[0].LagInMb))
}
alerter.AlertIfSlotLag(ctx, peerName, slotInfo[0])

// Also handles alerts for PeerDB user connections exceeding a given limit here
maxOpenConnectionsThreshold := dynamicconf.PeerDBOpenConnectionsAlertThreshold(ctx)
res, err := getOpenConnectionsForUser(ctx, conn, c.config.User)
if err != nil {
logger.Warn("warning: failed to get current open connections", "error", err)
return err
}
if (maxOpenConnectionsThreshold > 0) && (res.CurrentOpenConnections >= int64(maxOpenConnectionsThreshold)) {
alerter.AlertIf(ctx, fmt.Sprintf("%s-max-open-connections-threshold-exceeded", peerName),
fmt.Sprintf(`%sOpen connections from PeerDB user `+"`%s`"+` on peer `+"`%s`"+
` has exceeded threshold size of %d connections, currently at %d connections!
cc: <!channel>`,
deploymentUIDPrefix, res.UserName, peerName, maxOpenConnectionsThreshold, res.CurrentOpenConnections))
}
alerter.AlertIfOpenConnections(ctx, peerName, res)

if len(slotInfo) != 0 {
return monitoring.AppendSlotSizeInfo(ctx, catalogPool, peerName, slotInfo[0])
Expand Down
150 changes: 116 additions & 34 deletions flow/shared/alerting/alerting.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package alerting
import (
"context"
"encoding/json"
"errors"
"fmt"
"log/slog"
"time"
Expand All @@ -11,16 +12,18 @@ import (
"github.com/jackc/pgx/v5/pgxpool"

"github.com/PeerDB-io/peer-flow/dynamicconf"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/logger"
"github.com/PeerDB-io/peer-flow/peerdbenv"
)

// alerting service, no cool name :(
type Alerter struct {
catalogPool *pgxpool.Pool
}

func registerSendersFromPool(ctx context.Context, catalogPool *pgxpool.Pool) ([]*slackAlertSender, error) {
rows, err := catalogPool.Query(ctx,
func (a *Alerter) registerSendersFromPool(ctx context.Context) ([]*slackAlertSender, error) {
rows, err := a.catalogPool.Query(ctx,
"SELECT service_type,service_config FROM peerdb_stats.alerting_config")
if err != nil {
return nil, fmt.Errorf("failed to read alerter config from catalog: %w", err)
Expand Down Expand Up @@ -50,68 +53,147 @@ func registerSendersFromPool(ctx context.Context, catalogPool *pgxpool.Pool) ([]
// doesn't take care of closing pool, needs to be done externally.
func NewAlerter(catalogPool *pgxpool.Pool) (*Alerter, error) {
if catalogPool == nil {
return nil, fmt.Errorf("catalog pool is nil for Alerter")
return nil, errors.New("catalog pool is nil for Alerter")
}

return &Alerter{
catalogPool: catalogPool,
}, nil
}

// Only raises an alert if another alert with the same key hasn't been raised
// in the past X minutes, where X is configurable and defaults to 15 minutes
func (a *Alerter) AlertIf(ctx context.Context, alertKey string, alertMessage string) {
dur := dynamicconf.PeerDBAlertingGapMinutesAsDuration(ctx)
if dur == 0 {
logger.LoggerFromCtx(ctx).Warn("Alerting disabled via environment variable, returning")
func (a *Alerter) AlertIfSlotLag(ctx context.Context, peerName string, slotInfo *protos.SlotInfo) {
slackAlertSenders, err := a.registerSendersFromPool(ctx)
if err != nil {
logger.LoggerFromCtx(ctx).Warn("failed to set Slack senders", slog.Any("error", err))
return
}

var err error
row := a.catalogPool.QueryRow(ctx,
`SELECT created_timestamp FROM peerdb_stats.alerts_v1 WHERE alert_key=$1
ORDER BY created_timestamp DESC LIMIT 1`,
alertKey)
var createdTimestamp time.Time
err = row.Scan(&createdTimestamp)
if err != nil && err != pgx.ErrNoRows {
logger.LoggerFromCtx(ctx).Warn("failed to send alert: ", slog.String("err", err.Error()))
return
deploymentUIDPrefix := ""
if peerdbenv.PeerDBDeploymentUID() != "" {
deploymentUIDPrefix = fmt.Sprintf("[%s] ", peerdbenv.PeerDBDeploymentUID())
}

if time.Since(createdTimestamp) >= dur {
a.AddAlertToCatalog(ctx, alertKey, alertMessage)
a.AlertToSlack(ctx, alertKey, alertMessage)
defaultSlotLagMBAlertThreshold := dynamicconf.PeerDBSlotLagMBAlertThreshold(ctx)
// catalog cannot use default threshold to space alerts properly, use the lowest set threshold instead
lowestSlotLagMBAlertThreshold := defaultSlotLagMBAlertThreshold
for _, slackAlertSender := range slackAlertSenders {
if slackAlertSender.slotLagMBAlertThreshold > 0 {
lowestSlotLagMBAlertThreshold = min(lowestSlotLagMBAlertThreshold, slackAlertSender.slotLagMBAlertThreshold)
}
}

alertKey := peerName + "-slot-lag-threshold-exceeded"
alertMessageTemplate := fmt.Sprintf("%sSlot `%s` on peer `%s` has exceeded threshold size of %%dMB, "+
`currently at %.2fMB!
cc: <!channel>`, deploymentUIDPrefix, slotInfo.SlotName, peerName, slotInfo.LagInMb)

if slotInfo.LagInMb > float32(lowestSlotLagMBAlertThreshold) &&
a.checkAndAddAlertToCatalog(ctx, alertKey, fmt.Sprintf(alertMessageTemplate, lowestSlotLagMBAlertThreshold)) {
for _, slackAlertSender := range slackAlertSenders {
if slackAlertSender.slotLagMBAlertThreshold > 0 {
if slotInfo.LagInMb > float32(slackAlertSender.slotLagMBAlertThreshold) {
a.alertToSlack(ctx, slackAlertSender, alertKey,
fmt.Sprintf(alertMessageTemplate, slackAlertSender.slotLagMBAlertThreshold))
}
} else {
if slotInfo.LagInMb > float32(defaultSlotLagMBAlertThreshold) {
a.alertToSlack(ctx, slackAlertSender, alertKey,
fmt.Sprintf(alertMessageTemplate, defaultSlotLagMBAlertThreshold))
}
}
}
}
}

func (a *Alerter) AlertToSlack(ctx context.Context, alertKey string, alertMessage string) {
slackAlertSenders, err := registerSendersFromPool(ctx, a.catalogPool)
func (a *Alerter) AlertIfOpenConnections(ctx context.Context, peerName string,
openConnections *protos.GetOpenConnectionsForUserResult,
) {
slackAlertSenders, err := a.registerSendersFromPool(ctx)
if err != nil {
logger.LoggerFromCtx(ctx).Warn("failed to set Slack senders", slog.Any("error", err))
return
}

deploymentUIDPrefix := ""
if peerdbenv.PeerDBDeploymentUID() != "" {
deploymentUIDPrefix = fmt.Sprintf("[%s] ", peerdbenv.PeerDBDeploymentUID())
}

// same as with slot lag, use lowest threshold for catalog
defaultOpenConnectionsThreshold := dynamicconf.PeerDBOpenConnectionsAlertThreshold(ctx)
lowestOpenConnectionsThreshold := defaultOpenConnectionsThreshold
for _, slackAlertSender := range slackAlertSenders {
err = slackAlertSender.sendAlert(ctx,
fmt.Sprintf(":rotating_light:Alert:rotating_light:: %s", alertKey), alertMessage)
if err != nil {
logger.LoggerFromCtx(ctx).Warn("failed to send alert", slog.Any("error", err))
return
if slackAlertSender.openConnectionsAlertThreshold > 0 {
lowestOpenConnectionsThreshold = min(lowestOpenConnectionsThreshold, slackAlertSender.openConnectionsAlertThreshold)
}
}

alertKey := peerName + "-max-open-connections-threshold-exceeded"
alertMessageTemplate := fmt.Sprintf("%sOpen connections from PeerDB user `%s` on peer `%s`"+
` has exceeded threshold size of %%d connections, currently at %d connections!
cc: <!channel>`, deploymentUIDPrefix, openConnections.UserName, peerName, openConnections.CurrentOpenConnections)

if openConnections.CurrentOpenConnections > int64(lowestOpenConnectionsThreshold) &&
a.checkAndAddAlertToCatalog(ctx, alertKey, fmt.Sprintf(alertMessageTemplate, lowestOpenConnectionsThreshold)) {
for _, slackAlertSender := range slackAlertSenders {
if slackAlertSender.openConnectionsAlertThreshold > 0 {
if openConnections.CurrentOpenConnections > int64(slackAlertSender.openConnectionsAlertThreshold) {
a.alertToSlack(ctx, slackAlertSender, alertKey,
fmt.Sprintf(alertMessageTemplate, slackAlertSender.openConnectionsAlertThreshold))
}
} else {
if openConnections.CurrentOpenConnections > int64(defaultOpenConnectionsThreshold) {
a.alertToSlack(ctx, slackAlertSender, alertKey,
fmt.Sprintf(alertMessageTemplate, defaultOpenConnectionsThreshold))
}
}
}
}
}

func (a *Alerter) AddAlertToCatalog(ctx context.Context, alertKey string, alertMessage string) {
_, err := a.catalogPool.Exec(ctx,
"INSERT INTO peerdb_stats.alerts_v1(alert_key,alert_message) VALUES($1,$2)",
alertKey, alertMessage)
func (a *Alerter) alertToSlack(ctx context.Context, slackAlertSender *slackAlertSender, alertKey string, alertMessage string) {
err := slackAlertSender.sendAlert(ctx,
":rotating_light:Alert:rotating_light:: "+alertKey, alertMessage)
if err != nil {
logger.LoggerFromCtx(ctx).Warn("failed to insert alert", slog.Any("error", err))
logger.LoggerFromCtx(ctx).Warn("failed to send alert", slog.Any("error", err))
return
}
}

// Only raises an alert if another alert with the same key hasn't been raised
// in the past X minutes, where X is configurable and defaults to 15 minutes
// returns true if alert added to catalog, so proceed with processing alerts to slack
func (a *Alerter) checkAndAddAlertToCatalog(ctx context.Context, alertKey string, alertMessage string) bool {
dur := dynamicconf.PeerDBAlertingGapMinutesAsDuration(ctx)
if dur == 0 {
logger.LoggerFromCtx(ctx).Warn("Alerting disabled via environment variable, returning")
return false
}

row := a.catalogPool.QueryRow(ctx,
`SELECT created_timestamp FROM peerdb_stats.alerts_v1 WHERE alert_key=$1
ORDER BY created_timestamp DESC LIMIT 1`,
alertKey)
var createdTimestamp time.Time
err := row.Scan(&createdTimestamp)
if err != nil && err != pgx.ErrNoRows {
logger.LoggerFromCtx(ctx).Warn("failed to send alert: ", slog.String("err", err.Error()))
return false
}

if time.Since(createdTimestamp) >= dur {
_, err = a.catalogPool.Exec(ctx,
"INSERT INTO peerdb_stats.alerts_v1(alert_key,alert_message) VALUES($1,$2)",
alertKey, alertMessage)
if err != nil {
logger.LoggerFromCtx(ctx).Warn("failed to insert alert", slog.Any("error", err))
return false
}
return true
}
return false
}

func (a *Alerter) LogFlowError(ctx context.Context, flowName string, err error) {
errorWithStack := fmt.Sprintf("%+v", err)
_, err = a.catalogPool.Exec(ctx,
Expand Down
18 changes: 12 additions & 6 deletions flow/shared/alerting/slack_alert_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,25 @@ import (
)

type slackAlertSender struct {
client *slack.Client
channelIDs []string
client *slack.Client
channelIDs []string
slotLagMBAlertThreshold uint32
openConnectionsAlertThreshold uint32
}

type slackAlertConfig struct {
AuthToken string `json:"auth_token"`
ChannelIDs []string `json:"channel_ids"`
AuthToken string `json:"auth_token"`
ChannelIDs []string `json:"channel_ids"`
SlotLagMBAlertThreshold uint32 `json:"slot_lag_mb_alert_threshold"`
OpenConnectionsAlertThreshold uint32 `json:"open_connections_alert_threshold"`
}

func newSlackAlertSender(config *slackAlertConfig) *slackAlertSender {
return &slackAlertSender{
client: slack.New(config.AuthToken),
channelIDs: config.ChannelIDs,
client: slack.New(config.AuthToken),
channelIDs: config.ChannelIDs,
slotLagMBAlertThreshold: config.SlotLagMBAlertThreshold,
openConnectionsAlertThreshold: config.OpenConnectionsAlertThreshold,
}
}

Expand Down
34 changes: 34 additions & 0 deletions ui/app/alert-config/new.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,25 @@ const NewAlertConfig = () => {
const [serviceType, setServiceType] = useState<string>();
const [authToken, setAuthToken] = useState<string>();
const [channelIdString, setChannelIdString] = useState<string>();
const [slotLagMBAlertThreshold, setSlotLagMBAlertThreshold] =
useState<number>();
const [openConnectionsAlertThreshold, setOpenConnectionsAlertThreshold] =
useState<number>();
const [loading, setLoading] = useState(false);
const handleAdd = async () => {
if (serviceType !== 'slack') {
notifyErr('Service Type must be selected');
return;
}
console.log(slotLagMBAlertThreshold);
console.log(openConnectionsAlertThreshold);
const alertConfigReq: alertConfigType = {
serviceType: serviceType,
serviceConfig: {
auth_token: authToken ?? '',
channel_ids: channelIdString?.split(',')!,
slot_lag_mb_alert_threshold: slotLagMBAlertThreshold || 0,
open_connections_alert_threshold: openConnectionsAlertThreshold || 0,
},
};
const alertReqValidity = alertConfigReqSchema.safeParse(alertConfigReq);
Expand Down Expand Up @@ -104,6 +112,32 @@ const NewAlertConfig = () => {
/>
</div>

<div>
<p>Slot Lag Alert Threshold (in MB)</p>
<TextField
style={{ height: '2.5rem', marginTop: '0.5rem' }}
variant='simple'
type={'number'}
placeholder='optional'
value={slotLagMBAlertThreshold}
onChange={(e) => setSlotLagMBAlertThreshold(e.target.valueAsNumber)}
/>
</div>

<div>
<p>Open Connections Alert Threshold</p>
<TextField
style={{ height: '2.5rem', marginTop: '0.5rem' }}
variant='simple'
type={'number'}
placeholder='optional'
value={openConnectionsAlertThreshold}
onChange={(e) =>
setOpenConnectionsAlertThreshold(e.target.valueAsNumber)
}
/>
</div>

<Button
style={{ marginTop: '1rem', width: '20%', height: '2.5rem' }}
onClick={handleAdd}
Expand Down
12 changes: 12 additions & 0 deletions ui/app/alert-config/validation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,18 @@ export const alertConfigReqSchema = z.object({
required_error: 'We need a channel ID',
})
.min(1, { message: 'Atleast one channel ID is needed' }),
slot_lag_mb_alert_threshold: z
.number({
invalid_type_error: 'Threshold must be a number',
})
.int()
.min(0, 'Threshold must be non-negative'),
open_connections_alert_threshold: z
.number({
invalid_type_error: 'Threshold must be a number',
})
.int()
.min(0, 'Threshold must be non-negative'),
}),
});

Expand Down
Loading