Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

improve cache #1866

Merged
merged 5 commits into from
Apr 4, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions go/common/subscription/new_heads_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,6 @@ func NewNewHeadsService(inputCh chan *common.BatchHeader, convertToEthHeader boo

func (nhs *NewHeadsService) Start() error {
go ForwardFromChannels([]chan *common.BatchHeader{nhs.inputCh}, nhs.stopped, func(head *common.BatchHeader) error {
nhs.notifiersMutex.RLock()
defer nhs.notifiersMutex.RUnlock()

if nhs.onMessage != nil {
err := nhs.onMessage(head)
if err != nil {
Expand All @@ -57,6 +54,9 @@ func (nhs *NewHeadsService) Start() error {
msg = convertBatchHeader(head)
}

nhs.notifiersMutex.RLock()
defer nhs.notifiersMutex.RUnlock()

// for each new head, notify all registered subscriptions
for id, notifier := range nhs.newHeadNotifiers {
if nhs.stopped.Load() {
Expand Down
29 changes: 23 additions & 6 deletions tools/walletextension/cache/RistrettoCache.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,13 @@ const (
)

type ristrettoCache struct {
cache *ristretto.Cache
quit chan struct{}
cache *ristretto.Cache
quit chan struct{}
lastEviction time.Time
}

// NewRistrettoCache returns a new ristrettoCache.
func NewRistrettoCache(logger log.Logger) (Cache, error) {
// NewRistrettoCacheWithEviction returns a new ristrettoCache.
func NewRistrettoCacheWithEviction(logger log.Logger) (Cache, error) {
cache, err := ristretto.NewCache(&ristretto.Config{
NumCounters: numCounters,
MaxCost: maxCost,
Expand All @@ -33,8 +34,9 @@ func NewRistrettoCache(logger log.Logger) (Cache, error) {
}

c := &ristrettoCache{
cache: cache,
quit: make(chan struct{}),
cache: cache,
quit: make(chan struct{}),
lastEviction: time.Now(),
}

// Start the metrics logging
Expand All @@ -43,6 +45,21 @@ func NewRistrettoCache(logger log.Logger) (Cache, error) {
return c, nil
}

func (c *ristrettoCache) EvictShortLiving() {
c.lastEviction = time.Now()
}

func (c *ristrettoCache) IsEvicted(key any) bool {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This here is the fiddly bit.
The rest is relatively straight forward

remainingTTL, notExpired := c.cache.GetTTL(key)
if !notExpired {
return true
}
cachedTime := time.Now().Add(-remainingTTL)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like the maths is a bit off here because TTL remaining is not time since cached. Not sure there's enough information to work out the cached time as this is at the moment.

Concrete example:

t+0s ---- evictShortLiving() is called for new batch
t+1s ----  item A is put in cache with TTL 60s
t+2s ---- get() is called and we check IsEvicted:

lastEviction = t+0s
remainingTTL = 59s
cachedTime = t+2-59 = t-57s

LastEviction is now after cached time, even though we cached it after last eviction.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you're right. I got confused.

// ... LE...Cached...Now - Valid
// ... Cached...LE...Now - Evicted
return c.lastEviction.After(cachedTime)
}

// Set adds the key and value to the cache.
func (c *ristrettoCache) Set(key []byte, value any, ttl time.Duration) bool {
return c.cache.SetWithTTL(key, value, defaultCost, ttl)
Expand Down
4 changes: 3 additions & 1 deletion tools/walletextension/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@ import (
)

type Cache interface {
EvictShortLiving()
IsEvicted(key any) bool
Set(key []byte, value any, ttl time.Duration) bool
Get(key []byte) (value any, ok bool)
Remove(key []byte)
}

func NewCache(logger log.Logger) (Cache, error) {
return NewRistrettoCache(logger)
return NewRistrettoCacheWithEviction(logger)
}
23 changes: 11 additions & 12 deletions tools/walletextension/rpcapi/blockchain_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package rpcapi
import (
"context"
"encoding/json"
"time"

"github.com/ethereum/go-ethereum/core/types"

Expand All @@ -23,12 +22,12 @@ func NewBlockChainAPI(we *Services) *BlockChainAPI {
}

func (api *BlockChainAPI) ChainId() *hexutil.Big { //nolint:stylecheck
chainID, _ := UnauthenticatedTenRPCCall[hexutil.Big](context.Background(), api.we, &CacheCfg{TTL: longCacheTTL}, "eth_chainId")
chainID, _ := UnauthenticatedTenRPCCall[hexutil.Big](context.Background(), api.we, &CacheCfg{CacheType: LongLiving}, "eth_chainId")
return chainID
}

func (api *BlockChainAPI) BlockNumber() hexutil.Uint64 {
nr, err := UnauthenticatedTenRPCCall[hexutil.Uint64](context.Background(), api.we, &CacheCfg{TTL: shortCacheTTL}, "eth_blockNumber")
nr, err := UnauthenticatedTenRPCCall[hexutil.Uint64](context.Background(), api.we, &CacheCfg{CacheType: LatestBatch}, "eth_blockNumber")
if err != nil {
return hexutil.Uint64(0)
}
Expand All @@ -41,7 +40,7 @@ func (api *BlockChainAPI) GetBalance(ctx context.Context, address common.Address
api.we,
&ExecCfg{
cacheCfg: &CacheCfg{
TTLCallback: func() time.Duration {
CacheTypeDynamic: func() CacheStrategy {
return cacheTTLBlockNumberOrHash(blockNrOrHash)
},
},
Expand Down Expand Up @@ -76,7 +75,7 @@ func (s *BlockChainAPI) GetProof(ctx context.Context, address common.Address, st
}

func (api *BlockChainAPI) GetHeaderByNumber(ctx context.Context, number rpc.BlockNumber) (map[string]interface{}, error) {
resp, err := UnauthenticatedTenRPCCall[map[string]interface{}](ctx, api.we, &CacheCfg{TTLCallback: func() time.Duration {
resp, err := UnauthenticatedTenRPCCall[map[string]interface{}](ctx, api.we, &CacheCfg{CacheTypeDynamic: func() CacheStrategy {
return cacheTTLBlockNumber(number)
}}, "eth_getHeaderByNumber", number)
if resp == nil {
Expand All @@ -86,7 +85,7 @@ func (api *BlockChainAPI) GetHeaderByNumber(ctx context.Context, number rpc.Bloc
}

func (api *BlockChainAPI) GetHeaderByHash(ctx context.Context, hash common.Hash) map[string]interface{} {
resp, _ := UnauthenticatedTenRPCCall[map[string]interface{}](ctx, api.we, &CacheCfg{TTL: longCacheTTL}, "eth_getHeaderByHash", hash)
resp, _ := UnauthenticatedTenRPCCall[map[string]interface{}](ctx, api.we, &CacheCfg{CacheType: LongLiving}, "eth_getHeaderByHash", hash)
if resp == nil {
return nil
}
Expand All @@ -98,7 +97,7 @@ func (api *BlockChainAPI) GetBlockByNumber(ctx context.Context, number rpc.Block
ctx,
api.we,
&CacheCfg{
TTLCallback: func() time.Duration {
CacheTypeDynamic: func() CacheStrategy {
return cacheTTLBlockNumber(number)
},
}, "eth_getBlockByNumber", number, fullTx)
Expand All @@ -109,7 +108,7 @@ func (api *BlockChainAPI) GetBlockByNumber(ctx context.Context, number rpc.Block
}

func (api *BlockChainAPI) GetBlockByHash(ctx context.Context, hash common.Hash, fullTx bool) (map[string]interface{}, error) {
resp, err := UnauthenticatedTenRPCCall[map[string]interface{}](ctx, api.we, &CacheCfg{TTL: longCacheTTL}, "eth_getBlockByHash", hash, fullTx)
resp, err := UnauthenticatedTenRPCCall[map[string]interface{}](ctx, api.we, &CacheCfg{CacheType: LongLiving}, "eth_getBlockByHash", hash, fullTx)
if resp == nil {
return nil, err
}
Expand All @@ -122,7 +121,7 @@ func (api *BlockChainAPI) GetCode(ctx context.Context, address common.Address, b
ctx,
api.we,
&CacheCfg{
TTLCallback: func() time.Duration {
CacheTypeDynamic: func() CacheStrategy {
return cacheTTLBlockNumberOrHash(blockNrOrHash)
},
},
Expand Down Expand Up @@ -185,7 +184,7 @@ type (
func (api *BlockChainAPI) Call(ctx context.Context, args gethapi.TransactionArgs, blockNrOrHash rpc.BlockNumberOrHash, overrides *StateOverride, blockOverrides *BlockOverrides) (hexutil.Bytes, error) {
resp, err := ExecAuthRPC[hexutil.Bytes](ctx, api.we, &ExecCfg{
cacheCfg: &CacheCfg{
TTLCallback: func() time.Duration {
CacheTypeDynamic: func() CacheStrategy {
return cacheTTLBlockNumberOrHash(blockNrOrHash)
},
},
Expand All @@ -207,11 +206,11 @@ func (api *BlockChainAPI) Call(ctx context.Context, args gethapi.TransactionArgs
func (api *BlockChainAPI) EstimateGas(ctx context.Context, args gethapi.TransactionArgs, blockNrOrHash *rpc.BlockNumberOrHash, overrides *StateOverride) (hexutil.Uint64, error) {
resp, err := ExecAuthRPC[hexutil.Uint64](ctx, api.we, &ExecCfg{
cacheCfg: &CacheCfg{
TTLCallback: func() time.Duration {
CacheTypeDynamic: func() CacheStrategy {
if blockNrOrHash != nil {
return cacheTTLBlockNumberOrHash(*blockNrOrHash)
}
return shortCacheTTL
return LatestBatch
},
},
computeFromCallback: func(user *GWUser) *common.Address {
Expand Down
7 changes: 3 additions & 4 deletions tools/walletextension/rpcapi/ethereum_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package rpcapi

import (
"context"
"time"

"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/common/math"
Expand All @@ -19,11 +18,11 @@ func NewEthereumAPI(we *Services,
}

func (api *EthereumAPI) GasPrice(ctx context.Context) (*hexutil.Big, error) {
return UnauthenticatedTenRPCCall[hexutil.Big](ctx, api.we, &CacheCfg{TTL: shortCacheTTL}, "eth_gasPrice")
return UnauthenticatedTenRPCCall[hexutil.Big](ctx, api.we, &CacheCfg{CacheType: LatestBatch}, "eth_gasPrice")
}

func (api *EthereumAPI) MaxPriorityFeePerGas(ctx context.Context) (*hexutil.Big, error) {
return UnauthenticatedTenRPCCall[hexutil.Big](ctx, api.we, &CacheCfg{TTL: shortCacheTTL}, "eth_maxPriorityFeePerGas")
return UnauthenticatedTenRPCCall[hexutil.Big](ctx, api.we, &CacheCfg{CacheType: LatestBatch}, "eth_maxPriorityFeePerGas")
}

type FeeHistoryResult struct {
Expand All @@ -37,7 +36,7 @@ func (api *EthereumAPI) FeeHistory(ctx context.Context, blockCount math.HexOrDec
return UnauthenticatedTenRPCCall[FeeHistoryResult](
ctx,
api.we,
&CacheCfg{TTLCallback: func() time.Duration {
&CacheCfg{CacheTypeDynamic: func() CacheStrategy {
return cacheTTLBlockNumber(lastBlock)
}},
"eth_feeHistory",
Expand Down
7 changes: 3 additions & 4 deletions tools/walletextension/rpcapi/filter_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"fmt"
"sync/atomic"
"time"

subscriptioncommon "github.com/ten-protocol/go-ten/go/common/subscription"

Expand Down Expand Up @@ -162,12 +161,12 @@ func (api *FilterAPI) GetLogs(ctx context.Context, crit common.FilterCriteria) (
api.we,
&ExecCfg{
cacheCfg: &CacheCfg{
TTLCallback: func() time.Duration {
CacheTypeDynamic: func() CacheStrategy {
// when the toBlock is not specified, the request is open-ended
if crit.ToBlock != nil && crit.ToBlock.Int64() > 0 {
return longCacheTTL
return LongLiving
}
return shortCacheTTL
return LatestBatch
},
},
tryUntilAuthorised: true,
Expand Down
2 changes: 1 addition & 1 deletion tools/walletextension/rpcapi/gw_user.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func userCacheKey(userID []byte) []byte {
}

func getUser(userID []byte, s *Services) (*GWUser, error) {
return withCache(s.Cache, &CacheCfg{TTL: longCacheTTL}, userCacheKey(userID), func() (*GWUser, error) {
return withCache(s.Cache, &CacheCfg{CacheType: LongLiving}, userCacheKey(userID), func() (*GWUser, error) {
result := GWUser{userID: userID, services: s, accounts: map[common.Address]*GWAccount{}}
userPrivateKey, err := s.Storage.GetUserPrivateKey(userID)
if err != nil {
Expand Down
5 changes: 2 additions & 3 deletions tools/walletextension/rpcapi/transaction_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"encoding/json"
"fmt"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
Expand All @@ -23,7 +22,7 @@ func NewTransactionAPI(we *Services) *TransactionAPI {
}

func (s *TransactionAPI) GetBlockTransactionCountByNumber(ctx context.Context, blockNr gethrpc.BlockNumber) *hexutil.Uint {
count, err := UnauthenticatedTenRPCCall[hexutil.Uint](ctx, s.we, &CacheCfg{TTLCallback: func() time.Duration {
count, err := UnauthenticatedTenRPCCall[hexutil.Uint](ctx, s.we, &CacheCfg{CacheTypeDynamic: func() CacheStrategy {
return cacheTTLBlockNumber(blockNr)
}}, "eth_getBlockTransactionCountByNumber", blockNr)
if err != nil {
Expand All @@ -33,7 +32,7 @@ func (s *TransactionAPI) GetBlockTransactionCountByNumber(ctx context.Context, b
}

func (s *TransactionAPI) GetBlockTransactionCountByHash(ctx context.Context, blockHash common.Hash) *hexutil.Uint {
count, err := UnauthenticatedTenRPCCall[hexutil.Uint](ctx, s.we, &CacheCfg{TTL: longCacheTTL}, "eth_getBlockTransactionCountByHash", blockHash)
count, err := UnauthenticatedTenRPCCall[hexutil.Uint](ctx, s.we, &CacheCfg{CacheType: LongLiving}, "eth_getBlockTransactionCountByHash", blockHash)
if err != nil {
return nil
}
Expand Down
65 changes: 38 additions & 27 deletions tools/walletextension/rpcapi/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ const (
notAuthorised = "not authorised"

longCacheTTL = 5 * time.Hour
shortCacheTTL = 100 * time.Millisecond
shortCacheTTL = 1 * time.Minute
)

var rpcNotImplemented = fmt.Errorf("rpc endpoint not implemented")
Expand All @@ -44,12 +44,17 @@ type ExecCfg struct {
cacheCfg *CacheCfg
}

type CacheStrategy uint8

const (
NoCache CacheStrategy = iota
LatestBatch CacheStrategy = iota
LongLiving CacheStrategy = iota
)

type CacheCfg struct {
// ResetWhenNewBlock bool todo
TTL time.Duration
// logic based on block
// todo - handle block in the future
TTLCallback func() time.Duration
CacheType CacheStrategy
CacheTypeDynamic func() CacheStrategy
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personal preference but I'd just have the func property instead of supporting either, like GetCacheType func() CacheStrategy
and then just have like func LongLivedStrategy() CacheStrategy { return LongLived } available to be referenced when configuring them.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to keep things straight-forward and compact where you know exactly

}

func UnauthenticatedTenRPCCall[R any](ctx context.Context, w *Services, cfg *CacheCfg, method string, args ...any) (*R, error) {
Expand Down Expand Up @@ -193,28 +198,34 @@ func withCache[R any](cache cache.Cache, cfg *CacheCfg, cacheKey []byte, onCache
return onCacheMiss()
}

cacheTTL := cfg.TTL
if cfg.TTLCallback != nil {
cacheTTL = cfg.TTLCallback()
cacheType := cfg.CacheType
if cfg.CacheTypeDynamic != nil {
cacheType = cfg.CacheTypeDynamic()
}
isCacheable := cacheTTL > 0

if isCacheable {
if cachedValue, ok := cache.Get(cacheKey); ok {
// cloning?
returnValue, ok := cachedValue.(*R)
if !ok {
return nil, fmt.Errorf("unexpected error. Invalid format cached. %v", cachedValue)
}
return returnValue, nil

if cacheType == NoCache {
return onCacheMiss()
}

ttl := longCacheTTL
if cacheType == LatestBatch {
ttl = shortCacheTTL
}

cachedValue, foundInCache := cache.Get(cacheKey)
if foundInCache && !cache.IsEvicted(cacheKey) {
returnValue, ok := cachedValue.(*R)
if !ok {
return nil, fmt.Errorf("unexpected error. Invalid format cached. %v", cachedValue)
}
return returnValue, nil
}

result, err := onCacheMiss()

// cache only non-nil values
if isCacheable && err == nil && result != nil {
cache.Set(cacheKey, result, cacheTTL)
if err == nil && result != nil {
cache.Set(cacheKey, result, ttl)
}

return result, err
Expand All @@ -226,18 +237,18 @@ func audit(services *Services, msg string, params ...any) {
}
}

func cacheTTLBlockNumberOrHash(blockNrOrHash rpc.BlockNumberOrHash) time.Duration {
func cacheTTLBlockNumberOrHash(blockNrOrHash rpc.BlockNumberOrHash) CacheStrategy {
if blockNrOrHash.BlockNumber != nil && blockNrOrHash.BlockNumber.Int64() <= 0 {
return shortCacheTTL
return LatestBatch
}
return longCacheTTL
return LongLiving
}

func cacheTTLBlockNumber(lastBlock rpc.BlockNumber) time.Duration {
func cacheTTLBlockNumber(lastBlock rpc.BlockNumber) CacheStrategy {
if lastBlock > 0 {
return longCacheTTL
return LongLiving
}
return shortCacheTTL
return LatestBatch
}

func connectWS(account *GWAccount, logger gethlog.Logger) (*tenrpc.EncRPCClient, error) {
Expand Down
2 changes: 1 addition & 1 deletion tools/walletextension/rpcapi/wallet_extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func NewServices(hostAddrHTTP string, hostAddrWS string, storage storage.Storage

services.backendNewHeadsSubscription = clientSubscription
services.NewHeadsService = subscriptioncommon.NewNewHeadsService(ch, true, logger, func(newHead *common2.BatchHeader) error {
// todo - in a followup PR, invalidate cache entries marked as "latest"
services.Cache.EvictShortLiving()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missed this on a previous review but could you alias that common2 import while you're here?

return nil
})

Expand Down
Loading