Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Misc fixes after refactoring the gateway RPC #1873

Merged
merged 2 commits into from
Apr 10, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions tools/walletextension/rpcapi/blockchain_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func (api *BlockChainAPI) GetBalance(ctx context.Context, address common.Address
&ExecCfg{
cacheCfg: &CacheCfg{
CacheTypeDynamic: func() CacheStrategy {
return cacheTTLBlockNumberOrHash(blockNrOrHash)
return cacheBlockNumberOrHash(blockNrOrHash)
},
},
account: &address,
Expand Down Expand Up @@ -76,7 +76,7 @@ func (s *BlockChainAPI) GetProof(ctx context.Context, address common.Address, st

func (api *BlockChainAPI) GetHeaderByNumber(ctx context.Context, number rpc.BlockNumber) (map[string]interface{}, error) {
resp, err := UnauthenticatedTenRPCCall[map[string]interface{}](ctx, api.we, &CacheCfg{CacheTypeDynamic: func() CacheStrategy {
return cacheTTLBlockNumber(number)
return cacheBlockNumber(number)
}}, "eth_getHeaderByNumber", number)
if resp == nil {
return nil, err
Expand All @@ -98,7 +98,7 @@ func (api *BlockChainAPI) GetBlockByNumber(ctx context.Context, number rpc.Block
api.we,
&CacheCfg{
CacheTypeDynamic: func() CacheStrategy {
return cacheTTLBlockNumber(number)
return cacheBlockNumber(number)
},
}, "eth_getBlockByNumber", number, fullTx)
if resp == nil {
Expand All @@ -122,7 +122,7 @@ func (api *BlockChainAPI) GetCode(ctx context.Context, address common.Address, b
api.we,
&CacheCfg{
CacheTypeDynamic: func() CacheStrategy {
return cacheTTLBlockNumberOrHash(blockNrOrHash)
return cacheBlockNumberOrHash(blockNrOrHash)
},
},
"eth_getCode",
Expand Down Expand Up @@ -185,7 +185,7 @@ func (api *BlockChainAPI) Call(ctx context.Context, args gethapi.TransactionArgs
resp, err := ExecAuthRPC[hexutil.Bytes](ctx, api.we, &ExecCfg{
cacheCfg: &CacheCfg{
CacheTypeDynamic: func() CacheStrategy {
return cacheTTLBlockNumberOrHash(blockNrOrHash)
return cacheBlockNumberOrHash(blockNrOrHash)
},
},
computeFromCallback: func(user *GWUser) *common.Address {
Expand All @@ -208,7 +208,7 @@ func (api *BlockChainAPI) EstimateGas(ctx context.Context, args gethapi.Transact
cacheCfg: &CacheCfg{
CacheTypeDynamic: func() CacheStrategy {
if blockNrOrHash != nil {
return cacheTTLBlockNumberOrHash(*blockNrOrHash)
return cacheBlockNumberOrHash(*blockNrOrHash)
}
return LatestBatch
},
Expand Down
2 changes: 1 addition & 1 deletion tools/walletextension/rpcapi/ethereum_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func (api *EthereumAPI) FeeHistory(ctx context.Context, blockCount math.HexOrDec
ctx,
api.we,
&CacheCfg{CacheTypeDynamic: func() CacheStrategy {
return cacheTTLBlockNumber(lastBlock)
return cacheBlockNumber(lastBlock)
}},
"eth_feeHistory",
blockCount,
Expand Down
24 changes: 19 additions & 5 deletions tools/walletextension/rpcapi/transaction_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func NewTransactionAPI(we *Services) *TransactionAPI {

func (s *TransactionAPI) GetBlockTransactionCountByNumber(ctx context.Context, blockNr gethrpc.BlockNumber) *hexutil.Uint {
count, err := UnauthenticatedTenRPCCall[hexutil.Uint](ctx, s.we, &CacheCfg{CacheTypeDynamic: func() CacheStrategy {
return cacheTTLBlockNumber(blockNr)
return cacheBlockNumber(blockNr)
}}, "eth_getBlockTransactionCountByNumber", blockNr)
if err != nil {
return nil
Expand Down Expand Up @@ -60,23 +60,37 @@ func (s *TransactionAPI) GetRawTransactionByBlockHashAndIndex(ctx context.Contex
}

func (s *TransactionAPI) GetTransactionCount(ctx context.Context, address common.Address, blockNrOrHash gethrpc.BlockNumberOrHash) (*hexutil.Uint64, error) {
return ExecAuthRPC[hexutil.Uint64](ctx, s.we, &ExecCfg{account: &address}, "eth_getTransactionCount", address, blockNrOrHash)
return ExecAuthRPC[hexutil.Uint64](
ctx,
s.we,
&ExecCfg{
account: &address,
cacheCfg: &CacheCfg{
CacheTypeDynamic: func() CacheStrategy {
return cacheBlockNumberOrHash(blockNrOrHash)
},
},
},
"eth_getTransactionCount",
address,
blockNrOrHash,
)
}

func (s *TransactionAPI) GetTransactionByHash(ctx context.Context, hash common.Hash) (*rpc.RpcTransaction, error) {
return ExecAuthRPC[rpc.RpcTransaction](ctx, s.we, &ExecCfg{tryAll: true}, "eth_getTransactionByHash", hash)
return ExecAuthRPC[rpc.RpcTransaction](ctx, s.we, &ExecCfg{tryAll: true, cacheCfg: &CacheCfg{CacheType: LongLiving}}, "eth_getTransactionByHash", hash)
}

func (s *TransactionAPI) GetRawTransactionByHash(ctx context.Context, hash common.Hash) (hexutil.Bytes, error) {
tx, err := ExecAuthRPC[hexutil.Bytes](ctx, s.we, &ExecCfg{tryAll: true}, "eth_getRawTransactionByHash", hash)
tx, err := ExecAuthRPC[hexutil.Bytes](ctx, s.we, &ExecCfg{tryAll: true, cacheCfg: &CacheCfg{CacheType: LongLiving}}, "eth_getRawTransactionByHash", hash)
if tx != nil {
return *tx, err
}
return nil, err
}

func (s *TransactionAPI) GetTransactionReceipt(ctx context.Context, hash common.Hash) (map[string]interface{}, error) {
txRec, err := ExecAuthRPC[map[string]interface{}](ctx, s.we, &ExecCfg{tryUntilAuthorised: true}, "eth_getTransactionReceipt", hash)
txRec, err := ExecAuthRPC[map[string]interface{}](ctx, s.we, &ExecCfg{tryUntilAuthorised: true, cacheCfg: &CacheCfg{CacheType: LongLiving}}, "eth_getTransactionReceipt", hash)
if err != nil {
return nil, err
}
Expand Down
12 changes: 9 additions & 3 deletions tools/walletextension/rpcapi/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,13 +207,19 @@ func withCache[R any](cache cache.Cache, cfg *CacheCfg, cacheKey []byte, onCache
return onCacheMiss()
}

isEvictable := false
ttl := longCacheTTL
if cacheType == LatestBatch {
ttl = shortCacheTTL
isEvictable = true
}

cachedValue, foundInCache := cache.Get(cacheKey)
if foundInCache && !cache.IsEvicted(cacheKey, ttl) {

// only entries cached with `LatestBatch` are evicted
isEvicted := isEvictable && cache.IsEvicted(cacheKey, ttl)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find this confusing

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll clarify the comment. Hopefully that works.


if foundInCache && !isEvicted {
returnValue, ok := cachedValue.(*R)
if !ok {
return nil, fmt.Errorf("unexpected error. Invalid format cached. %v", cachedValue)
Expand All @@ -237,14 +243,14 @@ func audit(services *Services, msg string, params ...any) {
}
}

func cacheTTLBlockNumberOrHash(blockNrOrHash rpc.BlockNumberOrHash) CacheStrategy {
func cacheBlockNumberOrHash(blockNrOrHash rpc.BlockNumberOrHash) CacheStrategy {
if blockNrOrHash.BlockNumber != nil && blockNrOrHash.BlockNumber.Int64() <= 0 {
return LatestBatch
}
return LongLiving
}

func cacheTTLBlockNumber(lastBlock rpc.BlockNumber) CacheStrategy {
func cacheBlockNumber(lastBlock rpc.BlockNumber) CacheStrategy {
if lastBlock > 0 {
return LongLiving
}
Expand Down
24 changes: 22 additions & 2 deletions tools/walletextension/rpcapi/wallet_extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ import (
"fmt"
"time"

"github.com/ten-protocol/go-ten/go/common/log"
"github.com/ten-protocol/go-ten/go/common/retry"

subscriptioncommon "github.com/ten-protocol/go-ten/go/common/subscription"

tencommon "github.com/ten-protocol/go-ten/go/common"
Expand Down Expand Up @@ -111,11 +114,10 @@ func NewServices(hostAddrHTTP string, hostAddrWS string, storage storage.Storage

rpcClient := connectionObj.(rpc.Client)
ch := make(chan *tencommon.BatchHeader)
clientSubscription, err := rpcClient.Subscribe(context.Background(), rpc.SubscribeNamespace, ch, rpc.SubscriptionTypeNewHeads)
clientSubscription, err := subscribeToNewHeadsWithRetry(rpcClient, ch, retry.NewTimeoutStrategy(10*time.Minute, 1*time.Second), logger)
if err != nil {
panic(fmt.Errorf("cannot subscribe to new heads to the backend %w", err))
}

services.backendNewHeadsSubscription = clientSubscription
services.NewHeadsService = subscriptioncommon.NewNewHeadsService(ch, true, logger, func(newHead *tencommon.BatchHeader) error {
services.Cache.EvictShortLiving()
Expand All @@ -125,6 +127,24 @@ func NewServices(hostAddrHTTP string, hostAddrWS string, storage storage.Storage
return &services
}

func subscribeToNewHeadsWithRetry(rpcClient rpc.Client, ch chan *tencommon.BatchHeader, retryStrategy retry.Strategy, logger gethlog.Logger) (*gethrpc.ClientSubscription, error) {
var sub *gethrpc.ClientSubscription

err := retry.Do(func() error {
var err error
sub, err = rpcClient.Subscribe(context.Background(), rpc.SubscribeNamespace, ch, rpc.SubscriptionTypeNewHeads)
if err != nil {
logger.Info("could not subscribe for new head blocks", log.ErrKey, err)
}
return err
}, retryStrategy)
if err != nil {
logger.Error("could not subscribe for new head blocks.", log.ErrKey, err)
}

return sub, err
}

// IsStopping returns whether the WE is stopping
func (w *Services) IsStopping() bool {
return w.stopControl.IsStopping()
Expand Down
Loading