diff --git a/flow/alerting/alerting.go b/flow/alerting/alerting.go index 6fc903aa79..688cdff3de 100644 --- a/flow/alerting/alerting.go +++ b/flow/alerting/alerting.go @@ -5,6 +5,7 @@ import ( "encoding/json" "fmt" "log/slog" + "runtime/debug" "time" "github.com/jackc/pgx/v5" @@ -74,7 +75,26 @@ func NewAlerter(ctx context.Context, catalogPool *pgxpool.Pool) *Alerter { } } +func withPanicGuard(ctx context.Context, functionName string, f func()) { + defer func(ctx context.Context) { + if r := recover(); r != nil { + errorMessage := fmt.Errorf("panic occurred during Alerter.%s: %v", functionName, r) + stack := string(debug.Stack()) + logger.LoggerFromCtx(ctx).Error("panic during Alerter."+functionName, + slog.Any("error", errorMessage), slog.String("stack", stack), + ) + } + }(ctx) + f() +} + func (a *Alerter) AlertIfSlotLag(ctx context.Context, peerName string, slotInfo *protos.SlotInfo) { + withPanicGuard(ctx, "AlertIfSlotLag", func() { + a.alertIfSlotLag(ctx, peerName, slotInfo) + }) +} + +func (a *Alerter) alertIfSlotLag(ctx context.Context, peerName string, slotInfo *protos.SlotInfo) { slackAlertSenders, err := a.registerSendersFromPool(ctx) if err != nil { logger.LoggerFromCtx(ctx).Warn("failed to set Slack senders", slog.Any("error", err)) @@ -120,6 +140,14 @@ func (a *Alerter) AlertIfSlotLag(ctx context.Context, peerName string, slotInfo func (a *Alerter) AlertIfOpenConnections(ctx context.Context, peerName string, openConnections *protos.GetOpenConnectionsForUserResult, +) { + withPanicGuard(ctx, "AlertIfOpenConnections", func() { + a.alertIfOpenConnections(ctx, peerName, openConnections) + }) +} + +func (a *Alerter) alertIfOpenConnections(ctx context.Context, peerName string, + openConnections *protos.GetOpenConnectionsForUserResult, ) { slackAlertSenders, err := a.registerSendersFromPool(ctx) if err != nil { @@ -224,6 +252,12 @@ func (a *Alerter) sendTelemetryMessage(ctx context.Context, flowName string, mor } func (a *Alerter) LogFlowError(ctx context.Context, flowName string, err error) { + withPanicGuard(ctx, "LogFlowError", func() { + a.logFlowError(ctx, flowName, err) + }) +} + +func (a *Alerter) logFlowError(ctx context.Context, flowName string, err error) { errorWithStack := fmt.Sprintf("%+v", err) _, err = a.catalogPool.Exec(ctx, "INSERT INTO peerdb_stats.flow_errors(flow_name,error_message,error_type) VALUES($1,$2,$3)", @@ -236,10 +270,22 @@ func (a *Alerter) LogFlowError(ctx context.Context, flowName string, err error) } func (a *Alerter) LogFlowEvent(ctx context.Context, flowName string, info string) { + withPanicGuard(ctx, "LogFlowEvent", func() { + a.logFlowEvent(ctx, flowName, info) + }) +} + +func (a *Alerter) logFlowEvent(ctx context.Context, flowName string, info string) { a.sendTelemetryMessage(ctx, flowName, info, telemetry.INFO) } func (a *Alerter) LogFlowInfo(ctx context.Context, flowName string, info string) { + withPanicGuard(ctx, "LogFlowInfo", func() { + a.logFlowInfo(ctx, flowName, info) + }) +} + +func (a *Alerter) logFlowInfo(ctx context.Context, flowName string, info string) { _, err := a.catalogPool.Exec(ctx, "INSERT INTO peerdb_stats.flow_errors(flow_name,error_message,error_type) VALUES($1,$2,$3)", flowName, info, "info")