Skip to content

Commit

Permalink
improve cache (#1866)
Browse files Browse the repository at this point in the history
* improve cache

* fix

* fix

* fix
  • Loading branch information
tudor-malene authored Apr 4, 2024
1 parent b8c1bb6 commit e994ee2
Show file tree
Hide file tree
Showing 10 changed files with 92 additions and 66 deletions.
6 changes: 3 additions & 3 deletions go/common/subscription/new_heads_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,6 @@ func NewNewHeadsService(inputCh chan *common.BatchHeader, convertToEthHeader boo

func (nhs *NewHeadsService) Start() error {
go ForwardFromChannels([]chan *common.BatchHeader{nhs.inputCh}, nhs.stopped, func(head *common.BatchHeader) error {
nhs.notifiersMutex.RLock()
defer nhs.notifiersMutex.RUnlock()

if nhs.onMessage != nil {
err := nhs.onMessage(head)
if err != nil {
Expand All @@ -57,6 +54,9 @@ func (nhs *NewHeadsService) Start() error {
msg = convertBatchHeader(head)
}

nhs.notifiersMutex.RLock()
defer nhs.notifiersMutex.RUnlock()

// for each new head, notify all registered subscriptions
for id, notifier := range nhs.newHeadNotifiers {
if nhs.stopped.Load() {
Expand Down
29 changes: 23 additions & 6 deletions tools/walletextension/cache/RistrettoCache.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,13 @@ const (
)

type ristrettoCache struct {
cache *ristretto.Cache
quit chan struct{}
cache *ristretto.Cache
quit chan struct{}
lastEviction time.Time
}

// NewRistrettoCache returns a new ristrettoCache.
func NewRistrettoCache(logger log.Logger) (Cache, error) {
// NewRistrettoCacheWithEviction returns a new ristrettoCache.
func NewRistrettoCacheWithEviction(logger log.Logger) (Cache, error) {
cache, err := ristretto.NewCache(&ristretto.Config{
NumCounters: numCounters,
MaxCost: maxCost,
Expand All @@ -33,8 +34,9 @@ func NewRistrettoCache(logger log.Logger) (Cache, error) {
}

c := &ristrettoCache{
cache: cache,
quit: make(chan struct{}),
cache: cache,
quit: make(chan struct{}),
lastEviction: time.Now(),
}

// Start the metrics logging
Expand All @@ -43,6 +45,21 @@ func NewRistrettoCache(logger log.Logger) (Cache, error) {
return c, nil
}

func (c *ristrettoCache) EvictShortLiving() {
c.lastEviction = time.Now()
}

func (c *ristrettoCache) IsEvicted(key any, originalTTL time.Duration) bool {
remainingTTL, notExpired := c.cache.GetTTL(key)
if !notExpired {
return true
}
cachedTime := time.Now().Add(remainingTTL).Add(-originalTTL)
// ... LE...Cached...Now - Valid
// ... Cached...LE...Now - Evicted
return c.lastEviction.After(cachedTime)
}

// Set adds the key and value to the cache.
func (c *ristrettoCache) Set(key []byte, value any, ttl time.Duration) bool {
return c.cache.SetWithTTL(key, value, defaultCost, ttl)
Expand Down
4 changes: 3 additions & 1 deletion tools/walletextension/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@ import (
)

type Cache interface {
EvictShortLiving()
IsEvicted(key any, originalTTL time.Duration) bool
Set(key []byte, value any, ttl time.Duration) bool
Get(key []byte) (value any, ok bool)
Remove(key []byte)
}

func NewCache(logger log.Logger) (Cache, error) {
return NewRistrettoCache(logger)
return NewRistrettoCacheWithEviction(logger)
}
23 changes: 11 additions & 12 deletions tools/walletextension/rpcapi/blockchain_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package rpcapi
import (
"context"
"encoding/json"
"time"

"github.com/ethereum/go-ethereum/core/types"

Expand All @@ -23,12 +22,12 @@ func NewBlockChainAPI(we *Services) *BlockChainAPI {
}

func (api *BlockChainAPI) ChainId() *hexutil.Big { //nolint:stylecheck
chainID, _ := UnauthenticatedTenRPCCall[hexutil.Big](context.Background(), api.we, &CacheCfg{TTL: longCacheTTL}, "eth_chainId")
chainID, _ := UnauthenticatedTenRPCCall[hexutil.Big](context.Background(), api.we, &CacheCfg{CacheType: LongLiving}, "eth_chainId")
return chainID
}

func (api *BlockChainAPI) BlockNumber() hexutil.Uint64 {
nr, err := UnauthenticatedTenRPCCall[hexutil.Uint64](context.Background(), api.we, &CacheCfg{TTL: shortCacheTTL}, "eth_blockNumber")
nr, err := UnauthenticatedTenRPCCall[hexutil.Uint64](context.Background(), api.we, &CacheCfg{CacheType: LatestBatch}, "eth_blockNumber")
if err != nil {
return hexutil.Uint64(0)
}
Expand All @@ -41,7 +40,7 @@ func (api *BlockChainAPI) GetBalance(ctx context.Context, address common.Address
api.we,
&ExecCfg{
cacheCfg: &CacheCfg{
TTLCallback: func() time.Duration {
CacheTypeDynamic: func() CacheStrategy {
return cacheTTLBlockNumberOrHash(blockNrOrHash)
},
},
Expand Down Expand Up @@ -76,7 +75,7 @@ func (s *BlockChainAPI) GetProof(ctx context.Context, address common.Address, st
}

func (api *BlockChainAPI) GetHeaderByNumber(ctx context.Context, number rpc.BlockNumber) (map[string]interface{}, error) {
resp, err := UnauthenticatedTenRPCCall[map[string]interface{}](ctx, api.we, &CacheCfg{TTLCallback: func() time.Duration {
resp, err := UnauthenticatedTenRPCCall[map[string]interface{}](ctx, api.we, &CacheCfg{CacheTypeDynamic: func() CacheStrategy {
return cacheTTLBlockNumber(number)
}}, "eth_getHeaderByNumber", number)
if resp == nil {
Expand All @@ -86,7 +85,7 @@ func (api *BlockChainAPI) GetHeaderByNumber(ctx context.Context, number rpc.Bloc
}

func (api *BlockChainAPI) GetHeaderByHash(ctx context.Context, hash common.Hash) map[string]interface{} {
resp, _ := UnauthenticatedTenRPCCall[map[string]interface{}](ctx, api.we, &CacheCfg{TTL: longCacheTTL}, "eth_getHeaderByHash", hash)
resp, _ := UnauthenticatedTenRPCCall[map[string]interface{}](ctx, api.we, &CacheCfg{CacheType: LongLiving}, "eth_getHeaderByHash", hash)
if resp == nil {
return nil
}
Expand All @@ -98,7 +97,7 @@ func (api *BlockChainAPI) GetBlockByNumber(ctx context.Context, number rpc.Block
ctx,
api.we,
&CacheCfg{
TTLCallback: func() time.Duration {
CacheTypeDynamic: func() CacheStrategy {
return cacheTTLBlockNumber(number)
},
}, "eth_getBlockByNumber", number, fullTx)
Expand All @@ -109,7 +108,7 @@ func (api *BlockChainAPI) GetBlockByNumber(ctx context.Context, number rpc.Block
}

func (api *BlockChainAPI) GetBlockByHash(ctx context.Context, hash common.Hash, fullTx bool) (map[string]interface{}, error) {
resp, err := UnauthenticatedTenRPCCall[map[string]interface{}](ctx, api.we, &CacheCfg{TTL: longCacheTTL}, "eth_getBlockByHash", hash, fullTx)
resp, err := UnauthenticatedTenRPCCall[map[string]interface{}](ctx, api.we, &CacheCfg{CacheType: LongLiving}, "eth_getBlockByHash", hash, fullTx)
if resp == nil {
return nil, err
}
Expand All @@ -122,7 +121,7 @@ func (api *BlockChainAPI) GetCode(ctx context.Context, address common.Address, b
ctx,
api.we,
&CacheCfg{
TTLCallback: func() time.Duration {
CacheTypeDynamic: func() CacheStrategy {
return cacheTTLBlockNumberOrHash(blockNrOrHash)
},
},
Expand Down Expand Up @@ -185,7 +184,7 @@ type (
func (api *BlockChainAPI) Call(ctx context.Context, args gethapi.TransactionArgs, blockNrOrHash rpc.BlockNumberOrHash, overrides *StateOverride, blockOverrides *BlockOverrides) (hexutil.Bytes, error) {
resp, err := ExecAuthRPC[hexutil.Bytes](ctx, api.we, &ExecCfg{
cacheCfg: &CacheCfg{
TTLCallback: func() time.Duration {
CacheTypeDynamic: func() CacheStrategy {
return cacheTTLBlockNumberOrHash(blockNrOrHash)
},
},
Expand All @@ -207,11 +206,11 @@ func (api *BlockChainAPI) Call(ctx context.Context, args gethapi.TransactionArgs
func (api *BlockChainAPI) EstimateGas(ctx context.Context, args gethapi.TransactionArgs, blockNrOrHash *rpc.BlockNumberOrHash, overrides *StateOverride) (hexutil.Uint64, error) {
resp, err := ExecAuthRPC[hexutil.Uint64](ctx, api.we, &ExecCfg{
cacheCfg: &CacheCfg{
TTLCallback: func() time.Duration {
CacheTypeDynamic: func() CacheStrategy {
if blockNrOrHash != nil {
return cacheTTLBlockNumberOrHash(*blockNrOrHash)
}
return shortCacheTTL
return LatestBatch
},
},
computeFromCallback: func(user *GWUser) *common.Address {
Expand Down
7 changes: 3 additions & 4 deletions tools/walletextension/rpcapi/ethereum_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package rpcapi

import (
"context"
"time"

"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/common/math"
Expand All @@ -19,11 +18,11 @@ func NewEthereumAPI(we *Services,
}

func (api *EthereumAPI) GasPrice(ctx context.Context) (*hexutil.Big, error) {
return UnauthenticatedTenRPCCall[hexutil.Big](ctx, api.we, &CacheCfg{TTL: shortCacheTTL}, "eth_gasPrice")
return UnauthenticatedTenRPCCall[hexutil.Big](ctx, api.we, &CacheCfg{CacheType: LatestBatch}, "eth_gasPrice")
}

func (api *EthereumAPI) MaxPriorityFeePerGas(ctx context.Context) (*hexutil.Big, error) {
return UnauthenticatedTenRPCCall[hexutil.Big](ctx, api.we, &CacheCfg{TTL: shortCacheTTL}, "eth_maxPriorityFeePerGas")
return UnauthenticatedTenRPCCall[hexutil.Big](ctx, api.we, &CacheCfg{CacheType: LatestBatch}, "eth_maxPriorityFeePerGas")
}

type FeeHistoryResult struct {
Expand All @@ -37,7 +36,7 @@ func (api *EthereumAPI) FeeHistory(ctx context.Context, blockCount math.HexOrDec
return UnauthenticatedTenRPCCall[FeeHistoryResult](
ctx,
api.we,
&CacheCfg{TTLCallback: func() time.Duration {
&CacheCfg{CacheTypeDynamic: func() CacheStrategy {
return cacheTTLBlockNumber(lastBlock)
}},
"eth_feeHistory",
Expand Down
7 changes: 3 additions & 4 deletions tools/walletextension/rpcapi/filter_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"fmt"
"sync/atomic"
"time"

subscriptioncommon "github.com/ten-protocol/go-ten/go/common/subscription"

Expand Down Expand Up @@ -162,12 +161,12 @@ func (api *FilterAPI) GetLogs(ctx context.Context, crit common.FilterCriteria) (
api.we,
&ExecCfg{
cacheCfg: &CacheCfg{
TTLCallback: func() time.Duration {
CacheTypeDynamic: func() CacheStrategy {
// when the toBlock is not specified, the request is open-ended
if crit.ToBlock != nil && crit.ToBlock.Int64() > 0 {
return longCacheTTL
return LongLiving
}
return shortCacheTTL
return LatestBatch
},
},
tryUntilAuthorised: true,
Expand Down
2 changes: 1 addition & 1 deletion tools/walletextension/rpcapi/gw_user.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func userCacheKey(userID []byte) []byte {
}

func getUser(userID []byte, s *Services) (*GWUser, error) {
return withCache(s.Cache, &CacheCfg{TTL: longCacheTTL}, userCacheKey(userID), func() (*GWUser, error) {
return withCache(s.Cache, &CacheCfg{CacheType: LongLiving}, userCacheKey(userID), func() (*GWUser, error) {
result := GWUser{userID: userID, services: s, accounts: map[common.Address]*GWAccount{}}
userPrivateKey, err := s.Storage.GetUserPrivateKey(userID)
if err != nil {
Expand Down
5 changes: 2 additions & 3 deletions tools/walletextension/rpcapi/transaction_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"encoding/json"
"fmt"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
Expand All @@ -23,7 +22,7 @@ func NewTransactionAPI(we *Services) *TransactionAPI {
}

func (s *TransactionAPI) GetBlockTransactionCountByNumber(ctx context.Context, blockNr gethrpc.BlockNumber) *hexutil.Uint {
count, err := UnauthenticatedTenRPCCall[hexutil.Uint](ctx, s.we, &CacheCfg{TTLCallback: func() time.Duration {
count, err := UnauthenticatedTenRPCCall[hexutil.Uint](ctx, s.we, &CacheCfg{CacheTypeDynamic: func() CacheStrategy {
return cacheTTLBlockNumber(blockNr)
}}, "eth_getBlockTransactionCountByNumber", blockNr)
if err != nil {
Expand All @@ -33,7 +32,7 @@ func (s *TransactionAPI) GetBlockTransactionCountByNumber(ctx context.Context, b
}

func (s *TransactionAPI) GetBlockTransactionCountByHash(ctx context.Context, blockHash common.Hash) *hexutil.Uint {
count, err := UnauthenticatedTenRPCCall[hexutil.Uint](ctx, s.we, &CacheCfg{TTL: longCacheTTL}, "eth_getBlockTransactionCountByHash", blockHash)
count, err := UnauthenticatedTenRPCCall[hexutil.Uint](ctx, s.we, &CacheCfg{CacheType: LongLiving}, "eth_getBlockTransactionCountByHash", blockHash)
if err != nil {
return nil
}
Expand Down
65 changes: 38 additions & 27 deletions tools/walletextension/rpcapi/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ const (
notAuthorised = "not authorised"

longCacheTTL = 5 * time.Hour
shortCacheTTL = 100 * time.Millisecond
shortCacheTTL = 1 * time.Minute
)

var rpcNotImplemented = fmt.Errorf("rpc endpoint not implemented")
Expand All @@ -44,12 +44,17 @@ type ExecCfg struct {
cacheCfg *CacheCfg
}

type CacheStrategy uint8

const (
NoCache CacheStrategy = iota
LatestBatch CacheStrategy = iota
LongLiving CacheStrategy = iota
)

type CacheCfg struct {
// ResetWhenNewBlock bool todo
TTL time.Duration
// logic based on block
// todo - handle block in the future
TTLCallback func() time.Duration
CacheType CacheStrategy
CacheTypeDynamic func() CacheStrategy
}

func UnauthenticatedTenRPCCall[R any](ctx context.Context, w *Services, cfg *CacheCfg, method string, args ...any) (*R, error) {
Expand Down Expand Up @@ -193,28 +198,34 @@ func withCache[R any](cache cache.Cache, cfg *CacheCfg, cacheKey []byte, onCache
return onCacheMiss()
}

cacheTTL := cfg.TTL
if cfg.TTLCallback != nil {
cacheTTL = cfg.TTLCallback()
cacheType := cfg.CacheType
if cfg.CacheTypeDynamic != nil {
cacheType = cfg.CacheTypeDynamic()
}
isCacheable := cacheTTL > 0

if isCacheable {
if cachedValue, ok := cache.Get(cacheKey); ok {
// cloning?
returnValue, ok := cachedValue.(*R)
if !ok {
return nil, fmt.Errorf("unexpected error. Invalid format cached. %v", cachedValue)
}
return returnValue, nil

if cacheType == NoCache {
return onCacheMiss()
}

ttl := longCacheTTL
if cacheType == LatestBatch {
ttl = shortCacheTTL
}

cachedValue, foundInCache := cache.Get(cacheKey)
if foundInCache && !cache.IsEvicted(cacheKey, ttl) {
returnValue, ok := cachedValue.(*R)
if !ok {
return nil, fmt.Errorf("unexpected error. Invalid format cached. %v", cachedValue)
}
return returnValue, nil
}

result, err := onCacheMiss()

// cache only non-nil values
if isCacheable && err == nil && result != nil {
cache.Set(cacheKey, result, cacheTTL)
if err == nil && result != nil {
cache.Set(cacheKey, result, ttl)
}

return result, err
Expand All @@ -226,18 +237,18 @@ func audit(services *Services, msg string, params ...any) {
}
}

func cacheTTLBlockNumberOrHash(blockNrOrHash rpc.BlockNumberOrHash) time.Duration {
func cacheTTLBlockNumberOrHash(blockNrOrHash rpc.BlockNumberOrHash) CacheStrategy {
if blockNrOrHash.BlockNumber != nil && blockNrOrHash.BlockNumber.Int64() <= 0 {
return shortCacheTTL
return LatestBatch
}
return longCacheTTL
return LongLiving
}

func cacheTTLBlockNumber(lastBlock rpc.BlockNumber) time.Duration {
func cacheTTLBlockNumber(lastBlock rpc.BlockNumber) CacheStrategy {
if lastBlock > 0 {
return longCacheTTL
return LongLiving
}
return shortCacheTTL
return LatestBatch
}

func connectWS(account *GWAccount, logger gethlog.Logger) (*tenrpc.EncRPCClient, error) {
Expand Down
Loading

0 comments on commit e994ee2

Please sign in to comment.