diff --git a/go/common/async/timestamp.go b/go/common/async/timestamp.go new file mode 100644 index 0000000000..a52f547e26 --- /dev/null +++ b/go/common/async/timestamp.go @@ -0,0 +1,35 @@ +package async + +import ( + "sync" + "time" +) + +// Timestamp is a thread safe timestamp +type Timestamp struct { + lastTimestamp time.Time + mutex sync.RWMutex +} + +func NewAsyncTimestamp(lastTimestamp time.Time) *Timestamp { + return &Timestamp{ + lastTimestamp: lastTimestamp, + mutex: sync.RWMutex{}, + } +} + +// Mark sets the timestamp with the current time +func (at *Timestamp) Mark() { + at.mutex.Lock() + defer at.mutex.Unlock() + at.lastTimestamp = time.Now() +} + +// LastTimestamp returns the last set timestamp +func (at *Timestamp) LastTimestamp() time.Time { + at.mutex.RLock() + defer at.mutex.RUnlock() + + newTimestamp := at.lastTimestamp + return newTimestamp +} diff --git a/go/enclave/components/batch_registry.go b/go/enclave/components/batch_registry.go index 7aecdef54d..94eb0aede5 100644 --- a/go/enclave/components/batch_registry.go +++ b/go/enclave/components/batch_registry.go @@ -5,6 +5,7 @@ import ( "fmt" "math/big" "sync" + "time" "github.com/ethereum/go-ethereum/core/types" "github.com/ten-protocol/go-ten/go/enclave/storage" @@ -12,6 +13,7 @@ import ( "github.com/ethereum/go-ethereum/core/state" gethlog "github.com/ethereum/go-ethereum/log" gethrpc "github.com/ethereum/go-ethereum/rpc" + "github.com/ten-protocol/go-ten/go/common/async" "github.com/ten-protocol/go-ten/go/common/errutil" "github.com/ten-protocol/go-ten/go/common/log" "github.com/ten-protocol/go-ten/go/common/measure" @@ -24,8 +26,10 @@ type batchRegistry struct { logger gethlog.Logger headBatchSeq *big.Int // keep track of the last executed batch to optimise db access - batchesCallback func(*core.Batch, types.Receipts) - callbackMutex sync.RWMutex + batchesCallback func(*core.Batch, types.Receipts) + callbackMutex sync.RWMutex + healthTimeout time.Duration + lastExecutedBatch *async.Timestamp } func NewBatchRegistry(storage storage.Storage, logger gethlog.Logger) BatchRegistry { @@ -42,9 +46,11 @@ func NewBatchRegistry(storage storage.Storage, logger gethlog.Logger) BatchRegis headBatchSeq = headBatch.SeqNo() } return &batchRegistry{ - storage: storage, - headBatchSeq: headBatchSeq, - logger: logger, + storage: storage, + headBatchSeq: headBatchSeq, + logger: logger, + healthTimeout: time.Minute, + lastExecutedBatch: async.NewAsyncTimestamp(time.Now().Add(-time.Minute)), } } @@ -75,6 +81,8 @@ func (br *batchRegistry) OnBatchExecuted(batch *core.Batch, receipts types.Recei if br.batchesCallback != nil { br.batchesCallback(batch, receipts) } + + br.lastExecutedBatch.Mark() } func (br *batchRegistry) HasGenesisBatch() (bool, error) { @@ -193,3 +201,13 @@ func (br *batchRegistry) GetBatchAtHeight(height gethrpc.BlockNumber) (*core.Bat } return batch, nil } + +// HealthCheck checks if the last executed batch was more than healthTimeout ago +func (br *batchRegistry) HealthCheck() (bool, error) { + lastExecutedBatchTime := br.lastExecutedBatch.LastTimestamp() + if time.Now().After(lastExecutedBatchTime.Add(br.healthTimeout)) { + return false, fmt.Errorf("last executed batch was %s ago", time.Since(lastExecutedBatchTime)) + } + + return true, nil +} diff --git a/go/enclave/components/block_processor.go b/go/enclave/components/block_processor.go index 13ea6edbcd..a80b893004 100644 --- a/go/enclave/components/block_processor.go +++ b/go/enclave/components/block_processor.go @@ -3,9 +3,10 @@ package components import ( "errors" "fmt" + "time" + "github.com/ten-protocol/go-ten/go/common/async" "github.com/ten-protocol/go-ten/go/enclave/core" - "github.com/ten-protocol/go-ten/go/enclave/gas" "github.com/ten-protocol/go-ten/go/enclave/storage" @@ -27,7 +28,9 @@ type l1BlockProcessor struct { // we store the l1 head to avoid expensive db access // the host is responsible to always submitting the head l1 block - currentL1Head *common.L1BlockHash + currentL1Head *common.L1BlockHash + healthTimeout time.Duration + lastIngestedBlock *async.Timestamp } func NewBlockProcessor(storage storage.Storage, cc *crosschain.Processors, gasOracle gas.Oracle, logger gethlog.Logger) L1BlockProcessor { @@ -48,6 +51,8 @@ func NewBlockProcessor(storage storage.Storage, cc *crosschain.Processors, gasOr gasOracle: gasOracle, crossChainProcessors: cc, currentL1Head: l1BlockHash, + healthTimeout: time.Minute, + lastIngestedBlock: async.NewAsyncTimestamp(time.Now().Add(-time.Minute)), } } @@ -77,9 +82,20 @@ func (bp *l1BlockProcessor) Process(br *common.BlockAndReceipts) (*BlockIngestio h := br.Block.Hash() bp.currentL1Head = &h + bp.lastIngestedBlock.Mark() return ingestion, nil } +// HealthCheck checks if the last ingested block was more than healthTimeout ago +func (bp *l1BlockProcessor) HealthCheck() (bool, error) { + lastIngestedBlockTime := bp.lastIngestedBlock.LastTimestamp() + if time.Now().After(lastIngestedBlockTime.Add(bp.healthTimeout)) { + return false, fmt.Errorf("last ingested block was %s ago", time.Since(lastIngestedBlockTime)) + } + + return true, nil +} + func (bp *l1BlockProcessor) tryAndInsertBlock(br *common.BlockAndReceipts) (*BlockIngestionType, error) { block := br.Block diff --git a/go/enclave/components/interfaces.go b/go/enclave/components/interfaces.go index 6ed7e7a750..68918849ab 100644 --- a/go/enclave/components/interfaces.go +++ b/go/enclave/components/interfaces.go @@ -37,6 +37,7 @@ type L1BlockProcessor interface { Process(br *common.BlockAndReceipts) (*BlockIngestionType, error) GetHead() (*common.L1Block, error) GetCrossChainContractAddress() *gethcommon.Address + HealthCheck() (bool, error) } // BatchExecutionContext - Contains all of the data that each batch depends on @@ -102,6 +103,8 @@ type BatchRegistry interface { HasGenesisBatch() (bool, error) HeadBatchSeq() *big.Int + + HealthCheck() (bool, error) } type RollupProducer interface { diff --git a/go/enclave/enclave.go b/go/enclave/enclave.go index 303ac77fa3..21c5d79cc7 100644 --- a/go/enclave/enclave.go +++ b/go/enclave/enclave.go @@ -1248,9 +1248,23 @@ func (e *enclaveImpl) HealthCheck() (bool, common.SystemError) { e.logger.Info("HealthCheck failed for the enclave storage", log.ErrKey, err) return false, nil } + // todo (#1148) - enclave healthcheck operations - enclaveHealthy := true - return storageHealthy && enclaveHealthy, nil + l1blockHealthy, err := e.l1BlockProcessor.HealthCheck() + if err != nil { + // simplest iteration, log the error and just return that it's not healthy + e.logger.Info("HealthCheck failed for the l1 block processor", log.ErrKey, err) + return false, nil + } + + l2batchHealthy, err := e.registry.HealthCheck() + if err != nil { + // simplest iteration, log the error and just return that it's not healthy + e.logger.Info("HealthCheck failed for the l2 batch registry", log.ErrKey, err) + return false, nil + } + + return storageHealthy && l1blockHealthy && l2batchHealthy, nil } func (e *enclaveImpl) DebugTraceTransaction(txHash gethcommon.Hash, config *tracers.TraceConfig) (json.RawMessage, common.SystemError) { diff --git a/go/enclave/storage/storage.go b/go/enclave/storage/storage.go index f96aede4cf..d270f9263d 100644 --- a/go/enclave/storage/storage.go +++ b/go/enclave/storage/storage.go @@ -266,10 +266,14 @@ func (s *storageImpl) HealthCheck() (bool, error) { defer s.logDuration("HealthCheck", measure.NewStopwatch()) headBatch, err := s.FetchHeadBatch() if err != nil { - s.logger.Info("HealthCheck failed for enclave storage", log.ErrKey, err) return false, err } - return headBatch != nil, nil + + if headBatch == nil { + return false, fmt.Errorf("head batch is nil") + } + + return true, nil } func (s *storageImpl) FetchHeadBatchForBlock(blockHash common.L1BlockHash) (*core.Batch, error) { diff --git a/go/enclave/txpool/txpool_mock_test.go b/go/enclave/txpool/txpool_mock_test.go index b46fb7c9b6..81693bbba3 100644 --- a/go/enclave/txpool/txpool_mock_test.go +++ b/go/enclave/txpool/txpool_mock_test.go @@ -57,6 +57,11 @@ func (m *mockBatchRegistry) HasGenesisBatch() (bool, error) { panic("implement me") } +func (m *mockBatchRegistry) HealthCheck() (bool, error) { + // TODO implement me + panic("implement me") +} + func (m *mockBatchRegistry) HeadBatchSeq() *big.Int { return m.currentBatch.SeqNo() } diff --git a/integration/simulation/simulation.go b/integration/simulation/simulation.go index 4a3a814320..f39cf86520 100644 --- a/integration/simulation/simulation.go +++ b/integration/simulation/simulation.go @@ -16,6 +16,7 @@ import ( "github.com/ten-protocol/go-ten/go/common" "github.com/ten-protocol/go-ten/go/common/errutil" "github.com/ten-protocol/go-ten/go/common/log" + "github.com/ten-protocol/go-ten/go/common/retry" "github.com/ten-protocol/go-ten/go/ethadapter" "github.com/ten-protocol/go-ten/go/wallet" "github.com/ten-protocol/go-ten/integration/common/testlog" @@ -288,8 +289,15 @@ func (s *Simulation) prefundL1Accounts() { func (s *Simulation) checkHealthStatus() { for _, client := range s.RPCHandles.ObscuroClients { - if healthy, err := client.Health(); !healthy || err != nil { - panic("Client is not healthy: " + err.Error()) + err := retry.Do(func() error { + healthy, err := client.Health() + if !healthy || err != nil { + return fmt.Errorf("client is not healthy: %w", err) + } + return nil + }, retry.NewTimeoutStrategy(30*time.Second, 100*time.Millisecond)) + if err != nil { + panic(err) } } }