Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Misc fixes after refactoring the gateway RPC #1873

Merged
merged 2 commits into from
Apr 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions tools/walletextension/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,12 @@ import (
)

type Cache interface {
// EvictShortLiving - notify the cache that all short living elements cached before the events should be considered as evicted.
EvictShortLiving()

// IsEvicted - based on the eviction event and the time of caching, calculates whether the key was evicted
IsEvicted(key any, originalTTL time.Duration) bool

Set(key []byte, value any, ttl time.Duration) bool
Get(key []byte) (value any, ok bool)
Remove(key []byte)
Expand Down
12 changes: 6 additions & 6 deletions tools/walletextension/rpcapi/blockchain_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func (api *BlockChainAPI) GetBalance(ctx context.Context, address common.Address
&ExecCfg{
cacheCfg: &CacheCfg{
CacheTypeDynamic: func() CacheStrategy {
return cacheTTLBlockNumberOrHash(blockNrOrHash)
return cacheBlockNumberOrHash(blockNrOrHash)
},
},
account: &address,
Expand Down Expand Up @@ -76,7 +76,7 @@ func (s *BlockChainAPI) GetProof(ctx context.Context, address common.Address, st

func (api *BlockChainAPI) GetHeaderByNumber(ctx context.Context, number rpc.BlockNumber) (map[string]interface{}, error) {
resp, err := UnauthenticatedTenRPCCall[map[string]interface{}](ctx, api.we, &CacheCfg{CacheTypeDynamic: func() CacheStrategy {
return cacheTTLBlockNumber(number)
return cacheBlockNumber(number)
}}, "eth_getHeaderByNumber", number)
if resp == nil {
return nil, err
Expand All @@ -98,7 +98,7 @@ func (api *BlockChainAPI) GetBlockByNumber(ctx context.Context, number rpc.Block
api.we,
&CacheCfg{
CacheTypeDynamic: func() CacheStrategy {
return cacheTTLBlockNumber(number)
return cacheBlockNumber(number)
},
}, "eth_getBlockByNumber", number, fullTx)
if resp == nil {
Expand All @@ -122,7 +122,7 @@ func (api *BlockChainAPI) GetCode(ctx context.Context, address common.Address, b
api.we,
&CacheCfg{
CacheTypeDynamic: func() CacheStrategy {
return cacheTTLBlockNumberOrHash(blockNrOrHash)
return cacheBlockNumberOrHash(blockNrOrHash)
},
},
"eth_getCode",
Expand Down Expand Up @@ -185,7 +185,7 @@ func (api *BlockChainAPI) Call(ctx context.Context, args gethapi.TransactionArgs
resp, err := ExecAuthRPC[hexutil.Bytes](ctx, api.we, &ExecCfg{
cacheCfg: &CacheCfg{
CacheTypeDynamic: func() CacheStrategy {
return cacheTTLBlockNumberOrHash(blockNrOrHash)
return cacheBlockNumberOrHash(blockNrOrHash)
},
},
computeFromCallback: func(user *GWUser) *common.Address {
Expand All @@ -208,7 +208,7 @@ func (api *BlockChainAPI) EstimateGas(ctx context.Context, args gethapi.Transact
cacheCfg: &CacheCfg{
CacheTypeDynamic: func() CacheStrategy {
if blockNrOrHash != nil {
return cacheTTLBlockNumberOrHash(*blockNrOrHash)
return cacheBlockNumberOrHash(*blockNrOrHash)
}
return LatestBatch
},
Expand Down
2 changes: 1 addition & 1 deletion tools/walletextension/rpcapi/ethereum_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func (api *EthereumAPI) FeeHistory(ctx context.Context, blockCount math.HexOrDec
ctx,
api.we,
&CacheCfg{CacheTypeDynamic: func() CacheStrategy {
return cacheTTLBlockNumber(lastBlock)
return cacheBlockNumber(lastBlock)
}},
"eth_feeHistory",
blockCount,
Expand Down
24 changes: 19 additions & 5 deletions tools/walletextension/rpcapi/transaction_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func NewTransactionAPI(we *Services) *TransactionAPI {

func (s *TransactionAPI) GetBlockTransactionCountByNumber(ctx context.Context, blockNr gethrpc.BlockNumber) *hexutil.Uint {
count, err := UnauthenticatedTenRPCCall[hexutil.Uint](ctx, s.we, &CacheCfg{CacheTypeDynamic: func() CacheStrategy {
return cacheTTLBlockNumber(blockNr)
return cacheBlockNumber(blockNr)
}}, "eth_getBlockTransactionCountByNumber", blockNr)
if err != nil {
return nil
Expand Down Expand Up @@ -60,23 +60,37 @@ func (s *TransactionAPI) GetRawTransactionByBlockHashAndIndex(ctx context.Contex
}

func (s *TransactionAPI) GetTransactionCount(ctx context.Context, address common.Address, blockNrOrHash gethrpc.BlockNumberOrHash) (*hexutil.Uint64, error) {
return ExecAuthRPC[hexutil.Uint64](ctx, s.we, &ExecCfg{account: &address}, "eth_getTransactionCount", address, blockNrOrHash)
return ExecAuthRPC[hexutil.Uint64](
ctx,
s.we,
&ExecCfg{
account: &address,
cacheCfg: &CacheCfg{
CacheTypeDynamic: func() CacheStrategy {
return cacheBlockNumberOrHash(blockNrOrHash)
},
},
},
"eth_getTransactionCount",
address,
blockNrOrHash,
)
}

func (s *TransactionAPI) GetTransactionByHash(ctx context.Context, hash common.Hash) (*rpc.RpcTransaction, error) {
return ExecAuthRPC[rpc.RpcTransaction](ctx, s.we, &ExecCfg{tryAll: true}, "eth_getTransactionByHash", hash)
return ExecAuthRPC[rpc.RpcTransaction](ctx, s.we, &ExecCfg{tryAll: true, cacheCfg: &CacheCfg{CacheType: LongLiving}}, "eth_getTransactionByHash", hash)
}

func (s *TransactionAPI) GetRawTransactionByHash(ctx context.Context, hash common.Hash) (hexutil.Bytes, error) {
tx, err := ExecAuthRPC[hexutil.Bytes](ctx, s.we, &ExecCfg{tryAll: true}, "eth_getRawTransactionByHash", hash)
tx, err := ExecAuthRPC[hexutil.Bytes](ctx, s.we, &ExecCfg{tryAll: true, cacheCfg: &CacheCfg{CacheType: LongLiving}}, "eth_getRawTransactionByHash", hash)
if tx != nil {
return *tx, err
}
return nil, err
}

func (s *TransactionAPI) GetTransactionReceipt(ctx context.Context, hash common.Hash) (map[string]interface{}, error) {
txRec, err := ExecAuthRPC[map[string]interface{}](ctx, s.we, &ExecCfg{tryUntilAuthorised: true}, "eth_getTransactionReceipt", hash)
txRec, err := ExecAuthRPC[map[string]interface{}](ctx, s.we, &ExecCfg{tryUntilAuthorised: true, cacheCfg: &CacheCfg{CacheType: LongLiving}}, "eth_getTransactionReceipt", hash)
if err != nil {
return nil, err
}
Expand Down
23 changes: 15 additions & 8 deletions tools/walletextension/rpcapi/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,18 +207,25 @@ func withCache[R any](cache cache.Cache, cfg *CacheCfg, cacheKey []byte, onCache
return onCacheMiss()
}

// we implement a custom cache eviction logic for the cache strategy of type LatestBatch.
// when a new batch is created, all entries with "LatestBatch" are considered evicted.
// elements not cached for a specific batch are not evicted
isEvicted := false
ttl := longCacheTTL
if cacheType == LatestBatch {
ttl = shortCacheTTL
isEvicted = cache.IsEvicted(cacheKey, ttl)
}

cachedValue, foundInCache := cache.Get(cacheKey)
if foundInCache && !cache.IsEvicted(cacheKey, ttl) {
returnValue, ok := cachedValue.(*R)
if !ok {
return nil, fmt.Errorf("unexpected error. Invalid format cached. %v", cachedValue)
if !isEvicted {
cachedValue, foundInCache := cache.Get(cacheKey)
if foundInCache {
returnValue, ok := cachedValue.(*R)
if !ok {
return nil, fmt.Errorf("unexpected error. Invalid format cached. %v", cachedValue)
}
return returnValue, nil
}
return returnValue, nil
}

result, err := onCacheMiss()
Expand All @@ -237,14 +244,14 @@ func audit(services *Services, msg string, params ...any) {
}
}

func cacheTTLBlockNumberOrHash(blockNrOrHash rpc.BlockNumberOrHash) CacheStrategy {
func cacheBlockNumberOrHash(blockNrOrHash rpc.BlockNumberOrHash) CacheStrategy {
if blockNrOrHash.BlockNumber != nil && blockNrOrHash.BlockNumber.Int64() <= 0 {
return LatestBatch
}
return LongLiving
}

func cacheTTLBlockNumber(lastBlock rpc.BlockNumber) CacheStrategy {
func cacheBlockNumber(lastBlock rpc.BlockNumber) CacheStrategy {
if lastBlock > 0 {
return LongLiving
}
Expand Down
24 changes: 22 additions & 2 deletions tools/walletextension/rpcapi/wallet_extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ import (
"fmt"
"time"

"github.com/ten-protocol/go-ten/go/common/log"
"github.com/ten-protocol/go-ten/go/common/retry"

subscriptioncommon "github.com/ten-protocol/go-ten/go/common/subscription"

tencommon "github.com/ten-protocol/go-ten/go/common"
Expand Down Expand Up @@ -111,11 +114,10 @@ func NewServices(hostAddrHTTP string, hostAddrWS string, storage storage.Storage

rpcClient := connectionObj.(rpc.Client)
ch := make(chan *tencommon.BatchHeader)
clientSubscription, err := rpcClient.Subscribe(context.Background(), rpc.SubscribeNamespace, ch, rpc.SubscriptionTypeNewHeads)
clientSubscription, err := subscribeToNewHeadsWithRetry(rpcClient, ch, retry.NewTimeoutStrategy(10*time.Minute, 1*time.Second), logger)
if err != nil {
panic(fmt.Errorf("cannot subscribe to new heads to the backend %w", err))
}

services.backendNewHeadsSubscription = clientSubscription
services.NewHeadsService = subscriptioncommon.NewNewHeadsService(ch, true, logger, func(newHead *tencommon.BatchHeader) error {
services.Cache.EvictShortLiving()
Expand All @@ -125,6 +127,24 @@ func NewServices(hostAddrHTTP string, hostAddrWS string, storage storage.Storage
return &services
}

func subscribeToNewHeadsWithRetry(rpcClient rpc.Client, ch chan *tencommon.BatchHeader, retryStrategy retry.Strategy, logger gethlog.Logger) (*gethrpc.ClientSubscription, error) {
var sub *gethrpc.ClientSubscription

err := retry.Do(func() error {
var err error
sub, err = rpcClient.Subscribe(context.Background(), rpc.SubscribeNamespace, ch, rpc.SubscriptionTypeNewHeads)
if err != nil {
logger.Info("could not subscribe for new head blocks", log.ErrKey, err)
}
return err
}, retryStrategy)
if err != nil {
logger.Error("could not subscribe for new head blocks.", log.ErrKey, err)
}

return sub, err
}

// IsStopping returns whether the WE is stopping
func (w *Services) IsStopping() bool {
return w.stopControl.IsStopping()
Expand Down
Loading