From 7efcde43f88e9ccb78b6d06a1855ec61b7d0594a Mon Sep 17 00:00:00 2001 From: Tudor Malene Date: Wed, 3 Jul 2024 13:25:52 +0100 Subject: [PATCH] split up storage and improve caching --- go/common/cache_util.go | 39 --- go/common/gethencoding/geth_encoding.go | 33 +-- go/enclave/enclave.go | 5 +- go/enclave/storage/cache_service.go | 167 +++++++++++++ go/enclave/storage/events_storage.go | 230 +++++++++++++++++ go/enclave/storage/interfaces.go | 3 - go/enclave/storage/storage.go | 317 ++---------------------- 7 files changed, 431 insertions(+), 363 deletions(-) delete mode 100644 go/common/cache_util.go create mode 100644 go/enclave/storage/cache_service.go create mode 100644 go/enclave/storage/events_storage.go diff --git a/go/common/cache_util.go b/go/common/cache_util.go deleted file mode 100644 index b35cc9068b..0000000000 --- a/go/common/cache_util.go +++ /dev/null @@ -1,39 +0,0 @@ -package common - -import ( - "context" - - "github.com/eko/gocache/lib/v4/cache" - gethlog "github.com/ethereum/go-ethereum/log" - "github.com/ten-protocol/go-ten/go/common/log" -) - -// GetCachedValue - returns the cached value for the provided key. If the key is not found, then invoke the 'onFailed' function -// which returns the value, and cache it -func GetCachedValue[V any](ctx context.Context, cache *cache.Cache[*V], logger gethlog.Logger, key any, onCacheMiss func(any) (*V, error)) (*V, error) { - value, err := cache.Get(ctx, key) - if err != nil || value == nil { - // todo metrics for cache misses - v, err := onCacheMiss(key) - if err != nil { - return v, err - } - if v == nil { - logger.Crit("Returned a nil value from the onCacheMiss function. Should not happen.") - } - CacheValue(ctx, cache, logger, key, v) - return v, nil - } - - return value, err -} - -func CacheValue[V any](ctx context.Context, cache *cache.Cache[*V], logger gethlog.Logger, key any, v *V) { - if v == nil { - return - } - err := cache.Set(ctx, key, v) - if err != nil { - logger.Error("Could not store value in cache", log.ErrKey, err) - } -} diff --git a/go/common/gethencoding/geth_encoding.go b/go/common/gethencoding/geth_encoding.go index bac9685bfb..a82bdb1fbf 100644 --- a/go/common/gethencoding/geth_encoding.go +++ b/go/common/gethencoding/geth_encoding.go @@ -10,10 +10,6 @@ import ( "time" "unsafe" - "github.com/dgraph-io/ristretto" - "github.com/eko/gocache/lib/v4/cache" - ristretto_store "github.com/eko/gocache/store/ristretto/v4" - gethlog "github.com/ethereum/go-ethereum/log" "github.com/ten-protocol/go-ten/go/common/log" "github.com/ten-protocol/go-ten/go/enclave/storage" @@ -51,29 +47,16 @@ type EncodingService interface { } type gethEncodingServiceImpl struct { - // conversion is expensive. Cache the converted headers. The key is the hash of the batch. - gethHeaderCache *cache.Cache[*types.Header] - - storage storage.Storage - logger gethlog.Logger + storage storage.Storage + logger gethlog.Logger + cachingService *storage.CacheService } -func NewGethEncodingService(storage storage.Storage, logger gethlog.Logger) EncodingService { - // todo (tudor) figure out the best values - ristrettoCache, err := ristretto.NewCache(&ristretto.Config{ - NumCounters: 5000, // number of keys to track frequency of. - MaxCost: 500, // todo - this represents how many items. - BufferItems: 64, // number of keys per Get buffer. Todo - what is this - }) - if err != nil { - panic(err) - } - ristrettoStore := ristretto_store.NewRistretto(ristrettoCache) - +func NewGethEncodingService(storage storage.Storage, cachingService *storage.CacheService, logger gethlog.Logger) EncodingService { return &gethEncodingServiceImpl{ - gethHeaderCache: cache.New[*types.Header](ristrettoStore), - storage: storage, - logger: logger, + storage: storage, + logger: logger, + cachingService: cachingService, } } @@ -276,7 +259,7 @@ func ExtractEthCall(param interface{}) (*gethapi.TransactionArgs, error) { // Special care must be taken to maintain a valid chain of these converted headers. func (enc *gethEncodingServiceImpl) CreateEthHeaderForBatch(ctx context.Context, h *common.BatchHeader) (*types.Header, error) { // wrap in a caching layer - return common.GetCachedValue(ctx, enc.gethHeaderCache, enc.logger, h.Hash(), func(a any) (*types.Header, error) { + return enc.cachingService.ReadConvertedHeader(ctx, h.Hash(), func(a any) (*types.Header, error) { // deterministically calculate the private randomness that will be exposed to the EVM secret, err := enc.storage.FetchSecret(ctx) if err != nil { diff --git a/go/enclave/enclave.go b/go/enclave/enclave.go index a12e7a7028..ada7f2c0fb 100644 --- a/go/enclave/enclave.go +++ b/go/enclave/enclave.go @@ -114,8 +114,9 @@ func NewEnclave( } // Initialise the database + cachingService := storage.NewCacheService(logger) chainConfig := ethchainadapter.ChainParams(big.NewInt(config.ObscuroChainID)) - storage := storage.NewStorageFromConfig(config, chainConfig, logger) + storage := storage.NewStorageFromConfig(config, cachingService, chainConfig, logger) // Initialise the Ethereum "Blockchain" structure that will allow us to validate incoming blocks // todo (#1056) - valid block @@ -160,7 +161,7 @@ func NewEnclave( obscuroKey := crypto.GetObscuroKey(logger) - gethEncodingService := gethencoding.NewGethEncodingService(storage, logger) + gethEncodingService := gethencoding.NewGethEncodingService(storage, cachingService, logger) dataEncryptionService := crypto.NewDataEncryptionService(logger) dataCompressionService := compression.NewBrotliDataCompressionService() diff --git a/go/enclave/storage/cache_service.go b/go/enclave/storage/cache_service.go new file mode 100644 index 0000000000..8982434aa6 --- /dev/null +++ b/go/enclave/storage/cache_service.go @@ -0,0 +1,167 @@ +package storage + +import ( + "context" + "math/big" + + "github.com/eko/gocache/lib/v4/store" + + "github.com/ten-protocol/go-ten/go/enclave/core" + + "github.com/dgraph-io/ristretto" + "github.com/eko/gocache/lib/v4/cache" + ristretto_store "github.com/eko/gocache/store/ristretto/v4" + + gethcommon "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + gethlog "github.com/ethereum/go-ethereum/log" + "github.com/ten-protocol/go-ten/go/common" + "github.com/ten-protocol/go-ten/go/common/log" +) + +// approximate cost in bytes of the cached values +const ( + blockCost = 1024 + batchCost = 1024 + hashCost = 32 + idCost = 8 +) + +type CacheService struct { + // cache for the immutable blocks and batches. + // this avoids a trip to the database. + blockCache *cache.Cache[*types.Block] + + // stores batches using the sequence number as key + batchCacheBySeqNo *cache.Cache[*common.BatchHeader] + + // mapping between the hash and the sequence number + // note: to fetch a batch by hash will require 2 cache hits + seqCacheByHash *cache.Cache[*big.Int] + + // mapping between the height and the sequence number + // note: to fetch a batch by height will require 2 cache hits + seqCacheByHeight *cache.Cache[*big.Int] + + // batch hash - geth converted hash + convertedHashCache *cache.Cache[*gethcommon.Hash] + + // from address ( either eoa or contract) to the id of the db entry + eoaCache *cache.Cache[*uint64] + contractAddressCache *cache.Cache[*uint64] + + // from contract_address||event_sig to the event_type (id, isLifecycle) object + eventTypeCache *cache.Cache[*EventType] + + // store the converted ethereum header which is passed to the evm + convertedGethHeaderCache *cache.Cache[*types.Header] + + logger gethlog.Logger +} + +func NewCacheService(logger gethlog.Logger) *CacheService { + // todo (tudor) figure out the config + ristrettoCache, err := ristretto.NewCache(&ristretto.Config{ + NumCounters: 100_000_000, // 10 times the expected elements + MaxCost: 1024 * 1024 * 1024, // allocate 1GB + BufferItems: 64, // number of keys per Get buffer. + }) + if err != nil { + logger.Crit("Could not initialise ristretto cache", log.ErrKey, err) + } + ristrettoStore := ristretto_store.NewRistretto(ristrettoCache) + return &CacheService{ + blockCache: cache.New[*types.Block](ristrettoStore), + batchCacheBySeqNo: cache.New[*common.BatchHeader](ristrettoStore), + seqCacheByHash: cache.New[*big.Int](ristrettoStore), + seqCacheByHeight: cache.New[*big.Int](ristrettoStore), + convertedHashCache: cache.New[*gethcommon.Hash](ristrettoStore), + eoaCache: cache.New[*uint64](ristrettoStore), + contractAddressCache: cache.New[*uint64](ristrettoStore), + eventTypeCache: cache.New[*EventType](ristrettoStore), + convertedGethHeaderCache: cache.New[*types.Header](ristrettoStore), + logger: logger, + } +} + +func (cs *CacheService) CacheBlock(ctx context.Context, b *types.Block) { + cacheValue(ctx, cs.blockCache, cs.logger, b.Hash(), b, blockCost) +} + +func (cs *CacheService) CacheBatch(ctx context.Context, batch *core.Batch) { + cacheValue(ctx, cs.batchCacheBySeqNo, cs.logger, batch.SeqNo().Uint64(), batch.Header, batchCost) + cacheValue(ctx, cs.seqCacheByHash, cs.logger, batch.Hash(), batch.SeqNo(), idCost) + // note: the key is (height+1), because for some reason it doesn't like a key of 0 + // should always contain the canonical batch because the cache is overwritten by each new batch after a reorg + cacheValue(ctx, cs.seqCacheByHeight, cs.logger, batch.NumberU64()+1, batch.SeqNo(), idCost) +} + +func (cs *CacheService) ReadBlock(ctx context.Context, key gethcommon.Hash, onCacheMiss func(any) (*types.Block, error)) (*types.Block, error) { + return getCachedValue(ctx, cs.blockCache, cs.logger, key, blockCost, onCacheMiss) +} + +func (cs *CacheService) ReadBatchSeqByHash(ctx context.Context, hash common.L2BatchHash, onCacheMiss func(any) (*big.Int, error)) (*big.Int, error) { + return getCachedValue(ctx, cs.seqCacheByHash, cs.logger, hash, idCost, onCacheMiss) +} + +func (cs *CacheService) ReadBatchSeqByHeight(ctx context.Context, height uint64, onCacheMiss func(any) (*big.Int, error)) (*big.Int, error) { + // the key is (height+1), because for some reason it doesn't like a key of 0 + return getCachedValue(ctx, cs.seqCacheByHeight, cs.logger, height+1, idCost, onCacheMiss) +} + +func (cs *CacheService) ReadConvertedHash(ctx context.Context, hash common.L2BatchHash, onCacheMiss func(any) (*gethcommon.Hash, error)) (*gethcommon.Hash, error) { + return getCachedValue(ctx, cs.convertedHashCache, cs.logger, hash, hashCost, onCacheMiss) +} + +func (cs *CacheService) ReadBatch(ctx context.Context, seqNum uint64, onCacheMiss func(any) (*common.BatchHeader, error)) (*common.BatchHeader, error) { + return getCachedValue(ctx, cs.batchCacheBySeqNo, cs.logger, seqNum, batchCost, onCacheMiss) +} + +func (cs *CacheService) ReadEOA(ctx context.Context, addr gethcommon.Address, onCacheMiss func(any) (*uint64, error)) (*uint64, error) { + return getCachedValue(ctx, cs.eoaCache, cs.logger, addr, idCost, onCacheMiss) +} + +func (cs *CacheService) ReadContractAddr(ctx context.Context, addr gethcommon.Address, onCacheMiss func(any) (*uint64, error)) (*uint64, error) { + return getCachedValue(ctx, cs.contractAddressCache, cs.logger, addr, idCost, onCacheMiss) +} + +func (cs *CacheService) ReadEventType(ctx context.Context, contractAddress gethcommon.Address, eventSignature gethcommon.Hash, onCacheMiss func(any) (*EventType, error)) (*EventType, error) { + key := make([]byte, 0) + key = append(key, contractAddress.Bytes()...) + key = append(key, eventSignature.Bytes()...) + return getCachedValue(ctx, cs.eventTypeCache, cs.logger, key, idCost, onCacheMiss) +} + +func (cs *CacheService) ReadConvertedHeader(ctx context.Context, batchHash common.L2BatchHash, onCacheMiss func(any) (*types.Header, error)) (*types.Header, error) { + return getCachedValue(ctx, cs.convertedGethHeaderCache, cs.logger, batchHash, blockCost, onCacheMiss) +} + +// getCachedValue - returns the cached value for the provided key. If the key is not found, then invoke the 'onCacheMiss' function +// which returns the value, and cache it +func getCachedValue[V any](ctx context.Context, cache *cache.Cache[*V], logger gethlog.Logger, key any, cost int64, onCacheMiss func(any) (*V, error)) (*V, error) { + value, err := cache.Get(ctx, key) + if err != nil || value == nil { + // todo metrics for cache misses + v, err := onCacheMiss(key) + if err != nil { + return v, err + } + if v == nil { + logger.Crit("Returned a nil value from the onCacheMiss function. Should not happen.") + } + cacheValue(ctx, cache, logger, key, v, cost) + return v, nil + } + + return value, err +} + +func cacheValue[V any](ctx context.Context, cache *cache.Cache[*V], logger gethlog.Logger, key any, v *V, cost int64) { + if v == nil { + return + } + err := cache.Set(ctx, key, v, store.WithCost(cost)) + if err != nil { + logger.Error("Could not store value in cache", log.ErrKey, err) + } +} diff --git a/go/enclave/storage/events_storage.go b/go/enclave/storage/events_storage.go new file mode 100644 index 0000000000..20370589ff --- /dev/null +++ b/go/enclave/storage/events_storage.go @@ -0,0 +1,230 @@ +package storage + +import ( + "context" + "database/sql" + "errors" + "fmt" + + gethcommon "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + gethlog "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/rlp" + "github.com/ten-protocol/go-ten/go/common" + "github.com/ten-protocol/go-ten/go/common/errutil" + "github.com/ten-protocol/go-ten/go/common/measure" + "github.com/ten-protocol/go-ten/go/enclave/core" + "github.com/ten-protocol/go-ten/go/enclave/storage/enclavedb" +) + +// responsible for saving event logs +type eventsStorage struct { + cachingService *CacheService + logger gethlog.Logger +} + +func newEventsStorage(cachingService *CacheService, logger gethlog.Logger) *eventsStorage { + return &eventsStorage{cachingService: cachingService, logger: logger} +} + +func (es *eventsStorage) storeReceiptAndEventLogs(ctx context.Context, dbTX *sql.Tx, batch *common.BatchHeader, receipt *types.Receipt, createdContracts []*gethcommon.Address) error { + txId, senderId, err := enclavedb.ReadTransactionIdAndSender(ctx, dbTX, receipt.TxHash) + if err != nil && !errors.Is(err, errutil.ErrNotFound) { + return fmt.Errorf("could not get transaction id. Cause: %w", err) + } + + for _, createdContract := range createdContracts { + _, err = enclavedb.WriteContractAddress(ctx, dbTX, createdContract, *senderId) + if err != nil { + return fmt.Errorf("could not write contract address. cause %w", err) + } + } + + // Convert the receipt into its storage form and serialize + // this removes information that can be recreated + // todo - in a future iteration, this can be slimmed down further because we already store the logs separately + storageReceipt := (*types.ReceiptForStorage)(receipt) + receiptBytes, err := rlp.EncodeToBytes(storageReceipt) + if err != nil { + return fmt.Errorf("failed to encode block receipts. Cause: %w", err) + } + + execTxId, err := enclavedb.WriteReceipt(ctx, dbTX, batch.SequencerOrderNo.Uint64(), txId, receiptBytes) + if err != nil { + return fmt.Errorf("could not write receipt. Cause: %w", err) + } + + for _, l := range receipt.Logs { + err := es.storeEventLog(ctx, dbTX, execTxId, l) + if err != nil { + return fmt.Errorf("could not store log entry %v. Cause: %w", l, err) + } + } + return nil +} + +func (es *eventsStorage) storeEventLog(ctx context.Context, dbTX *sql.Tx, execTxId uint64, l *types.Log) error { + topicIds, isLifecycle, err := es.handleUserTopics(ctx, dbTX, l) + if err != nil { + return err + } + + eventTypeId, err := es.handleEventType(ctx, dbTX, l, isLifecycle) + if err != nil { + return err + } + + // normalize data + data := l.Data + if len(data) == 0 { + data = nil + } + err = enclavedb.WriteEventLog(ctx, dbTX, eventTypeId, topicIds, data, l.Index, execTxId) + if err != nil { + return fmt.Errorf("could not write event log. Cause: %w", err) + } + + return nil +} + +func (es *eventsStorage) handleEventType(ctx context.Context, dbTX *sql.Tx, l *types.Log, isLifecycle bool) (uint64, error) { + et, err := es.readEventType(ctx, dbTX, l.Address, l.Topics[0]) + if err != nil && !errors.Is(err, errutil.ErrNotFound) { + return 0, fmt.Errorf("could not read event type. Cause: %w", err) + } + if err == nil { + // in case we determined the current emitted event is not lifecycle, we must update the EventType + if !isLifecycle && et.isLifecycle { + err := enclavedb.UpdateEventTopicLifecycle(ctx, dbTX, et.id, isLifecycle) + if err != nil { + return 0, fmt.Errorf("could not update the event type. cause: %w", err) + } + } + return et.id, nil + } + + // the first time an event of this type is emitted we must store it + contractAddId, err := es.readContractAddress(ctx, dbTX, l.Address) + if err != nil { + // the contract was already stored when it was created + return 0, fmt.Errorf("could not read contract address. %s. Cause: %w", l.Address, err) + } + return enclavedb.WriteEventType(ctx, dbTX, contractAddId, l.Topics[0], isLifecycle) +} + +func (es *eventsStorage) handleUserTopics(ctx context.Context, dbTX *sql.Tx, l *types.Log) ([]*uint64, bool, error) { + topicIds := make([]*uint64, 3) + // iterate the topics containing user values + // reuse them if already inserted + // if not, discover if there is a relevant externally owned address + isLifecycle := true + for i := 1; i < len(l.Topics); i++ { + topic := l.Topics[i] + // first check if there is an entry already for this topic + eventTopicId, relAddressId, err := es.findEventTopic(ctx, dbTX, topic.Bytes()) + if err != nil && !errors.Is(err, errutil.ErrNotFound) { + return nil, false, fmt.Errorf("could not read the event topic. Cause: %w", err) + } + if errors.Is(err, errutil.ErrNotFound) { + // check whether the topic is an EOA + relAddressId, err = es.findRelevantAddress(ctx, dbTX, topic) + if err != nil && !errors.Is(err, errutil.ErrNotFound) { + return nil, false, fmt.Errorf("could not read relevant address. Cause %w", err) + } + eventTopicId, err = enclavedb.WriteEventTopic(ctx, dbTX, &topic, relAddressId) + if err != nil { + return nil, false, fmt.Errorf("could not write event topic. Cause: %w", err) + } + } + + if relAddressId != nil { + isLifecycle = false + } + topicIds[i-1] = &eventTopicId + } + return topicIds, isLifecycle, nil +} + +// Of the log's topics, returns those that are (potentially) user addresses. A topic is considered a user address if: +// - It has at least 12 leading zero bytes (since addresses are 20 bytes long, while hashes are 32) and at most 22 leading zero bytes +// - It is not a smart contract address +func (es *eventsStorage) findRelevantAddress(ctx context.Context, dbTX *sql.Tx, topic gethcommon.Hash) (*uint64, error) { + potentialAddr := common.ExtractPotentialAddress(topic) + if potentialAddr == nil { + return nil, errutil.ErrNotFound + } + + // first check whether there is already an entry in the EOA table + eoaID, err := es.readEOA(ctx, dbTX, *potentialAddr) + if err != nil && !errors.Is(err, errutil.ErrNotFound) { + return nil, err + } + if err == nil { + return eoaID, nil + } + + // if the address is a contract then it's clearly not an EOA + _, err = es.readContractAddress(ctx, dbTX, *potentialAddr) + if err != nil && !errors.Is(err, errutil.ErrNotFound) { + return nil, err + } + if err == nil { + return nil, errutil.ErrNotFound + } + + // when we reach this point, the value looks like an address, but we haven't yet seen it + // for the first iteration, we'll just assume it's an EOA + // we can make this smarter by passing in more information about the event + id, err := enclavedb.WriteEoa(ctx, dbTX, *potentialAddr) + if err != nil { + return nil, err + } + + return &id, nil +} + +func (es *eventsStorage) readEventType(ctx context.Context, dbTX *sql.Tx, contractAddress gethcommon.Address, eventSignature gethcommon.Hash) (*EventType, error) { + defer es.logDuration("ReadEventType", measure.NewStopwatch()) + + return es.cachingService.ReadEventType(ctx, contractAddress, eventSignature, func(v any) (*EventType, error) { + contractAddrId, err := enclavedb.ReadContractAddress(ctx, dbTX, contractAddress) + if err != nil { + return nil, err + } + id, isLifecycle, err := enclavedb.ReadEventType(ctx, dbTX, *contractAddrId, eventSignature) + if err != nil { + return nil, err + } + return &EventType{ + id: id, + isLifecycle: isLifecycle, + }, nil + }) +} + +func (es *eventsStorage) readContractAddress(ctx context.Context, dbTX *sql.Tx, addr gethcommon.Address) (*uint64, error) { + defer es.logDuration("readContractAddress", measure.NewStopwatch()) + return es.cachingService.ReadContractAddr(ctx, addr, func(v any) (*uint64, error) { + return enclavedb.ReadContractAddress(ctx, dbTX, addr) + }) +} + +func (es *eventsStorage) findEventTopic(ctx context.Context, dbTX *sql.Tx, topic []byte) (uint64, *uint64, error) { + defer es.logDuration("findEventTopic", measure.NewStopwatch()) + return enclavedb.ReadEventTopic(ctx, dbTX, topic) +} + +func (es *eventsStorage) readEOA(ctx context.Context, dbTX *sql.Tx, addr gethcommon.Address) (*uint64, error) { + defer es.logDuration("ReadEOA", measure.NewStopwatch()) + return es.cachingService.ReadEOA(ctx, addr, func(v any) (*uint64, error) { + id, err := enclavedb.ReadEoa(ctx, dbTX, addr) + if err != nil { + return nil, err + } + return &id, nil + }) +} + +func (es *eventsStorage) logDuration(method string, stopWatch *measure.Stopwatch) { + core.LogMethodDuration(es.logger, stopWatch, fmt.Sprintf("Storage::%s completed", method)) +} diff --git a/go/enclave/storage/interfaces.go b/go/enclave/storage/interfaces.go index 01b7f81586..fb7ac7b73a 100644 --- a/go/enclave/storage/interfaces.go +++ b/go/enclave/storage/interfaces.go @@ -150,9 +150,6 @@ type Storage interface { // StateDB - return the underlying state database StateDB() state.Database - ReadEOA(ctx context.Context, addr gethcommon.Address) (*uint64, error) - - ReadContractAddress(ctx context.Context, addr gethcommon.Address) (*uint64, error) ReadContractOwner(ctx context.Context, address gethcommon.Address) (*gethcommon.Address, error) } diff --git a/go/enclave/storage/storage.go b/go/enclave/storage/storage.go index e6aac396e1..244ffd7c5d 100644 --- a/go/enclave/storage/storage.go +++ b/go/enclave/storage/storage.go @@ -18,12 +18,8 @@ import ( "github.com/ethereum/go-ethereum/triedb" - "github.com/dgraph-io/ristretto" - "github.com/eko/gocache/lib/v4/cache" "github.com/ten-protocol/go-ten/go/common/measure" - ristretto_store "github.com/eko/gocache/store/ristretto/v4" - "github.com/ten-protocol/go-ten/go/config" "github.com/ten-protocol/go-ten/go/enclave/storage/enclavedb" @@ -50,40 +46,16 @@ const ( masterSeedCfg = "MASTER_SEED" ) -type eventType struct { +type EventType struct { id uint64 isLifecycle bool } // todo - this file needs splitting up based on concerns type storageImpl struct { - db enclavedb.EnclaveDB - - // cache for the immutable blocks and batches. - // this avoids a trip to the database. - blockCache *cache.Cache[*types.Block] - - // stores batches using the sequence number as key - batchCacheBySeqNo *cache.Cache[*common.BatchHeader] - - // mapping between the hash and the sequence number - // note: to fetch a batch by hash will require 2 cache hits - seqCacheByHash *cache.Cache[*big.Int] - - // mapping between the height and the sequence number - // note: to fetch a batch by height will require 2 cache hits - seqCacheByHeight *cache.Cache[*big.Int] - - // batch hash - geth converted hash - convertedHashCache *cache.Cache[*gethcommon.Hash] - - // from address ( either eoa or contract) to the id of the db entry - eoaCache *cache.Cache[*uint64] - contractAddressCache *cache.Cache[*uint64] - - // from contract_address||event_sig to the event_type (id, isLifecycle) object - eventTypeCache *cache.Cache[*eventType] - + db enclavedb.EnclaveDB + cachingService *CacheService + eventsStorage *eventsStorage cachedSharedSecret *crypto.SharedEnclaveSecret stateCache state.Database @@ -91,12 +63,12 @@ type storageImpl struct { logger gethlog.Logger } -func NewStorageFromConfig(config *config.EnclaveConfig, chainConfig *params.ChainConfig, logger gethlog.Logger) Storage { +func NewStorageFromConfig(config *config.EnclaveConfig, cachingService *CacheService, chainConfig *params.ChainConfig, logger gethlog.Logger) Storage { backingDB, err := CreateDBFromConfig(config, logger) if err != nil { logger.Crit("Failed to connect to backing database", log.ErrKey, err) } - return NewStorage(backingDB, chainConfig, logger) + return NewStorage(backingDB, cachingService, chainConfig, logger) } var defaultCacheConfig = &gethcore.CacheConfig{ @@ -116,35 +88,19 @@ var trieDBConfig = &triedb.Config{ }, } -func NewStorage(backingDB enclavedb.EnclaveDB, chainConfig *params.ChainConfig, logger gethlog.Logger) Storage { +func NewStorage(backingDB enclavedb.EnclaveDB, cachingService *CacheService, chainConfig *params.ChainConfig, logger gethlog.Logger) Storage { // Open trie database with provided config triedb := triedb.NewDatabase(backingDB, trieDBConfig) stateDB := state.NewDatabaseWithNodeDB(backingDB, triedb) - // todo (tudor) figure out the config - ristrettoCache, err := ristretto.NewCache(&ristretto.Config{ - NumCounters: 20_000, // 10*MaxCost - MaxCost: 2000, // - how many items to cache - BufferItems: 64, // number of keys per Get buffer. - }) - if err != nil { - logger.Crit("Could not initialise ristretto cache", log.ErrKey, err) - } - ristrettoStore := ristretto_store.NewRistretto(ristrettoCache) return &storageImpl{ - db: backingDB, - stateCache: stateDB, - chainConfig: chainConfig, - blockCache: cache.New[*types.Block](ristrettoStore), - batchCacheBySeqNo: cache.New[*common.BatchHeader](ristrettoStore), - seqCacheByHash: cache.New[*big.Int](ristrettoStore), - seqCacheByHeight: cache.New[*big.Int](ristrettoStore), - convertedHashCache: cache.New[*gethcommon.Hash](ristrettoStore), - eoaCache: cache.New[*uint64](ristrettoStore), - contractAddressCache: cache.New[*uint64](ristrettoStore), - eventTypeCache: cache.New[*eventType](ristrettoStore), - logger: logger, + db: backingDB, + stateCache: stateDB, + chainConfig: chainConfig, + cachingService: cachingService, + eventsStorage: newEventsStorage(cachingService, logger), + logger: logger, } } @@ -184,7 +140,7 @@ func (s *storageImpl) FetchBatch(ctx context.Context, hash common.L2BatchHash) ( } func (s *storageImpl) fetchSeqNoByHash(ctx context.Context, hash common.L2BatchHash) (*big.Int, error) { - seqNo, err := common.GetCachedValue(ctx, s.seqCacheByHash, s.logger, hash, func(v any) (*big.Int, error) { + seqNo, err := s.cachingService.ReadBatchSeqByHash(ctx, hash, func(v any) (*big.Int, error) { batch, err := enclavedb.ReadBatchHeaderByHash(ctx, s.db.GetSQLDB(), v.(common.L2BatchHash)) if err != nil { return nil, err @@ -201,7 +157,7 @@ func (s *storageImpl) FetchConvertedHash(ctx context.Context, hash common.L2Batc return gethcommon.Hash{}, err } - convertedHash, err := common.GetCachedValue(ctx, s.convertedHashCache, s.logger, hash, func(v any) (*gethcommon.Hash, error) { + convertedHash, err := s.cachingService.ReadConvertedHash(ctx, hash, func(v any) (*gethcommon.Hash, error) { ch, err := enclavedb.FetchConvertedBatchHash(ctx, s.db.GetSQLDB(), batch.SequencerOrderNo.Uint64()) if err != nil { return nil, err @@ -235,8 +191,7 @@ func (s *storageImpl) FetchBatchTransactionsBySeq(ctx context.Context, seqNo uin func (s *storageImpl) FetchBatchByHeight(ctx context.Context, height uint64) (*core.Batch, error) { defer s.logDuration("FetchBatchByHeight", measure.NewStopwatch()) - // the key is (height+1), because for some reason it doesn't like a key of 0 - seqNo, err := common.GetCachedValue(ctx, s.seqCacheByHeight, s.logger, height+1, func(h any) (*big.Int, error) { + seqNo, err := s.cachingService.ReadBatchSeqByHeight(ctx, height, func(h any) (*big.Int, error) { batch, err := enclavedb.ReadCanonicalBatchHeaderByHeight(ctx, s.db.GetSQLDB(), height) if err != nil { return nil, err @@ -322,14 +277,14 @@ func (s *storageImpl) StoreBlock(ctx context.Context, block *types.Block, chainF return fmt.Errorf("4. could not store block %s. Cause: %w", block.Hash(), err) } - common.CacheValue(ctx, s.blockCache, s.logger, block.Hash(), block) + s.cachingService.CacheBlock(ctx, block) return nil } func (s *storageImpl) FetchBlock(ctx context.Context, blockHash common.L1BlockHash) (*types.Block, error) { defer s.logDuration("FetchBlock", measure.NewStopwatch()) - return common.GetCachedValue(ctx, s.blockCache, s.logger, blockHash, func(hash any) (*types.Block, error) { + return s.cachingService.ReadBlock(ctx, blockHash, func(hash any) (*types.Block, error) { return enclavedb.FetchBlock(ctx, s.db.GetSQLDB(), hash.(common.L1BlockHash)) }) } @@ -511,9 +466,7 @@ func (s *storageImpl) StoreAttestedKey(ctx context.Context, aggregator gethcommo func (s *storageImpl) FetchBatchBySeqNo(ctx context.Context, seqNum uint64) (*core.Batch, error) { defer s.logDuration("FetchBatchBySeqNo", measure.NewStopwatch()) - h, err := common.GetCachedValue(ctx, s.batchCacheBySeqNo, s.logger, seqNum, func(seq any) (*common.BatchHeader, error) { - return enclavedb.ReadBatchHeaderBySeqNo(ctx, s.db.GetSQLDB(), seqNum) - }) + h, err := s.FetchBatchHeaderBySeqNo(ctx, seqNum) if err != nil { return nil, err } @@ -529,7 +482,7 @@ func (s *storageImpl) FetchBatchBySeqNo(ctx context.Context, seqNum uint64) (*co func (s *storageImpl) FetchBatchHeaderBySeqNo(ctx context.Context, seqNum uint64) (*common.BatchHeader, error) { defer s.logDuration("FetchBatchHeaderBySeqNo", measure.NewStopwatch()) - return common.GetCachedValue(ctx, s.batchCacheBySeqNo, s.logger, seqNum, func(seq any) (*common.BatchHeader, error) { + return s.cachingService.ReadBatch(ctx, seqNum, func(seq any) (*common.BatchHeader, error) { return enclavedb.ReadBatchHeaderBySeqNo(ctx, s.db.GetSQLDB(), seqNum) }) } @@ -608,11 +561,7 @@ func (s *storageImpl) StoreBatch(ctx context.Context, batch *core.Batch, convert return fmt.Errorf("could not commit batch %w", err) } - common.CacheValue(ctx, s.batchCacheBySeqNo, s.logger, batch.SeqNo().Uint64(), batch.Header) - common.CacheValue(ctx, s.seqCacheByHash, s.logger, batch.Hash(), batch.SeqNo()) - // note: the key is (height+1), because for some reason it doesn't like a key of 0 - // should always contain the canonical batch because the cache is overwritten by each new batch after a reorg - common.CacheValue(ctx, s.seqCacheByHeight, s.logger, batch.NumberU64()+1, batch.SeqNo()) + s.cachingService.CacheBatch(ctx, batch) return nil } @@ -657,7 +606,7 @@ func (s *storageImpl) StoreExecutedBatch(ctx context.Context, batch *common.Batc } for _, receipt := range receipts { - err = s.storeReceiptAndEventLogs(ctx, dbTx, batch, receipt, newContracts[receipt.TxHash]) + err = s.eventsStorage.storeReceiptAndEventLogs(ctx, dbTx, batch, receipt, newContracts[receipt.TxHash]) if err != nil { return fmt.Errorf("could not store receipt. Cause: %w", err) } @@ -669,185 +618,6 @@ func (s *storageImpl) StoreExecutedBatch(ctx context.Context, batch *common.Batc return nil } -// todo - move this to a separate service -func (s *storageImpl) storeReceiptAndEventLogs(ctx context.Context, dbTX *sql.Tx, batch *common.BatchHeader, receipt *types.Receipt, createdContracts []*gethcommon.Address) error { - txId, senderId, err := enclavedb.ReadTransactionIdAndSender(ctx, dbTX, receipt.TxHash) - if err != nil && !errors.Is(err, errutil.ErrNotFound) { - return fmt.Errorf("could not get transaction id. Cause: %w", err) - } - - for _, createdContract := range createdContracts { - _, err = enclavedb.WriteContractAddress(ctx, dbTX, createdContract, *senderId) - if err != nil { - return fmt.Errorf("could not write contract address. cause %w", err) - } - } - - // Convert the receipt into its storage form and serialize - // this removes information that can be recreated - // todo - in a future iteration, this can be slimmed down further because we already store the logs separately - storageReceipt := (*types.ReceiptForStorage)(receipt) - receiptBytes, err := rlp.EncodeToBytes(storageReceipt) - if err != nil { - return fmt.Errorf("failed to encode block receipts. Cause: %w", err) - } - - execTxId, err := enclavedb.WriteReceipt(ctx, dbTX, batch.SequencerOrderNo.Uint64(), txId, receiptBytes) - if err != nil { - return fmt.Errorf("could not write receipt. Cause: %w", err) - } - - for _, l := range receipt.Logs { - err := s.storeEventLog(ctx, dbTX, execTxId, l) - if err != nil { - return fmt.Errorf("could not store log entry %v. Cause: %w", l, err) - } - } - return nil -} - -func (s *storageImpl) storeEventLog(ctx context.Context, dbTX *sql.Tx, execTxId uint64, l *types.Log) error { - topicIds, isLifecycle, err := s.handleUserTopics(ctx, dbTX, l) - if err != nil { - return err - } - - eventTypeId, err := s.handleEventType(ctx, dbTX, l, isLifecycle) - if err != nil { - return err - } - - // normalize data - data := l.Data - if len(data) == 0 { - data = nil - } - err = enclavedb.WriteEventLog(ctx, dbTX, eventTypeId, topicIds, data, l.Index, execTxId) - if err != nil { - return fmt.Errorf("could not write event log. Cause: %w", err) - } - - return nil -} - -func (s *storageImpl) handleEventType(ctx context.Context, dbTX *sql.Tx, l *types.Log, isLifecycle bool) (uint64, error) { - et, err := s.readEventType(ctx, dbTX, l.Address, l.Topics[0]) - if err != nil && !errors.Is(err, errutil.ErrNotFound) { - return 0, fmt.Errorf("could not read event type. Cause: %w", err) - } - if err == nil { - // in case we determined the current emitted event is not lifecycle, we must update the eventType - if !isLifecycle && et.isLifecycle { - err := enclavedb.UpdateEventTopicLifecycle(ctx, dbTX, et.id, isLifecycle) - if err != nil { - return 0, fmt.Errorf("could not update the event type. cause: %w", err) - } - } - return et.id, nil - } - - // the first time an event of this type is emitted we must store it - contractAddId, err := s.readContractAddress(ctx, dbTX, l.Address) - if err != nil { - // the contract was already stored when it was created - return 0, fmt.Errorf("could not read contract address. %s. Cause: %w", l.Address, err) - } - return enclavedb.WriteEventType(ctx, dbTX, contractAddId, l.Topics[0], isLifecycle) -} - -func (s *storageImpl) handleUserTopics(ctx context.Context, dbTX *sql.Tx, l *types.Log) ([]*uint64, bool, error) { - topicIds := make([]*uint64, 3) - // iterate the topics containing user values - // reuse them if already inserted - // if not, discover if there is a relevant externally owned address - isLifecycle := true - for i := 1; i < len(l.Topics); i++ { - topic := l.Topics[i] - // first check if there is an entry already for this topic - eventTopicId, relAddressId, err := s.findEventTopic(ctx, dbTX, topic.Bytes()) - if err != nil && !errors.Is(err, errutil.ErrNotFound) { - return nil, false, fmt.Errorf("could not read the event topic. Cause: %w", err) - } - if errors.Is(err, errutil.ErrNotFound) { - // check whether the topic is an EOA - relAddressId, err = s.findRelevantAddress(ctx, dbTX, topic) - if err != nil && !errors.Is(err, errutil.ErrNotFound) { - return nil, false, fmt.Errorf("could not read relevant address. Cause %w", err) - } - eventTopicId, err = enclavedb.WriteEventTopic(ctx, dbTX, &topic, relAddressId) - if err != nil { - return nil, false, fmt.Errorf("could not write event topic. Cause: %w", err) - } - } - - if relAddressId != nil { - isLifecycle = false - } - topicIds[i-1] = &eventTopicId - } - return topicIds, isLifecycle, nil -} - -// Of the log's topics, returns those that are (potentially) user addresses. A topic is considered a user address if: -// - It has at least 12 leading zero bytes (since addresses are 20 bytes long, while hashes are 32) and at most 22 leading zero bytes -// - It is not a smart contract address -func (s *storageImpl) findRelevantAddress(ctx context.Context, dbTX *sql.Tx, topic gethcommon.Hash) (*uint64, error) { - potentialAddr := common.ExtractPotentialAddress(topic) - if potentialAddr == nil { - return nil, errutil.ErrNotFound - } - - // first check whether there is already an entry in the EOA table - eoaID, err := s.readEOA(ctx, dbTX, *potentialAddr) - if err != nil && !errors.Is(err, errutil.ErrNotFound) { - return nil, err - } - if err == nil { - return eoaID, nil - } - - // if the address is a contract then it's clearly not an EOA - _, err = s.readContractAddress(ctx, dbTX, *potentialAddr) - if err != nil && !errors.Is(err, errutil.ErrNotFound) { - return nil, err - } - if err == nil { - return nil, errutil.ErrNotFound - } - - // when we reach this point, the value looks like an address, but we haven't yet seen it - // for the first iteration, we'll just assume it's an EOA - // we can make this smarter by passing in more information about the event - id, err := enclavedb.WriteEoa(ctx, dbTX, *potentialAddr) - if err != nil { - return nil, err - } - - return &id, nil -} - -func (s *storageImpl) readEventType(ctx context.Context, dbTX *sql.Tx, contractAddress gethcommon.Address, eventSignature gethcommon.Hash) (*eventType, error) { - defer s.logDuration("readEventType", measure.NewStopwatch()) - - key := make([]byte, 0) - key = append(key, contractAddress.Bytes()...) - key = append(key, eventSignature.Bytes()...) - return common.GetCachedValue(ctx, s.eventTypeCache, s.logger, key, func(v any) (*eventType, error) { - contractAddrId, err := enclavedb.ReadContractAddress(ctx, dbTX, contractAddress) - if err != nil { - return nil, err - } - id, isLifecycle, err := enclavedb.ReadEventType(ctx, dbTX, *contractAddrId, eventSignature) - if err != nil { - return nil, err - } - return &eventType{ - id: id, - isLifecycle: isLifecycle, - }, nil - }) -} - func (s *storageImpl) StoreValueTransfers(ctx context.Context, blockHash common.L1BlockHash, transfers common.ValueTransferEvents) error { defer s.logDuration("StoreValueTransfers", measure.NewStopwatch()) dbtx, err := s.db.NewDBTransaction(ctx) @@ -1020,29 +790,9 @@ func (s *storageImpl) CountTransactionsPerAddress(ctx context.Context, address * return enclavedb.CountTransactionsPerAddress(ctx, s.db.GetSQLDB(), address) } -func (s *storageImpl) ReadEOA(ctx context.Context, addr gethcommon.Address) (*uint64, error) { - dbtx, err := s.db.NewDBTransaction(ctx) - if err != nil { - return nil, err - } - defer dbtx.Rollback() - return s.readEOA(ctx, dbtx, addr) -} - -func (s *storageImpl) readEOA(ctx context.Context, dbTX *sql.Tx, addr gethcommon.Address) (*uint64, error) { - defer s.logDuration("readEOA", measure.NewStopwatch()) - return common.GetCachedValue(ctx, s.eoaCache, s.logger, addr, func(v any) (*uint64, error) { - id, err := enclavedb.ReadEoa(ctx, dbTX, addr) - if err != nil { - return nil, err - } - return &id, nil - }) -} - func (s *storageImpl) readOrWriteEOA(ctx context.Context, dbTX *sql.Tx, addr gethcommon.Address) (*uint64, error) { defer s.logDuration("readOrWriteEOA", measure.NewStopwatch()) - return common.GetCachedValue(ctx, s.eoaCache, s.logger, addr, func(v any) (*uint64, error) { + return s.cachingService.ReadEOA(ctx, addr, func(v any) (*uint64, error) { id, err := enclavedb.ReadEoa(ctx, dbTX, addr) if err != nil { if errors.Is(err, errutil.ErrNotFound) { @@ -1058,31 +808,10 @@ func (s *storageImpl) readOrWriteEOA(ctx context.Context, dbTX *sql.Tx, addr get }) } -func (s *storageImpl) ReadContractAddress(ctx context.Context, addr gethcommon.Address) (*uint64, error) { - dbtx, err := s.db.NewDBTransaction(ctx) - if err != nil { - return nil, err - } - defer dbtx.Commit() - return s.readContractAddress(ctx, dbtx, addr) -} - func (s *storageImpl) ReadContractOwner(ctx context.Context, address gethcommon.Address) (*gethcommon.Address, error) { return enclavedb.ReadContractOwner(ctx, s.db.GetSQLDB(), address) } -func (s *storageImpl) readContractAddress(ctx context.Context, dbTX *sql.Tx, addr gethcommon.Address) (*uint64, error) { - defer s.logDuration("readContractAddress", measure.NewStopwatch()) - return common.GetCachedValue(ctx, s.contractAddressCache, s.logger, addr, func(v any) (*uint64, error) { - return enclavedb.ReadContractAddress(ctx, dbTX, addr) - }) -} - -func (s *storageImpl) findEventTopic(ctx context.Context, dbTX *sql.Tx, topic []byte) (uint64, *uint64, error) { - defer s.logDuration("findEventTopic", measure.NewStopwatch()) - return enclavedb.ReadEventTopic(ctx, dbTX, topic) -} - func (s *storageImpl) logDuration(method string, stopWatch *measure.Stopwatch) { core.LogMethodDuration(s.logger, stopWatch, fmt.Sprintf("Storage::%s completed", method)) }