Skip to content

Commit

Permalink
Make alerter independent of slack (#877)
Browse files Browse the repository at this point in the history
This makes the alerter log to catalog whether slack based alerting is
setup or not. Also fixes usage of Context in the alerting class.
  • Loading branch information
iskakaushik authored Dec 22, 2023
1 parent 60044de commit 7f97c00
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 38 deletions.
8 changes: 7 additions & 1 deletion flow/cmd/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,9 +132,15 @@ func WorkerMain(opts *WorkerOptions) error {
w.RegisterWorkflow(peerflow.XminFlowWorkflow)
w.RegisterWorkflow(peerflow.DropFlowWorkflow)
w.RegisterWorkflow(peerflow.HeartbeatFlowWorkflow)

alerter, err := alerting.NewAlerter(conn)
if err != nil {
return fmt.Errorf("unable to create alerter: %w", err)
}

w.RegisterActivity(&activities.FlowableActivity{
CatalogPool: conn,
Alerter: alerting.NewAlerter(conn),
Alerter: alerter,
})

err = w.Run(worker.InterruptCh())
Expand Down
8 changes: 7 additions & 1 deletion flow/e2e/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,15 @@ func RegisterWorkflowsAndActivities(env *testsuite.TestWorkflowEnvironment, t *t
env.RegisterWorkflow(peerflow.QRepFlowWorkflow)
env.RegisterWorkflow(peerflow.XminFlowWorkflow)
env.RegisterWorkflow(peerflow.QRepPartitionWorkflow)

alerter, err := alerting.NewAlerter(conn)
if err != nil {
t.Fatalf("unable to create alerter: %v", err)
}

env.RegisterActivity(&activities.FlowableActivity{
CatalogPool: conn,
Alerter: alerting.NewAlerter(conn),
Alerter: alerter,
})
env.RegisterActivity(&activities.SnapshotActivity{})
}
Expand Down
85 changes: 49 additions & 36 deletions flow/shared/alerting/alerting.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ type Alerter struct {
logger *slog.Logger
}

func registerSendersFromPool(catalogPool *pgxpool.Pool) ([]*slackAlertSender, error) {
rows, err := catalogPool.Query(context.Background(),
func registerSendersFromPool(ctx context.Context, catalogPool *pgxpool.Pool) ([]*slackAlertSender, error) {
rows, err := catalogPool.Query(ctx,
"SELECT service_type,service_config FROM peerdb_stats.alerting_config")
if err != nil {
return nil, fmt.Errorf("failed to read alerter config from catalog: %w", err)
Expand Down Expand Up @@ -47,11 +47,17 @@ func registerSendersFromPool(catalogPool *pgxpool.Pool) ([]*slackAlertSender, er
}

// doesn't take care of closing pool, needs to be done externally.
func NewAlerter(catalogPool *pgxpool.Pool) *Alerter {
func NewAlerter(catalogPool *pgxpool.Pool) (*Alerter, error) {
logger := slog.Default()
if catalogPool == nil {
logger.Error("catalog pool is nil for Alerter")
return nil, fmt.Errorf("catalog pool is nil for Alerter")
}

return &Alerter{
catalogPool: catalogPool,
logger: slog.Default(),
}
logger: logger,
}, nil
}

// Only raises an alert if another alert with the same key hasn't been raised
Expand All @@ -62,40 +68,47 @@ func (a *Alerter) AlertIf(ctx context.Context, alertKey string, alertMessage str
return
}

if a.catalogPool != nil {
slackAlertSenders, err := registerSendersFromPool(a.catalogPool)
if err != nil {
a.logger.WarnContext(ctx, "failed to set Slack senders", slog.Any("error", err))
return
}
if len(slackAlertSenders) == 0 {
a.logger.WarnContext(ctx, "no Slack senders configured, returning")
return
}

row := a.catalogPool.QueryRow(context.Background(),
`SELECT created_timestamp FROM peerdb_stats.alerts_v1 WHERE alert_key=$1
var err error
row := a.catalogPool.QueryRow(ctx,
`SELECT created_timestamp FROM peerdb_stats.alerts_v1 WHERE alert_key=$1
ORDER BY created_timestamp DESC LIMIT 1`,
alertKey)
var createdTimestamp time.Time
err = row.Scan(&createdTimestamp)
if err != nil && err != pgx.ErrNoRows {
a.logger.Warn("failed to send alert: %v", err)
alertKey)
var createdTimestamp time.Time
err = row.Scan(&createdTimestamp)
if err != nil && err != pgx.ErrNoRows {
a.logger.Warn("failed to send alert: %v", err)
return
}

if time.Since(createdTimestamp) >= peerdbenv.PeerDBAlertingGapMinutesAsDuration() {
a.AddAlertToCatalog(ctx, alertKey, alertMessage)
a.AlertToSlack(ctx, alertKey, alertMessage)
}
}

func (a *Alerter) AlertToSlack(ctx context.Context, alertKey string, alertMessage string) {
slackAlertSenders, err := registerSendersFromPool(ctx, a.catalogPool)
if err != nil {
a.logger.WarnContext(ctx, "failed to set Slack senders", slog.Any("error", err))
return
}

for _, slackAlertSender := range slackAlertSenders {
err = slackAlertSender.sendAlert(ctx,
fmt.Sprintf(":rotating_light:Alert:rotating_light:: %s", alertKey), alertMessage)
if err != nil {
a.logger.WarnContext(ctx, "failed to send alert", slog.Any("error", err))
return
}
}
}

if time.Since(createdTimestamp) >= peerdbenv.PeerDBAlertingGapMinutesAsDuration() {
for _, slackAlertSender := range slackAlertSenders {
err = slackAlertSender.sendAlert(context.Background(),
fmt.Sprintf(":rotating_light:Alert:rotating_light:: %s", alertKey), alertMessage)
if err != nil {
a.logger.WarnContext(ctx, "failed to send alert", slog.Any("error", err))
return
}
_, _ = a.catalogPool.Exec(context.Background(),
"INSERT INTO peerdb_stats.alerts_v1(alert_key,alert_message) VALUES($1,$2)",
alertKey, alertMessage)
}
}
func (a *Alerter) AddAlertToCatalog(ctx context.Context, alertKey string, alertMessage string) {
_, err := a.catalogPool.Exec(ctx,
"INSERT INTO peerdb_stats.alerts_v1(alert_key,alert_message) VALUES($1,$2)",
alertKey, alertMessage)
if err != nil {
a.logger.WarnContext(ctx, "failed to insert alert", slog.Any("error", err))
return
}
}

0 comments on commit 7f97c00

Please sign in to comment.