diff --git a/flow/cmd/worker.go b/flow/cmd/worker.go index a42ac76b47..eea0e9184f 100644 --- a/flow/cmd/worker.go +++ b/flow/cmd/worker.go @@ -132,9 +132,15 @@ func WorkerMain(opts *WorkerOptions) error { w.RegisterWorkflow(peerflow.XminFlowWorkflow) w.RegisterWorkflow(peerflow.DropFlowWorkflow) w.RegisterWorkflow(peerflow.HeartbeatFlowWorkflow) + + alerter, err := alerting.NewAlerter(conn) + if err != nil { + return fmt.Errorf("unable to create alerter: %w", err) + } + w.RegisterActivity(&activities.FlowableActivity{ CatalogPool: conn, - Alerter: alerting.NewAlerter(conn), + Alerter: alerter, }) err = w.Run(worker.InterruptCh()) diff --git a/flow/e2e/test_utils.go b/flow/e2e/test_utils.go index 13ca8044e5..8bea8cf984 100644 --- a/flow/e2e/test_utils.go +++ b/flow/e2e/test_utils.go @@ -60,9 +60,15 @@ func RegisterWorkflowsAndActivities(env *testsuite.TestWorkflowEnvironment, t *t env.RegisterWorkflow(peerflow.QRepFlowWorkflow) env.RegisterWorkflow(peerflow.XminFlowWorkflow) env.RegisterWorkflow(peerflow.QRepPartitionWorkflow) + + alerter, err := alerting.NewAlerter(conn) + if err != nil { + t.Fatalf("unable to create alerter: %v", err) + } + env.RegisterActivity(&activities.FlowableActivity{ CatalogPool: conn, - Alerter: alerting.NewAlerter(conn), + Alerter: alerter, }) env.RegisterActivity(&activities.SnapshotActivity{}) } diff --git a/flow/shared/alerting/alerting.go b/flow/shared/alerting/alerting.go index a3a3d6c6e0..394623825d 100644 --- a/flow/shared/alerting/alerting.go +++ b/flow/shared/alerting/alerting.go @@ -18,8 +18,8 @@ type Alerter struct { logger *slog.Logger } -func registerSendersFromPool(catalogPool *pgxpool.Pool) ([]*slackAlertSender, error) { - rows, err := catalogPool.Query(context.Background(), +func registerSendersFromPool(ctx context.Context, catalogPool *pgxpool.Pool) ([]*slackAlertSender, error) { + rows, err := catalogPool.Query(ctx, "SELECT service_type,service_config FROM peerdb_stats.alerting_config") if err != nil { return nil, fmt.Errorf("failed to read alerter config from catalog: %w", err) @@ -47,11 +47,17 @@ func registerSendersFromPool(catalogPool *pgxpool.Pool) ([]*slackAlertSender, er } // doesn't take care of closing pool, needs to be done externally. -func NewAlerter(catalogPool *pgxpool.Pool) *Alerter { +func NewAlerter(catalogPool *pgxpool.Pool) (*Alerter, error) { + logger := slog.Default() + if catalogPool == nil { + logger.Error("catalog pool is nil for Alerter") + return nil, fmt.Errorf("catalog pool is nil for Alerter") + } + return &Alerter{ catalogPool: catalogPool, - logger: slog.Default(), - } + logger: logger, + }, nil } // Only raises an alert if another alert with the same key hasn't been raised @@ -62,40 +68,47 @@ func (a *Alerter) AlertIf(ctx context.Context, alertKey string, alertMessage str return } - if a.catalogPool != nil { - slackAlertSenders, err := registerSendersFromPool(a.catalogPool) - if err != nil { - a.logger.WarnContext(ctx, "failed to set Slack senders", slog.Any("error", err)) - return - } - if len(slackAlertSenders) == 0 { - a.logger.WarnContext(ctx, "no Slack senders configured, returning") - return - } - - row := a.catalogPool.QueryRow(context.Background(), - `SELECT created_timestamp FROM peerdb_stats.alerts_v1 WHERE alert_key=$1 + var err error + row := a.catalogPool.QueryRow(ctx, + `SELECT created_timestamp FROM peerdb_stats.alerts_v1 WHERE alert_key=$1 ORDER BY created_timestamp DESC LIMIT 1`, - alertKey) - var createdTimestamp time.Time - err = row.Scan(&createdTimestamp) - if err != nil && err != pgx.ErrNoRows { - a.logger.Warn("failed to send alert: %v", err) + alertKey) + var createdTimestamp time.Time + err = row.Scan(&createdTimestamp) + if err != nil && err != pgx.ErrNoRows { + a.logger.Warn("failed to send alert: %v", err) + return + } + + if time.Since(createdTimestamp) >= peerdbenv.PeerDBAlertingGapMinutesAsDuration() { + a.AddAlertToCatalog(ctx, alertKey, alertMessage) + a.AlertToSlack(ctx, alertKey, alertMessage) + } +} + +func (a *Alerter) AlertToSlack(ctx context.Context, alertKey string, alertMessage string) { + slackAlertSenders, err := registerSendersFromPool(ctx, a.catalogPool) + if err != nil { + a.logger.WarnContext(ctx, "failed to set Slack senders", slog.Any("error", err)) + return + } + + for _, slackAlertSender := range slackAlertSenders { + err = slackAlertSender.sendAlert(ctx, + fmt.Sprintf(":rotating_light:Alert:rotating_light:: %s", alertKey), alertMessage) + if err != nil { + a.logger.WarnContext(ctx, "failed to send alert", slog.Any("error", err)) return } + } +} - if time.Since(createdTimestamp) >= peerdbenv.PeerDBAlertingGapMinutesAsDuration() { - for _, slackAlertSender := range slackAlertSenders { - err = slackAlertSender.sendAlert(context.Background(), - fmt.Sprintf(":rotating_light:Alert:rotating_light:: %s", alertKey), alertMessage) - if err != nil { - a.logger.WarnContext(ctx, "failed to send alert", slog.Any("error", err)) - return - } - _, _ = a.catalogPool.Exec(context.Background(), - "INSERT INTO peerdb_stats.alerts_v1(alert_key,alert_message) VALUES($1,$2)", - alertKey, alertMessage) - } - } +func (a *Alerter) AddAlertToCatalog(ctx context.Context, alertKey string, alertMessage string) { + _, err := a.catalogPool.Exec(ctx, + "INSERT INTO peerdb_stats.alerts_v1(alert_key,alert_message) VALUES($1,$2)", + alertKey, alertMessage) + if err != nil { + a.logger.WarnContext(ctx, "failed to insert alert", slog.Any("error", err)) + return } }