diff --git a/go/common/subscription/new_heads_manager.go b/go/common/subscription/new_heads_manager.go index 7c82708147..eded54d1b2 100644 --- a/go/common/subscription/new_heads_manager.go +++ b/go/common/subscription/new_heads_manager.go @@ -42,9 +42,6 @@ func NewNewHeadsService(inputCh chan *common.BatchHeader, convertToEthHeader boo func (nhs *NewHeadsService) Start() error { go ForwardFromChannels([]chan *common.BatchHeader{nhs.inputCh}, nhs.stopped, func(head *common.BatchHeader) error { - nhs.notifiersMutex.RLock() - defer nhs.notifiersMutex.RUnlock() - if nhs.onMessage != nil { err := nhs.onMessage(head) if err != nil { @@ -57,6 +54,9 @@ func (nhs *NewHeadsService) Start() error { msg = convertBatchHeader(head) } + nhs.notifiersMutex.RLock() + defer nhs.notifiersMutex.RUnlock() + // for each new head, notify all registered subscriptions for id, notifier := range nhs.newHeadNotifiers { if nhs.stopped.Load() { diff --git a/tools/walletextension/cache/RistrettoCache.go b/tools/walletextension/cache/RistrettoCache.go index a72ca5f606..233d49f64b 100644 --- a/tools/walletextension/cache/RistrettoCache.go +++ b/tools/walletextension/cache/RistrettoCache.go @@ -16,12 +16,13 @@ const ( ) type ristrettoCache struct { - cache *ristretto.Cache - quit chan struct{} + cache *ristretto.Cache + quit chan struct{} + lastEviction time.Time } -// NewRistrettoCache returns a new ristrettoCache. -func NewRistrettoCache(logger log.Logger) (Cache, error) { +// NewRistrettoCacheWithEviction returns a new ristrettoCache. +func NewRistrettoCacheWithEviction(logger log.Logger) (Cache, error) { cache, err := ristretto.NewCache(&ristretto.Config{ NumCounters: numCounters, MaxCost: maxCost, @@ -33,8 +34,9 @@ func NewRistrettoCache(logger log.Logger) (Cache, error) { } c := &ristrettoCache{ - cache: cache, - quit: make(chan struct{}), + cache: cache, + quit: make(chan struct{}), + lastEviction: time.Now(), } // Start the metrics logging @@ -43,6 +45,21 @@ func NewRistrettoCache(logger log.Logger) (Cache, error) { return c, nil } +func (c *ristrettoCache) EvictShortLiving() { + c.lastEviction = time.Now() +} + +func (c *ristrettoCache) IsEvicted(key any, originalTTL time.Duration) bool { + remainingTTL, notExpired := c.cache.GetTTL(key) + if !notExpired { + return true + } + cachedTime := time.Now().Add(remainingTTL).Add(-originalTTL) + // ... LE...Cached...Now - Valid + // ... Cached...LE...Now - Evicted + return c.lastEviction.After(cachedTime) +} + // Set adds the key and value to the cache. func (c *ristrettoCache) Set(key []byte, value any, ttl time.Duration) bool { return c.cache.SetWithTTL(key, value, defaultCost, ttl) diff --git a/tools/walletextension/cache/cache.go b/tools/walletextension/cache/cache.go index c080779305..108e1c1fed 100644 --- a/tools/walletextension/cache/cache.go +++ b/tools/walletextension/cache/cache.go @@ -7,11 +7,13 @@ import ( ) type Cache interface { + EvictShortLiving() + IsEvicted(key any, originalTTL time.Duration) bool Set(key []byte, value any, ttl time.Duration) bool Get(key []byte) (value any, ok bool) Remove(key []byte) } func NewCache(logger log.Logger) (Cache, error) { - return NewRistrettoCache(logger) + return NewRistrettoCacheWithEviction(logger) } diff --git a/tools/walletextension/rpcapi/blockchain_api.go b/tools/walletextension/rpcapi/blockchain_api.go index e9e1d66e3f..c970aa1538 100644 --- a/tools/walletextension/rpcapi/blockchain_api.go +++ b/tools/walletextension/rpcapi/blockchain_api.go @@ -3,7 +3,6 @@ package rpcapi import ( "context" "encoding/json" - "time" "github.com/ethereum/go-ethereum/core/types" @@ -23,12 +22,12 @@ func NewBlockChainAPI(we *Services) *BlockChainAPI { } func (api *BlockChainAPI) ChainId() *hexutil.Big { //nolint:stylecheck - chainID, _ := UnauthenticatedTenRPCCall[hexutil.Big](context.Background(), api.we, &CacheCfg{TTL: longCacheTTL}, "eth_chainId") + chainID, _ := UnauthenticatedTenRPCCall[hexutil.Big](context.Background(), api.we, &CacheCfg{CacheType: LongLiving}, "eth_chainId") return chainID } func (api *BlockChainAPI) BlockNumber() hexutil.Uint64 { - nr, err := UnauthenticatedTenRPCCall[hexutil.Uint64](context.Background(), api.we, &CacheCfg{TTL: shortCacheTTL}, "eth_blockNumber") + nr, err := UnauthenticatedTenRPCCall[hexutil.Uint64](context.Background(), api.we, &CacheCfg{CacheType: LatestBatch}, "eth_blockNumber") if err != nil { return hexutil.Uint64(0) } @@ -41,7 +40,7 @@ func (api *BlockChainAPI) GetBalance(ctx context.Context, address common.Address api.we, &ExecCfg{ cacheCfg: &CacheCfg{ - TTLCallback: func() time.Duration { + CacheTypeDynamic: func() CacheStrategy { return cacheTTLBlockNumberOrHash(blockNrOrHash) }, }, @@ -76,7 +75,7 @@ func (s *BlockChainAPI) GetProof(ctx context.Context, address common.Address, st } func (api *BlockChainAPI) GetHeaderByNumber(ctx context.Context, number rpc.BlockNumber) (map[string]interface{}, error) { - resp, err := UnauthenticatedTenRPCCall[map[string]interface{}](ctx, api.we, &CacheCfg{TTLCallback: func() time.Duration { + resp, err := UnauthenticatedTenRPCCall[map[string]interface{}](ctx, api.we, &CacheCfg{CacheTypeDynamic: func() CacheStrategy { return cacheTTLBlockNumber(number) }}, "eth_getHeaderByNumber", number) if resp == nil { @@ -86,7 +85,7 @@ func (api *BlockChainAPI) GetHeaderByNumber(ctx context.Context, number rpc.Bloc } func (api *BlockChainAPI) GetHeaderByHash(ctx context.Context, hash common.Hash) map[string]interface{} { - resp, _ := UnauthenticatedTenRPCCall[map[string]interface{}](ctx, api.we, &CacheCfg{TTL: longCacheTTL}, "eth_getHeaderByHash", hash) + resp, _ := UnauthenticatedTenRPCCall[map[string]interface{}](ctx, api.we, &CacheCfg{CacheType: LongLiving}, "eth_getHeaderByHash", hash) if resp == nil { return nil } @@ -98,7 +97,7 @@ func (api *BlockChainAPI) GetBlockByNumber(ctx context.Context, number rpc.Block ctx, api.we, &CacheCfg{ - TTLCallback: func() time.Duration { + CacheTypeDynamic: func() CacheStrategy { return cacheTTLBlockNumber(number) }, }, "eth_getBlockByNumber", number, fullTx) @@ -109,7 +108,7 @@ func (api *BlockChainAPI) GetBlockByNumber(ctx context.Context, number rpc.Block } func (api *BlockChainAPI) GetBlockByHash(ctx context.Context, hash common.Hash, fullTx bool) (map[string]interface{}, error) { - resp, err := UnauthenticatedTenRPCCall[map[string]interface{}](ctx, api.we, &CacheCfg{TTL: longCacheTTL}, "eth_getBlockByHash", hash, fullTx) + resp, err := UnauthenticatedTenRPCCall[map[string]interface{}](ctx, api.we, &CacheCfg{CacheType: LongLiving}, "eth_getBlockByHash", hash, fullTx) if resp == nil { return nil, err } @@ -122,7 +121,7 @@ func (api *BlockChainAPI) GetCode(ctx context.Context, address common.Address, b ctx, api.we, &CacheCfg{ - TTLCallback: func() time.Duration { + CacheTypeDynamic: func() CacheStrategy { return cacheTTLBlockNumberOrHash(blockNrOrHash) }, }, @@ -185,7 +184,7 @@ type ( func (api *BlockChainAPI) Call(ctx context.Context, args gethapi.TransactionArgs, blockNrOrHash rpc.BlockNumberOrHash, overrides *StateOverride, blockOverrides *BlockOverrides) (hexutil.Bytes, error) { resp, err := ExecAuthRPC[hexutil.Bytes](ctx, api.we, &ExecCfg{ cacheCfg: &CacheCfg{ - TTLCallback: func() time.Duration { + CacheTypeDynamic: func() CacheStrategy { return cacheTTLBlockNumberOrHash(blockNrOrHash) }, }, @@ -207,11 +206,11 @@ func (api *BlockChainAPI) Call(ctx context.Context, args gethapi.TransactionArgs func (api *BlockChainAPI) EstimateGas(ctx context.Context, args gethapi.TransactionArgs, blockNrOrHash *rpc.BlockNumberOrHash, overrides *StateOverride) (hexutil.Uint64, error) { resp, err := ExecAuthRPC[hexutil.Uint64](ctx, api.we, &ExecCfg{ cacheCfg: &CacheCfg{ - TTLCallback: func() time.Duration { + CacheTypeDynamic: func() CacheStrategy { if blockNrOrHash != nil { return cacheTTLBlockNumberOrHash(*blockNrOrHash) } - return shortCacheTTL + return LatestBatch }, }, computeFromCallback: func(user *GWUser) *common.Address { diff --git a/tools/walletextension/rpcapi/ethereum_api.go b/tools/walletextension/rpcapi/ethereum_api.go index 742c5c71c9..b304a405f4 100644 --- a/tools/walletextension/rpcapi/ethereum_api.go +++ b/tools/walletextension/rpcapi/ethereum_api.go @@ -2,7 +2,6 @@ package rpcapi import ( "context" - "time" "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/common/math" @@ -19,11 +18,11 @@ func NewEthereumAPI(we *Services, } func (api *EthereumAPI) GasPrice(ctx context.Context) (*hexutil.Big, error) { - return UnauthenticatedTenRPCCall[hexutil.Big](ctx, api.we, &CacheCfg{TTL: shortCacheTTL}, "eth_gasPrice") + return UnauthenticatedTenRPCCall[hexutil.Big](ctx, api.we, &CacheCfg{CacheType: LatestBatch}, "eth_gasPrice") } func (api *EthereumAPI) MaxPriorityFeePerGas(ctx context.Context) (*hexutil.Big, error) { - return UnauthenticatedTenRPCCall[hexutil.Big](ctx, api.we, &CacheCfg{TTL: shortCacheTTL}, "eth_maxPriorityFeePerGas") + return UnauthenticatedTenRPCCall[hexutil.Big](ctx, api.we, &CacheCfg{CacheType: LatestBatch}, "eth_maxPriorityFeePerGas") } type FeeHistoryResult struct { @@ -37,7 +36,7 @@ func (api *EthereumAPI) FeeHistory(ctx context.Context, blockCount math.HexOrDec return UnauthenticatedTenRPCCall[FeeHistoryResult]( ctx, api.we, - &CacheCfg{TTLCallback: func() time.Duration { + &CacheCfg{CacheTypeDynamic: func() CacheStrategy { return cacheTTLBlockNumber(lastBlock) }}, "eth_feeHistory", diff --git a/tools/walletextension/rpcapi/filter_api.go b/tools/walletextension/rpcapi/filter_api.go index 7d7e4f85ce..471985e670 100644 --- a/tools/walletextension/rpcapi/filter_api.go +++ b/tools/walletextension/rpcapi/filter_api.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "sync/atomic" - "time" subscriptioncommon "github.com/ten-protocol/go-ten/go/common/subscription" @@ -162,12 +161,12 @@ func (api *FilterAPI) GetLogs(ctx context.Context, crit common.FilterCriteria) ( api.we, &ExecCfg{ cacheCfg: &CacheCfg{ - TTLCallback: func() time.Duration { + CacheTypeDynamic: func() CacheStrategy { // when the toBlock is not specified, the request is open-ended if crit.ToBlock != nil && crit.ToBlock.Int64() > 0 { - return longCacheTTL + return LongLiving } - return shortCacheTTL + return LatestBatch }, }, tryUntilAuthorised: true, diff --git a/tools/walletextension/rpcapi/gw_user.go b/tools/walletextension/rpcapi/gw_user.go index abd65df0fa..302a488557 100644 --- a/tools/walletextension/rpcapi/gw_user.go +++ b/tools/walletextension/rpcapi/gw_user.go @@ -41,7 +41,7 @@ func userCacheKey(userID []byte) []byte { } func getUser(userID []byte, s *Services) (*GWUser, error) { - return withCache(s.Cache, &CacheCfg{TTL: longCacheTTL}, userCacheKey(userID), func() (*GWUser, error) { + return withCache(s.Cache, &CacheCfg{CacheType: LongLiving}, userCacheKey(userID), func() (*GWUser, error) { result := GWUser{userID: userID, services: s, accounts: map[common.Address]*GWAccount{}} userPrivateKey, err := s.Storage.GetUserPrivateKey(userID) if err != nil { diff --git a/tools/walletextension/rpcapi/transaction_api.go b/tools/walletextension/rpcapi/transaction_api.go index 537bdbc8f4..1b1f17dc69 100644 --- a/tools/walletextension/rpcapi/transaction_api.go +++ b/tools/walletextension/rpcapi/transaction_api.go @@ -4,7 +4,6 @@ import ( "context" "encoding/json" "fmt" - "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" @@ -23,7 +22,7 @@ func NewTransactionAPI(we *Services) *TransactionAPI { } func (s *TransactionAPI) GetBlockTransactionCountByNumber(ctx context.Context, blockNr gethrpc.BlockNumber) *hexutil.Uint { - count, err := UnauthenticatedTenRPCCall[hexutil.Uint](ctx, s.we, &CacheCfg{TTLCallback: func() time.Duration { + count, err := UnauthenticatedTenRPCCall[hexutil.Uint](ctx, s.we, &CacheCfg{CacheTypeDynamic: func() CacheStrategy { return cacheTTLBlockNumber(blockNr) }}, "eth_getBlockTransactionCountByNumber", blockNr) if err != nil { @@ -33,7 +32,7 @@ func (s *TransactionAPI) GetBlockTransactionCountByNumber(ctx context.Context, b } func (s *TransactionAPI) GetBlockTransactionCountByHash(ctx context.Context, blockHash common.Hash) *hexutil.Uint { - count, err := UnauthenticatedTenRPCCall[hexutil.Uint](ctx, s.we, &CacheCfg{TTL: longCacheTTL}, "eth_getBlockTransactionCountByHash", blockHash) + count, err := UnauthenticatedTenRPCCall[hexutil.Uint](ctx, s.we, &CacheCfg{CacheType: LongLiving}, "eth_getBlockTransactionCountByHash", blockHash) if err != nil { return nil } diff --git a/tools/walletextension/rpcapi/utils.go b/tools/walletextension/rpcapi/utils.go index 009c31f88a..20a05a7d8f 100644 --- a/tools/walletextension/rpcapi/utils.go +++ b/tools/walletextension/rpcapi/utils.go @@ -30,7 +30,7 @@ const ( notAuthorised = "not authorised" longCacheTTL = 5 * time.Hour - shortCacheTTL = 100 * time.Millisecond + shortCacheTTL = 1 * time.Minute ) var rpcNotImplemented = fmt.Errorf("rpc endpoint not implemented") @@ -44,12 +44,17 @@ type ExecCfg struct { cacheCfg *CacheCfg } +type CacheStrategy uint8 + +const ( + NoCache CacheStrategy = iota + LatestBatch CacheStrategy = iota + LongLiving CacheStrategy = iota +) + type CacheCfg struct { - // ResetWhenNewBlock bool todo - TTL time.Duration - // logic based on block - // todo - handle block in the future - TTLCallback func() time.Duration + CacheType CacheStrategy + CacheTypeDynamic func() CacheStrategy } func UnauthenticatedTenRPCCall[R any](ctx context.Context, w *Services, cfg *CacheCfg, method string, args ...any) (*R, error) { @@ -193,28 +198,34 @@ func withCache[R any](cache cache.Cache, cfg *CacheCfg, cacheKey []byte, onCache return onCacheMiss() } - cacheTTL := cfg.TTL - if cfg.TTLCallback != nil { - cacheTTL = cfg.TTLCallback() + cacheType := cfg.CacheType + if cfg.CacheTypeDynamic != nil { + cacheType = cfg.CacheTypeDynamic() } - isCacheable := cacheTTL > 0 - - if isCacheable { - if cachedValue, ok := cache.Get(cacheKey); ok { - // cloning? - returnValue, ok := cachedValue.(*R) - if !ok { - return nil, fmt.Errorf("unexpected error. Invalid format cached. %v", cachedValue) - } - return returnValue, nil + + if cacheType == NoCache { + return onCacheMiss() + } + + ttl := longCacheTTL + if cacheType == LatestBatch { + ttl = shortCacheTTL + } + + cachedValue, foundInCache := cache.Get(cacheKey) + if foundInCache && !cache.IsEvicted(cacheKey, ttl) { + returnValue, ok := cachedValue.(*R) + if !ok { + return nil, fmt.Errorf("unexpected error. Invalid format cached. %v", cachedValue) } + return returnValue, nil } result, err := onCacheMiss() // cache only non-nil values - if isCacheable && err == nil && result != nil { - cache.Set(cacheKey, result, cacheTTL) + if err == nil && result != nil { + cache.Set(cacheKey, result, ttl) } return result, err @@ -226,18 +237,18 @@ func audit(services *Services, msg string, params ...any) { } } -func cacheTTLBlockNumberOrHash(blockNrOrHash rpc.BlockNumberOrHash) time.Duration { +func cacheTTLBlockNumberOrHash(blockNrOrHash rpc.BlockNumberOrHash) CacheStrategy { if blockNrOrHash.BlockNumber != nil && blockNrOrHash.BlockNumber.Int64() <= 0 { - return shortCacheTTL + return LatestBatch } - return longCacheTTL + return LongLiving } -func cacheTTLBlockNumber(lastBlock rpc.BlockNumber) time.Duration { +func cacheTTLBlockNumber(lastBlock rpc.BlockNumber) CacheStrategy { if lastBlock > 0 { - return longCacheTTL + return LongLiving } - return shortCacheTTL + return LatestBatch } func connectWS(account *GWAccount, logger gethlog.Logger) (*tenrpc.EncRPCClient, error) { diff --git a/tools/walletextension/rpcapi/wallet_extension.go b/tools/walletextension/rpcapi/wallet_extension.go index 5d9e6abf4a..02cc194bf6 100644 --- a/tools/walletextension/rpcapi/wallet_extension.go +++ b/tools/walletextension/rpcapi/wallet_extension.go @@ -9,7 +9,7 @@ import ( subscriptioncommon "github.com/ten-protocol/go-ten/go/common/subscription" - common2 "github.com/ten-protocol/go-ten/go/common" + tencommon "github.com/ten-protocol/go-ten/go/common" "github.com/ten-protocol/go-ten/go/rpc" "github.com/ten-protocol/go-ten/go/obsclient" @@ -50,7 +50,7 @@ type Services struct { } type NewHeadNotifier interface { - onNewHead(header *common2.BatchHeader) + onNewHead(header *tencommon.BatchHeader) } func NewServices(hostAddrHTTP string, hostAddrWS string, storage storage.Storage, stopControl *stopcontrol.StopControl, version string, logger gethlog.Logger, config *common.Config) *Services { @@ -110,15 +110,15 @@ func NewServices(hostAddrHTTP string, hostAddrWS string, storage storage.Storage } rpcClient := connectionObj.(rpc.Client) - ch := make(chan *common2.BatchHeader) + ch := make(chan *tencommon.BatchHeader) clientSubscription, err := rpcClient.Subscribe(context.Background(), rpc.SubscribeNamespace, ch, rpc.SubscriptionTypeNewHeads) if err != nil { panic(fmt.Errorf("cannot subscribe to new heads to the backend %w", err)) } services.backendNewHeadsSubscription = clientSubscription - services.NewHeadsService = subscriptioncommon.NewNewHeadsService(ch, true, logger, func(newHead *common2.BatchHeader) error { - // todo - in a followup PR, invalidate cache entries marked as "latest" + services.NewHeadsService = subscriptioncommon.NewNewHeadsService(ch, true, logger, func(newHead *tencommon.BatchHeader) error { + services.Cache.EvictShortLiving() return nil })