Skip to content

Commit

Permalink
basic alerting, refactored to use slack-go instead
Browse files Browse the repository at this point in the history
  • Loading branch information
heavycrystal committed Dec 21, 2023
1 parent 9ec86d4 commit 3994468
Show file tree
Hide file tree
Showing 10 changed files with 184 additions and 3 deletions.
3 changes: 2 additions & 1 deletion docker-compose-dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ x-flow-worker-env: &flow-worker-env
# enables worker profiling using Grafana Pyroscope
ENABLE_PROFILING: "true"
PYROSCOPE_SERVER_ADDRESS: http://pyroscope:4040
PEERDB_SLOT_LAG_MB_ALERT_THRESHOLD: 10

services:
catalog:
Expand Down Expand Up @@ -133,7 +134,7 @@ services:
dockerfile: stacks/flow.Dockerfile
target: flow-snapshot-worker
environment:
<<: [*flow-worker-env]
<<: [*catalog-config, *flow-worker-env]
depends_on:
temporal-admin-tools:
condition: service_healthy
Expand Down
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ services:
container_name: flow-snapshot-worker
image: ghcr.io/peerdb-io/flow-snapshot-worker:latest-dev
environment:
<<: [*flow-worker-env]
<<: [*catalog-config, *flow-worker-env]
depends_on:
temporal-admin-tools:
condition: service_healthy
Expand Down
18 changes: 17 additions & 1 deletion flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/peerdbenv"
"github.com/PeerDB-io/peer-flow/shared"
"github.com/PeerDB-io/peer-flow/shared/alerting"
"github.com/jackc/pglogrepl"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgtype"
Expand All @@ -42,6 +43,7 @@ type SlotSnapshotSignal struct {

type FlowableActivity struct {
CatalogPool *pgxpool.Pool
Alerter *alerting.Alerter
}

// CheckConnection implements CheckConnection.
Expand Down Expand Up @@ -178,6 +180,14 @@ func (a *FlowableActivity) handleSlotInfo(
return err
}

slotLagInMBThreshold := peerdbenv.GetPeerDBSlotLagMBAlertThreshold()
if uint32(slotInfo[0].LagInMb) >= slotLagInMBThreshold {
a.Alerter.AlertIf(ctx, fmt.Sprintf("%s-slot-lag-threshold-exceeded", peerName),
fmt.Sprintf(`Slot `+"`%s`"+` on peer `+"`%s`"+` has exceeded threshold size of %dMB, currently at %.2fMB!
cc: <!channel>`,
slotName, peerName, slotLagInMBThreshold, slotInfo[0].LagInMb))
}

if len(slotInfo) != 0 {
return monitoring.AppendSlotSizeInfo(ctx, a.CatalogPool, peerName, slotInfo[0])
}
Expand All @@ -190,7 +200,13 @@ func (a *FlowableActivity) recordSlotSizePeriodically(
slotName string,
peerName string,
) {
timeout := 10 * time.Minute
// ensures slot info is logged atleast once per SyncFlow
err := a.handleSlotInfo(ctx, srcConn, slotName, peerName)
if err != nil {
return
}

timeout := 5 * time.Minute
ticker := time.NewTicker(timeout)

defer ticker.Stop()
Expand Down
2 changes: 2 additions & 0 deletions flow/cmd/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/PeerDB-io/peer-flow/activities"
utils "github.com/PeerDB-io/peer-flow/connectors/utils/catalog"
"github.com/PeerDB-io/peer-flow/shared"
"github.com/PeerDB-io/peer-flow/shared/alerting"
peerflow "github.com/PeerDB-io/peer-flow/workflows"

"github.com/grafana/pyroscope-go"
Expand Down Expand Up @@ -133,6 +134,7 @@ func WorkerMain(opts *WorkerOptions) error {
w.RegisterWorkflow(peerflow.HeartbeatFlowWorkflow)
w.RegisterActivity(&activities.FlowableActivity{
CatalogPool: conn,
Alerter: alerting.NewAlerter(conn),
})

err = w.Run(worker.InterruptCh())
Expand Down
2 changes: 2 additions & 0 deletions flow/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ require (
github.com/linkedin/goavro/v2 v2.12.0
github.com/microsoft/go-mssqldb v1.6.0
github.com/orcaman/concurrent-map/v2 v2.0.1
github.com/slack-go/slack v0.12.3
github.com/snowflakedb/gosnowflake v1.7.1
github.com/stretchr/testify v1.8.4
github.com/twpayne/go-geos v0.14.0
Expand Down Expand Up @@ -53,6 +54,7 @@ require (
github.com/getsentry/sentry-go v0.25.0 // indirect
github.com/go-logr/logr v1.3.0 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/gorilla/websocket v1.5.1 // indirect
github.com/kr/pretty v0.3.1 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 // indirect
Expand Down
8 changes: 8 additions & 0 deletions flow/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,8 @@ github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre
github.com/go-sql-driver/mysql v1.6.0 h1:BCTh4TKNUYmOmMUcQ3IipzF5prigylS7XXjEkfCHuOE=
github.com/go-sql-driver/mysql v1.6.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg=
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
github.com/go-test/deep v1.0.4 h1:u2CU3YKy9I2pmu9pX0eq50wCgjfGIt539SqR7FbHiho=
github.com/go-test/deep v1.0.4/go.mod h1:wGDj63lr65AM2AQyKZd/NYHGb0R+1RLqB8NKt3aSFNA=
github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU=
github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I=
github.com/godbus/dbus v0.0.0-20190726142602-4481cbc300e2 h1:ZpnhV/YsD2/4cESfV5+Hoeu/iUR3ruzNvZ+yQfO03a0=
Expand Down Expand Up @@ -221,6 +223,7 @@ github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.7/go.mod h1:n+brtR0CgQNWTVd5ZUFpTBC8YFBDLK/h/bpaJ8/DtOE=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/martian/v3 v3.3.2 h1:IqNFLAmvJOgVlpdEBiQbDc2EwKW77amAycfTuWKdfvw=
Expand All @@ -235,6 +238,9 @@ github.com/googleapis/enterprise-certificate-proxy v0.3.2 h1:Vie5ybvEvT75RniqhfF
github.com/googleapis/enterprise-certificate-proxy v0.3.2/go.mod h1:VLSiSSBs/ksPL8kq3OBOQ6WRI2QnaFynd1DCjZ62+V0=
github.com/googleapis/gax-go/v2 v2.12.0 h1:A+gCJKdRfqXkr+BIRGtZLibNXf0m1f9E4HG56etFpas=
github.com/googleapis/gax-go/v2 v2.12.0/go.mod h1:y+aIqrI5eb1YGMVJfuV3185Ts/D7qKpsEkdD5+I6QGU=
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/gorilla/websocket v1.5.1 h1:gmztn0JnHVt9JZquRuzLw3g4wouNVzKL15iLr/zn/QY=
github.com/gorilla/websocket v1.5.1/go.mod h1:x3kM2JMyaluk02fnUJpQuwD2dCS5NDG2ZHL0uE0tcaY=
github.com/grafana/pyroscope-go v1.0.4 h1:oyQX0BOkL+iARXzHuCdIF5TQ7/sRSel1YFViMHC7Bm0=
github.com/grafana/pyroscope-go v1.0.4/go.mod h1:0d7ftwSMBV/Awm7CCiYmHQEG8Y44Ma3YSjt+nWcWztY=
github.com/grafana/pyroscope-go/godeltaprof v0.1.6 h1:nEdZ8louGAplSvIJi1HVp7kWvFvdiiYg3COLlTwJiFo=
Expand Down Expand Up @@ -351,6 +357,8 @@ github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQD
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ=
github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
github.com/slack-go/slack v0.12.3 h1:92/dfFU8Q5XP6Wp5rr5/T5JHLM5c5Smtn53fhToAP88=
github.com/slack-go/slack v0.12.3/go.mod h1:hlGi5oXA+Gt+yWTPP0plCdRKmjsDxecdHxYQdlMQKOw=
github.com/snowflakedb/gosnowflake v1.7.1 h1:c9JjyjjDlvxex9ud71TwKL+Wu54Vfx+39h4DAwbIdqU=
github.com/snowflakedb/gosnowflake v1.7.1/go.mod h1:JI3eRZL8CpimPek6CJO0aTbDQjDGOt7Rxv9A/ti4f5c=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
Expand Down
5 changes: 5 additions & 0 deletions flow/peerdbenv/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,3 +64,8 @@ func GetPeerDBCatalogPassword() string {
func GetPeerDBCatalogDatabase() string {
return getEnvString("PEERDB_CATALOG_DATABASE", "")
}

// PEERDB_SLOT_LAG_MB_ALERT_THRESHOLD
func GetPeerDBSlotLagMBAlertThreshold() uint32 {
return getEnvUint32("PEERDB_SLOT_LAG_MB_ALERT_THRESHOLD", 5000)
}
96 changes: 96 additions & 0 deletions flow/shared/alerting/alerting.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package alerting

import (
"context"
"encoding/json"
"fmt"
"time"

"log/slog"

"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
)

// alerting service, no cool name :(
type Alerter struct {
catalogPool *pgxpool.Pool
logger *slog.Logger
}

func registerSendersFromPool(catalogPool *pgxpool.Pool) ([]*slackAlertSender, error) {
rows, err := catalogPool.Query(context.Background(),
"SELECT service_type,service_config FROM peerdb_stats.alerting_config")
if err != nil {
return nil, fmt.Errorf("failed to read alerter config from catalog: %w", err)
}

var slackAlertSenders []*slackAlertSender
var serviceType, serviceConfig string
_, err = pgx.ForEachRow(rows, []any{&serviceType, &serviceConfig}, func() error {
switch serviceType {
case "slack":
var slackServiceConfig slackAlertConfig
err = json.Unmarshal([]byte(serviceConfig), &slackServiceConfig)
if err != nil {
return fmt.Errorf("failed to unmarshal Slack service config: %w", err)
}

slackAlertSenders = append(slackAlertSenders, newSlackAlertSender(&slackServiceConfig))
default:
return fmt.Errorf("unknown service type: %s", serviceType)
}
return nil
})

return slackAlertSenders, nil
}

// doesn't take care of closing pool, needs to be done externally.
func NewAlerter(catalogPool *pgxpool.Pool) *Alerter {
return &Alerter{
catalogPool: catalogPool,
logger: slog.Default(),
}
}

// Only raises an alert if another alert with the same key hasn't been raised
// in the past 15 minutes
func (a *Alerter) AlertIf(ctx context.Context, alertKey string, alertMessage string) {
if a.catalogPool != nil {
slackAlertSenders, err := registerSendersFromPool(a.catalogPool)
if err != nil {
a.logger.WarnContext(ctx, "failed to set Slack senders", slog.Any("error", err))
return
}
if len(slackAlertSenders) == 0 {
a.logger.Warn("no Slack senders configured, returning")
return
}

row := a.catalogPool.QueryRow(context.Background(),
`SELECT created_timestamp FROM peerdb_stats.alerts_v1 WHERE alert_key=$1
ORDER BY created_timestamp DESC LIMIT 1`,
alertKey)
var createdTimestamp time.Time
err = row.Scan(&createdTimestamp)
if err != nil && err != pgx.ErrNoRows {
a.logger.Warn("failed to send alert: %v", err)
return
}

if time.Since(createdTimestamp) >= 15*time.Minute {
for _, slackAlertSender := range slackAlertSenders {
err = slackAlertSender.sendAlert(context.Background(),
fmt.Sprintf(":rotating_light:Alert:rotating_light:: %s", alertKey), alertMessage)
if err != nil {
a.logger.WarnContext(ctx, "failed to send alert", slog.Any("error", err))
return
}
_, _ = a.catalogPool.Exec(context.Background(),
"INSERT INTO peerdb_stats.alerts_v1(alert_key,alert_message) VALUES($1,$2)",
alertKey, alertMessage)
}
}
}
}
38 changes: 38 additions & 0 deletions flow/shared/alerting/slack_alert_sender.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package alerting

import (
"context"
"fmt"

"github.com/slack-go/slack"
)

type slackAlertSender struct {
client *slack.Client
channelIDs []string
}

type slackAlertConfig struct {
AuthToken string `json:"auth_token"`
ChannelIDs []string `json:"channel_ids"`
}

func newSlackAlertSender(config *slackAlertConfig) *slackAlertSender {
return &slackAlertSender{
client: slack.New(config.AuthToken),
channelIDs: config.ChannelIDs,
}
}

func (s *slackAlertSender) sendAlert(ctx context.Context, alertTitle string, alertMessage string) error {
for _, channelID := range s.channelIDs {
_, _, _, err := s.client.SendMessageContext(ctx, channelID, slack.MsgOptionBlocks(
slack.NewHeaderBlock(slack.NewTextBlockObject("plain_text", alertTitle, true, false)),
slack.NewSectionBlock(slack.NewTextBlockObject("mrkdwn", alertMessage, false, false), nil, nil),
))
if err != nil {
return fmt.Errorf("failed to send message to Slack channel %s: %w", channelID, err)
}
}
return nil
}
13 changes: 13 additions & 0 deletions nexus/catalog/migrations/V16__alerting_config_init.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
CREATE TABLE IF NOT EXISTS peerdb_stats.alerting_config (
id BIGINT PRIMARY KEY GENERATED BY DEFAULT AS IDENTITY,
service_type TEXT NOT NULL CHECK (service_type IN ('slack')),
service_config JSONB NOT NULL
);

CREATE TABLE IF NOT EXISTS peerdb_stats.alerts_v1 (
id BIGINT PRIMARY KEY GENERATED BY DEFAULT AS IDENTITY,
alert_key TEXT NOT NULL,
alert_level TEXT NOT NULL CHECK (alert_level IN ('critical')) DEFAULT 'critical',
alert_message TEXT NOT NULL,
created_timestamp TIMESTAMP DEFAULT now()
);

0 comments on commit 3994468

Please sign in to comment.