From 4b2d0e93be561d62053c81f7cd3f8b7fb350e833 Mon Sep 17 00:00:00 2001 From: Pedro Gomes Date: Mon, 25 Sep 2023 14:48:53 +0100 Subject: [PATCH] =?UTF-8?q?Guardian=20now=20produces=20rollup=20based=20on?= =?UTF-8?q?=20interval=20time=20and=20available=20bat=E2=80=A6=20(#1536)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Guardian now produces rollup based on interval time and available batches size * tweaks * pr comments * PR Comments * ci fix --- go/config/host_config.go | 7 ++ go/enclave/components/batch_registry.go | 1 - go/host/container/cli.go | 3 + go/host/container/cli_flags.go | 2 + go/host/enclave/guardian.go | 89 ++++++++++++++++++----- go/node/docker_node.go | 1 + integration/simulation/devnetwork/node.go | 1 + 7 files changed, 83 insertions(+), 21 deletions(-) diff --git a/go/config/host_config.go b/go/config/host_config.go index fcafd2127d..d8383aad27 100644 --- a/go/config/host_config.go +++ b/go/config/host_config.go @@ -92,6 +92,9 @@ type HostInputConfig struct { // Whether inbound p2p is enabled or not IsInboundP2PDisabled bool + + // MaxRollupSize specifies the threshold size which the sequencer-host publishes a rollup + MaxRollupSize uint64 } // ToHostConfig returns a HostConfig given a HostInputConfig @@ -131,6 +134,7 @@ func (p HostInputConfig) ToHostConfig() *HostConfig { RollupInterval: p.RollupInterval, L1BlockTime: p.L1BlockTime, IsInboundP2PDisabled: p.IsInboundP2PDisabled, + MaxRollupSize: p.MaxRollupSize, } } @@ -156,6 +160,8 @@ type HostConfig struct { BatchInterval time.Duration // Min interval before creating the next rollup (only used by Sequencer nodes) RollupInterval time.Duration + // MaxRollupSize is the max size of the rollup + MaxRollupSize uint64 // The expected time between blocks on the L1 network L1BlockTime time.Duration @@ -255,5 +261,6 @@ func DefaultHostParsedConfig() *HostInputConfig { RollupInterval: 5 * time.Second, L1BlockTime: 15 * time.Second, IsInboundP2PDisabled: false, + MaxRollupSize: 1024 * 64, } } diff --git a/go/enclave/components/batch_registry.go b/go/enclave/components/batch_registry.go index c291394b00..148f98685c 100644 --- a/go/enclave/components/batch_registry.go +++ b/go/enclave/components/batch_registry.go @@ -9,7 +9,6 @@ import ( "github.com/obscuronet/go-obscuro/go/common" "github.com/ethereum/go-ethereum/core/types" - "github.com/obscuronet/go-obscuro/go/enclave/storage" "github.com/ethereum/go-ethereum/core/state" diff --git a/go/host/container/cli.go b/go/host/container/cli.go index 21bc1b12b6..63767d418e 100644 --- a/go/host/container/cli.go +++ b/go/host/container/cli.go @@ -50,6 +50,7 @@ type HostConfigToml struct { RollupInterval string IsInboundP2PDisabled bool L1BlockTime int + MaxRollupSize int } // ParseConfig returns a config.HostInputConfig based on either the file identified by the `config` flag, or the flags with @@ -89,6 +90,7 @@ func ParseConfig() (*config.HostInputConfig, error) { batchInterval := flag.String(batchIntervalName, cfg.BatchInterval.String(), flagUsageMap[batchIntervalName]) rollupInterval := flag.String(rollupIntervalName, cfg.RollupInterval.String(), flagUsageMap[rollupIntervalName]) isInboundP2PDisabled := flag.Bool(isInboundP2PDisabledName, cfg.IsInboundP2PDisabled, flagUsageMap[isInboundP2PDisabledName]) + maxRollupSize := flag.Uint64(maxRollupSizeFlagName, cfg.MaxRollupSize, flagUsageMap[maxRollupSizeFlagName]) flag.Parse() @@ -139,6 +141,7 @@ func ParseConfig() (*config.HostInputConfig, error) { return nil, err } cfg.IsInboundP2PDisabled = *isInboundP2PDisabled + cfg.MaxRollupSize = *maxRollupSize return cfg, nil } diff --git a/go/host/container/cli_flags.go b/go/host/container/cli_flags.go index 445d28124b..f3ac39e6a0 100644 --- a/go/host/container/cli_flags.go +++ b/go/host/container/cli_flags.go @@ -34,6 +34,7 @@ const ( batchIntervalName = "batchInterval" rollupIntervalName = "rollupInterval" isInboundP2PDisabledName = "isInboundP2PDisabled" + maxRollupSizeFlagName = "maxRollupSize" ) // Returns a map of the flag usages. @@ -72,5 +73,6 @@ func getFlagUsageMap() map[string]string { batchIntervalName: "Duration between each batch. Can be put down as 1.0s", rollupIntervalName: "Duration between each rollup. Can be put down as 1.0s", isInboundP2PDisabledName: "Whether inbound p2p is enabled", + maxRollupSizeFlagName: "Max size of a rollup", } } diff --git a/go/host/enclave/guardian.go b/go/host/enclave/guardian.go index 8deda441fd..83387ebdde 100644 --- a/go/host/enclave/guardian.go +++ b/go/host/enclave/guardian.go @@ -62,7 +62,9 @@ type Guardian struct { batchInterval time.Duration rollupInterval time.Duration + blockTime time.Duration l1StartHash gethcommon.Hash + maxRollupSize uint64 hostInterrupter *stopcontrol.StopControl // host hostInterrupter so we can stop quickly @@ -78,6 +80,8 @@ func NewGuardian(cfg *config.HostConfig, hostData host.Identity, serviceLocator batchInterval: cfg.BatchInterval, rollupInterval: cfg.RollupInterval, l1StartHash: cfg.L1StartHash, + maxRollupSize: cfg.MaxRollupSize, + blockTime: cfg.L1BlockTime, db: db, hostInterrupter: interrupter, logger: logger, @@ -518,42 +522,47 @@ func (g *Guardian) periodicBatchProduction() { func (g *Guardian) periodicRollupProduction() { defer g.logger.Info("Stopping rollup production") - interval := g.rollupInterval - if interval == 0 { - interval = 3 * time.Second - } - rollupTicker := time.NewTicker(interval) - // attempt to produce rollup every time the timer ticks until we are stopped/interrupted + // check rollup every l1 block time + rollupCheckTicker := time.NewTicker(g.blockTime) + lastSuccessfulRollup := time.Now() + for { - if g.hostInterrupter.IsStopping() { - rollupTicker.Stop() - return // stop periodic rollup production - } select { - case <-rollupTicker.C: + case <-rollupCheckTicker.C: if !g.state.IsUpToDate() { // if we're behind the L1, we don't want to produce rollups g.logger.Debug("skipping rollup production because L1 is not up to date", "state", g.state) continue } - lastBatchNo, err := g.sl.L1Publisher().FetchLatestSeqNo() + + fromBatch, err := g.getLatestBatchNo() if err != nil { g.logger.Warn("encountered error while trying to retrieve latest sequence number", log.ErrKey, err) continue } - fromBatch := lastBatchNo.Uint64() - if lastBatchNo.Uint64() > common.L2GenesisSeqNo { - fromBatch++ - } - producedRollup, err := g.enclaveClient.CreateRollup(fromBatch) + + availBatchesSumSize, err := g.calculateNonRolledupBatchesSize(fromBatch) if err != nil { - g.logger.Error("unable to produce rollup", log.ErrKey, err) - } else { + g.logger.Error("unable to GetBatchesAfterSize rollup", log.ErrKey, err) + } + + // produce and issue rollup when either: + // it has passed g.rollupInterval from last lastSuccessfulRollup + // or the size of accumulated batches is > g.maxRollupSize + if time.Since(lastSuccessfulRollup) > g.rollupInterval || availBatchesSumSize >= g.maxRollupSize { + producedRollup, err := g.enclaveClient.CreateRollup(fromBatch) + if err != nil { + g.logger.Error("unable to create rollup", "batchSeqNo", fromBatch) + continue + } + // this method waits until the receipt is received g.sl.L1Publisher().PublishRollup(producedRollup) + lastSuccessfulRollup = time.Now() } + case <-g.hostInterrupter.Done(): // interrupted - end periodic process - rollupTicker.Stop() + rollupCheckTicker.Stop() return } } @@ -608,3 +617,43 @@ func (g *Guardian) streamEnclaveData() { } } } + +func (g *Guardian) calculateNonRolledupBatchesSize(seqNo uint64) (uint64, error) { + var size uint64 + + if seqNo == 0 { // don't calculate for seqNo 0 batches + return 0, nil + } + + currentNo := seqNo + for { + batch, err := g.sl.L2Repo().FetchBatchBySeqNo(big.NewInt(int64(currentNo))) + if err != nil { + if errors.Is(err, errutil.ErrNotFound) { + break // no more batches + } + return 0, err + } + + bSize, err := batch.Size() + if err != nil { + return 0, err + } + size += uint64(bSize) + currentNo++ + } + + return size, nil +} + +func (g *Guardian) getLatestBatchNo() (uint64, error) { + lastBatchNo, err := g.sl.L1Publisher().FetchLatestSeqNo() + if err != nil { + return 0, err + } + fromBatch := lastBatchNo.Uint64() + if lastBatchNo.Uint64() > common.L2GenesisSeqNo { + fromBatch++ + } + return fromBatch, nil +} diff --git a/go/node/docker_node.go b/go/node/docker_node.go index 8a7230e584..4f1a6d296c 100644 --- a/go/node/docker_node.go +++ b/go/node/docker_node.go @@ -111,6 +111,7 @@ func (d *DockerNode) startHost() error { "-p2pBindAddress", fmt.Sprintf("0.0.0.0:%d", d.cfg.hostP2PPort), "-clientRPCPortHttp", fmt.Sprintf("%d", d.cfg.hostHTTPPort), "-clientRPCPortWs", fmt.Sprintf("%d", d.cfg.hostWSPort), + "-maxRollupSize=65536", // host persistence hardcoded to use /data dir within the container, this needs to be mounted fmt.Sprintf("-useInMemoryDB=%t", d.cfg.hostInMemDB), fmt.Sprintf("-debugNamespaceEnabled=%t", d.cfg.debugNamespaceEnabled), diff --git a/integration/simulation/devnetwork/node.go b/integration/simulation/devnetwork/node.go index a3baf9a164..771d63061f 100644 --- a/integration/simulation/devnetwork/node.go +++ b/integration/simulation/devnetwork/node.go @@ -132,6 +132,7 @@ func (n *InMemNodeOperator) createHostContainer() *hostcontainer.HostContainer { BatchInterval: n.config.BatchInterval, RollupInterval: n.config.RollupInterval, L1BlockTime: n.config.L1BlockTime, + MaxRollupSize: 1024 * 64, } hostLogger := testlog.Logger().New(log.NodeIDKey, n.l1Wallet.Address(), log.CmpKey, log.HostCmp)