Skip to content

Commit

Permalink
Add maxBatchInterval to skip some empty batches (#1599)
Browse files Browse the repository at this point in the history
  • Loading branch information
BedrockSquirrel authored Oct 14, 2023
1 parent e13ad33 commit 51b0038
Show file tree
Hide file tree
Showing 24 changed files with 680 additions and 644 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/manual-deploy-testnet-l2.yml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ jobs:
echo "WORKER_ADDR=${{vars.WORKER_ADDR}}"
echo "BATCH_INTERVAL=${{vars.BATCH_INTERVAL}}"
echo "L2_MAX_BATCH_INTERVAL=${{vars.L2_MAX_BATCH_INTERVAL}}"
echo "ROLLUP_INTERVAL=${{vars.ROLLUP_INTERVAL}}"
echo "FAUCET_FUNDS=${{vars.FAUCET_FUNDS}}"
Expand Down Expand Up @@ -251,6 +252,7 @@ jobs:
-is_debug_namespace_enabled=true \
-log_level=${{ github.event.inputs.log_level }} \
-batch_interval=${{vars.BATCH_INTERVAL}} \
-max_batch_interval=${{vars.L2_MAX_BATCH_INTERVAL}} \
-rollup_interval=${{vars.ROLLUP_INTERVAL}} \
-l1_chain_id=${{vars.L1_CHAIN_ID}} \
start'
Expand Down
2 changes: 2 additions & 0 deletions .github/workflows/manual-upgrade-testnet-l2.yml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ jobs:
echo "WORKER_ADDR=${{vars.WORKER_ADDR}}"
echo "BATCH_INTERVAL=${{vars.BATCH_INTERVAL}}"
echo "L2_MAX_BATCH_INTERVAL=${{vars.L2_MAX_BATCH_INTERVAL}}"
echo "ROLLUP_INTERVAL=${{vars.ROLLUP_INTERVAL}}"
- name: 'Login via Azure CLI'
Expand Down Expand Up @@ -177,6 +178,7 @@ jobs:
-host_docker_image=${{vars.L2_HOST_DOCKER_BUILD_TAG}} \
-log_level=${{ github.event.inputs.log_level }} \
-batch_interval=${{vars.BATCH_INTERVAL}} \
-max_batch_interval=${{vars.L2_MAX_BATCH_INTERVAL}} \
-rollup_interval=${{vars.ROLLUP_INTERVAL}} \
-l1_chain_id=${{vars.L1_CHAIN_ID}} \
upgrade'
Expand Down
2 changes: 1 addition & 1 deletion go/common/enclave.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ type Enclave interface {

// CreateBatch - creates a new head batch extending the previous one for the latest known L1 head if the node is
// a sequencer. Will panic otherwise.
CreateBatch() SystemError
CreateBatch(skipIfEmpty bool) SystemError

// CreateRollup - will create a new rollup by going through the sequencer if the node is a sequencer
// or panic otherwise.
Expand Down
1,001 changes: 506 additions & 495 deletions go/common/rpc/generated/enclave.pb.go

Large diffs are not rendered by default.

4 changes: 3 additions & 1 deletion go/common/rpc/generated/enclave.proto
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,9 @@ message DebugTraceTransactionResponse{
SystemError systemError = 2;
}

message CreateBatchRequest{}
message CreateBatchRequest{
bool skipIfEmpty = 1;
}
message CreateBatchResponse{
string error = 2;
}
Expand Down
162 changes: 63 additions & 99 deletions go/common/rpc/generated/enclave_grpc.pb.go

Large diffs are not rendered by default.

20 changes: 14 additions & 6 deletions go/config/host_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,10 @@ type HostInputConfig struct {
// Min interval before creating the next batch (only used by Sequencer nodes)
BatchInterval time.Duration

// MaxBatchInterval is the max interval between batches, if this is set higher than BatchInterval, the host will
// not create empty batches until the MaxBatchInterval is reached or a transaction is received.
MaxBatchInterval time.Duration

// Min interval before creating the next rollup (only used by Sequencer nodes)
RollupInterval time.Duration

Expand Down Expand Up @@ -131,6 +135,7 @@ func (p HostInputConfig) ToHostConfig() *HostConfig {
LevelDBPath: p.LevelDBPath,
DebugNamespaceEnabled: p.DebugNamespaceEnabled,
BatchInterval: p.BatchInterval,
MaxBatchInterval: p.MaxBatchInterval,
RollupInterval: p.RollupInterval,
L1BlockTime: p.L1BlockTime,
IsInboundP2PDisabled: p.IsInboundP2PDisabled,
Expand Down Expand Up @@ -158,6 +163,9 @@ type HostConfig struct {
MessageBusAddress gethcommon.Address
// Min interval before creating the next batch (only used by Sequencer nodes)
BatchInterval time.Duration
// MaxBatchInterval is the max interval between batches, if this is set higher than BatchInterval, the host will
// not create empty batches until the MaxBatchInterval is reached or a transaction is received.
MaxBatchInterval time.Duration
// Min interval before creating the next rollup (only used by Sequencer nodes)
RollupInterval time.Duration
// MaxRollupSize is the max size of the rollup
Expand Down Expand Up @@ -256,11 +264,11 @@ func DefaultHostParsedConfig() *HostInputConfig {
MetricsEnabled: true,
MetricsHTTPPort: 14000,
UseInMemoryDB: true,
DebugNamespaceEnabled: false,
BatchInterval: 1 * time.Second,
RollupInterval: 5 * time.Second,
L1BlockTime: 15 * time.Second,
IsInboundP2PDisabled: false,
MaxRollupSize: 1024 * 64,
DebugNamespaceEnabled: false, BatchInterval: 1 * time.Second,
MaxBatchInterval: 1 * time.Second,
RollupInterval: 5 * time.Second,
L1BlockTime: 15 * time.Second,
IsInboundP2PDisabled: false,
MaxRollupSize: 1024 * 64,
}
}
15 changes: 13 additions & 2 deletions go/enclave/components/batch_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ import (
"github.com/obscuronet/go-obscuro/go/enclave/genesis"
)

var ErrNoTransactionsToProcess = fmt.Errorf("no transactions to process")

// batchExecutor - the component responsible for executing batches
type batchExecutor struct {
storage storage.Storage
Expand Down Expand Up @@ -128,7 +130,7 @@ func (executor *batchExecutor) refundL1Fees(stateDB *state.StateDB, context *Bat
}
}

func (executor *batchExecutor) ComputeBatch(context *BatchExecutionContext) (*ComputedBatch, error) {
func (executor *batchExecutor) ComputeBatch(context *BatchExecutionContext, failForEmptyBatch bool) (*ComputedBatch, error) { //nolint:gocognit
defer core.LogMethodDuration(executor.logger, measure.NewStopwatch(), "Batch context processed")

// sanity check that the l1 block exists. We don't have to execute batches of forks.
Expand Down Expand Up @@ -207,6 +209,15 @@ func (executor *batchExecutor) ComputeBatch(context *BatchExecutionContext) (*Co
}

executor.populateHeader(&copyBatch, allReceipts(txReceipts, ccReceipts))
if failForEmptyBatch &&
len(txReceipts) == 0 &&
len(ccReceipts) == 0 &&
len(transactionsToProcess) == 0 &&
len(crossChainTransactions) == 0 &&
len(messages) == 0 &&
len(transfers) == 0 {
return nil, ErrNoTransactionsToProcess
}

// the logs and receipts produced by the EVM have the wrong hash which must be adjusted
for _, receipt := range txReceipts {
Expand Down Expand Up @@ -250,7 +261,7 @@ func (executor *batchExecutor) ExecuteBatch(batch *core.Batch) (types.Receipts,
SequencerNo: batch.Header.SequencerOrderNo,
Creator: batch.Header.Coinbase,
BaseFee: batch.Header.BaseFee,
})
}, false) // this execution is not used when first producing a batch, we never want to fail for empty batches
if err != nil {
return nil, fmt.Errorf("failed computing batch %s. Cause: %w", batch.Hash(), err)
}
Expand Down
3 changes: 2 additions & 1 deletion go/enclave/components/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ type BatchExecutor interface {
// ComputeBatch - a more primitive ExecuteBatch
// Call with same BatchContext should always produce identical extBatch - idempotent
// Should be safe to call in parallel
ComputeBatch(*BatchExecutionContext) (*ComputedBatch, error)
// failForEmptyBatch bool is used to skip batch production
ComputeBatch(batchContext *BatchExecutionContext, failForEmptyBatch bool) (*ComputedBatch, error)

// ExecuteBatch - executes the transactions and xchain messages, returns the receipts, and updates the stateDB
ExecuteBatch(*core.Batch) (types.Receipts, error)
Expand Down
2 changes: 1 addition & 1 deletion go/enclave/components/rollup_compression.go
Original file line number Diff line number Diff line change
Expand Up @@ -550,7 +550,7 @@ func (rc *RollupCompression) computeBatch(
ChainConfig: rc.chainConfig,
SequencerNo: SequencerNo,
BaseFee: big.NewInt(0).Set(BaseFee),
})
}, false)
}

func transformToByteArray(reorgs []*common.BatchHeader) ([][]byte, error) {
Expand Down
4 changes: 2 additions & 2 deletions go/enclave/enclave.go
Original file line number Diff line number Diff line change
Expand Up @@ -581,7 +581,7 @@ func (e *enclaveImpl) SubmitBatch(extBatch *common.ExtBatch) common.SystemError
return nil
}

func (e *enclaveImpl) CreateBatch() common.SystemError {
func (e *enclaveImpl) CreateBatch(skipBatchIfEmpty bool) common.SystemError {
defer core.LogMethodDuration(e.logger, measure.NewStopwatch(), "CreateBatch call ended")
if e.stopControl.IsStopping() {
return responses.ToInternalError(fmt.Errorf("requested CreateBatch with the enclave stopping"))
Expand All @@ -590,7 +590,7 @@ func (e *enclaveImpl) CreateBatch() common.SystemError {
e.mainMutex.Lock()
defer e.mainMutex.Unlock()

err := e.Sequencer().CreateBatch()
err := e.Sequencer().CreateBatch(skipBatchIfEmpty)
if err != nil {
return responses.ToInternalError(err)
}
Expand Down
2 changes: 1 addition & 1 deletion go/enclave/nodetype/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ type NodeType interface {

type Sequencer interface {
// CreateBatch - creates a new head batch for the latest known L1 head block.
CreateBatch() error
CreateBatch(skipBatchIfEmpty bool) error

// CreateRollup - creates a new rollup from the latest recorded rollup in the head l1 chain
// and adds as many batches to it as possible.
Expand Down
21 changes: 13 additions & 8 deletions go/enclave/nodetype/sequencer.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func NewSequencer(
}
}

func (s *sequencer) CreateBatch() error {
func (s *sequencer) CreateBatch(skipBatchIfEmpty bool) error {
hasGenesis, err := s.batchRegistry.HasGenesisBatch()
if err != nil {
return fmt.Errorf("unknown genesis batch state. Cause: %w", err)
Expand All @@ -114,7 +114,7 @@ func (s *sequencer) CreateBatch() error {
return s.initGenesis(l1HeadBlock)
}

return s.createNewHeadBatch(l1HeadBlock)
return s.createNewHeadBatch(l1HeadBlock, skipBatchIfEmpty)
}

// TODO - This is iffy, the producer commits the stateDB. The producer
Expand Down Expand Up @@ -149,7 +149,7 @@ func (s *sequencer) initGenesis(block *common.L1Block) error {
return nil
}

func (s *sequencer) createNewHeadBatch(l1HeadBlock *common.L1Block) error {
func (s *sequencer) createNewHeadBatch(l1HeadBlock *common.L1Block, skipBatchIfEmpty bool) error {
headBatchSeq := s.batchRegistry.HeadBatchSeq()
if headBatchSeq == nil {
headBatchSeq = big.NewInt(int64(common.L2GenesisSeqNo))
Expand Down Expand Up @@ -186,7 +186,12 @@ func (s *sequencer) createNewHeadBatch(l1HeadBlock *common.L1Block) error {
}

// todo - time is set only here; take from l1 block?
if _, err := s.produceBatch(sequencerNo.Add(sequencerNo, big.NewInt(1)), l1HeadBlock.Hash(), headBatch.Hash(), transactions, uint64(time.Now().Unix())); err != nil {
if _, err := s.produceBatch(sequencerNo.Add(sequencerNo, big.NewInt(1)), l1HeadBlock.Hash(), headBatch.Hash(), transactions, uint64(time.Now().Unix()), skipBatchIfEmpty); err != nil {
if errors.Is(err, components.ErrNoTransactionsToProcess) {
// skip batch production when there are no transactions to process
s.logger.Info("Skipping batch production, no transactions to execute")
return nil
}
return fmt.Errorf(" failed producing batch. Cause: %w", err)
}

Expand All @@ -197,7 +202,7 @@ func (s *sequencer) createNewHeadBatch(l1HeadBlock *common.L1Block) error {
return nil
}

func (s *sequencer) produceBatch(sequencerNo *big.Int, l1Hash common.L1BlockHash, headBatch common.L2BatchHash, transactions common.L2Transactions, batchTime uint64) (*core.Batch, error) {
func (s *sequencer) produceBatch(sequencerNo *big.Int, l1Hash common.L1BlockHash, headBatch common.L2BatchHash, transactions common.L2Transactions, batchTime uint64, failForEmptyBatch bool) (*core.Batch, error) {
cb, err := s.batchProducer.ComputeBatch(&components.BatchExecutionContext{
BlockPtr: l1Hash,
ParentPtr: headBatch,
Expand All @@ -207,7 +212,7 @@ func (s *sequencer) produceBatch(sequencerNo *big.Int, l1Hash common.L1BlockHash
BaseFee: s.settings.BaseFee,
ChainConfig: s.chainConfig,
SequencerNo: sequencerNo,
})
}, failForEmptyBatch)
if err != nil {
return nil, fmt.Errorf("failed computing batch. Cause: %w", err)
}
Expand Down Expand Up @@ -317,8 +322,8 @@ func (s *sequencer) duplicateBatches(l1Head *types.Block, nonCanonicalL1Path []c
return fmt.Errorf("could not fetch sequencer no. Cause %w", err)
}
sequencerNo = sequencerNo.Add(sequencerNo, big.NewInt(1))
// create the duplicate and store/broadcast it
b, err := s.produceBatch(sequencerNo, l1Head.ParentHash(), currentHead, orphanBatch.Transactions, orphanBatch.Header.Time)
// create the duplicate and store/broadcast it, recreate batch even if it was empty
b, err := s.produceBatch(sequencerNo, l1Head.ParentHash(), currentHead, orphanBatch.Transactions, orphanBatch.Header.Time, false)
if err != nil {
return fmt.Errorf("could not produce batch. Cause %w", err)
}
Expand Down
4 changes: 2 additions & 2 deletions go/enclave/rpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,8 +277,8 @@ func (s *RPCServer) CreateRollup(_ context.Context, req *generated.CreateRollupR
}, nil
}

func (s *RPCServer) CreateBatch(_ context.Context, _ *generated.CreateBatchRequest) (*generated.CreateBatchResponse, error) {
sysError := s.enclave.CreateBatch()
func (s *RPCServer) CreateBatch(_ context.Context, r *generated.CreateBatchRequest) (*generated.CreateBatchResponse, error) {
sysError := s.enclave.CreateBatch(r.SkipIfEmpty)
if sysError != nil {
s.logger.Error("Error creating batch", log.ErrKey, sysError)
}
Expand Down
12 changes: 11 additions & 1 deletion go/host/container/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ type HostConfigToml struct {
LevelDBPath string
DebugNamespaceEnabled bool
BatchInterval string
MaxBatchInterval string
RollupInterval string
IsInboundP2PDisabled bool
L1BlockTime int
Expand Down Expand Up @@ -88,6 +89,7 @@ func ParseConfig() (*config.HostInputConfig, error) {
levelDBPath := flag.String(levelDBPathName, cfg.LevelDBPath, flagUsageMap[levelDBPathName])
debugNamespaceEnabled := flag.Bool(debugNamespaceEnabledName, cfg.DebugNamespaceEnabled, flagUsageMap[debugNamespaceEnabledName])
batchInterval := flag.String(batchIntervalName, cfg.BatchInterval.String(), flagUsageMap[batchIntervalName])
maxBatchInterval := flag.String(maxBatchIntervalName, cfg.MaxBatchInterval.String(), flagUsageMap[maxBatchIntervalName])
rollupInterval := flag.String(rollupIntervalName, cfg.RollupInterval.String(), flagUsageMap[rollupIntervalName])
isInboundP2PDisabled := flag.Bool(isInboundP2PDisabledName, cfg.IsInboundP2PDisabled, flagUsageMap[isInboundP2PDisabledName])
maxRollupSize := flag.Uint64(maxRollupSizeFlagName, cfg.MaxRollupSize, flagUsageMap[maxRollupSizeFlagName])
Expand Down Expand Up @@ -136,6 +138,10 @@ func ParseConfig() (*config.HostInputConfig, error) {
if err != nil {
return nil, err
}
cfg.MaxBatchInterval, err = time.ParseDuration(*maxBatchInterval)
if err != nil {
return nil, err
}
cfg.RollupInterval, err = time.ParseDuration(*rollupInterval)
if err != nil {
return nil, err
Expand Down Expand Up @@ -164,13 +170,16 @@ func fileBasedConfig(configPath string) (*config.HostInputConfig, error) {
return &config.HostInputConfig{}, fmt.Errorf("unrecognised node type '%s'", tomlConfig.NodeType)
}

batchInterval, rollupInterval := 1*time.Second, 5*time.Second
batchInterval, maxBatchInterval, rollupInterval := 1*time.Second, 1*time.Second, 5*time.Second
if interval, err := time.ParseDuration(tomlConfig.BatchInterval); err == nil {
batchInterval = interval
}
if interval, err := time.ParseDuration(tomlConfig.RollupInterval); err == nil {
rollupInterval = interval
}
if interval, err := time.ParseDuration(tomlConfig.MaxBatchInterval); err == nil {
maxBatchInterval = interval
}

return &config.HostInputConfig{
IsGenesis: tomlConfig.IsGenesis,
Expand Down Expand Up @@ -202,6 +211,7 @@ func fileBasedConfig(configPath string) (*config.HostInputConfig, error) {
UseInMemoryDB: tomlConfig.UseInMemoryDB,
LevelDBPath: tomlConfig.LevelDBPath,
BatchInterval: batchInterval,
MaxBatchInterval: maxBatchInterval,
RollupInterval: rollupInterval,
IsInboundP2PDisabled: tomlConfig.IsInboundP2PDisabled,
L1BlockTime: time.Duration(tomlConfig.L1BlockTime) * time.Second,
Expand Down
2 changes: 2 additions & 0 deletions go/host/container/cli_flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ const (
levelDBPathName = "levelDBPath"
debugNamespaceEnabledName = "debugNamespaceEnabled"
batchIntervalName = "batchInterval"
maxBatchIntervalName = "maxBatchInterval"
rollupIntervalName = "rollupInterval"
isInboundP2PDisabledName = "isInboundP2PDisabled"
maxRollupSizeFlagName = "maxRollupSize"
Expand Down Expand Up @@ -71,6 +72,7 @@ func getFlagUsageMap() map[string]string {
levelDBPathName: "Filepath for the levelDB persistence dir (can be empty if a throwaway file in /tmp/ is acceptable or if using InMemory DB)",
debugNamespaceEnabledName: "Whether the debug names is enabled",
batchIntervalName: "Duration between each batch. Can be put down as 1.0s",
maxBatchIntervalName: "Max interval between each batch, if greater than batchInterval then some empty batches will be skipped. Can be put down as 1.0s",
rollupIntervalName: "Duration between each rollup. Can be put down as 1.0s",
isInboundP2PDisabledName: "Whether inbound p2p is enabled",
maxRollupSizeFlagName: "Max size of a rollup",
Expand Down
35 changes: 21 additions & 14 deletions go/host/enclave/guardian.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,23 +68,26 @@ type Guardian struct {

hostInterrupter *stopcontrol.StopControl // host hostInterrupter so we can stop quickly

logger gethlog.Logger
logger gethlog.Logger
maxBatchInterval time.Duration
lastBatchCreated time.Time
}

func NewGuardian(cfg *config.HostConfig, hostData host.Identity, serviceLocator guardianServiceLocator, enclaveClient common.Enclave, db *db.DB, interrupter *stopcontrol.StopControl, logger gethlog.Logger) *Guardian {
return &Guardian{
hostData: hostData,
state: NewStateTracker(logger),
enclaveClient: enclaveClient,
sl: serviceLocator,
batchInterval: cfg.BatchInterval,
rollupInterval: cfg.RollupInterval,
l1StartHash: cfg.L1StartHash,
maxRollupSize: cfg.MaxRollupSize,
blockTime: cfg.L1BlockTime,
db: db,
hostInterrupter: interrupter,
logger: logger,
hostData: hostData,
state: NewStateTracker(logger),
enclaveClient: enclaveClient,
sl: serviceLocator,
batchInterval: cfg.BatchInterval,
maxBatchInterval: cfg.MaxBatchInterval,
rollupInterval: cfg.RollupInterval,
l1StartHash: cfg.L1StartHash,
maxRollupSize: cfg.MaxRollupSize,
blockTime: cfg.L1BlockTime,
db: db,
hostInterrupter: interrupter,
logger: logger,
}
}

Expand Down Expand Up @@ -508,7 +511,10 @@ func (g *Guardian) periodicBatchProduction() {
continue
}
g.logger.Debug("Create batch")
err := g.enclaveClient.CreateBatch()
// if maxBatchInterval is set higher than batchInterval then we are happy to skip creating batches when there is no data
// (up to a maximum time of maxBatchInterval)
skipBatchIfEmpty := g.maxBatchInterval > g.batchInterval && time.Since(g.lastBatchCreated) < g.maxBatchInterval
err := g.enclaveClient.CreateBatch(skipBatchIfEmpty)
if err != nil {
g.logger.Error("Unable to produce batch", log.ErrKey, err)
}
Expand Down Expand Up @@ -608,6 +614,7 @@ func (g *Guardian) streamEnclaveData() {
}

if g.hostData.IsSequencer { // if we are the sequencer we need to broadcast this new batch to the network
g.lastBatchCreated = time.Now()
g.logger.Info("Batch produced. Sending to peers..", log.BatchHeightKey, resp.Batch.Header.Number, log.BatchHashKey, resp.Batch.Hash())

err = g.sl.P2P().BroadcastBatches([]*common.ExtBatch{resp.Batch})
Expand Down
Loading

0 comments on commit 51b0038

Please sign in to comment.