Skip to content

Commit

Permalink
Add Enclave block and Batch health checks (#1727)
Browse files Browse the repository at this point in the history
* Add Enclave block and Batch health checks

* lint

* fix healthchecks on sims

* lint

* pr comments
  • Loading branch information
otherview authored Dec 29, 2023
1 parent f159fb8 commit b2e41d3
Show file tree
Hide file tree
Showing 8 changed files with 116 additions and 13 deletions.
35 changes: 35 additions & 0 deletions go/common/async/timestamp.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package async

import (
"sync"
"time"
)

// Timestamp is a thread safe timestamp
type Timestamp struct {
lastTimestamp time.Time
mutex sync.RWMutex
}

func NewAsyncTimestamp(lastTimestamp time.Time) *Timestamp {
return &Timestamp{
lastTimestamp: lastTimestamp,
mutex: sync.RWMutex{},
}
}

// Mark sets the timestamp with the current time
func (at *Timestamp) Mark() {
at.mutex.Lock()
defer at.mutex.Unlock()
at.lastTimestamp = time.Now()
}

// LastTimestamp returns the last set timestamp
func (at *Timestamp) LastTimestamp() time.Time {
at.mutex.RLock()
defer at.mutex.RUnlock()

newTimestamp := at.lastTimestamp
return newTimestamp
}
28 changes: 23 additions & 5 deletions go/enclave/components/batch_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@ import (
"fmt"
"math/big"
"sync"
"time"

"github.com/ethereum/go-ethereum/core/types"
"github.com/ten-protocol/go-ten/go/enclave/storage"

"github.com/ethereum/go-ethereum/core/state"
gethlog "github.com/ethereum/go-ethereum/log"
gethrpc "github.com/ethereum/go-ethereum/rpc"
"github.com/ten-protocol/go-ten/go/common/async"
"github.com/ten-protocol/go-ten/go/common/errutil"
"github.com/ten-protocol/go-ten/go/common/log"
"github.com/ten-protocol/go-ten/go/common/measure"
Expand All @@ -24,8 +26,10 @@ type batchRegistry struct {
logger gethlog.Logger
headBatchSeq *big.Int // keep track of the last executed batch to optimise db access

batchesCallback func(*core.Batch, types.Receipts)
callbackMutex sync.RWMutex
batchesCallback func(*core.Batch, types.Receipts)
callbackMutex sync.RWMutex
healthTimeout time.Duration
lastExecutedBatch *async.Timestamp
}

func NewBatchRegistry(storage storage.Storage, logger gethlog.Logger) BatchRegistry {
Expand All @@ -42,9 +46,11 @@ func NewBatchRegistry(storage storage.Storage, logger gethlog.Logger) BatchRegis
headBatchSeq = headBatch.SeqNo()
}
return &batchRegistry{
storage: storage,
headBatchSeq: headBatchSeq,
logger: logger,
storage: storage,
headBatchSeq: headBatchSeq,
logger: logger,
healthTimeout: time.Minute,
lastExecutedBatch: async.NewAsyncTimestamp(time.Now().Add(-time.Minute)),
}
}

Expand Down Expand Up @@ -75,6 +81,8 @@ func (br *batchRegistry) OnBatchExecuted(batch *core.Batch, receipts types.Recei
if br.batchesCallback != nil {
br.batchesCallback(batch, receipts)
}

br.lastExecutedBatch.Mark()
}

func (br *batchRegistry) HasGenesisBatch() (bool, error) {
Expand Down Expand Up @@ -193,3 +201,13 @@ func (br *batchRegistry) GetBatchAtHeight(height gethrpc.BlockNumber) (*core.Bat
}
return batch, nil
}

// HealthCheck checks if the last executed batch was more than healthTimeout ago
func (br *batchRegistry) HealthCheck() (bool, error) {
lastExecutedBatchTime := br.lastExecutedBatch.LastTimestamp()
if time.Now().After(lastExecutedBatchTime.Add(br.healthTimeout)) {
return false, fmt.Errorf("last executed batch was %s ago", time.Since(lastExecutedBatchTime))
}

return true, nil
}
20 changes: 18 additions & 2 deletions go/enclave/components/block_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ package components
import (
"errors"
"fmt"
"time"

"github.com/ten-protocol/go-ten/go/common/async"
"github.com/ten-protocol/go-ten/go/enclave/core"

"github.com/ten-protocol/go-ten/go/enclave/gas"
"github.com/ten-protocol/go-ten/go/enclave/storage"

Expand All @@ -27,7 +28,9 @@ type l1BlockProcessor struct {

// we store the l1 head to avoid expensive db access
// the host is responsible to always submitting the head l1 block
currentL1Head *common.L1BlockHash
currentL1Head *common.L1BlockHash
healthTimeout time.Duration
lastIngestedBlock *async.Timestamp
}

func NewBlockProcessor(storage storage.Storage, cc *crosschain.Processors, gasOracle gas.Oracle, logger gethlog.Logger) L1BlockProcessor {
Expand All @@ -48,6 +51,8 @@ func NewBlockProcessor(storage storage.Storage, cc *crosschain.Processors, gasOr
gasOracle: gasOracle,
crossChainProcessors: cc,
currentL1Head: l1BlockHash,
healthTimeout: time.Minute,
lastIngestedBlock: async.NewAsyncTimestamp(time.Now().Add(-time.Minute)),
}
}

Expand Down Expand Up @@ -77,9 +82,20 @@ func (bp *l1BlockProcessor) Process(br *common.BlockAndReceipts) (*BlockIngestio

h := br.Block.Hash()
bp.currentL1Head = &h
bp.lastIngestedBlock.Mark()
return ingestion, nil
}

// HealthCheck checks if the last ingested block was more than healthTimeout ago
func (bp *l1BlockProcessor) HealthCheck() (bool, error) {
lastIngestedBlockTime := bp.lastIngestedBlock.LastTimestamp()
if time.Now().After(lastIngestedBlockTime.Add(bp.healthTimeout)) {
return false, fmt.Errorf("last ingested block was %s ago", time.Since(lastIngestedBlockTime))
}

return true, nil
}

func (bp *l1BlockProcessor) tryAndInsertBlock(br *common.BlockAndReceipts) (*BlockIngestionType, error) {
block := br.Block

Expand Down
3 changes: 3 additions & 0 deletions go/enclave/components/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type L1BlockProcessor interface {
Process(br *common.BlockAndReceipts) (*BlockIngestionType, error)
GetHead() (*common.L1Block, error)
GetCrossChainContractAddress() *gethcommon.Address
HealthCheck() (bool, error)
}

// BatchExecutionContext - Contains all of the data that each batch depends on
Expand Down Expand Up @@ -102,6 +103,8 @@ type BatchRegistry interface {
HasGenesisBatch() (bool, error)

HeadBatchSeq() *big.Int

HealthCheck() (bool, error)
}

type RollupProducer interface {
Expand Down
18 changes: 16 additions & 2 deletions go/enclave/enclave.go
Original file line number Diff line number Diff line change
Expand Up @@ -1248,9 +1248,23 @@ func (e *enclaveImpl) HealthCheck() (bool, common.SystemError) {
e.logger.Info("HealthCheck failed for the enclave storage", log.ErrKey, err)
return false, nil
}

// todo (#1148) - enclave healthcheck operations
enclaveHealthy := true
return storageHealthy && enclaveHealthy, nil
l1blockHealthy, err := e.l1BlockProcessor.HealthCheck()
if err != nil {
// simplest iteration, log the error and just return that it's not healthy
e.logger.Info("HealthCheck failed for the l1 block processor", log.ErrKey, err)
return false, nil
}

l2batchHealthy, err := e.registry.HealthCheck()
if err != nil {
// simplest iteration, log the error and just return that it's not healthy
e.logger.Info("HealthCheck failed for the l2 batch registry", log.ErrKey, err)
return false, nil
}

return storageHealthy && l1blockHealthy && l2batchHealthy, nil
}

func (e *enclaveImpl) DebugTraceTransaction(txHash gethcommon.Hash, config *tracers.TraceConfig) (json.RawMessage, common.SystemError) {
Expand Down
8 changes: 6 additions & 2 deletions go/enclave/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,10 +266,14 @@ func (s *storageImpl) HealthCheck() (bool, error) {
defer s.logDuration("HealthCheck", measure.NewStopwatch())
headBatch, err := s.FetchHeadBatch()
if err != nil {
s.logger.Info("HealthCheck failed for enclave storage", log.ErrKey, err)
return false, err
}
return headBatch != nil, nil

if headBatch == nil {
return false, fmt.Errorf("head batch is nil")
}

return true, nil
}

func (s *storageImpl) FetchHeadBatchForBlock(blockHash common.L1BlockHash) (*core.Batch, error) {
Expand Down
5 changes: 5 additions & 0 deletions go/enclave/txpool/txpool_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@ func (m *mockBatchRegistry) HasGenesisBatch() (bool, error) {
panic("implement me")
}

func (m *mockBatchRegistry) HealthCheck() (bool, error) {
// TODO implement me
panic("implement me")
}

func (m *mockBatchRegistry) HeadBatchSeq() *big.Int {
return m.currentBatch.SeqNo()
}
Expand Down
12 changes: 10 additions & 2 deletions integration/simulation/simulation.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/ten-protocol/go-ten/go/common"
"github.com/ten-protocol/go-ten/go/common/errutil"
"github.com/ten-protocol/go-ten/go/common/log"
"github.com/ten-protocol/go-ten/go/common/retry"
"github.com/ten-protocol/go-ten/go/ethadapter"
"github.com/ten-protocol/go-ten/go/wallet"
"github.com/ten-protocol/go-ten/integration/common/testlog"
Expand Down Expand Up @@ -288,8 +289,15 @@ func (s *Simulation) prefundL1Accounts() {

func (s *Simulation) checkHealthStatus() {
for _, client := range s.RPCHandles.ObscuroClients {
if healthy, err := client.Health(); !healthy || err != nil {
panic("Client is not healthy: " + err.Error())
err := retry.Do(func() error {
healthy, err := client.Health()
if !healthy || err != nil {
return fmt.Errorf("client is not healthy: %w", err)
}
return nil
}, retry.NewTimeoutStrategy(30*time.Second, 100*time.Millisecond))
if err != nil {
panic(err)
}
}
}
Expand Down

0 comments on commit b2e41d3

Please sign in to comment.