Skip to content

Commit

Permalink
Make alerter independent of slack
Browse files Browse the repository at this point in the history
  • Loading branch information
iskakaushik committed Dec 22, 2023
1 parent 60044de commit b87f4a7
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 38 deletions.
8 changes: 7 additions & 1 deletion flow/cmd/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,9 +132,15 @@ func WorkerMain(opts *WorkerOptions) error {
w.RegisterWorkflow(peerflow.XminFlowWorkflow)
w.RegisterWorkflow(peerflow.DropFlowWorkflow)
w.RegisterWorkflow(peerflow.HeartbeatFlowWorkflow)

alerter, err := alerting.NewAlerter(conn)
if err != nil {
return fmt.Errorf("unable to create alerter: %w", err)
}

w.RegisterActivity(&activities.FlowableActivity{
CatalogPool: conn,
Alerter: alerting.NewAlerter(conn),
Alerter: alerter,
})

err = w.Run(worker.InterruptCh())
Expand Down
8 changes: 7 additions & 1 deletion flow/e2e/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,15 @@ func RegisterWorkflowsAndActivities(env *testsuite.TestWorkflowEnvironment, t *t
env.RegisterWorkflow(peerflow.QRepFlowWorkflow)
env.RegisterWorkflow(peerflow.XminFlowWorkflow)
env.RegisterWorkflow(peerflow.QRepPartitionWorkflow)

alerter, err := alerting.NewAlerter(conn)
if err != nil {
t.Fatalf("unable to create alerter: %v", err)
}

env.RegisterActivity(&activities.FlowableActivity{
CatalogPool: conn,
Alerter: alerting.NewAlerter(conn),
Alerter: alerter,
})
env.RegisterActivity(&activities.SnapshotActivity{})
}
Expand Down
85 changes: 49 additions & 36 deletions flow/shared/alerting/alerting.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ type Alerter struct {
logger *slog.Logger
}

func registerSendersFromPool(catalogPool *pgxpool.Pool) ([]*slackAlertSender, error) {
rows, err := catalogPool.Query(context.Background(),
func registerSendersFromPool(ctx context.Context, catalogPool *pgxpool.Pool) ([]*slackAlertSender, error) {
rows, err := catalogPool.Query(ctx,
"SELECT service_type,service_config FROM peerdb_stats.alerting_config")
if err != nil {
return nil, fmt.Errorf("failed to read alerter config from catalog: %w", err)
Expand Down Expand Up @@ -47,11 +47,17 @@ func registerSendersFromPool(catalogPool *pgxpool.Pool) ([]*slackAlertSender, er
}

// doesn't take care of closing pool, needs to be done externally.
func NewAlerter(catalogPool *pgxpool.Pool) *Alerter {
func NewAlerter(catalogPool *pgxpool.Pool) (*Alerter, error) {
logger := slog.Default()
if catalogPool == nil {
logger.Error("catalog pool is nil for Alerter")
return nil, fmt.Errorf("catalog pool is nil for Alerter")
}

return &Alerter{
catalogPool: catalogPool,
logger: slog.Default(),
}
logger: logger,
}, nil
}

// Only raises an alert if another alert with the same key hasn't been raised
Expand All @@ -62,40 +68,47 @@ func (a *Alerter) AlertIf(ctx context.Context, alertKey string, alertMessage str
return
}

if a.catalogPool != nil {
slackAlertSenders, err := registerSendersFromPool(a.catalogPool)
if err != nil {
a.logger.WarnContext(ctx, "failed to set Slack senders", slog.Any("error", err))
return
}
if len(slackAlertSenders) == 0 {
a.logger.WarnContext(ctx, "no Slack senders configured, returning")
return
}

row := a.catalogPool.QueryRow(context.Background(),
`SELECT created_timestamp FROM peerdb_stats.alerts_v1 WHERE alert_key=$1
var err error
row := a.catalogPool.QueryRow(ctx,
`SELECT created_timestamp FROM peerdb_stats.alerts_v1 WHERE alert_key=$1
ORDER BY created_timestamp DESC LIMIT 1`,
alertKey)
var createdTimestamp time.Time
err = row.Scan(&createdTimestamp)
if err != nil && err != pgx.ErrNoRows {
a.logger.Warn("failed to send alert: %v", err)
alertKey)
var createdTimestamp time.Time
err = row.Scan(&createdTimestamp)
if err != nil && err != pgx.ErrNoRows {
a.logger.Warn("failed to send alert: %v", err)
return
}

if time.Since(createdTimestamp) >= peerdbenv.PeerDBAlertingGapMinutesAsDuration() {
a.AddAlertToCatalog(ctx, alertKey, alertMessage)
a.AlertToSlack(ctx, alertKey, alertMessage)
}
}

func (a *Alerter) AlertToSlack(ctx context.Context, alertKey string, alertMessage string) {
slackAlertSenders, err := registerSendersFromPool(ctx, a.catalogPool)
if err != nil {
a.logger.WarnContext(ctx, "failed to set Slack senders", slog.Any("error", err))
return
}

for _, slackAlertSender := range slackAlertSenders {
err = slackAlertSender.sendAlert(ctx,
fmt.Sprintf(":rotating_light:Alert:rotating_light:: %s", alertKey), alertMessage)
if err != nil {
a.logger.WarnContext(ctx, "failed to send alert", slog.Any("error", err))
return
}
}
}

if time.Since(createdTimestamp) >= peerdbenv.PeerDBAlertingGapMinutesAsDuration() {
for _, slackAlertSender := range slackAlertSenders {
err = slackAlertSender.sendAlert(context.Background(),
fmt.Sprintf(":rotating_light:Alert:rotating_light:: %s", alertKey), alertMessage)
if err != nil {
a.logger.WarnContext(ctx, "failed to send alert", slog.Any("error", err))
return
}
_, _ = a.catalogPool.Exec(context.Background(),
"INSERT INTO peerdb_stats.alerts_v1(alert_key,alert_message) VALUES($1,$2)",
alertKey, alertMessage)
}
}
func (a *Alerter) AddAlertToCatalog(ctx context.Context, alertKey string, alertMessage string) {
_, err := a.catalogPool.Exec(ctx,
"INSERT INTO peerdb_stats.alerts_v1(alert_key,alert_message) VALUES($1,$2)",
alertKey, alertMessage)
if err != nil {
a.logger.WarnContext(ctx, "failed to insert alert", slog.Any("error", err))
return
}
}

0 comments on commit b87f4a7

Please sign in to comment.