From 69ea787572c37dcd0a1d83f275c4905e74fd67e0 Mon Sep 17 00:00:00 2001 From: Tudor Malene Date: Wed, 10 Apr 2024 13:51:27 +0100 Subject: [PATCH 1/2] misc fixes --- .../walletextension/rpcapi/blockchain_api.go | 12 +++++----- tools/walletextension/rpcapi/ethereum_api.go | 2 +- .../walletextension/rpcapi/transaction_api.go | 24 +++++++++++++++---- tools/walletextension/rpcapi/utils.go | 12 +++++++--- .../rpcapi/wallet_extension.go | 24 +++++++++++++++++-- 5 files changed, 57 insertions(+), 17 deletions(-) diff --git a/tools/walletextension/rpcapi/blockchain_api.go b/tools/walletextension/rpcapi/blockchain_api.go index c970aa1538..7993231108 100644 --- a/tools/walletextension/rpcapi/blockchain_api.go +++ b/tools/walletextension/rpcapi/blockchain_api.go @@ -41,7 +41,7 @@ func (api *BlockChainAPI) GetBalance(ctx context.Context, address common.Address &ExecCfg{ cacheCfg: &CacheCfg{ CacheTypeDynamic: func() CacheStrategy { - return cacheTTLBlockNumberOrHash(blockNrOrHash) + return cacheBlockNumberOrHash(blockNrOrHash) }, }, account: &address, @@ -76,7 +76,7 @@ func (s *BlockChainAPI) GetProof(ctx context.Context, address common.Address, st func (api *BlockChainAPI) GetHeaderByNumber(ctx context.Context, number rpc.BlockNumber) (map[string]interface{}, error) { resp, err := UnauthenticatedTenRPCCall[map[string]interface{}](ctx, api.we, &CacheCfg{CacheTypeDynamic: func() CacheStrategy { - return cacheTTLBlockNumber(number) + return cacheBlockNumber(number) }}, "eth_getHeaderByNumber", number) if resp == nil { return nil, err @@ -98,7 +98,7 @@ func (api *BlockChainAPI) GetBlockByNumber(ctx context.Context, number rpc.Block api.we, &CacheCfg{ CacheTypeDynamic: func() CacheStrategy { - return cacheTTLBlockNumber(number) + return cacheBlockNumber(number) }, }, "eth_getBlockByNumber", number, fullTx) if resp == nil { @@ -122,7 +122,7 @@ func (api *BlockChainAPI) GetCode(ctx context.Context, address common.Address, b api.we, &CacheCfg{ CacheTypeDynamic: func() CacheStrategy { - return cacheTTLBlockNumberOrHash(blockNrOrHash) + return cacheBlockNumberOrHash(blockNrOrHash) }, }, "eth_getCode", @@ -185,7 +185,7 @@ func (api *BlockChainAPI) Call(ctx context.Context, args gethapi.TransactionArgs resp, err := ExecAuthRPC[hexutil.Bytes](ctx, api.we, &ExecCfg{ cacheCfg: &CacheCfg{ CacheTypeDynamic: func() CacheStrategy { - return cacheTTLBlockNumberOrHash(blockNrOrHash) + return cacheBlockNumberOrHash(blockNrOrHash) }, }, computeFromCallback: func(user *GWUser) *common.Address { @@ -208,7 +208,7 @@ func (api *BlockChainAPI) EstimateGas(ctx context.Context, args gethapi.Transact cacheCfg: &CacheCfg{ CacheTypeDynamic: func() CacheStrategy { if blockNrOrHash != nil { - return cacheTTLBlockNumberOrHash(*blockNrOrHash) + return cacheBlockNumberOrHash(*blockNrOrHash) } return LatestBatch }, diff --git a/tools/walletextension/rpcapi/ethereum_api.go b/tools/walletextension/rpcapi/ethereum_api.go index b304a405f4..214fa7912b 100644 --- a/tools/walletextension/rpcapi/ethereum_api.go +++ b/tools/walletextension/rpcapi/ethereum_api.go @@ -37,7 +37,7 @@ func (api *EthereumAPI) FeeHistory(ctx context.Context, blockCount math.HexOrDec ctx, api.we, &CacheCfg{CacheTypeDynamic: func() CacheStrategy { - return cacheTTLBlockNumber(lastBlock) + return cacheBlockNumber(lastBlock) }}, "eth_feeHistory", blockCount, diff --git a/tools/walletextension/rpcapi/transaction_api.go b/tools/walletextension/rpcapi/transaction_api.go index 1b1f17dc69..3833db0e77 100644 --- a/tools/walletextension/rpcapi/transaction_api.go +++ b/tools/walletextension/rpcapi/transaction_api.go @@ -23,7 +23,7 @@ func NewTransactionAPI(we *Services) *TransactionAPI { func (s *TransactionAPI) GetBlockTransactionCountByNumber(ctx context.Context, blockNr gethrpc.BlockNumber) *hexutil.Uint { count, err := UnauthenticatedTenRPCCall[hexutil.Uint](ctx, s.we, &CacheCfg{CacheTypeDynamic: func() CacheStrategy { - return cacheTTLBlockNumber(blockNr) + return cacheBlockNumber(blockNr) }}, "eth_getBlockTransactionCountByNumber", blockNr) if err != nil { return nil @@ -60,15 +60,29 @@ func (s *TransactionAPI) GetRawTransactionByBlockHashAndIndex(ctx context.Contex } func (s *TransactionAPI) GetTransactionCount(ctx context.Context, address common.Address, blockNrOrHash gethrpc.BlockNumberOrHash) (*hexutil.Uint64, error) { - return ExecAuthRPC[hexutil.Uint64](ctx, s.we, &ExecCfg{account: &address}, "eth_getTransactionCount", address, blockNrOrHash) + return ExecAuthRPC[hexutil.Uint64]( + ctx, + s.we, + &ExecCfg{ + account: &address, + cacheCfg: &CacheCfg{ + CacheTypeDynamic: func() CacheStrategy { + return cacheBlockNumberOrHash(blockNrOrHash) + }, + }, + }, + "eth_getTransactionCount", + address, + blockNrOrHash, + ) } func (s *TransactionAPI) GetTransactionByHash(ctx context.Context, hash common.Hash) (*rpc.RpcTransaction, error) { - return ExecAuthRPC[rpc.RpcTransaction](ctx, s.we, &ExecCfg{tryAll: true}, "eth_getTransactionByHash", hash) + return ExecAuthRPC[rpc.RpcTransaction](ctx, s.we, &ExecCfg{tryAll: true, cacheCfg: &CacheCfg{CacheType: LongLiving}}, "eth_getTransactionByHash", hash) } func (s *TransactionAPI) GetRawTransactionByHash(ctx context.Context, hash common.Hash) (hexutil.Bytes, error) { - tx, err := ExecAuthRPC[hexutil.Bytes](ctx, s.we, &ExecCfg{tryAll: true}, "eth_getRawTransactionByHash", hash) + tx, err := ExecAuthRPC[hexutil.Bytes](ctx, s.we, &ExecCfg{tryAll: true, cacheCfg: &CacheCfg{CacheType: LongLiving}}, "eth_getRawTransactionByHash", hash) if tx != nil { return *tx, err } @@ -76,7 +90,7 @@ func (s *TransactionAPI) GetRawTransactionByHash(ctx context.Context, hash commo } func (s *TransactionAPI) GetTransactionReceipt(ctx context.Context, hash common.Hash) (map[string]interface{}, error) { - txRec, err := ExecAuthRPC[map[string]interface{}](ctx, s.we, &ExecCfg{tryUntilAuthorised: true}, "eth_getTransactionReceipt", hash) + txRec, err := ExecAuthRPC[map[string]interface{}](ctx, s.we, &ExecCfg{tryUntilAuthorised: true, cacheCfg: &CacheCfg{CacheType: LongLiving}}, "eth_getTransactionReceipt", hash) if err != nil { return nil, err } diff --git a/tools/walletextension/rpcapi/utils.go b/tools/walletextension/rpcapi/utils.go index 20a05a7d8f..9ab34e119d 100644 --- a/tools/walletextension/rpcapi/utils.go +++ b/tools/walletextension/rpcapi/utils.go @@ -207,13 +207,19 @@ func withCache[R any](cache cache.Cache, cfg *CacheCfg, cacheKey []byte, onCache return onCacheMiss() } + isEvictable := false ttl := longCacheTTL if cacheType == LatestBatch { ttl = shortCacheTTL + isEvictable = true } cachedValue, foundInCache := cache.Get(cacheKey) - if foundInCache && !cache.IsEvicted(cacheKey, ttl) { + + // only entries cached with `LatestBatch` are evicted + isEvicted := isEvictable && cache.IsEvicted(cacheKey, ttl) + + if foundInCache && !isEvicted { returnValue, ok := cachedValue.(*R) if !ok { return nil, fmt.Errorf("unexpected error. Invalid format cached. %v", cachedValue) @@ -237,14 +243,14 @@ func audit(services *Services, msg string, params ...any) { } } -func cacheTTLBlockNumberOrHash(blockNrOrHash rpc.BlockNumberOrHash) CacheStrategy { +func cacheBlockNumberOrHash(blockNrOrHash rpc.BlockNumberOrHash) CacheStrategy { if blockNrOrHash.BlockNumber != nil && blockNrOrHash.BlockNumber.Int64() <= 0 { return LatestBatch } return LongLiving } -func cacheTTLBlockNumber(lastBlock rpc.BlockNumber) CacheStrategy { +func cacheBlockNumber(lastBlock rpc.BlockNumber) CacheStrategy { if lastBlock > 0 { return LongLiving } diff --git a/tools/walletextension/rpcapi/wallet_extension.go b/tools/walletextension/rpcapi/wallet_extension.go index 02cc194bf6..eb967d6d8e 100644 --- a/tools/walletextension/rpcapi/wallet_extension.go +++ b/tools/walletextension/rpcapi/wallet_extension.go @@ -7,6 +7,9 @@ import ( "fmt" "time" + "github.com/ten-protocol/go-ten/go/common/log" + "github.com/ten-protocol/go-ten/go/common/retry" + subscriptioncommon "github.com/ten-protocol/go-ten/go/common/subscription" tencommon "github.com/ten-protocol/go-ten/go/common" @@ -111,11 +114,10 @@ func NewServices(hostAddrHTTP string, hostAddrWS string, storage storage.Storage rpcClient := connectionObj.(rpc.Client) ch := make(chan *tencommon.BatchHeader) - clientSubscription, err := rpcClient.Subscribe(context.Background(), rpc.SubscribeNamespace, ch, rpc.SubscriptionTypeNewHeads) + clientSubscription, err := subscribeToNewHeadsWithRetry(rpcClient, ch, retry.NewTimeoutStrategy(10*time.Minute, 1*time.Second), logger) if err != nil { panic(fmt.Errorf("cannot subscribe to new heads to the backend %w", err)) } - services.backendNewHeadsSubscription = clientSubscription services.NewHeadsService = subscriptioncommon.NewNewHeadsService(ch, true, logger, func(newHead *tencommon.BatchHeader) error { services.Cache.EvictShortLiving() @@ -125,6 +127,24 @@ func NewServices(hostAddrHTTP string, hostAddrWS string, storage storage.Storage return &services } +func subscribeToNewHeadsWithRetry(rpcClient rpc.Client, ch chan *tencommon.BatchHeader, retryStrategy retry.Strategy, logger gethlog.Logger) (*gethrpc.ClientSubscription, error) { + var sub *gethrpc.ClientSubscription + + err := retry.Do(func() error { + var err error + sub, err = rpcClient.Subscribe(context.Background(), rpc.SubscribeNamespace, ch, rpc.SubscriptionTypeNewHeads) + if err != nil { + logger.Info("could not subscribe for new head blocks", log.ErrKey, err) + } + return err + }, retryStrategy) + if err != nil { + logger.Error("could not subscribe for new head blocks.", log.ErrKey, err) + } + + return sub, err +} + // IsStopping returns whether the WE is stopping func (w *Services) IsStopping() bool { return w.stopControl.IsStopping() From 843440d72d01d161c218fd3cd3849401c091a7a7 Mon Sep 17 00:00:00 2001 From: Tudor Malene Date: Wed, 10 Apr 2024 14:15:45 +0100 Subject: [PATCH 2/2] clarification --- tools/walletextension/cache/cache.go | 4 ++++ tools/walletextension/rpcapi/utils.go | 25 +++++++++++++------------ 2 files changed, 17 insertions(+), 12 deletions(-) diff --git a/tools/walletextension/cache/cache.go b/tools/walletextension/cache/cache.go index 108e1c1fed..5ac717541b 100644 --- a/tools/walletextension/cache/cache.go +++ b/tools/walletextension/cache/cache.go @@ -7,8 +7,12 @@ import ( ) type Cache interface { + // EvictShortLiving - notify the cache that all short living elements cached before the events should be considered as evicted. EvictShortLiving() + + // IsEvicted - based on the eviction event and the time of caching, calculates whether the key was evicted IsEvicted(key any, originalTTL time.Duration) bool + Set(key []byte, value any, ttl time.Duration) bool Get(key []byte) (value any, ok bool) Remove(key []byte) diff --git a/tools/walletextension/rpcapi/utils.go b/tools/walletextension/rpcapi/utils.go index 9ab34e119d..eeb2fb9408 100644 --- a/tools/walletextension/rpcapi/utils.go +++ b/tools/walletextension/rpcapi/utils.go @@ -207,24 +207,25 @@ func withCache[R any](cache cache.Cache, cfg *CacheCfg, cacheKey []byte, onCache return onCacheMiss() } - isEvictable := false + // we implement a custom cache eviction logic for the cache strategy of type LatestBatch. + // when a new batch is created, all entries with "LatestBatch" are considered evicted. + // elements not cached for a specific batch are not evicted + isEvicted := false ttl := longCacheTTL if cacheType == LatestBatch { ttl = shortCacheTTL - isEvictable = true + isEvicted = cache.IsEvicted(cacheKey, ttl) } - cachedValue, foundInCache := cache.Get(cacheKey) - - // only entries cached with `LatestBatch` are evicted - isEvicted := isEvictable && cache.IsEvicted(cacheKey, ttl) - - if foundInCache && !isEvicted { - returnValue, ok := cachedValue.(*R) - if !ok { - return nil, fmt.Errorf("unexpected error. Invalid format cached. %v", cachedValue) + if !isEvicted { + cachedValue, foundInCache := cache.Get(cacheKey) + if foundInCache { + returnValue, ok := cachedValue.(*R) + if !ok { + return nil, fmt.Errorf("unexpected error. Invalid format cached. %v", cachedValue) + } + return returnValue, nil } - return returnValue, nil } result, err := onCacheMiss()