Skip to content

Commit

Permalink
Merge branch 'master' into redis-stream-creation
Browse files Browse the repository at this point in the history
  • Loading branch information
tsahee authored May 24, 2024
2 parents 0dfb1c1 + 917f828 commit 41d0e59
Showing 1 changed file with 56 additions and 1 deletion.
57 changes: 56 additions & 1 deletion execution/gethexec/sequencer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,24 @@
package gethexec

import (
"bytes"
"context"
"errors"
"fmt"
"math"
"math/big"
"os"
"path"
"runtime/debug"
"runtime/pprof"
"runtime/trace"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/google/uuid"
"github.com/offchainlabs/nitro/arbutil"
"github.com/offchainlabs/nitro/execution"
"github.com/offchainlabs/nitro/util/arbmath"
Expand Down Expand Up @@ -76,6 +82,7 @@ type SequencerConfig struct {
NonceFailureCacheExpiry time.Duration `koanf:"nonce-failure-cache-expiry" reload:"hot"`
ExpectedSurplusSoftThreshold string `koanf:"expected-surplus-soft-threshold" reload:"hot"`
ExpectedSurplusHardThreshold string `koanf:"expected-surplus-hard-threshold" reload:"hot"`
EnableProfiling bool `koanf:"enable-profiling"`
expectedSurplusSoftThreshold int
expectedSurplusHardThreshold int
}
Expand Down Expand Up @@ -125,6 +132,7 @@ var DefaultSequencerConfig = SequencerConfig{
NonceFailureCacheExpiry: time.Second,
ExpectedSurplusSoftThreshold: "default",
ExpectedSurplusHardThreshold: "default",
EnableProfiling: true,
}

var TestSequencerConfig = SequencerConfig{
Expand All @@ -142,6 +150,7 @@ var TestSequencerConfig = SequencerConfig{
NonceFailureCacheExpiry: time.Second,
ExpectedSurplusSoftThreshold: "default",
ExpectedSurplusHardThreshold: "default",
EnableProfiling: false,
}

func SequencerConfigAddOptions(prefix string, f *flag.FlagSet) {
Expand All @@ -159,6 +168,7 @@ func SequencerConfigAddOptions(prefix string, f *flag.FlagSet) {
f.Duration(prefix+".nonce-failure-cache-expiry", DefaultSequencerConfig.NonceFailureCacheExpiry, "maximum amount of time to wait for a predecessor before rejecting a tx with nonce too high")
f.String(prefix+".expected-surplus-soft-threshold", DefaultSequencerConfig.ExpectedSurplusSoftThreshold, "if expected surplus is lower than this value, warnings are posted")
f.String(prefix+".expected-surplus-hard-threshold", DefaultSequencerConfig.ExpectedSurplusHardThreshold, "if expected surplus is lower than this value, new incoming transactions will be denied")
f.Bool(prefix+".enable-profiling", DefaultSequencerConfig.EnableProfiling, "enable CPU profiling and tracing")
}

type txQueueItem struct {
Expand Down Expand Up @@ -327,6 +337,7 @@ type Sequencer struct {
expectedSurplusMutex sync.RWMutex
expectedSurplus int64
expectedSurplusUpdated bool
enableProfiling bool
}

func NewSequencer(execEngine *ExecutionEngine, l1Reader *headerreader.HeaderReader, configFetcher SequencerConfigFetcher) (*Sequencer, error) {
Expand All @@ -353,6 +364,7 @@ func NewSequencer(execEngine *ExecutionEngine, l1Reader *headerreader.HeaderRead
l1Timestamp: 0,
pauseChan: nil,
onForwarderSet: make(chan struct{}, 1),
enableProfiling: config.EnableProfiling,
}
s.nonceFailures = &nonceFailureCache{
containers.NewLruCacheWithOnEvict(config.NonceCacheSize, s.onNonceFailureEvict),
Expand Down Expand Up @@ -758,6 +770,44 @@ func (s *Sequencer) precheckNonces(queueItems []txQueueItem) []txQueueItem {
return outputQueueItems
}

// createBlockWithProfiling runs create block with tracing and CPU profiling
// enabled. If the block creation takes longer than 5 seconds, it keeps both
// and prints out filenames in an error log line.
func (s *Sequencer) createBlockWithProfiling(ctx context.Context) bool {
pprofBuf, traceBuf := bytes.NewBuffer(nil), bytes.NewBuffer(nil)
if err := pprof.StartCPUProfile(pprofBuf); err != nil {
log.Error("Starting CPU profiling", "error", err)
}
if err := trace.Start(traceBuf); err != nil {
log.Error("Starting tracing", "error", err)
}
start := time.Now()
res := s.createBlock(ctx)
elapsed := time.Since(start)
pprof.StopCPUProfile()
trace.Stop()
if elapsed > 2*time.Second {
writeAndLog(pprofBuf, traceBuf)
return res
}
return res
}

func writeAndLog(pprof, trace *bytes.Buffer) {
id := uuid.NewString()
pprofFile := path.Join(os.TempDir(), id+".pprof")
if err := os.WriteFile(pprofFile, pprof.Bytes(), 0o600); err != nil {
log.Error("Creating temporary file for pprof", "fileName", pprofFile, "error", err)
return
}
traceFile := path.Join(os.TempDir(), id+".trace")
if err := os.WriteFile(traceFile, trace.Bytes(), 0o600); err != nil {
log.Error("Creating temporary file for trace", "fileName", traceFile, "error", err)
return
}
log.Debug("Block creation took longer than 5 seconds, created pprof and trace files", "pprof", pprofFile, "traceFile", traceFile)
}

func (s *Sequencer) createBlock(ctx context.Context) (returnValue bool) {
var queueItems []txQueueItem
var totalBatchSize int
Expand Down Expand Up @@ -1088,7 +1138,12 @@ func (s *Sequencer) Start(ctxIn context.Context) error {

s.CallIteratively(func(ctx context.Context) time.Duration {
nextBlock := time.Now().Add(s.config().MaxBlockSpeed)
madeBlock := s.createBlock(ctx)
var madeBlock bool
if s.enableProfiling {
madeBlock = s.createBlockWithProfiling(ctx)
} else {
madeBlock = s.createBlock(ctx)
}
if madeBlock {
// Note: this may return a negative duration, but timers are fine with that (they treat negative durations as 0).
return time.Until(nextBlock)
Expand Down

0 comments on commit 41d0e59

Please sign in to comment.