Skip to content

Commit

Permalink
soroban-rpc: add generic panic handing (stellar#856)
Browse files Browse the repository at this point in the history
* add generic panic handling.

* update

* rename

* update per cr
  • Loading branch information
tsachiherman authored Aug 23, 2023
1 parent 7b48b16 commit 3758c2e
Show file tree
Hide file tree
Showing 4 changed files with 230 additions and 8 deletions.
10 changes: 6 additions & 4 deletions cmd/soroban-rpc/internal/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/stellar/soroban-tools/cmd/soroban-rpc/internal/ledgerbucketwindow"
"github.com/stellar/soroban-tools/cmd/soroban-rpc/internal/preflight"
"github.com/stellar/soroban-tools/cmd/soroban-rpc/internal/transactions"
"github.com/stellar/soroban-tools/cmd/soroban-rpc/internal/util"
)

const (
Expand Down Expand Up @@ -281,19 +282,20 @@ func (d *Daemon) Run() {
"addr": d.server.Addr,
}).Info("starting Soroban JSON RPC server")

go func() {
panicGroup := util.UnrecoverablePanicGroup.Log(d.logger)
panicGroup.Go(func() {
if err := d.server.ListenAndServe(); !errors.Is(err, http.ErrServerClosed) {
// Error starting or closing listener:
d.logger.WithError(err).Fatal("soroban JSON RPC server encountered fatal error")
}
}()
})

if d.adminServer != nil {
go func() {
panicGroup.Go(func() {
if err := d.adminServer.ListenAndServe(); !errors.Is(err, http.ErrServerClosed) {
d.logger.WithError(err).Error("soroban admin server encountered fatal error")
}
}()
})
}

// Shutdown gracefully when we receive an interrupt signal.
Expand Down
11 changes: 7 additions & 4 deletions cmd/soroban-rpc/internal/ingest/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (

"github.com/stellar/soroban-tools/cmd/soroban-rpc/internal/daemon/interfaces"
"github.com/stellar/soroban-tools/cmd/soroban-rpc/internal/db"
"github.com/stellar/soroban-tools/cmd/soroban-rpc/internal/util"

"github.com/stellar/soroban-tools/cmd/soroban-rpc/internal/events"
"github.com/stellar/soroban-tools/cmd/soroban-rpc/internal/transactions"
Expand Down Expand Up @@ -81,7 +82,8 @@ func NewService(cfg Config) *Service {
ledgerStatsMetric: ledgerStatsMetric,
}
service.wg.Add(1)
go func() {
panicGroup := util.UnrecoverablePanicGroup.Log(cfg.Logger)
panicGroup.Go(func() {
defer service.wg.Done()
// Retry running ingestion every second for 5 seconds.
constantBackoff := backoff.WithMaxRetries(backoff.NewConstantBackOff(1*time.Second), 5)
Expand All @@ -101,7 +103,7 @@ func NewService(cfg Config) *Service {
if err != nil && !errors.Is(err, context.Canceled) {
service.logger.WithError(err).Fatal("could not run ingestion")
}
}()
})
return service
}

Expand Down Expand Up @@ -170,9 +172,10 @@ func (s *Service) maybeFillEntriesFromCheckpoint(ctx context.Context, archive hi
// DB is empty, let's fill it from the History Archive, using the latest available checkpoint
// Do it in parallel with the upcoming captive core preparation to save time
s.logger.Infof("found an empty database, creating ledger-entry baseline from the most recent checkpoint (%d). This can take up to 30 minutes, depending on the network", checkpointLedger)
go func() {
panicGroup := util.UnrecoverablePanicGroup.Log(s.logger)
panicGroup.Go(func() {
checkPointFillErr <- s.fillEntriesFromCheckpoint(ctx, archive, checkpointLedger)
}()
})
return checkpointLedger + 1, checkPointFillErr, nil
} else if err != nil {
return 0, checkPointFillErr, err
Expand Down
106 changes: 106 additions & 0 deletions cmd/soroban-rpc/internal/util/panicgroup.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package util

import (
"fmt"
"os"
"reflect"
"runtime"
"runtime/debug"
"strings"

"github.com/prometheus/client_golang/prometheus"
"github.com/stellar/go/support/log"
)

var UnrecoverablePanicGroup = panicGroup{
logPanicsToStdErr: true,
exitProcessOnPanic: true,
}

var RecoverablePanicGroup = panicGroup{
logPanicsToStdErr: true,
exitProcessOnPanic: false,
}

type panicGroup struct {
log *log.Entry
logPanicsToStdErr bool
exitProcessOnPanic bool
panicsCounter prometheus.Counter
}

func (pg *panicGroup) Log(log *log.Entry) *panicGroup {
return &panicGroup{
log: log,
logPanicsToStdErr: pg.logPanicsToStdErr,
exitProcessOnPanic: pg.exitProcessOnPanic,
panicsCounter: pg.panicsCounter,
}
}

func (pg *panicGroup) Counter(counter prometheus.Counter) *panicGroup {
return &panicGroup{
log: pg.log,
logPanicsToStdErr: pg.logPanicsToStdErr,
exitProcessOnPanic: pg.exitProcessOnPanic,
panicsCounter: counter,
}
}

// panicGroup give us the ability to spin a goroutine, with clear upfront definitions on what should be done in the
// case of an internal panic.
func (pg *panicGroup) Go(fn func()) {
go func() {
defer pg.recoverRoutine(fn)
fn()
}()
}

func (pg *panicGroup) recoverRoutine(fn func()) {
recoverRes := recover()
if recoverRes == nil {
return
}
var cs []string
if pg.log != nil {
cs = getPanicCallStack(fn)
for _, line := range cs {
pg.log.Warn(line)
}
}
if pg.logPanicsToStdErr {
if len(cs) == 0 {
cs = getPanicCallStack(fn)
}
for _, line := range cs {
fmt.Fprintln(os.Stderr, line)
}
}

if pg.panicsCounter != nil {
pg.panicsCounter.Inc()
}
if pg.exitProcessOnPanic {
os.Exit(1)
}
}

func getPanicCallStack(fn func()) (outCallStack []string) {
functionName := runtime.FuncForPC(reflect.ValueOf(fn).Pointer()).Name()
outCallStack = append(outCallStack, fmt.Sprintf("panicing root function '%s'", functionName))
// while we're within the recoverRoutine, the debug.Stack() would return the
// call stack where the panic took place.
callStackStrings := string(debug.Stack())
for i, callStackLine := range strings.FieldsFunc(callStackStrings, func(r rune) bool { return r == '\n' || r == '\t' }) {
// skip the first 5 entries, since these are the "debug.Stack()" entries, which aren't really useful.
if i < 5 {
continue
}
outCallStack = append(outCallStack, callStackLine)
// once we reached the panicGroup entry, stop.
if strings.Contains(callStackLine, "(*panicGroup).Go") {
break
}
}
return outCallStack
}
111 changes: 111 additions & 0 deletions cmd/soroban-rpc/internal/util/panicgroup_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package util

import (
"os"
"sync"
"testing"
"time"

"github.com/sirupsen/logrus"
"github.com/stellar/go/support/log"
"github.com/stretchr/testify/require"
)

func TestTrivialPanicGroup(t *testing.T) {
ch := make(chan int)

panicGroup := panicGroup{}
panicGroup.Go(func() { ch <- 1 })

<-ch
}

type TestLogsCounter struct {
entry *log.Entry
mu sync.Mutex
writtenLogEntries [logrus.TraceLevel + 1]int
}

func makeTestLogCounter() *TestLogsCounter {
out := &TestLogsCounter{
entry: log.New(),
}
out.entry.AddHook(out)
out.entry.SetLevel(logrus.DebugLevel)
return out
}
func (te *TestLogsCounter) Entry() *log.Entry {
return te.entry
}
func (te *TestLogsCounter) Levels() []logrus.Level {
return []logrus.Level{logrus.PanicLevel, logrus.FatalLevel, logrus.ErrorLevel, logrus.WarnLevel, logrus.InfoLevel, logrus.DebugLevel, logrus.TraceLevel}
}
func (te *TestLogsCounter) Fire(e *logrus.Entry) error {
te.mu.Lock()
defer te.mu.Unlock()
te.writtenLogEntries[e.Level]++
return nil
}
func (te *TestLogsCounter) GetLevel(i int) int {
te.mu.Lock()
defer te.mu.Unlock()
return te.writtenLogEntries[i]
}

func PanicingFunctionA(w *int) {
*w = 0
}

func IndirectPanicingFunctionB() {
PanicingFunctionA(nil)
}

func IndirectPanicingFunctionC() {
IndirectPanicingFunctionB()
}

func TestPanicGroupLog(t *testing.T) {
logCounter := makeTestLogCounter()
panicGroup := panicGroup{
log: logCounter.Entry(),
}
panicGroup.Go(IndirectPanicingFunctionC)
// wait until we get all the log entries.
waitStarted := time.Now()
for time.Since(waitStarted) < 5*time.Second {
warningCount := logCounter.GetLevel(3)
if warningCount >= 10 {
return
}
time.Sleep(1 * time.Millisecond)
}
t.FailNow()
}

func TestPanicGroupStdErr(t *testing.T) {
tmpFile, err := os.CreateTemp("", "TestPanicGroupStdErr")
require.NoError(t, err)
defaultStdErr := os.Stderr
os.Stderr = tmpFile
defer func() {
os.Stderr = defaultStdErr
tmpFile.Close()
os.Remove(tmpFile.Name())
}()

panicGroup := panicGroup{
logPanicsToStdErr: true,
}
panicGroup.Go(IndirectPanicingFunctionC)
// wait until we get all the log entries.
waitStarted := time.Now()
for time.Since(waitStarted) < 5*time.Second {
outErrBytes, err := os.ReadFile(tmpFile.Name())
require.NoError(t, err)
if len(outErrBytes) >= 100 {
return
}
time.Sleep(1 * time.Millisecond)
}
t.FailNow()
}

0 comments on commit 3758c2e

Please sign in to comment.