Skip to content

Commit

Permalink
slot lag and conn alerts
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed Apr 9, 2024
1 parent d47a140 commit 5ac7933
Show file tree
Hide file tree
Showing 7 changed files with 176 additions and 46 deletions.
26 changes: 3 additions & 23 deletions flow/activities/slot.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,11 @@ package activities

import (
"context"
"fmt"
"log/slog"
"time"

"github.com/PeerDB-io/peer-flow/connectors"
"github.com/PeerDB-io/peer-flow/connectors/utils/monitoring"
"github.com/PeerDB-io/peer-flow/dynamicconf"
"github.com/PeerDB-io/peer-flow/peerdbenv"
)

func (a *FlowableActivity) handleSlotInfo(
Expand All @@ -29,33 +26,16 @@ func (a *FlowableActivity) handleSlotInfo(
return nil
}

deploymentUIDPrefix := ""
if peerdbenv.PeerDBDeploymentUID() != "" {
deploymentUIDPrefix = fmt.Sprintf("[%s] ", peerdbenv.PeerDBDeploymentUID())
}

slotLagInMBThreshold := dynamicconf.PeerDBSlotLagMBAlertThreshold(ctx)
if (slotLagInMBThreshold > 0) && (slotInfo[0].LagInMb >= float32(slotLagInMBThreshold)) {
a.Alerter.AlertIf(ctx, fmt.Sprintf("%s-slot-lag-threshold-exceeded", peerName),
fmt.Sprintf(`%sSlot `+"`%s`"+` on peer `+"`%s`"+` has exceeded threshold size of %dMB, currently at %.2fMB!
cc: <!channel>`,
deploymentUIDPrefix, slotName, peerName, slotLagInMBThreshold, slotInfo[0].LagInMb))
}
a.Alerter.AlertIfSlotLag(ctx, peerName, slotInfo[0])

// Also handles alerts for PeerDB user connections exceeding a given limit here
maxOpenConnectionsThreshold := dynamicconf.PeerDBOpenConnectionsAlertThreshold(ctx)
res, err := srcConn.GetOpenConnectionsForUser()
if err != nil {
slog.WarnContext(ctx, "warning: failed to get current open connections", slog.Any("error", err))
return err
}
if (maxOpenConnectionsThreshold > 0) && (res.CurrentOpenConnections >= int64(maxOpenConnectionsThreshold)) {
a.Alerter.AlertIf(ctx, fmt.Sprintf("%s-max-open-connections-threshold-exceeded", peerName),
fmt.Sprintf(`%sOpen connections from PeerDB user `+"`%s`"+` on peer `+"`%s`"+
` has exceeded threshold size of %d connections, currently at %d connections!
cc: <!channel>`,
deploymentUIDPrefix, res.UserName, peerName, maxOpenConnectionsThreshold, res.CurrentOpenConnections))
}

a.Alerter.AlertIfOpenConnections(ctx, peerName, res)

if len(slotInfo) != 0 {
return monitoring.AppendSlotSizeInfo(ctx, a.CatalogPool, peerName, slotInfo[0])
Expand Down
126 changes: 114 additions & 12 deletions flow/shared/alerting/alerting.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,17 @@ package alerting
import (
"context"
"encoding/json"
"errors"
"fmt"
"log/slog"
"time"

"github.com/PeerDB-io/peer-flow/dynamicconf"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"

"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/peerdbenv"
)

// alerting service, no cool name :(
Expand All @@ -18,8 +22,8 @@ type Alerter struct {
logger *slog.Logger
}

func registerSendersFromPool(ctx context.Context, catalogPool *pgxpool.Pool) ([]*slackAlertSender, error) {
rows, err := catalogPool.Query(ctx,
func (a *Alerter) registerSendersFromPool(ctx context.Context) ([]*slackAlertSender, error) {
rows, err := a.catalogPool.Query(ctx,
"SELECT service_type,service_config FROM peerdb_stats.alerting_config")
if err != nil {
return nil, fmt.Errorf("failed to read alerter config from catalog: %w", err)
Expand Down Expand Up @@ -50,8 +54,7 @@ func registerSendersFromPool(ctx context.Context, catalogPool *pgxpool.Pool) ([]
func NewAlerter(catalogPool *pgxpool.Pool) (*Alerter, error) {
logger := slog.Default()
if catalogPool == nil {
logger.Error("catalog pool is nil for Alerter")
return nil, fmt.Errorf("catalog pool is nil for Alerter")
return nil, errors.New("catalog pool is nil for Alerter")
}

return &Alerter{
Expand All @@ -60,35 +63,134 @@ func NewAlerter(catalogPool *pgxpool.Pool) (*Alerter, error) {
}, nil
}

func (a *Alerter) AlertIfSlotLag(ctx context.Context, peerName string, slotInfo *protos.SlotInfo) {
slackAlertSenders, err := a.registerSendersFromPool(ctx)
if err != nil {
return
}

deploymentUIDPrefix := ""
if peerdbenv.PeerDBDeploymentUID() != "" {
deploymentUIDPrefix = fmt.Sprintf("[%s] ", peerdbenv.PeerDBDeploymentUID())
}

defaultSlotLagMBAlertThreshold := dynamicconf.PeerDBSlotLagMBAlertThreshold(ctx)
// catalog cannot use default threshold to space alerts properly, use the lowest set threshold instead
lowestSlotLagMBAlertThreshold := defaultSlotLagMBAlertThreshold
for _, slackAlertSender := range slackAlertSenders {
if slackAlertSender.slotLagMBAlertThreshold > 0 {
lowestSlotLagMBAlertThreshold = min(lowestSlotLagMBAlertThreshold, slackAlertSender.slotLagMBAlertThreshold)
}
}

alertKey := peerName + "-slot-lag-threshold-exceeded"
alertMessageTemplate := fmt.Sprintf("%sSlot `%s` on peer `%s` has exceeded threshold size of %%dMB, "+
`currently at %.2fMB!
cc: <!channel>`, deploymentUIDPrefix, slotInfo.SlotName, peerName, slotInfo.LagInMb)

if slotInfo.LagInMb > float32(lowestSlotLagMBAlertThreshold) &&
a.checkAndAddAlertToCatalog(ctx, alertKey, fmt.Sprintf(alertMessageTemplate, lowestSlotLagMBAlertThreshold)) {
for _, slackAlertSender := range slackAlertSenders {
if slackAlertSender.slotLagMBAlertThreshold > 0 {
slog.Info("should alert slot lag")
if slotInfo.LagInMb > float32(slackAlertSender.slotLagMBAlertThreshold) {
slog.Info("Inside slack alert for slot")
a.alertToSlack(ctx, slackAlertSender, alertKey,
fmt.Sprintf(alertMessageTemplate, slackAlertSender.slotLagMBAlertThreshold))
}
} else {
if slotInfo.LagInMb > float32(defaultSlotLagMBAlertThreshold) {
a.alertToSlack(ctx, slackAlertSender, alertKey,
fmt.Sprintf(alertMessageTemplate, defaultSlotLagMBAlertThreshold))
}
}
}
}
}

func (a *Alerter) AlertIfOpenConnections(ctx context.Context, peerName string,
openConnections *protos.GetOpenConnectionsForUserResult,
) {
slackAlertSenders, err := a.registerSendersFromPool(ctx)
if err != nil {
return
}

deploymentUIDPrefix := ""
if peerdbenv.PeerDBDeploymentUID() != "" {
deploymentUIDPrefix = fmt.Sprintf("[%s] ", peerdbenv.PeerDBDeploymentUID())
}

// same as with slot lag, use lowest threshold for catalog
defaultOpenConnectionsThreshold := dynamicconf.PeerDBOpenConnectionsAlertThreshold(ctx)
lowestOpenConnectionsThreshold := defaultOpenConnectionsThreshold
for _, slackAlertSender := range slackAlertSenders {
if slackAlertSender.openConnectionsAlertThreshold > 0 {
lowestOpenConnectionsThreshold = min(lowestOpenConnectionsThreshold, slackAlertSender.openConnectionsAlertThreshold)
}
}

alertKey := peerName + "-max-open-connections-threshold-exceeded"
alertMessageTemplate := fmt.Sprintf("%sOpen connections from PeerDB user `%s` on peer `%s`"+
` has exceeded threshold size of %%d connections, currently at %d connections!
cc: <!channel>`, deploymentUIDPrefix, openConnections.UserName, peerName, openConnections.CurrentOpenConnections)

if openConnections.CurrentOpenConnections > int64(lowestOpenConnectionsThreshold) &&
a.checkAndAddAlertToCatalog(ctx, alertKey, fmt.Sprintf(alertMessageTemplate, lowestOpenConnectionsThreshold)) {
for _, slackAlertSender := range slackAlertSenders {
if slackAlertSender.openConnectionsAlertThreshold > 0 {
slog.Info("should alert connections")
if openConnections.CurrentOpenConnections > int64(slackAlertSender.openConnectionsAlertThreshold) {
slog.Info("Inside slack alert for conn")
a.alertToSlack(ctx, slackAlertSender, alertKey,
fmt.Sprintf(alertMessageTemplate, slackAlertSender.openConnectionsAlertThreshold))
}
} else {
if openConnections.CurrentOpenConnections > int64(defaultOpenConnectionsThreshold) {
a.alertToSlack(ctx, slackAlertSender, alertKey,
fmt.Sprintf(alertMessageTemplate, defaultOpenConnectionsThreshold))
}
}
}
}
}

func (a *Alerter) alertToSlack(ctx context.Context, slackAlertSender *slackAlertSender, alertKey string, alertMessage string) {
err := slackAlertSender.sendAlert(ctx,
":rotating_light:Alert:rotating_light:: "+alertKey, alertMessage)
if err != nil {
return
}
}

// Only raises an alert if another alert with the same key hasn't been raised
// in the past X minutes, where X is configurable and defaults to 15 minutes
func (a *Alerter) AlertIf(ctx context.Context, alertKey string, alertMessage string) {
// returns true if alert added to catalog, so proceed with processing alerts to slack
func (a *Alerter) checkAndAddAlertToCatalog(ctx context.Context, alertKey string, alertMessage string) bool {
dur := dynamicconf.PeerDBAlertingGapMinutesAsDuration(ctx)
if dur == 0 {
a.logger.WarnContext(ctx, "Alerting disabled via environment variable, returning")
return
return false
}

var err error
row := a.catalogPool.QueryRow(ctx,
`SELECT created_timestamp FROM peerdb_stats.alerts_v1 WHERE alert_key=$1
ORDER BY created_timestamp DESC LIMIT 1`,
alertKey)
var createdTimestamp time.Time
err = row.Scan(&createdTimestamp)
err := row.Scan(&createdTimestamp)
if err != nil && err != pgx.ErrNoRows {
a.logger.Warn("failed to send alert: ", slog.String("err", err.Error()))
return
return false
}

if time.Since(createdTimestamp) >= dur {
a.AddAlertToCatalog(ctx, alertKey, alertMessage)
a.AlertToSlack(ctx, alertKey, alertMessage)
}
return true
}

func (a *Alerter) AlertToSlack(ctx context.Context, alertKey string, alertMessage string) {
slackAlertSenders, err := registerSendersFromPool(ctx, a.catalogPool)
slackAlertSenders, err := a.registerSendersFromPool(ctx)
if err != nil {
a.logger.WarnContext(ctx, "failed to set Slack senders", slog.Any("error", err))
return
Expand Down
18 changes: 12 additions & 6 deletions flow/shared/alerting/slack_alert_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,25 @@ import (
)

type slackAlertSender struct {
client *slack.Client
channelIDs []string
client *slack.Client
channelIDs []string
slotLagMBAlertThreshold uint32
openConnectionsAlertThreshold uint32
}

type slackAlertConfig struct {
AuthToken string `json:"auth_token"`
ChannelIDs []string `json:"channel_ids"`
AuthToken string `json:"auth_token"`
ChannelIDs []string `json:"channel_ids"`
SlotLagMBAlertThreshold uint32 `json:"slot_lag_mb_alert_threshold"`
OpenConnectionsAlertThreshold uint32 `json:"open_connections_alert_threshold"`
}

func newSlackAlertSender(config *slackAlertConfig) *slackAlertSender {
return &slackAlertSender{
client: slack.New(config.AuthToken),
channelIDs: config.ChannelIDs,
client: slack.New(config.AuthToken),
channelIDs: config.ChannelIDs,
slotLagMBAlertThreshold: config.SlotLagMBAlertThreshold,
openConnectionsAlertThreshold: config.OpenConnectionsAlertThreshold,
}
}

Expand Down
33 changes: 33 additions & 0 deletions ui/app/alert-config/new.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,24 @@ const NewAlertConfig = () => {
const [serviceType, setServiceType] = useState<string>();
const [authToken, setAuthToken] = useState<string>();
const [channelIdString, setChannelIdString] = useState<string>();
const [slotLagMBAlertThreshold, setSlotLagMBAlertThreshold] =
useState<number>();
const [openConnectionsAlertThreshold, setOpenConnectionsAlertThreshold] =
useState<number>();
const [loading, setLoading] = useState(false);
const handleAdd = async () => {
if (serviceType !== 'slack') {
notifyErr('Service Type must be selected');
return;
}

const alertConfigReq: alertConfigType = {
serviceType: serviceType,
serviceConfig: {
auth_token: authToken ?? '',
channel_ids: channelIdString?.split(',')!,
slot_lag_mb_alert_threshold: slotLagMBAlertThreshold || 5000,
open_connections_alert_threshold: openConnectionsAlertThreshold || 5,
},
};
const alertReqValidity = alertConfigReqSchema.safeParse(alertConfigReq);
Expand Down Expand Up @@ -104,6 +111,32 @@ const NewAlertConfig = () => {
/>
</div>

<div>
<p>Slot Lag Alert Threshold (in MB)</p>
<TextField
style={{ height: '2.5rem', marginTop: '0.5rem' }}
variant='simple'
type={'number'}
placeholder='optional'
value={slotLagMBAlertThreshold}
onChange={(e) => setSlotLagMBAlertThreshold(e.target.valueAsNumber)}
/>
</div>

<div>
<p>Open Connections Alert Threshold</p>
<TextField
style={{ height: '2.5rem', marginTop: '0.5rem' }}
variant='simple'
type={'number'}
placeholder='optional'
value={openConnectionsAlertThreshold}
onChange={(e) =>
setOpenConnectionsAlertThreshold(e.target.valueAsNumber)
}
/>
</div>

<Button
style={{ marginTop: '1rem', width: '20%', height: '2.5rem' }}
onClick={handleAdd}
Expand Down
12 changes: 12 additions & 0 deletions ui/app/alert-config/validation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,18 @@ export const alertConfigReqSchema = z.object({
required_error: 'We need a channel ID',
})
.min(1, { message: 'Atleast one channel ID is needed' }),
slot_lag_mb_alert_threshold: z
.number({
invalid_type_error: 'Threshold must be a number',
})
.int()
.min(0, 'Threshold must be non-negative'),
open_connections_alert_threshold: z
.number({
invalid_type_error: 'Threshold must be a number',
})
.int()
.min(0, 'Threshold must be non-negative'),
}),
});

Expand Down
3 changes: 1 addition & 2 deletions ui/app/api/alert-config/route.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
import { alertConfigType } from '@/app/alert-config/validation';
import prisma from '@/app/utils/prisma';
import { alerting_config } from '@prisma/client';

export async function GET() {
const configs: alerting_config[] = await prisma.alerting_config.findMany();
const configs: any[] = await prisma.alerting_config.findMany();
const serializedConfigs = configs.map((config) => ({
...config,
id: String(config.id),
Expand Down
4 changes: 1 addition & 3 deletions ui/app/dto/AlertDTO.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
import { Prisma } from '@prisma/client';

export type UAlertConfigResponse = {
id: bigint;
service_type: string;
service_config: Prisma.JsonValue;
service_config: any;
};

export type MirrorLogsRequest = {
Expand Down

0 comments on commit 5ac7933

Please sign in to comment.