Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

split up storage and improve caching #1977

Merged
merged 2 commits into from
Jul 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 0 additions & 39 deletions go/common/cache_util.go

This file was deleted.

33 changes: 8 additions & 25 deletions go/common/gethencoding/geth_encoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,6 @@ import (
"time"
"unsafe"

"github.com/dgraph-io/ristretto"
"github.com/eko/gocache/lib/v4/cache"
ristretto_store "github.com/eko/gocache/store/ristretto/v4"

gethlog "github.com/ethereum/go-ethereum/log"
"github.com/ten-protocol/go-ten/go/common/log"
"github.com/ten-protocol/go-ten/go/enclave/storage"
Expand Down Expand Up @@ -51,29 +47,16 @@ type EncodingService interface {
}

type gethEncodingServiceImpl struct {
// conversion is expensive. Cache the converted headers. The key is the hash of the batch.
gethHeaderCache *cache.Cache[*types.Header]

storage storage.Storage
logger gethlog.Logger
storage storage.Storage
logger gethlog.Logger
cachingService *storage.CacheService
}

func NewGethEncodingService(storage storage.Storage, logger gethlog.Logger) EncodingService {
// todo (tudor) figure out the best values
ristrettoCache, err := ristretto.NewCache(&ristretto.Config{
NumCounters: 5000, // number of keys to track frequency of.
MaxCost: 500, // todo - this represents how many items.
BufferItems: 64, // number of keys per Get buffer. Todo - what is this
})
if err != nil {
panic(err)
}
ristrettoStore := ristretto_store.NewRistretto(ristrettoCache)

func NewGethEncodingService(storage storage.Storage, cachingService *storage.CacheService, logger gethlog.Logger) EncodingService {
return &gethEncodingServiceImpl{
gethHeaderCache: cache.New[*types.Header](ristrettoStore),
storage: storage,
logger: logger,
storage: storage,
logger: logger,
cachingService: cachingService,
}
}

Expand Down Expand Up @@ -276,7 +259,7 @@ func ExtractEthCall(param interface{}) (*gethapi.TransactionArgs, error) {
// Special care must be taken to maintain a valid chain of these converted headers.
func (enc *gethEncodingServiceImpl) CreateEthHeaderForBatch(ctx context.Context, h *common.BatchHeader) (*types.Header, error) {
// wrap in a caching layer
return common.GetCachedValue(ctx, enc.gethHeaderCache, enc.logger, h.Hash(), func(a any) (*types.Header, error) {
return enc.cachingService.ReadConvertedHeader(ctx, h.Hash(), func(a any) (*types.Header, error) {
// deterministically calculate the private randomness that will be exposed to the EVM
secret, err := enc.storage.FetchSecret(ctx)
if err != nil {
Expand Down
5 changes: 3 additions & 2 deletions go/enclave/enclave.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,9 @@ func NewEnclave(
}

// Initialise the database
cachingService := storage.NewCacheService(logger)
chainConfig := ethchainadapter.ChainParams(big.NewInt(config.ObscuroChainID))
storage := storage.NewStorageFromConfig(config, chainConfig, logger)
storage := storage.NewStorageFromConfig(config, cachingService, chainConfig, logger)

// Initialise the Ethereum "Blockchain" structure that will allow us to validate incoming blocks
// todo (#1056) - valid block
Expand Down Expand Up @@ -160,7 +161,7 @@ func NewEnclave(

obscuroKey := crypto.GetObscuroKey(logger)

gethEncodingService := gethencoding.NewGethEncodingService(storage, logger)
gethEncodingService := gethencoding.NewGethEncodingService(storage, cachingService, logger)
dataEncryptionService := crypto.NewDataEncryptionService(logger)
dataCompressionService := compression.NewBrotliDataCompressionService()

Expand Down
4 changes: 2 additions & 2 deletions go/enclave/genesis/genesis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func TestDefaultGenesis(t *testing.T) {
if err != nil {
t.Fatalf("unable to create temp db: %s", err)
}
storageDB := storage.NewStorage(backingDB, nil, gethlog.New())
storageDB := storage.NewStorage(backingDB, storage.NewCacheService(gethlog.New()), nil, gethlog.New())
stateDB, err := gen.applyAllocations(storageDB)
if err != nil {
t.Fatalf("unable to apply genesis allocations")
Expand Down Expand Up @@ -86,7 +86,7 @@ func TestCustomGenesis(t *testing.T) {
if err != nil {
t.Fatalf("unable to create temp db: %s", err)
}
storageDB := storage.NewStorage(backingDB, nil, gethlog.New())
storageDB := storage.NewStorage(backingDB, storage.NewCacheService(gethlog.New()), nil, gethlog.New())
stateDB, err := gen.applyAllocations(storageDB)
if err != nil {
t.Fatalf("unable to apply genesis allocations")
Expand Down
167 changes: 167 additions & 0 deletions go/enclave/storage/cache_service.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
package storage

import (
"context"
"math/big"

"github.com/eko/gocache/lib/v4/store"

"github.com/ten-protocol/go-ten/go/enclave/core"

"github.com/dgraph-io/ristretto"
"github.com/eko/gocache/lib/v4/cache"
ristretto_store "github.com/eko/gocache/store/ristretto/v4"

gethcommon "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
gethlog "github.com/ethereum/go-ethereum/log"
"github.com/ten-protocol/go-ten/go/common"
"github.com/ten-protocol/go-ten/go/common/log"
)

// approximate cost in bytes of the cached values
const (
blockCost = 1024
batchCost = 1024
hashCost = 32
idCost = 8
)

type CacheService struct {
// cache for the immutable blocks and batches.
// this avoids a trip to the database.
blockCache *cache.Cache[*types.Block]

// stores batches using the sequence number as key
batchCacheBySeqNo *cache.Cache[*common.BatchHeader]

// mapping between the hash and the sequence number
// note: to fetch a batch by hash will require 2 cache hits
seqCacheByHash *cache.Cache[*big.Int]

// mapping between the height and the sequence number
// note: to fetch a batch by height will require 2 cache hits
seqCacheByHeight *cache.Cache[*big.Int]

// batch hash - geth converted hash
convertedHashCache *cache.Cache[*gethcommon.Hash]

// from address ( either eoa or contract) to the id of the db entry
eoaCache *cache.Cache[*uint64]
contractAddressCache *cache.Cache[*uint64]

// from contract_address||event_sig to the event_type (id, isLifecycle) object
eventTypeCache *cache.Cache[*EventType]

// store the converted ethereum header which is passed to the evm
convertedGethHeaderCache *cache.Cache[*types.Header]

logger gethlog.Logger
}

func NewCacheService(logger gethlog.Logger) *CacheService {
// todo (tudor) figure out the config
ristrettoCache, err := ristretto.NewCache(&ristretto.Config{
NumCounters: 100_000_000, // 10 times the expected elements
MaxCost: 1024 * 1024 * 1024, // allocate 1GB
BufferItems: 64, // number of keys per Get buffer.
})
if err != nil {
logger.Crit("Could not initialise ristretto cache", log.ErrKey, err)
}
ristrettoStore := ristretto_store.NewRistretto(ristrettoCache)
return &CacheService{
blockCache: cache.New[*types.Block](ristrettoStore),
batchCacheBySeqNo: cache.New[*common.BatchHeader](ristrettoStore),
seqCacheByHash: cache.New[*big.Int](ristrettoStore),
seqCacheByHeight: cache.New[*big.Int](ristrettoStore),
convertedHashCache: cache.New[*gethcommon.Hash](ristrettoStore),
eoaCache: cache.New[*uint64](ristrettoStore),
contractAddressCache: cache.New[*uint64](ristrettoStore),
eventTypeCache: cache.New[*EventType](ristrettoStore),
convertedGethHeaderCache: cache.New[*types.Header](ristrettoStore),
logger: logger,
}
}

func (cs *CacheService) CacheBlock(ctx context.Context, b *types.Block) {
cacheValue(ctx, cs.blockCache, cs.logger, b.Hash(), b, blockCost)
}

func (cs *CacheService) CacheBatch(ctx context.Context, batch *core.Batch) {
cacheValue(ctx, cs.batchCacheBySeqNo, cs.logger, batch.SeqNo().Uint64(), batch.Header, batchCost)
cacheValue(ctx, cs.seqCacheByHash, cs.logger, batch.Hash(), batch.SeqNo(), idCost)
// note: the key is (height+1), because for some reason it doesn't like a key of 0
// should always contain the canonical batch because the cache is overwritten by each new batch after a reorg
cacheValue(ctx, cs.seqCacheByHeight, cs.logger, batch.NumberU64()+1, batch.SeqNo(), idCost)
}

func (cs *CacheService) ReadBlock(ctx context.Context, key gethcommon.Hash, onCacheMiss func(any) (*types.Block, error)) (*types.Block, error) {
return getCachedValue(ctx, cs.blockCache, cs.logger, key, blockCost, onCacheMiss)
}

func (cs *CacheService) ReadBatchSeqByHash(ctx context.Context, hash common.L2BatchHash, onCacheMiss func(any) (*big.Int, error)) (*big.Int, error) {
return getCachedValue(ctx, cs.seqCacheByHash, cs.logger, hash, idCost, onCacheMiss)
}

func (cs *CacheService) ReadBatchSeqByHeight(ctx context.Context, height uint64, onCacheMiss func(any) (*big.Int, error)) (*big.Int, error) {
// the key is (height+1), because for some reason it doesn't like a key of 0
return getCachedValue(ctx, cs.seqCacheByHeight, cs.logger, height+1, idCost, onCacheMiss)
}

func (cs *CacheService) ReadConvertedHash(ctx context.Context, hash common.L2BatchHash, onCacheMiss func(any) (*gethcommon.Hash, error)) (*gethcommon.Hash, error) {
return getCachedValue(ctx, cs.convertedHashCache, cs.logger, hash, hashCost, onCacheMiss)
}

func (cs *CacheService) ReadBatch(ctx context.Context, seqNum uint64, onCacheMiss func(any) (*common.BatchHeader, error)) (*common.BatchHeader, error) {
return getCachedValue(ctx, cs.batchCacheBySeqNo, cs.logger, seqNum, batchCost, onCacheMiss)
}

func (cs *CacheService) ReadEOA(ctx context.Context, addr gethcommon.Address, onCacheMiss func(any) (*uint64, error)) (*uint64, error) {
return getCachedValue(ctx, cs.eoaCache, cs.logger, addr, idCost, onCacheMiss)
}

func (cs *CacheService) ReadContractAddr(ctx context.Context, addr gethcommon.Address, onCacheMiss func(any) (*uint64, error)) (*uint64, error) {
return getCachedValue(ctx, cs.contractAddressCache, cs.logger, addr, idCost, onCacheMiss)
}

func (cs *CacheService) ReadEventType(ctx context.Context, contractAddress gethcommon.Address, eventSignature gethcommon.Hash, onCacheMiss func(any) (*EventType, error)) (*EventType, error) {
key := make([]byte, 0)
key = append(key, contractAddress.Bytes()...)
key = append(key, eventSignature.Bytes()...)
return getCachedValue(ctx, cs.eventTypeCache, cs.logger, key, idCost, onCacheMiss)
}

func (cs *CacheService) ReadConvertedHeader(ctx context.Context, batchHash common.L2BatchHash, onCacheMiss func(any) (*types.Header, error)) (*types.Header, error) {
return getCachedValue(ctx, cs.convertedGethHeaderCache, cs.logger, batchHash, blockCost, onCacheMiss)
}

// getCachedValue - returns the cached value for the provided key. If the key is not found, then invoke the 'onCacheMiss' function
// which returns the value, and cache it
func getCachedValue[V any](ctx context.Context, cache *cache.Cache[*V], logger gethlog.Logger, key any, cost int64, onCacheMiss func(any) (*V, error)) (*V, error) {
value, err := cache.Get(ctx, key)
if err != nil || value == nil {
// todo metrics for cache misses
v, err := onCacheMiss(key)
if err != nil {
return v, err
}
if v == nil {
logger.Crit("Returned a nil value from the onCacheMiss function. Should not happen.")
}
cacheValue(ctx, cache, logger, key, v, cost)
return v, nil
}

return value, err
}

func cacheValue[V any](ctx context.Context, cache *cache.Cache[*V], logger gethlog.Logger, key any, v *V, cost int64) {
if v == nil {
return
}
err := cache.Set(ctx, key, v, store.WithCost(cost))
if err != nil {
logger.Error("Could not store value in cache", log.ErrKey, err)
}
}
Loading