Skip to content

Commit

Permalink
fix(alerting): add recover for possible panics
Browse files Browse the repository at this point in the history
  • Loading branch information
iamKunalGupta committed Mar 7, 2024
1 parent 6b47812 commit 17d75a8
Showing 1 changed file with 46 additions and 0 deletions.
46 changes: 46 additions & 0 deletions flow/alerting/alerting.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"fmt"
"log/slog"
"runtime/debug"
"time"

"github.com/jackc/pgx/v5"
Expand Down Expand Up @@ -74,7 +75,26 @@ func NewAlerter(ctx context.Context, catalogPool *pgxpool.Pool) *Alerter {
}
}

func withPanicGuard(ctx context.Context, functionName string, f func()) {
defer func(ctx context.Context) {
if r := recover(); r != nil {
errorMessage := fmt.Errorf("panic occurred during Alerter.%s: %v", functionName, r)
stack := string(debug.Stack())
logger.LoggerFromCtx(ctx).Error("panic during Alerter."+functionName,
slog.Any("error", errorMessage), slog.String("stack", stack),
)
}
}(ctx)
f()
}

func (a *Alerter) AlertIfSlotLag(ctx context.Context, peerName string, slotInfo *protos.SlotInfo) {
withPanicGuard(ctx, "AlertIfSlotLag", func() {
a.alertIfSlotLag(ctx, peerName, slotInfo)
})
}

func (a *Alerter) alertIfSlotLag(ctx context.Context, peerName string, slotInfo *protos.SlotInfo) {
slackAlertSenders, err := a.registerSendersFromPool(ctx)
if err != nil {
logger.LoggerFromCtx(ctx).Warn("failed to set Slack senders", slog.Any("error", err))
Expand Down Expand Up @@ -120,6 +140,14 @@ func (a *Alerter) AlertIfSlotLag(ctx context.Context, peerName string, slotInfo

func (a *Alerter) AlertIfOpenConnections(ctx context.Context, peerName string,
openConnections *protos.GetOpenConnectionsForUserResult,
) {
withPanicGuard(ctx, "AlertIfOpenConnections", func() {
a.alertIfOpenConnections(ctx, peerName, openConnections)
})
}

func (a *Alerter) alertIfOpenConnections(ctx context.Context, peerName string,
openConnections *protos.GetOpenConnectionsForUserResult,
) {
slackAlertSenders, err := a.registerSendersFromPool(ctx)
if err != nil {
Expand Down Expand Up @@ -224,6 +252,12 @@ func (a *Alerter) sendTelemetryMessage(ctx context.Context, flowName string, mor
}

func (a *Alerter) LogFlowError(ctx context.Context, flowName string, err error) {
withPanicGuard(ctx, "LogFlowError", func() {
a.logFlowError(ctx, flowName, err)
})
}

func (a *Alerter) logFlowError(ctx context.Context, flowName string, err error) {
errorWithStack := fmt.Sprintf("%+v", err)
_, err = a.catalogPool.Exec(ctx,
"INSERT INTO peerdb_stats.flow_errors(flow_name,error_message,error_type) VALUES($1,$2,$3)",
Expand All @@ -236,10 +270,22 @@ func (a *Alerter) LogFlowError(ctx context.Context, flowName string, err error)
}

func (a *Alerter) LogFlowEvent(ctx context.Context, flowName string, info string) {
withPanicGuard(ctx, "LogFlowEvent", func() {
a.logFlowEvent(ctx, flowName, info)
})
}

func (a *Alerter) logFlowEvent(ctx context.Context, flowName string, info string) {
a.sendTelemetryMessage(ctx, flowName, info, telemetry.INFO)
}

func (a *Alerter) LogFlowInfo(ctx context.Context, flowName string, info string) {
withPanicGuard(ctx, "LogFlowInfo", func() {
a.logFlowInfo(ctx, flowName, info)
})
}

func (a *Alerter) logFlowInfo(ctx context.Context, flowName string, info string) {
_, err := a.catalogPool.Exec(ctx,
"INSERT INTO peerdb_stats.flow_errors(flow_name,error_message,error_type) VALUES($1,$2,$3)",
flowName, info, "info")
Expand Down

0 comments on commit 17d75a8

Please sign in to comment.