Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Guardian now produces rollup based on interval time and available bat… #1536

Merged
merged 9 commits into from
Sep 25, 2023
4 changes: 2 additions & 2 deletions go/common/rpc/generated/enclave.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

162 changes: 63 additions & 99 deletions go/common/rpc/generated/enclave_grpc.pb.go

Large diffs are not rendered by default.

7 changes: 7 additions & 0 deletions go/config/host_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ type HostInputConfig struct {

// Whether inbound p2p is enabled or not
IsInboundP2PDisabled bool

// MaxRollupSize specifies the threshold size which the sequencer-host publishes a rollup
MaxRollupSize uint64
}

// ToHostConfig returns a HostConfig given a HostInputConfig
Expand Down Expand Up @@ -131,6 +134,7 @@ func (p HostInputConfig) ToHostConfig() *HostConfig {
RollupInterval: p.RollupInterval,
L1BlockTime: p.L1BlockTime,
IsInboundP2PDisabled: p.IsInboundP2PDisabled,
MaxRollupSize: p.MaxRollupSize,
}
}

Expand All @@ -156,6 +160,8 @@ type HostConfig struct {
BatchInterval time.Duration
// Min interval before creating the next rollup (only used by Sequencer nodes)
RollupInterval time.Duration
// MaxRollupSize is the max size of the rollup
MaxRollupSize uint64
// The expected time between blocks on the L1 network
L1BlockTime time.Duration

Expand Down Expand Up @@ -255,5 +261,6 @@ func DefaultHostParsedConfig() *HostInputConfig {
RollupInterval: 5 * time.Second,
L1BlockTime: 15 * time.Second,
IsInboundP2PDisabled: false,
MaxRollupSize: 1024 * 64,
otherview marked this conversation as resolved.
Show resolved Hide resolved
}
}
1 change: 0 additions & 1 deletion go/enclave/components/batch_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"sync"

"github.com/ethereum/go-ethereum/core/types"

"github.com/obscuronet/go-obscuro/go/enclave/storage"

"github.com/ethereum/go-ethereum/core/state"
Expand Down
2 changes: 2 additions & 0 deletions go/host/container/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ func ParseConfig() (*config.HostInputConfig, error) {
batchInterval := flag.String(batchIntervalName, cfg.BatchInterval.String(), flagUsageMap[batchIntervalName])
rollupInterval := flag.String(rollupIntervalName, cfg.RollupInterval.String(), flagUsageMap[rollupIntervalName])
isInboundP2PDisabled := flag.Bool(isInboundP2PDisabledName, cfg.IsInboundP2PDisabled, flagUsageMap[isInboundP2PDisabledName])
maxRollupSize := flag.Uint64(maxRollupSizeFlagName, cfg.MaxRollupSize, flagUsageMap[maxRollupSizeFlagName])

flag.Parse()

Expand Down Expand Up @@ -139,6 +140,7 @@ func ParseConfig() (*config.HostInputConfig, error) {
return nil, err
}
cfg.IsInboundP2PDisabled = *isInboundP2PDisabled
cfg.MaxRollupSize = *maxRollupSize

return cfg, nil
}
Expand Down
2 changes: 2 additions & 0 deletions go/host/container/cli_flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ const (
batchIntervalName = "batchInterval"
rollupIntervalName = "rollupInterval"
isInboundP2PDisabledName = "isInboundP2PDisabled"
maxRollupSizeFlagName = "maxRollupSize"
)

// Returns a map of the flag usages.
Expand Down Expand Up @@ -72,5 +73,6 @@ func getFlagUsageMap() map[string]string {
batchIntervalName: "Duration between each batch. Can be put down as 1.0s",
rollupIntervalName: "Duration between each rollup. Can be put down as 1.0s",
isInboundP2PDisabledName: "Whether inbound p2p is enabled",
maxRollupSizeFlagName: "Max size of a rollup",
}
}
109 changes: 92 additions & 17 deletions go/host/enclave/guardian.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ type Guardian struct {
batchInterval time.Duration
rollupInterval time.Duration
l1StartHash gethcommon.Hash
maxRollupSize uint64

hostInterrupter *stopcontrol.StopControl // host hostInterrupter so we can stop quickly

Expand All @@ -78,6 +79,7 @@ func NewGuardian(cfg *config.HostConfig, hostData host.Identity, serviceLocator
batchInterval: cfg.BatchInterval,
rollupInterval: cfg.RollupInterval,
l1StartHash: cfg.L1StartHash,
maxRollupSize: cfg.MaxRollupSize,
db: db,
hostInterrupter: interrupter,
logger: logger,
Expand Down Expand Up @@ -515,45 +517,81 @@ func (g *Guardian) periodicBatchProduction() {
}
}

func (g *Guardian) periodicRollupProduction() {
func (g *Guardian) periodicRollupProduction() { //nolint:gocognit
defer g.logger.Info("Stopping rollup production")

interval := g.rollupInterval
if interval == 0 {
interval = 3 * time.Second
// rollup at least at every g.rollupInterval
rollupInterval := g.rollupInterval
if rollupInterval == 0 {
g.logger.Crit("invalid rollup interval 0")
}
rollupTicker := time.NewTicker(interval)
// attempt to produce rollup every time the timer ticks until we are stopped/interrupted
rollupIntervalBasedTicker := time.NewTicker(rollupInterval)
otherview marked this conversation as resolved.
Show resolved Hide resolved

// every 5sec check if it should create a new rollup
rollupSizeCheckTicker := time.NewTicker(5 * time.Second)

for {
if g.hostInterrupter.IsStopping() {
rollupTicker.Stop()
return // stop periodic rollup production
}
select {
case <-rollupTicker.C:
case <-rollupIntervalBasedTicker.C:
if !g.state.IsUpToDate() {
// if we're behind the L1, we don't want to produce rollups
g.logger.Debug("skipping rollup production because L1 is not up to date", "state", g.state)
continue
}
lastBatchNo, err := g.sl.L1Publisher().FetchLatestSeqNo()

fromBatch, err := g.getLastestBatchNo()
if err != nil {
g.logger.Warn("encountered error while trying to retrieve latest sequence number", log.ErrKey, err)
continue
}
fromBatch := lastBatchNo.Uint64()
if lastBatchNo.Uint64() > common.L2GenesisSeqNo {
fromBatch++
}

producedRollup, err := g.enclaveClient.CreateRollup(fromBatch)
if err != nil {
g.logger.Error("unable to produce rollup", log.ErrKey, err)
} else {
fmt.Println(time.Now().String(), " - PRODUCING rollup from config'd rollupInterval - lastBatchNo: ", fromBatch)
g.sl.L1Publisher().PublishRollup(producedRollup)
fmt.Println(time.Now().String(), " - FINISHED PRODUCED rollup from config'd rollupInterval ")
otherview marked this conversation as resolved.
Show resolved Hide resolved
}

case <-rollupSizeCheckTicker.C:
if !g.state.IsUpToDate() {
// if we're behind the L1, we don't want to produce rollups
g.logger.Debug("skipping rollup production because L1 is not up to date", "state", g.state)
continue
}

fromBatch, err := g.getLastestBatchNo()
if err != nil {
g.logger.Warn("encountered error while trying to retrieve latest sequence number", log.ErrKey, err)
continue
}

fmt.Println(time.Now().String(), " - 5sec rollup creation CHECK - lastBatchNo: ", fromBatch)
availBatchesSumSize, err := g.getBatchesAfterSize(fromBatch)
if err != nil {
g.logger.Error("unable to GetBatchesAfterSize rollup", log.ErrKey, err)
continue
}

fmt.Println(time.Now().String(), " - Sum of bytes of Batches not yet rolled up: - ", availBatchesSumSize, " bytes")
if availBatchesSumSize >= g.maxRollupSize {
producedRollup, err := g.enclaveClient.CreateRollup(fromBatch)
otherview marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
g.logger.Error("unable to produce rollup", log.ErrKey, err)
} else {
fmt.Println(time.Now().String(), " - PRODUCING rollup from 5sec check ")
g.sl.L1Publisher().PublishRollup(producedRollup)
}
rollupSizeCheckTicker.Reset(5 * time.Second)
otherview marked this conversation as resolved.
Show resolved Hide resolved
rollupIntervalBasedTicker.Reset(rollupInterval)
fmt.Println(time.Now().String(), " - FINISHED PRODUCING rollup from 5sec check ")
}

case <-g.hostInterrupter.Done():
// interrupted - end periodic process
rollupTicker.Stop()
rollupIntervalBasedTicker.Stop()
rollupSizeCheckTicker.Stop()
return
}
}
Expand Down Expand Up @@ -608,3 +646,40 @@ func (g *Guardian) streamEnclaveData() {
}
}
}

func (g *Guardian) getBatchesAfterSize(seqNo uint64) (uint64, error) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Find this method name a bit confusing. Maybe could be something like getSizeOfBatchesAfter() (maybe since instead of after)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I found that confusing as well

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Third here, hate the name. WDYT ofcalculateUnrolledupBatchesSize ?

var size uint64

currentNo := seqNo
for {
batch, err := g.sl.L2Repo().FetchBatchBySeqNo(big.NewInt(int64(currentNo)))
if err != nil {
if errors.Is(err, errutil.ErrNotFound) {
break // no more batches
}
return 0, err
}

bSize, err := batch.Size()
if err != nil {
return 0, err
}
size += uint64(bSize)
otherview marked this conversation as resolved.
Show resolved Hide resolved
currentNo++
}

return size, nil
}

func (g *Guardian) getLastestBatchNo() (uint64, error) {
lastBatchNo, err := g.sl.L1Publisher().FetchLatestSeqNo()
if err != nil {
fmt.Println(err)
return 0, err
}
fromBatch := lastBatchNo.Uint64()
if lastBatchNo.Uint64() > common.L2GenesisSeqNo {
fromBatch++
}
return fromBatch, nil
}
1 change: 1 addition & 0 deletions integration/simulation/devnetwork/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ func (n *InMemNodeOperator) createHostContainer() *hostcontainer.HostContainer {
BatchInterval: n.config.BatchInterval,
RollupInterval: n.config.RollupInterval,
L1BlockTime: n.config.L1BlockTime,
MaxRollupSize: 1024 * 64,
}

hostLogger := testlog.Logger().New(log.NodeIDKey, n.l1Wallet.Address(), log.CmpKey, log.HostCmp)
Expand Down
Loading