Skip to content

Commit

Permalink
Synchronization rework.
Browse files Browse the repository at this point in the history
  • Loading branch information
StefanIliev545 committed Jun 24, 2024
1 parent eb3a24c commit c7c491d
Show file tree
Hide file tree
Showing 10 changed files with 284 additions and 56 deletions.
36 changes: 34 additions & 2 deletions contracts/generated/ManagementContract/ManagementContract.go

Large diffs are not rendered by default.

10 changes: 10 additions & 0 deletions contracts/src/management/ManagementContract.sol
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,19 @@ contract ManagementContract is Initializable, OwnableUpgradeable {
return GetRollupByHash(hash);
}

function GetUniqueForkID(uint256 number) view public returns(bytes32, Structs.MetaRollup memory) {
(bool success, Structs.MetaRollup memory rollup) = GetRollupByNumber(number);
if (!success) {
return (0x0, rollup);
}

return (rollups.toUniqueForkID[number], rollup);
}

function AppendRollup(Structs.MetaRollup calldata _r) internal {
rollups.byHash[_r.Hash] = _r;
rollups.byOrder[rollups.nextFreeSequenceNumber] = _r.Hash;
rollups.toUniqueForkID[rollups.nextFreeSequenceNumber] = keccak256(abi.encode(_r.Hash, blockhash(block.number-1)));
rollups.nextFreeSequenceNumber++;

if (_r.LastSequenceNumber > lastBatchSeqNo) {
Expand Down
1 change: 1 addition & 0 deletions contracts/src/management/Structs.sol
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ interface Structs {
struct RollupStorage {
mapping(bytes32=>MetaRollup) byHash;
mapping(uint256=>bytes32) byOrder;
mapping(uint256=>bytes32) toUniqueForkID;
uint256 nextFreeSequenceNumber;
}

Expand Down
3 changes: 3 additions & 0 deletions go/common/errutil/errors_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ var (
ErrAncestorBatchNotFound = errors.New("parent for batch not found")
ErrCrossChainBundleRepublished = errors.New("root already added to the message bus")
ErrCrossChainBundleNoBatches = errors.New("no batches for cross chain bundle")
ErrNoNextRollup = errors.New("no next rollup")
ErrRollupForkMismatch = errors.New("rollup fork mismatch")
ErrNoBundleToPublish = errors.New("no bundle to publish")
)

// BlockRejectError is used as a standard format for error response from enclave for block submission errors
Expand Down
3 changes: 2 additions & 1 deletion go/common/host/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ const (
EnclaveServiceName = "enclaves"
LogSubscriptionServiceName = "log-subs"
FilterAPIServiceName = "filter-api"
CrossChainServiceName = "cross-chain"
)

// The host has a number of services that encapsulate the various responsibilities of the host.
Expand Down Expand Up @@ -119,7 +120,7 @@ type L1Publisher interface {
ResyncImportantContracts() error

// GetBundleRangeFromManagementContract returns the range of batches for which to build a bundle
GetBundleRangeFromManagementContract() (*big.Int, *big.Int, error)
GetBundleRangeFromManagementContract(lastRollupNumber *big.Int, lastRollupUID gethcommon.Hash) (*gethcommon.Hash, *big.Int, *big.Int, error)
}

// L2BatchRepository provides an interface for the host to request L2 batch data (live-streaming and historical)
Expand Down
37 changes: 5 additions & 32 deletions go/host/enclave/guardian.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type guardianServiceLocator interface {
L1Repo() host.L1BlockRepository
L2Repo() host.L2BatchRepository
LogSubs() host.LogSubscriptionManager
CrossChainMachine() l1.CrossChainStateMachine
}

// Guardian is a host service which monitors an enclave, it's responsibilities include:
Expand Down Expand Up @@ -634,46 +635,18 @@ func (g *Guardian) periodicBundleSubmission() {

bundleSubmissionTicker := time.NewTicker(interval)

fromSequenceNumber, _, err := g.sl.L1Publisher().GetBundleRangeFromManagementContract()
if err != nil {
g.logger.Error(`Unable to get bundle range from management contract and initialize cross chain publishing`, log.ErrKey, err)
return
}

for {
select {
case <-bundleSubmissionTicker.C:
from, to, err := g.sl.L1Publisher().GetBundleRangeFromManagementContract()
err := g.sl.CrossChainMachine().Synchronize()
if err != nil {
g.logger.Error("Unable to get bundle range from management contract", log.ErrKey, err)
continue
}

if from.Uint64() > fromSequenceNumber.Uint64() {
fromSequenceNumber.Set(from)
}

bundle, err := g.enclaveClient.ExportCrossChainData(context.Background(), fromSequenceNumber.Uint64(), to.Uint64())
if err != nil {
if !errors.Is(err, errutil.ErrCrossChainBundleNoBatches) {
g.logger.Error("Unable to export cross chain bundle from enclave", log.ErrKey, err)
}
if errors.Is(err, context.DeadlineExceeded) {
g.logger.Error(`Cross chain bundle export timed out.`, log.ErrKey, err)
return // stop the process - if we are timing out we are not going to catch up
}
continue
}

if len(bundle.CrossChainRootHashes) == 0 {
g.logger.Debug("No cross chain data to submit")
fromSequenceNumber.SetUint64(to.Uint64() + 1)
g.logger.Error("Failed to synchronize cross chain state machine", log.ErrKey, err)
continue
}

err = g.sl.L1Publisher().PublishCrossChainBundle(bundle)
err = g.sl.CrossChainMachine().PublishNextBundle()
if err != nil {
g.logger.Error("Unable to publish cross chain bundle", log.ErrKey, err)
g.logger.Error("Failed to publish next bundle", log.ErrKey, err)
continue
}
case <-g.hostInterrupter.Done():
Expand Down
3 changes: 3 additions & 0 deletions go/host/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,10 +114,13 @@ func NewHost(config *config.HostConfig, hostServices *ServicesRegistry, p2p host
retryIntervalForL1Receipt,
hostStorage,
)

hostServices.RegisterService(hostcommon.L1PublisherName, l1Publisher)
hostServices.RegisterService(hostcommon.L2BatchRepositoryName, l2Repo)
hostServices.RegisterService(hostcommon.EnclaveServiceName, enclService)
hostServices.RegisterService(hostcommon.LogSubscriptionServiceName, subsService)
l1StateMachine := l1.NewCrossChainStateMachine(l1Publisher, mgmtContractLib, ethClient, hostServices.Enclaves().GetEnclaveClient(), logger, host.stopControl)
hostServices.RegisterService(hostcommon.CrossChainServiceName, l1StateMachine)

var prof *profiler.Profiler
if config.ProfilerEnabled {
Expand Down
45 changes: 24 additions & 21 deletions go/host/l1/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,43 +99,46 @@ func (p *Publisher) Start() error {
return nil
}

func (p *Publisher) GetBundleRangeFromManagementContract() (*big.Int, *big.Int, error) {
func (p *Publisher) GetBundleRangeFromManagementContract(lastRollupNumber *big.Int, lastRollupUID gethcommon.Hash) (*gethcommon.Hash, *big.Int, *big.Int, error) {
if p.mgmtContractLib.IsMock() {
return nil, nil, fmt.Errorf("bundle publishing unavailable for mocked environments")
return nil, nil, nil, fmt.Errorf("bundle publishing unavailable for mocked environments")
}

managementCtr, err := ManagementContract.NewManagementContract(*p.mgmtContractLib.GetContractAddr(), p.ethClient.EthClient())
if err != nil {
p.logger.Error("Unable to instantiate management contract client")
return nil, nil, err
return nil, nil, nil, err
}

lastBatchHash, err := managementCtr.LastBatchHash(&bind.CallOpts{})
hashBytes, rollup, err := managementCtr.GetUniqueForkID(&bind.CallOpts{}, lastRollupNumber)
if err != nil {
p.logger.Error("Unable to fetch last batch hash from management contract", log.ErrKey, err)
return nil, nil, err
p.logger.Error("Unable to get unique fork ID from management contract")
return nil, nil, nil, err
}

var fromSeqNo *big.Int
if lastBatchHash == [32]byte{} {
fromSeqNo = big.NewInt(0)
} else {
batch, err := p.storage.FetchBatch(lastBatchHash)
if err != nil {
p.logger.Error("Unable to fetch last batch from host db", log.ErrKey, err)
return nil, nil, err
}
fromSeqNo = batch.SeqNo()
fromSeqNo = batch.SeqNo().Add(fromSeqNo, big.NewInt(1))
rollupUid := gethcommon.BytesToHash(hashBytes[:])
if rollupUid != lastRollupUID {
return nil, nil, nil, errutil.ErrRollupForkMismatch
}

lastBatchRolledUpSeqNo, err := managementCtr.LastBatchSeqNo(&bind.CallOpts{})
fromSeqNo := big.NewInt(0)
if lastRollupNumber.Cmp(big.NewInt(0)) != 0 {
fromSeqNo = big.NewInt(0).SetUint64(rollup.LastSequenceNumber.Uint64() + 1)
}

nextRollupNumber := big.NewInt(0).SetUint64(lastRollupNumber.Uint64() + 1)
nextHashBytes, nextRollup, err := managementCtr.GetUniqueForkID(&bind.CallOpts{}, nextRollupNumber)
if err != nil {
p.logger.Error("Unable to fetch last batch seq no from management contract", log.ErrKey, err)
return nil, nil, err
p.logger.Error("Unable to get unique fork ID from management contract")
return nil, nil, nil, err
}

nextRollupUID := gethcommon.BytesToHash(nextHashBytes[:])
if nextRollupUID.Big().Cmp(gethcommon.Big0) == 0 {
return nil, nil, nil, errutil.ErrNoNextRollup
}

return fromSeqNo, lastBatchRolledUpSeqNo, nil
return &nextRollupUID, fromSeqNo, nextRollup.LastSequenceNumber, nil
}

func (p *Publisher) Stop() error {
Expand Down
197 changes: 197 additions & 0 deletions go/host/l1/statemachine.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
package l1

import (
"context"
"errors"
"math/big"

"github.com/ethereum/go-ethereum/accounts/abi/bind"
gethcommon "github.com/ethereum/go-ethereum/common"
gethlog "github.com/ethereum/go-ethereum/log"
"github.com/ten-protocol/go-ten/contracts/generated/ManagementContract"
"github.com/ten-protocol/go-ten/go/common"
"github.com/ten-protocol/go-ten/go/common/errutil"
"github.com/ten-protocol/go-ten/go/common/host"
"github.com/ten-protocol/go-ten/go/common/stopcontrol"
"github.com/ten-protocol/go-ten/go/ethadapter"
"github.com/ten-protocol/go-ten/go/ethadapter/mgmtcontractlib"
)

type ForkUniqueID = gethcommon.Hash
type RollupNumber = uint64

type CrossChainStateMachine interface {
GetRollupData(number RollupNumber) (RollupInfo, error)
Synchronize() error
PublishNextBundle() error
host.Service
}

type crossChainStateMachine struct {
latestRollup RollupInfo
rollupHistory map[RollupNumber]RollupInfo
currentRollup RollupNumber

enclaveClient common.Enclave
publisher host.L1Publisher
ethClient ethadapter.EthClient
mgmtContractLib mgmtcontractlib.MgmtContractLib // Library to handle Management Contract lib operations
logger gethlog.Logger
hostStopper *stopcontrol.StopControl
}

type RollupInfo struct {
Hash gethcommon.Hash
ForkUID ForkUniqueID
Number RollupNumber
}

func NewCrossChainStateMachine(
publisher host.L1Publisher,
mgmtContractLib mgmtcontractlib.MgmtContractLib,
ethClient ethadapter.EthClient,
enclaveClient common.Enclave,
logger gethlog.Logger,
hostStopper *stopcontrol.StopControl,
) CrossChainStateMachine {
return &crossChainStateMachine{
latestRollup: RollupInfo{
Hash: gethcommon.Hash{},
ForkUID: gethcommon.Hash{},
Number: 0,
},
rollupHistory: make(map[RollupNumber]RollupInfo),
currentRollup: 0,
publisher: publisher,
ethClient: ethClient,
mgmtContractLib: mgmtContractLib,
enclaveClient: enclaveClient,
logger: logger,
hostStopper: hostStopper,
}
}

func (c *crossChainStateMachine) Start() error {
return nil
}
func (c *crossChainStateMachine) Stop() error {
return nil
}
func (c *crossChainStateMachine) HealthStatus(context.Context) host.HealthStatus {
errMsg := ""
if c.hostStopper.IsStopping() {
errMsg = "not running"
}
return &host.BasicErrHealthStatus{ErrMsg: errMsg}

Check failure on line 86 in go/host/l1/statemachine.go

View workflow job for this annotation

GitHub Actions / lint

unnecessary trailing newline (whitespace)
}

func (c *crossChainStateMachine) GetRollupData(number RollupNumber) (RollupInfo, error) {
if number == c.latestRollup.Number {
return c.latestRollup, nil
} else if number > c.latestRollup.Number {
return RollupInfo{}, errutil.ErrNotFound
} else {
return c.rollupHistory[number], nil
}
}

func (c *crossChainStateMachine) PublishNextBundle() error {
// If all bundles for the rollups have been published, there is nothing to do.
if c.currentRollup >= c.latestRollup.Number {
return nil
}

// Get the bundle range from the management contract
nextForkUID, begin, end, err := c.publisher.GetBundleRangeFromManagementContract(big.NewInt(0).SetUint64(c.currentRollup), c.latestRollup.ForkUID)
if err != nil {
return err
}

data, err := c.GetRollupData(c.currentRollup + 1)
if err != nil {
return err
}
if data.ForkUID != *nextForkUID {
return errutil.ErrRollupForkMismatch
}

bundle, err := c.enclaveClient.ExportCrossChainData(context.Background(), begin.Uint64(), end.Uint64())
if err != nil {
return err
}

err = c.publisher.PublishCrossChainBundle(bundle)
if err != nil {
return err
}

// Move the current rollup to the next rollup
c.currentRollup++

return nil

Check failure on line 133 in go/host/l1/statemachine.go

View workflow job for this annotation

GitHub Actions / lint

unnecessary trailing newline (whitespace)
}

// Synchronize - checks if there are any new rollups or forks and moves the tracking needle to the latest common ancestor.
func (c *crossChainStateMachine) Synchronize() error {

Check failure on line 137 in go/host/l1/statemachine.go

View workflow job for this annotation

GitHub Actions / lint

unnecessary leading newline (whitespace)

forkUID, _, _, err := c.publisher.GetBundleRangeFromManagementContract(big.NewInt(0).SetUint64(c.latestRollup.Number), c.latestRollup.ForkUID)
if err != nil {
if errors.Is(err, errutil.ErrNoNextRollup) {
c.logger.Debug("No new rollup or fork found")
return nil
}

if errors.Is(err, errutil.ErrRollupForkMismatch) {
return c.RevertToLatestKnownCommonAncestorRollup()
}

c.logger.Error("Failed to get bundle range from management contract", "error", err)
return err
}

c.rollupHistory[c.latestRollup.Number] = c.latestRollup
c.latestRollup = RollupInfo{
Hash: gethcommon.Hash{},
ForkUID: *forkUID,
Number: c.latestRollup.Number + 1,
}

return nil
}

func (c *crossChainStateMachine) RevertToLatestKnownCommonAncestorRollup() error {

Check failure on line 164 in go/host/l1/statemachine.go

View workflow job for this annotation

GitHub Actions / lint

unnecessary leading newline (whitespace)

managementContract, err := ManagementContract.NewManagementContract(*c.mgmtContractLib.GetContractAddr(), c.ethClient.EthClient())
if err != nil {
return err
}

hashBytes, _, err := managementContract.GetUniqueForkID(&bind.CallOpts{}, big.NewInt(0).SetUint64(c.latestRollup.Number))
if err != nil {
return err
}

var forkHash gethcommon.Hash
forkHash = gethcommon.BytesToHash(hashBytes[:])

for forkHash != c.latestRollup.ForkUID {
// Revert to previous rollup; No need to wipe the map as the synchronization reinserts the latest rollup
c.latestRollup = c.rollupHistory[c.latestRollup.Number-1] //go to previous rollup

hashBytes, _, err = managementContract.GetUniqueForkID(&bind.CallOpts{}, big.NewInt(0).SetUint64(c.latestRollup.Number))
if err != nil {
return err
}

forkHash = gethcommon.BytesToHash(hashBytes[:])
}

// Rollback current rollup if it was dumped due to a fork.
if c.currentRollup > c.latestRollup.Number {
c.currentRollup = c.latestRollup.Number
}

return nil
}
Loading

0 comments on commit c7c491d

Please sign in to comment.