From d1b658a276ac3757c7f7551d593a4845756f8769 Mon Sep 17 00:00:00 2001 From: Tudor Malene Date: Mon, 4 Nov 2024 15:21:32 +0000 Subject: [PATCH] clear responsibilities (#2124) --- go.mod | 1 - go.sum | 9 +- tools/walletextension/cache/cache.go | 62 +++++++ tools/walletextension/httpapi/routes.go | 34 ++-- .../walletextension/rpcapi/blockchain_api.go | 54 +++--- tools/walletextension/rpcapi/debug_api.go | 12 +- tools/walletextension/rpcapi/ethereum_api.go | 14 +- tools/walletextension/rpcapi/filter_api.go | 36 ++-- tools/walletextension/rpcapi/net_api.go | 12 +- .../walletextension/rpcapi/transaction_api.go | 22 ++- tools/walletextension/rpcapi/txpool_api.go | 5 +- tools/walletextension/rpcapi/utils.go | 170 +++--------------- tools/walletextension/rpcapi/web3_api.go | 6 +- tools/walletextension/services/conn_utils.go | 61 +++++++ .../{rpcapi => services}/gw_user.go | 30 +--- tools/walletextension/services/utils.go | 11 ++ .../{rpcapi => services}/wallet_extension.go | 34 ++-- .../walletextension_container.go | 6 +- 18 files changed, 310 insertions(+), 269 deletions(-) create mode 100644 tools/walletextension/services/conn_utils.go rename tools/walletextension/{rpcapi => services}/gw_user.go (61%) create mode 100644 tools/walletextension/services/utils.go rename tools/walletextension/{rpcapi => services}/wallet_extension.go (91%) diff --git a/go.mod b/go.mod index e51164fef3..0787c0e2b6 100644 --- a/go.mod +++ b/go.mod @@ -106,7 +106,6 @@ require ( github.com/go-playground/validator/v10 v10.22.1 // indirect github.com/goccy/go-json v0.10.3 // indirect github.com/gogo/protobuf v1.3.2 // indirect - github.com/golang/glog v1.2.2 // indirect github.com/golang/mock v1.6.0 // indirect github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb // indirect github.com/gorilla/mux v1.8.1 // indirect diff --git a/go.sum b/go.sum index 8599f67082..37593dcbe4 100644 --- a/go.sum +++ b/go.sum @@ -12,14 +12,12 @@ github.com/Azure/azure-sdk-for-go/sdk/internal v1.10.0 h1:ywEEhmNahHBihViHepv3xP github.com/Azure/azure-sdk-for-go/sdk/internal v1.10.0/go.mod h1:iZDifYGJTIgIIkYRNWPENUnqx6bJ2xnSDFI2tjwZNuY= github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161 h1:L/gRVlceqvL25UVaW/CKtUDjefjrs0SPonmDGUVOYP0= github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161/go.mod h1:xomTg63KZ2rFqZQzSB4Vz2SUXa1BpHTVz9L5PTmPC4E= +github.com/AzureAD/microsoft-authentication-library-for-go v1.2.2 h1:XHOnouVk1mxXfQidrMEnLlPk9UMeRtyBTnEFtxkV0kU= +github.com/AzureAD/microsoft-authentication-library-for-go v1.2.2/go.mod h1:wP83P5OoQ5p6ip3ScPr0BAq0BvuPAvacpEuSzyouqAI= github.com/DataDog/zstd v1.5.6 h1:LbEglqepa/ipmmQJUDnSsfvA8e8IStVcGaFWDuxvGOY= github.com/DataDog/zstd v1.5.6/go.mod h1:g4AWEaM3yOg3HYfnJ3YIawPnVdXJh9QME85blwSAmyw= github.com/FantasyJony/openzeppelin-merkle-tree-go v1.1.3 h1:KzMvCFet0baw6uJnxTE/His8YeRgaxlASd4/ISuTvzI= github.com/FantasyJony/openzeppelin-merkle-tree-go v1.1.3/go.mod h1:OiwyYqbtMkQH+VzA4b8lI+qHnExJy0fIdz+59/8nFes= -github.com/AzureAD/microsoft-authentication-library-for-go v1.2.2 h1:XHOnouVk1mxXfQidrMEnLlPk9UMeRtyBTnEFtxkV0kU= -github.com/AzureAD/microsoft-authentication-library-for-go v1.2.2/go.mod h1:wP83P5OoQ5p6ip3ScPr0BAq0BvuPAvacpEuSzyouqAI= -github.com/DataDog/zstd v1.5.5 h1:oWf5W7GtOLgp6bciQYDmhHHjdhYkALu6S/5Ni9ZgSvQ= -github.com/DataDog/zstd v1.5.5/go.mod h1:g4AWEaM3yOg3HYfnJ3YIawPnVdXJh9QME85blwSAmyw= github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERoyfY= github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA8Ipt1oGCvU= github.com/VictoriaMetrics/fastcache v1.12.2 h1:N0y9ASrJ0F6h0QaC3o6uJb3NIZ9VKLjCM7NQbSmF7WI= @@ -175,9 +173,6 @@ github.com/golang-jwt/jwt/v4 v4.5.0 h1:7cYmW1XlMY7h7ii7UhUyChSgS5wUJEnm9uZVTGqOW github.com/golang-jwt/jwt/v4 v4.5.0/go.mod h1:m21LjoU+eqJr34lmDMbreY2eSTRJ1cv77w39/MY0Ch0= github.com/golang-jwt/jwt/v5 v5.2.1 h1:OuVbFODueb089Lh128TAcimifWaLhJwVflnrgM17wHk= github.com/golang-jwt/jwt/v5 v5.2.1/go.mod h1:pqrtFR0X4osieyHYxtmOUWsAWrfe1Q5UVIyoH402zdk= -github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= -github.com/golang/glog v1.2.2 h1:1+mZ9upx1Dh6FmUTFR1naJ77miKiXgALjWOZ3NVFPmY= -github.com/golang/glog v1.2.2/go.mod h1:6AhwSGph0fcJtXVM/PEHPqZlFeoLxhs7/t5UDAwmO+w= github.com/golang/mock v1.6.0 h1:ErTB+efbowRARo13NNdxyJji2egdxLGQhRaY+DUumQc= github.com/golang/mock v1.6.0/go.mod h1:p6yTPP+5HYm5mzsMV8JkE6ZKdX+/wYM6Hr+LicevLPs= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= diff --git a/tools/walletextension/cache/cache.go b/tools/walletextension/cache/cache.go index 5ac717541b..58df4489b8 100644 --- a/tools/walletextension/cache/cache.go +++ b/tools/walletextension/cache/cache.go @@ -1,6 +1,7 @@ package cache import ( + "fmt" "time" "github.com/ethereum/go-ethereum/log" @@ -21,3 +22,64 @@ type Cache interface { func NewCache(logger log.Logger) (Cache, error) { return NewRistrettoCacheWithEviction(logger) } + +type Strategy uint8 + +const ( + NoCache Strategy = iota + LatestBatch Strategy = iota + LongLiving Strategy = iota + + longCacheTTL = 5 * time.Hour + shortCacheTTL = 1 * time.Minute +) + +type Cfg struct { + Type Strategy + DynamicType func() Strategy +} + +func WithCache[R any](cache Cache, cfg *Cfg, cacheKey []byte, onCacheMiss func() (*R, error)) (*R, error) { + if cfg == nil { + return onCacheMiss() + } + + cacheType := cfg.Type + if cfg.DynamicType != nil { + cacheType = cfg.DynamicType() + } + + if cacheType == NoCache { + return onCacheMiss() + } + + // we implement a custom cache eviction logic for the cache strategy of type LatestBatch. + // when a new batch is created, all entries with "LatestBatch" are considered evicted. + // elements not cached for a specific batch are not evicted + isEvicted := false + ttl := longCacheTTL + if cacheType == LatestBatch { + ttl = shortCacheTTL + isEvicted = cache.IsEvicted(cacheKey, ttl) + } + + if !isEvicted { + cachedValue, foundInCache := cache.Get(cacheKey) + if foundInCache { + returnValue, ok := cachedValue.(*R) + if !ok { + return nil, fmt.Errorf("unexpected error. Invalid format cached. %v", cachedValue) + } + return returnValue, nil + } + } + + result, err := onCacheMiss() + + // cache only non-nil values + if err == nil && result != nil { + cache.Set(cacheKey, result, ttl) + } + + return result, err +} diff --git a/tools/walletextension/httpapi/routes.go b/tools/walletextension/httpapi/routes.go index cbdb5baf8e..1d1e935577 100644 --- a/tools/walletextension/httpapi/routes.go +++ b/tools/walletextension/httpapi/routes.go @@ -6,13 +6,13 @@ import ( "fmt" "net/http" + "github.com/ten-protocol/go-ten/tools/walletextension/services" + "github.com/status-im/keycard-go/hexutils" + "github.com/ten-protocol/go-ten/go/common/log" "github.com/ten-protocol/go-ten/go/common/viewingkey" "github.com/ten-protocol/go-ten/lib/gethfork/node" - "github.com/ten-protocol/go-ten/tools/walletextension/rpcapi" - - "github.com/ten-protocol/go-ten/go/common/log" "github.com/ten-protocol/go-ten/go/common/httputil" "github.com/ten-protocol/go-ten/tools/walletextension/common" @@ -20,7 +20,7 @@ import ( // NewHTTPRoutes returns the http specific routes // todo - move these to the rpc framework. -func NewHTTPRoutes(walletExt *rpcapi.Services) []node.Route { +func NewHTTPRoutes(walletExt *services.Services) []node.Route { return []node.Route{ { Name: common.APIVersion1 + common.PathReady, @@ -66,8 +66,8 @@ func NewHTTPRoutes(walletExt *rpcapi.Services) []node.Route { } func httpHandler( - walletExt *rpcapi.Services, - fun func(walletExt *rpcapi.Services, conn UserConn), + walletExt *services.Services, + fun func(walletExt *services.Services, conn UserConn), ) func(resp http.ResponseWriter, req *http.Request) { return func(resp http.ResponseWriter, req *http.Request) { httpRequestHandler(walletExt, resp, req, fun) @@ -75,7 +75,7 @@ func httpHandler( } // Overall request handler for http requests -func httpRequestHandler(walletExt *rpcapi.Services, resp http.ResponseWriter, req *http.Request, fun func(walletExt *rpcapi.Services, conn UserConn)) { +func httpRequestHandler(walletExt *services.Services, resp http.ResponseWriter, req *http.Request, fun func(walletExt *services.Services, conn UserConn)) { if walletExt.IsStopping() { return } @@ -87,10 +87,10 @@ func httpRequestHandler(walletExt *rpcapi.Services, resp http.ResponseWriter, re } // readyRequestHandler is used to check whether the server is ready -func readyRequestHandler(_ *rpcapi.Services, _ UserConn) {} +func readyRequestHandler(_ *services.Services, _ UserConn) {} // This function handles request to /join endpoint. It is responsible to create new user (new key-pair) and store it to the db -func joinRequestHandler(walletExt *rpcapi.Services, conn UserConn) { +func joinRequestHandler(walletExt *services.Services, conn UserConn) { // audit() // todo (@ziga) add protection against DDOS attacks _, err := conn.ReadRequest() @@ -116,7 +116,7 @@ func joinRequestHandler(walletExt *rpcapi.Services, conn UserConn) { // This function handles request to /authenticate endpoint. // In the request we receive message, signature and address in JSON as request body and userID and address as query parameters // We then check if message is in correct format and if signature is valid. If all checks pass we save address and signature against userID -func authenticateRequestHandler(walletExt *rpcapi.Services, conn UserConn) { +func authenticateRequestHandler(walletExt *services.Services, conn UserConn) { // read the request body, err := conn.ReadRequest() if err != nil { @@ -181,7 +181,7 @@ func authenticateRequestHandler(walletExt *rpcapi.Services, conn UserConn) { // This function handles request to /query endpoint. // In the query parameters address and userID are required. We check if provided address is registered for given userID // and return true/false in json response -func queryRequestHandler(walletExt *rpcapi.Services, conn UserConn) { +func queryRequestHandler(walletExt *services.Services, conn UserConn) { // read the request _, err := conn.ReadRequest() if err != nil { @@ -233,7 +233,7 @@ func queryRequestHandler(walletExt *rpcapi.Services, conn UserConn) { // This function handles request to /revoke endpoint. // It requires userID as query parameter and deletes given user and all associated viewing keys -func revokeRequestHandler(walletExt *rpcapi.Services, conn UserConn) { +func revokeRequestHandler(walletExt *services.Services, conn UserConn) { // read the request _, err := conn.ReadRequest() if err != nil { @@ -263,7 +263,7 @@ func revokeRequestHandler(walletExt *rpcapi.Services, conn UserConn) { } // Handles request to /health endpoint. -func healthRequestHandler(walletExt *rpcapi.Services, conn UserConn) { +func healthRequestHandler(walletExt *services.Services, conn UserConn) { // read the request _, err := conn.ReadRequest() if err != nil { @@ -279,7 +279,7 @@ func healthRequestHandler(walletExt *rpcapi.Services, conn UserConn) { } // Handles request to /network-health endpoint. -func networkHealthRequestHandler(walletExt *rpcapi.Services, userConn UserConn) { +func networkHealthRequestHandler(walletExt *services.Services, userConn UserConn) { // read the request _, err := userConn.ReadRequest() if err != nil { @@ -321,7 +321,7 @@ func networkHealthRequestHandler(walletExt *rpcapi.Services, userConn UserConn) } } -func networkConfigRequestHandler(walletExt *rpcapi.Services, userConn UserConn) { +func networkConfigRequestHandler(walletExt *services.Services, userConn UserConn) { // read the request _, err := userConn.ReadRequest() if err != nil { @@ -373,7 +373,7 @@ func networkConfigRequestHandler(walletExt *rpcapi.Services, userConn UserConn) } // Handles request to /version endpoint. -func versionRequestHandler(walletExt *rpcapi.Services, userConn UserConn) { +func versionRequestHandler(walletExt *services.Services, userConn UserConn) { // read the request _, err := userConn.ReadRequest() if err != nil { @@ -388,7 +388,7 @@ func versionRequestHandler(walletExt *rpcapi.Services, userConn UserConn) { } // getMessageRequestHandler handles request to /getmessage endpoint. -func getMessageRequestHandler(walletExt *rpcapi.Services, conn UserConn) { +func getMessageRequestHandler(walletExt *services.Services, conn UserConn) { // read the request body, err := conn.ReadRequest() if err != nil { diff --git a/tools/walletextension/rpcapi/blockchain_api.go b/tools/walletextension/rpcapi/blockchain_api.go index 3dc068b448..961b6ac614 100644 --- a/tools/walletextension/rpcapi/blockchain_api.go +++ b/tools/walletextension/rpcapi/blockchain_api.go @@ -6,6 +6,10 @@ import ( "encoding/json" "fmt" + "github.com/ten-protocol/go-ten/tools/walletextension/cache" + + "github.com/ten-protocol/go-ten/tools/walletextension/services" + gethcommon "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/core/types" @@ -17,11 +21,11 @@ import ( ) type BlockChainAPI struct { - we *Services + we *services.Services storageWhitelist *privacy.Whitelist } -func NewBlockChainAPI(we *Services) *BlockChainAPI { +func NewBlockChainAPI(we *services.Services) *BlockChainAPI { whitelist := privacy.NewWhitelist() return &BlockChainAPI{ we: we, @@ -30,12 +34,12 @@ func NewBlockChainAPI(we *Services) *BlockChainAPI { } func (api *BlockChainAPI) ChainId() *hexutil.Big { //nolint:stylecheck - chainID, _ := UnauthenticatedTenRPCCall[hexutil.Big](context.Background(), api.we, &CacheCfg{CacheType: LongLiving}, "eth_chainId") + chainID, _ := UnauthenticatedTenRPCCall[hexutil.Big](context.Background(), api.we, &cache.Cfg{Type: cache.LongLiving}, "eth_chainId") return chainID } func (api *BlockChainAPI) BlockNumber() hexutil.Uint64 { - nr, err := UnauthenticatedTenRPCCall[hexutil.Uint64](context.Background(), api.we, &CacheCfg{CacheType: LatestBatch}, "eth_blockNumber") + nr, err := UnauthenticatedTenRPCCall[hexutil.Uint64](context.Background(), api.we, &cache.Cfg{Type: cache.LatestBatch}, "eth_blockNumber") if err != nil { return hexutil.Uint64(0) } @@ -47,8 +51,8 @@ func (api *BlockChainAPI) GetBalance(ctx context.Context, address gethcommon.Add ctx, api.we, &ExecCfg{ - cacheCfg: &CacheCfg{ - CacheTypeDynamic: func() CacheStrategy { + cacheCfg: &cache.Cfg{ + DynamicType: func() cache.Strategy { return cacheBlockNumberOrHash(blockNrOrHash) }, }, @@ -83,7 +87,7 @@ func (s *BlockChainAPI) GetProof(ctx context.Context, address gethcommon.Address } func (api *BlockChainAPI) GetHeaderByNumber(ctx context.Context, number rpc.BlockNumber) (map[string]interface{}, error) { - resp, err := UnauthenticatedTenRPCCall[map[string]interface{}](ctx, api.we, &CacheCfg{CacheTypeDynamic: func() CacheStrategy { + resp, err := UnauthenticatedTenRPCCall[map[string]interface{}](ctx, api.we, &cache.Cfg{DynamicType: func() cache.Strategy { return cacheBlockNumber(number) }}, "eth_getHeaderByNumber", number) if resp == nil { @@ -93,7 +97,7 @@ func (api *BlockChainAPI) GetHeaderByNumber(ctx context.Context, number rpc.Bloc } func (api *BlockChainAPI) GetHeaderByHash(ctx context.Context, hash gethcommon.Hash) map[string]interface{} { - resp, _ := UnauthenticatedTenRPCCall[map[string]interface{}](ctx, api.we, &CacheCfg{CacheType: LongLiving}, "eth_getHeaderByHash", hash) + resp, _ := UnauthenticatedTenRPCCall[map[string]interface{}](ctx, api.we, &cache.Cfg{Type: cache.LongLiving}, "eth_getHeaderByHash", hash) if resp == nil { return nil } @@ -104,8 +108,8 @@ func (api *BlockChainAPI) GetBlockByNumber(ctx context.Context, number rpc.Block resp, err := UnauthenticatedTenRPCCall[common.BatchHeader]( ctx, api.we, - &CacheCfg{ - CacheTypeDynamic: func() CacheStrategy { + &cache.Cfg{ + DynamicType: func() cache.Strategy { return cacheBlockNumber(number) }, }, "eth_getBlockByNumber", number, fullTx) @@ -126,7 +130,7 @@ func (api *BlockChainAPI) GetBlockByNumber(ctx context.Context, number rpc.Block } func (api *BlockChainAPI) GetBlockByHash(ctx context.Context, hash gethcommon.Hash, fullTx bool) (map[string]interface{}, error) { - resp, err := UnauthenticatedTenRPCCall[common.BatchHeader](ctx, api.we, &CacheCfg{CacheType: LongLiving}, "eth_getBlockByHash", hash, fullTx) + resp, err := UnauthenticatedTenRPCCall[common.BatchHeader](ctx, api.we, &cache.Cfg{Type: cache.LongLiving}, "eth_getBlockByHash", hash, fullTx) if resp == nil { return nil, err } @@ -148,8 +152,8 @@ func (api *BlockChainAPI) GetCode(ctx context.Context, address gethcommon.Addres resp, err := UnauthenticatedTenRPCCall[hexutil.Bytes]( ctx, api.we, - &CacheCfg{ - CacheTypeDynamic: func() CacheStrategy { + &cache.Cfg{ + DynamicType: func() cache.Strategy { return cacheBlockNumberOrHash(blockNrOrHash) }, }, @@ -181,7 +185,7 @@ func (api *BlockChainAPI) GetStorageAt(ctx context.Context, address gethcommon.A return nil, err } - _, err = getUser(userID, api.we) + _, err = api.we.GetUser(userID) if err != nil { return nil, err } @@ -246,15 +250,15 @@ type ( func (api *BlockChainAPI) Call(ctx context.Context, args gethapi.TransactionArgs, blockNrOrHash rpc.BlockNumberOrHash, overrides *StateOverride, blockOverrides *BlockOverrides) (hexutil.Bytes, error) { resp, err := ExecAuthRPC[hexutil.Bytes](ctx, api.we, &ExecCfg{ - cacheCfg: &CacheCfg{ - CacheTypeDynamic: func() CacheStrategy { + cacheCfg: &cache.Cfg{ + DynamicType: func() cache.Strategy { return cacheBlockNumberOrHash(blockNrOrHash) }, }, - computeFromCallback: func(user *GWUser) *gethcommon.Address { + computeFromCallback: func(user *services.GWUser) *gethcommon.Address { return searchFromAndData(user.GetAllAddresses(), args) }, - adjustArgs: func(acct *GWAccount) []any { + adjustArgs: func(acct *services.GWAccount) []any { argsClone := populateFrom(acct, args) return []any{argsClone, blockNrOrHash, overrides, blockOverrides} }, @@ -268,18 +272,18 @@ func (api *BlockChainAPI) Call(ctx context.Context, args gethapi.TransactionArgs func (api *BlockChainAPI) EstimateGas(ctx context.Context, args gethapi.TransactionArgs, blockNrOrHash *rpc.BlockNumberOrHash, overrides *StateOverride) (hexutil.Uint64, error) { resp, err := ExecAuthRPC[hexutil.Uint64](ctx, api.we, &ExecCfg{ - cacheCfg: &CacheCfg{ - CacheTypeDynamic: func() CacheStrategy { + cacheCfg: &cache.Cfg{ + DynamicType: func() cache.Strategy { if blockNrOrHash != nil { return cacheBlockNumberOrHash(*blockNrOrHash) } - return LatestBatch + return cache.LatestBatch }, }, - computeFromCallback: func(user *GWUser) *gethcommon.Address { + computeFromCallback: func(user *services.GWUser) *gethcommon.Address { return searchFromAndData(user.GetAllAddresses(), args) }, - adjustArgs: func(acct *GWAccount) []any { + adjustArgs: func(acct *services.GWAccount) []any { argsClone := populateFrom(acct, args) return []any{argsClone, blockNrOrHash, overrides} }, @@ -292,12 +296,12 @@ func (api *BlockChainAPI) EstimateGas(ctx context.Context, args gethapi.Transact return *resp, err } -func populateFrom(acct *GWAccount, args gethapi.TransactionArgs) gethapi.TransactionArgs { +func populateFrom(acct *services.GWAccount, args gethapi.TransactionArgs) gethapi.TransactionArgs { // clone the args argsClone := cloneArgs(args) // set the from if args.From == nil || args.From.Hex() == (gethcommon.Address{}).Hex() { - argsClone.From = acct.address + argsClone.From = acct.Address } return argsClone } diff --git a/tools/walletextension/rpcapi/debug_api.go b/tools/walletextension/rpcapi/debug_api.go index f26277c174..c3b1891d32 100644 --- a/tools/walletextension/rpcapi/debug_api.go +++ b/tools/walletextension/rpcapi/debug_api.go @@ -3,6 +3,10 @@ package rpcapi import ( "context" + "github.com/ten-protocol/go-ten/tools/walletextension/cache" + + "github.com/ten-protocol/go-ten/tools/walletextension/services" + gethcommon "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ten-protocol/go-ten/go/common" @@ -10,10 +14,10 @@ import ( ) type DebugAPI struct { - we *Services + we *services.Services } -func NewDebugAPI(we *Services) *DebugAPI { +func NewDebugAPI(we *services.Services) *DebugAPI { return &DebugAPI{we} } @@ -58,8 +62,8 @@ func (api *DebugAPI) EventLogRelevancy(ctx context.Context, crit common.FilterCr ctx, api.we, &ExecCfg{ - cacheCfg: &CacheCfg{ - CacheType: NoCache, + cacheCfg: &cache.Cfg{ + Type: cache.NoCache, }, tryUntilAuthorised: true, }, diff --git a/tools/walletextension/rpcapi/ethereum_api.go b/tools/walletextension/rpcapi/ethereum_api.go index 214fa7912b..0a02b23573 100644 --- a/tools/walletextension/rpcapi/ethereum_api.go +++ b/tools/walletextension/rpcapi/ethereum_api.go @@ -3,26 +3,30 @@ package rpcapi import ( "context" + "github.com/ten-protocol/go-ten/tools/walletextension/cache" + + "github.com/ten-protocol/go-ten/tools/walletextension/services" + "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/common/math" "github.com/ten-protocol/go-ten/lib/gethfork/rpc" ) type EthereumAPI struct { - we *Services + we *services.Services } -func NewEthereumAPI(we *Services, +func NewEthereumAPI(we *services.Services, ) *EthereumAPI { return &EthereumAPI{we} } func (api *EthereumAPI) GasPrice(ctx context.Context) (*hexutil.Big, error) { - return UnauthenticatedTenRPCCall[hexutil.Big](ctx, api.we, &CacheCfg{CacheType: LatestBatch}, "eth_gasPrice") + return UnauthenticatedTenRPCCall[hexutil.Big](ctx, api.we, &cache.Cfg{Type: cache.LatestBatch}, "eth_gasPrice") } func (api *EthereumAPI) MaxPriorityFeePerGas(ctx context.Context) (*hexutil.Big, error) { - return UnauthenticatedTenRPCCall[hexutil.Big](ctx, api.we, &CacheCfg{CacheType: LatestBatch}, "eth_maxPriorityFeePerGas") + return UnauthenticatedTenRPCCall[hexutil.Big](ctx, api.we, &cache.Cfg{Type: cache.LatestBatch}, "eth_maxPriorityFeePerGas") } type FeeHistoryResult struct { @@ -36,7 +40,7 @@ func (api *EthereumAPI) FeeHistory(ctx context.Context, blockCount math.HexOrDec return UnauthenticatedTenRPCCall[FeeHistoryResult]( ctx, api.we, - &CacheCfg{CacheTypeDynamic: func() CacheStrategy { + &cache.Cfg{DynamicType: func() cache.Strategy { return cacheBlockNumber(lastBlock) }}, "eth_feeHistory", diff --git a/tools/walletextension/rpcapi/filter_api.go b/tools/walletextension/rpcapi/filter_api.go index af154b6793..de8fb1fa6c 100644 --- a/tools/walletextension/rpcapi/filter_api.go +++ b/tools/walletextension/rpcapi/filter_api.go @@ -7,6 +7,10 @@ import ( "sync/atomic" "time" + "github.com/ten-protocol/go-ten/tools/walletextension/cache" + + "github.com/ten-protocol/go-ten/tools/walletextension/services" + "github.com/status-im/keycard-go/hexutils" "github.com/ethereum/go-ethereum/log" @@ -25,14 +29,14 @@ import ( ) type FilterAPI struct { - we *Services + we *services.Services logger log.Logger } -func NewFilterAPI(we *Services) *FilterAPI { +func NewFilterAPI(we *services.Services) *FilterAPI { return &FilterAPI{ we: we, - logger: we.logger, + logger: we.Logger(), } } @@ -81,7 +85,7 @@ func (api *FilterAPI) Logs(ctx context.Context, crit common.FilterCriteria) (*rp errorChannels := make([]<-chan error, 0) backendSubscriptions := make([]*rpc.ClientSubscription, 0) for _, address := range candidateAddresses { - rpcWSClient, err := connectWS(ctx, user.accounts[*address], api.we.Logger()) + rpcWSClient, err := services.ConnectWS(ctx, user.Accounts[*address], api.we.Logger()) if err != nil { return nil, err } @@ -148,11 +152,11 @@ func (api *FilterAPI) closeConnections(backendSubscriptions []*rpc.ClientSubscri backendSub.Unsubscribe() } for _, connection := range backendWSConnections { - _ = returnConn(api.we.rpcWSConnPool, connection.BackingClient(), api.logger) + _ = services.ReturnConn(api.we.RpcWSConnPool, connection.BackingClient(), api.logger) } } -func getUserAndNotifier(ctx context.Context, api *FilterAPI) (*rpc.Notifier, *GWUser, error) { +func getUserAndNotifier(ctx context.Context, api *FilterAPI) (*rpc.Notifier, *services.GWUser, error) { subNotifier, supported := rpc.NotifierFromContext(ctx) if !supported { return nil, nil, fmt.Errorf("creation of subscriptions is not supported") @@ -163,7 +167,7 @@ func getUserAndNotifier(ctx context.Context, api *FilterAPI) (*rpc.Notifier, *GW return nil, nil, fmt.Errorf("illegal access") } - user, err := getUser(subNotifier.UserID, api.we) + user, err := api.we.GetUser(subNotifier.UserID) if err != nil { return nil, nil, fmt.Errorf("illegal access: %s, %w", subNotifier.UserID, err) } @@ -203,23 +207,23 @@ func (api *FilterAPI) GetLogs(ctx context.Context, crit common.FilterCriteria) ( return nil, fmt.Errorf("rate limit exceeded") } - res, err := withCache( + res, err := cache.WithCache( api.we.Cache, - &CacheCfg{ - CacheTypeDynamic: func() CacheStrategy { + &cache.Cfg{ + DynamicType: func() cache.Strategy { if crit.ToBlock != nil && crit.ToBlock.Int64() > 0 { - return LongLiving + return cache.LongLiving } if crit.BlockHash != nil { - return LongLiving + return cache.LongLiving } // when the toBlock or the block Hash are not specified, the request is open-ended - return LatestBatch + return cache.LatestBatch }, }, generateCacheKey([]any{userID, method, common.SerializableFilterCriteria(crit)}), func() (*[]*types.Log, error) { // called when there is no entry in the cache - user, err := getUser(userID, api.we) + user, err := api.we.GetUser(userID) if err != nil { return nil, err } @@ -228,8 +232,8 @@ func (api *FilterAPI) GetLogs(ctx context.Context, crit common.FilterCriteria) ( // for each account registered for the current user // execute the get_Logs function // dedupe and concatenate the results - for _, acct := range user.accounts { - eventLogs, err := withEncRPCConnection(ctx, api.we, acct, func(rpcClient *tenrpc.EncRPCClient) (*[]*types.Log, error) { + for _, acct := range user.Accounts { + eventLogs, err := services.WithEncRPCConnection(ctx, api.we, acct, func(rpcClient *tenrpc.EncRPCClient) (*[]*types.Log, error) { var result []*types.Log // wrap the context with a timeout to prevent long executions diff --git a/tools/walletextension/rpcapi/net_api.go b/tools/walletextension/rpcapi/net_api.go index a08c26a3c1..402b31fa1c 100644 --- a/tools/walletextension/rpcapi/net_api.go +++ b/tools/walletextension/rpcapi/net_api.go @@ -2,18 +2,22 @@ package rpcapi import ( "context" + + "github.com/ten-protocol/go-ten/tools/walletextension/cache" + + "github.com/ten-protocol/go-ten/tools/walletextension/services" ) type NetAPI struct { - we *Services + we *services.Services } -func NewNetAPI(we *Services) *NetAPI { +func NewNetAPI(we *services.Services) *NetAPI { return &NetAPI{we} } func (api *NetAPI) Version(ctx context.Context) (*string, error) { - return UnauthenticatedTenRPCCall[string](ctx, api.we, &CacheCfg{CacheType: LongLiving}, "net_version") + return UnauthenticatedTenRPCCall[string](ctx, api.we, &cache.Cfg{Type: cache.LongLiving}, "net_version") } type ConfigResponseJson struct { @@ -24,5 +28,5 @@ type ConfigResponseJson struct { } func (api *NetAPI) Config(ctx context.Context) (*ConfigResponseJson, error) { - return UnauthenticatedTenRPCCall[ConfigResponseJson](ctx, api.we, &CacheCfg{CacheType: LongLiving}, "obscuro_config") + return UnauthenticatedTenRPCCall[ConfigResponseJson](ctx, api.we, &cache.Cfg{Type: cache.LongLiving}, "obscuro_config") } diff --git a/tools/walletextension/rpcapi/transaction_api.go b/tools/walletextension/rpcapi/transaction_api.go index ffa978145e..73bd8288e3 100644 --- a/tools/walletextension/rpcapi/transaction_api.go +++ b/tools/walletextension/rpcapi/transaction_api.go @@ -3,6 +3,10 @@ package rpcapi import ( "context" + "github.com/ten-protocol/go-ten/tools/walletextension/cache" + + "github.com/ten-protocol/go-ten/tools/walletextension/services" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/core/types" @@ -12,15 +16,15 @@ import ( ) type TransactionAPI struct { - we *Services + we *services.Services } -func NewTransactionAPI(we *Services) *TransactionAPI { +func NewTransactionAPI(we *services.Services) *TransactionAPI { return &TransactionAPI{we} } func (s *TransactionAPI) GetBlockTransactionCountByNumber(ctx context.Context, blockNr gethrpc.BlockNumber) *hexutil.Uint { - count, err := UnauthenticatedTenRPCCall[hexutil.Uint](ctx, s.we, &CacheCfg{CacheTypeDynamic: func() CacheStrategy { + count, err := UnauthenticatedTenRPCCall[hexutil.Uint](ctx, s.we, &cache.Cfg{DynamicType: func() cache.Strategy { return cacheBlockNumber(blockNr) }}, "eth_getBlockTransactionCountByNumber", blockNr) if err != nil { @@ -30,7 +34,7 @@ func (s *TransactionAPI) GetBlockTransactionCountByNumber(ctx context.Context, b } func (s *TransactionAPI) GetBlockTransactionCountByHash(ctx context.Context, blockHash common.Hash) *hexutil.Uint { - count, err := UnauthenticatedTenRPCCall[hexutil.Uint](ctx, s.we, &CacheCfg{CacheType: LongLiving}, "eth_getBlockTransactionCountByHash", blockHash) + count, err := UnauthenticatedTenRPCCall[hexutil.Uint](ctx, s.we, &cache.Cfg{Type: cache.LongLiving}, "eth_getBlockTransactionCountByHash", blockHash) if err != nil { return nil } @@ -63,8 +67,8 @@ func (s *TransactionAPI) GetTransactionCount(ctx context.Context, address common s.we, &ExecCfg{ account: &address, - cacheCfg: &CacheCfg{ - CacheTypeDynamic: func() CacheStrategy { + cacheCfg: &cache.Cfg{ + DynamicType: func() cache.Strategy { return cacheBlockNumberOrHash(blockNrOrHash) }, }, @@ -76,11 +80,11 @@ func (s *TransactionAPI) GetTransactionCount(ctx context.Context, address common } func (s *TransactionAPI) GetTransactionByHash(ctx context.Context, hash common.Hash) (*rpc.RpcTransaction, error) { - return ExecAuthRPC[rpc.RpcTransaction](ctx, s.we, &ExecCfg{tryAll: true, cacheCfg: &CacheCfg{CacheType: LongLiving}}, "eth_getTransactionByHash", hash) + return ExecAuthRPC[rpc.RpcTransaction](ctx, s.we, &ExecCfg{tryAll: true, cacheCfg: &cache.Cfg{Type: cache.LongLiving}}, "eth_getTransactionByHash", hash) } func (s *TransactionAPI) GetRawTransactionByHash(ctx context.Context, hash common.Hash) (hexutil.Bytes, error) { - tx, err := ExecAuthRPC[hexutil.Bytes](ctx, s.we, &ExecCfg{tryAll: true, cacheCfg: &CacheCfg{CacheType: LongLiving}}, "eth_getRawTransactionByHash", hash) + tx, err := ExecAuthRPC[hexutil.Bytes](ctx, s.we, &ExecCfg{tryAll: true, cacheCfg: &cache.Cfg{Type: cache.LongLiving}}, "eth_getRawTransactionByHash", hash) if tx != nil { return *tx, err } @@ -88,7 +92,7 @@ func (s *TransactionAPI) GetRawTransactionByHash(ctx context.Context, hash commo } func (s *TransactionAPI) GetTransactionReceipt(ctx context.Context, hash common.Hash) (map[string]interface{}, error) { - txRec, err := ExecAuthRPC[map[string]interface{}](ctx, s.we, &ExecCfg{tryUntilAuthorised: true, cacheCfg: &CacheCfg{CacheType: LongLiving}}, "eth_getTransactionReceipt", hash) + txRec, err := ExecAuthRPC[map[string]interface{}](ctx, s.we, &ExecCfg{tryUntilAuthorised: true, cacheCfg: &cache.Cfg{Type: cache.LongLiving}}, "eth_getTransactionReceipt", hash) if err != nil { return nil, err } diff --git a/tools/walletextension/rpcapi/txpool_api.go b/tools/walletextension/rpcapi/txpool_api.go index 16ec3f3b76..7f19b1f494 100644 --- a/tools/walletextension/rpcapi/txpool_api.go +++ b/tools/walletextension/rpcapi/txpool_api.go @@ -4,13 +4,14 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" rpc2 "github.com/ten-protocol/go-ten/go/enclave/rpc" + "github.com/ten-protocol/go-ten/tools/walletextension/services" ) type TxPoolAPI struct { - we *Services + we *services.Services } -func NewTxPoolAPI(we *Services) *TxPoolAPI { +func NewTxPoolAPI(we *services.Services) *TxPoolAPI { return &TxPoolAPI{we} } diff --git a/tools/walletextension/rpcapi/utils.go b/tools/walletextension/rpcapi/utils.go index 3ea03623ca..82392b8ac2 100644 --- a/tools/walletextension/rpcapi/utils.go +++ b/tools/walletextension/rpcapi/utils.go @@ -10,24 +10,17 @@ import ( "strings" "time" - "github.com/status-im/keycard-go/hexutils" - - "github.com/ten-protocol/go-ten/go/common/measure" - "github.com/ten-protocol/go-ten/go/enclave/core" + "github.com/ten-protocol/go-ten/tools/walletextension/cache" - "github.com/ten-protocol/go-ten/go/common/log" + "github.com/ten-protocol/go-ten/tools/walletextension/services" - gethlog "github.com/ethereum/go-ethereum/log" - pool "github.com/jolestar/go-commons-pool/v2" - tenrpc "github.com/ten-protocol/go-ten/go/rpc" - wecommon "github.com/ten-protocol/go-ten/tools/walletextension/common" + "github.com/status-im/keycard-go/hexutils" "github.com/ten-protocol/go-ten/go/common/viewingkey" + tenrpc "github.com/ten-protocol/go-ten/go/rpc" "github.com/ten-protocol/go-ten/lib/gethfork/rpc" - "github.com/ten-protocol/go-ten/tools/walletextension/cache" - gethcommon "github.com/ethereum/go-ethereum/common" ) @@ -38,9 +31,6 @@ const ( notAuthorised = "not authorised" serverBusy = "server busy. please retry later" - longCacheTTL = 5 * time.Hour - shortCacheTTL = 1 * time.Minute - // hardcoding the maximum time for an RPC request // this value will be propagated to the node and enclave and all the operations maximumRPCCallDuration = 5 * time.Second @@ -52,29 +42,16 @@ var rpcNotImplemented = fmt.Errorf("rpc endpoint not implemented") type ExecCfg struct { // these 4 fields specify the account(s) that should make the backend call account *gethcommon.Address - computeFromCallback func(user *GWUser) *gethcommon.Address + computeFromCallback func(user *services.GWUser) *gethcommon.Address tryAll bool tryUntilAuthorised bool - adjustArgs func(acct *GWAccount) []any - cacheCfg *CacheCfg + adjustArgs func(acct *services.GWAccount) []any + cacheCfg *cache.Cfg timeout time.Duration } -type CacheStrategy uint8 - -const ( - NoCache CacheStrategy = iota - LatestBatch CacheStrategy = iota - LongLiving CacheStrategy = iota -) - -type CacheCfg struct { - CacheType CacheStrategy - CacheTypeDynamic func() CacheStrategy -} - -func UnauthenticatedTenRPCCall[R any](ctx context.Context, w *Services, cfg *CacheCfg, method string, args ...any) (*R, error) { +func UnauthenticatedTenRPCCall[R any](ctx context.Context, w *services.Services, cfg *cache.Cfg, method string, args ...any) (*R, error) { if ctx == nil { return nil, errors.New("invalid call. nil Context") } @@ -83,8 +60,8 @@ func UnauthenticatedTenRPCCall[R any](ctx context.Context, w *Services, cfg *Cac cacheArgs := []any{method} cacheArgs = append(cacheArgs, args...) - res, err := withCache(w.Cache, cfg, generateCacheKey(cacheArgs), func() (*R, error) { - return withPlainRPCConnection(ctx, w, func(client *rpc.Client) (*R, error) { + res, err := cache.WithCache(w.Cache, cfg, generateCacheKey(cacheArgs), func() (*R, error) { + return services.WithPlainRPCConnection(ctx, w, func(client *rpc.Client) (*R, error) { var resp *R var err error @@ -100,7 +77,7 @@ func UnauthenticatedTenRPCCall[R any](ctx context.Context, w *Services, cfg *Cac return res, err } -func ExecAuthRPC[R any](ctx context.Context, w *Services, cfg *ExecCfg, method string, args ...any) (*R, error) { +func ExecAuthRPC[R any](ctx context.Context, w *services.Services, cfg *ExecCfg, method string, args ...any) (*R, error) { audit(w, "RPC start method=%s args=%v", method, args) requestStartTime := time.Now() userID, err := extractUserID(ctx, w) @@ -117,8 +94,8 @@ func ExecAuthRPC[R any](ctx context.Context, w *Services, cfg *ExecCfg, method s cacheArgs := []any{userID, method} cacheArgs = append(cacheArgs, args...) - res, err := withCache(w.Cache, cfg.cacheCfg, generateCacheKey(cacheArgs), func() (*R, error) { - user, err := getUser(userID, w) + res, err := cache.WithCache(w.Cache, cfg.cacheCfg, generateCacheKey(cacheArgs), func() (*R, error) { + user, err := w.GetUser(userID) if err != nil { return nil, err } @@ -135,7 +112,7 @@ func ExecAuthRPC[R any](ctx context.Context, w *Services, cfg *ExecCfg, method s var rpcErr error for i := range candidateAccts { acct := candidateAccts[i] - result, err := withEncRPCConnection(ctx, w, acct, func(rpcClient *tenrpc.EncRPCClient) (*R, error) { + result, err := services.WithEncRPCConnection(ctx, w, acct, func(rpcClient *tenrpc.EncRPCClient) (*R, error) { var result *R adjustedArgs := args if cfg.adjustArgs != nil { @@ -174,12 +151,12 @@ func ExecAuthRPC[R any](ctx context.Context, w *Services, cfg *ExecCfg, method s return res, err } -func getCandidateAccounts(user *GWUser, _ *Services, cfg *ExecCfg) ([]*GWAccount, error) { - candidateAccts := make([]*GWAccount, 0) +func getCandidateAccounts(user *services.GWUser, _ *services.Services, cfg *ExecCfg) ([]*services.GWAccount, error) { + candidateAccts := make([]*services.GWAccount, 0) // for users with multiple accounts try to determine a candidate account based on the available information switch { case cfg.account != nil: - acc := user.accounts[*cfg.account] + acc := user.Accounts[*cfg.account] if acc != nil { candidateAccts = append(candidateAccts, acc) return candidateAccts, nil @@ -188,7 +165,7 @@ func getCandidateAccounts(user *GWUser, _ *Services, cfg *ExecCfg) ([]*GWAccount case cfg.computeFromCallback != nil: suggestedAddress := cfg.computeFromCallback(user) if suggestedAddress != nil { - acc := user.accounts[*suggestedAddress] + acc := user.Accounts[*suggestedAddress] if acc != nil { candidateAccts = append(candidateAccts, acc) return candidateAccts, nil @@ -197,7 +174,7 @@ func getCandidateAccounts(user *GWUser, _ *Services, cfg *ExecCfg) ([]*GWAccount } if cfg.tryAll || cfg.tryUntilAuthorised { - for _, acc := range user.accounts { + for _, acc := range user.Accounts { candidateAccts = append(candidateAccts, acc) } } @@ -205,7 +182,7 @@ func getCandidateAccounts(user *GWUser, _ *Services, cfg *ExecCfg) ([]*GWAccount return candidateAccts, nil } -func extractUserID(ctx context.Context, _ *Services) ([]byte, error) { +func extractUserID(ctx context.Context, _ *services.Services) ([]byte, error) { token, ok := ctx.Value(rpc.GWTokenKey{}).(string) if !ok { return nil, fmt.Errorf("invalid userid: %s", ctx.Value(rpc.GWTokenKey{})) @@ -233,115 +210,24 @@ func generateCacheKey(params []any) []byte { return hasher.Sum(nil) } -func withCache[R any](cache cache.Cache, cfg *CacheCfg, cacheKey []byte, onCacheMiss func() (*R, error)) (*R, error) { - if cfg == nil { - return onCacheMiss() - } - - cacheType := cfg.CacheType - if cfg.CacheTypeDynamic != nil { - cacheType = cfg.CacheTypeDynamic() - } - - if cacheType == NoCache { - return onCacheMiss() - } - - // we implement a custom cache eviction logic for the cache strategy of type LatestBatch. - // when a new batch is created, all entries with "LatestBatch" are considered evicted. - // elements not cached for a specific batch are not evicted - isEvicted := false - ttl := longCacheTTL - if cacheType == LatestBatch { - ttl = shortCacheTTL - isEvicted = cache.IsEvicted(cacheKey, ttl) - } - - if !isEvicted { - cachedValue, foundInCache := cache.Get(cacheKey) - if foundInCache { - returnValue, ok := cachedValue.(*R) - if !ok { - return nil, fmt.Errorf("unexpected error. Invalid format cached. %v", cachedValue) - } - return returnValue, nil - } - } - - result, err := onCacheMiss() - - // cache only non-nil values - if err == nil && result != nil { - cache.Set(cacheKey, result, ttl) - } - - return result, err -} - -func audit(services *Services, msg string, params ...any) { +func audit(services *services.Services, msg string, params ...any) { if services.Config.VerboseFlag { - services.logger.Info(fmt.Sprintf(msg, params...)) + services.Logger().Info(fmt.Sprintf(msg, params...)) } } -func cacheBlockNumberOrHash(blockNrOrHash rpc.BlockNumberOrHash) CacheStrategy { +func cacheBlockNumberOrHash(blockNrOrHash rpc.BlockNumberOrHash) cache.Strategy { if blockNrOrHash.BlockNumber != nil && blockNrOrHash.BlockNumber.Int64() <= 0 { - return LatestBatch + return cache.LatestBatch } - return LongLiving + return cache.LongLiving } -func cacheBlockNumber(lastBlock rpc.BlockNumber) CacheStrategy { +func cacheBlockNumber(lastBlock rpc.BlockNumber) cache.Strategy { if lastBlock > 0 { - return LongLiving - } - return LatestBatch -} - -func connectWS(ctx context.Context, account *GWAccount, logger gethlog.Logger) (*tenrpc.EncRPCClient, error) { - return conn(ctx, account.user.services.rpcWSConnPool, account, logger) -} - -func conn(ctx context.Context, p *pool.ObjectPool, account *GWAccount, logger gethlog.Logger) (*tenrpc.EncRPCClient, error) { - defer core.LogMethodDuration(logger, measure.NewStopwatch(), "get rpc connection") - connectionObj, err := p.BorrowObject(ctx) - if err != nil { - return nil, fmt.Errorf("cannot fetch rpc connection to backend node %w", err) - } - conn := connectionObj.(*rpc.Client) - encClient, err := wecommon.CreateEncClient(conn, account.address.Bytes(), account.user.userKey, account.signature, account.signatureType, logger) - if err != nil { - _ = returnConn(p, conn, logger) - return nil, fmt.Errorf("error creating new client, %w", err) - } - return encClient, nil -} - -func returnConn(p *pool.ObjectPool, conn tenrpc.Client, logger gethlog.Logger) error { - err := p.ReturnObject(context.Background(), conn) - if err != nil { - logger.Error("Error returning connection to pool", log.ErrKey, err) - } - return err -} - -func withEncRPCConnection[R any](ctx context.Context, w *Services, acct *GWAccount, execute func(*tenrpc.EncRPCClient) (*R, error)) (*R, error) { - rpcClient, err := conn(ctx, acct.user.services.rpcHTTPConnPool, acct, w.logger) - if err != nil { - return nil, fmt.Errorf("could not connect to backed. Cause: %w", err) - } - defer returnConn(w.rpcHTTPConnPool, rpcClient.BackingClient(), w.logger) - return execute(rpcClient) -} - -func withPlainRPCConnection[R any](ctx context.Context, w *Services, execute func(client *rpc.Client) (*R, error)) (*R, error) { - connectionObj, err := w.rpcHTTPConnPool.BorrowObject(ctx) - if err != nil { - return nil, fmt.Errorf("cannot fetch rpc connection to backend node %w", err) + return cache.LongLiving } - rpcClient := connectionObj.(*rpc.Client) - defer returnConn(w.rpcHTTPConnPool, rpcClient, w.logger) - return execute(rpcClient) + return cache.LatestBatch } func SafeGenericToString[R any](r *R) string { diff --git a/tools/walletextension/rpcapi/web3_api.go b/tools/walletextension/rpcapi/web3_api.go index 78059a01e1..8783baaa2f 100644 --- a/tools/walletextension/rpcapi/web3_api.go +++ b/tools/walletextension/rpcapi/web3_api.go @@ -2,15 +2,17 @@ package rpcapi import ( "context" + + "github.com/ten-protocol/go-ten/tools/walletextension/services" ) var _hardcodedClientVersion = "Geth/v10.0.0/ten" type Web3API struct { - we *Services + we *services.Services } -func NewWeb3API(we *Services) *Web3API { +func NewWeb3API(we *services.Services) *Web3API { return &Web3API{we} } diff --git a/tools/walletextension/services/conn_utils.go b/tools/walletextension/services/conn_utils.go new file mode 100644 index 0000000000..ee3a6ec87f --- /dev/null +++ b/tools/walletextension/services/conn_utils.go @@ -0,0 +1,61 @@ +package services + +import ( + "context" + "fmt" + + gethlog "github.com/ethereum/go-ethereum/log" + pool "github.com/jolestar/go-commons-pool/v2" + "github.com/ten-protocol/go-ten/go/common/log" + "github.com/ten-protocol/go-ten/go/common/measure" + "github.com/ten-protocol/go-ten/go/enclave/core" + tenrpc "github.com/ten-protocol/go-ten/go/rpc" + "github.com/ten-protocol/go-ten/lib/gethfork/rpc" + wecommon "github.com/ten-protocol/go-ten/tools/walletextension/common" +) + +func ConnectWS(ctx context.Context, account *GWAccount, logger gethlog.Logger) (*tenrpc.EncRPCClient, error) { + return connect(ctx, account.user.services.RpcWSConnPool, account, logger) +} + +func connect(ctx context.Context, p *pool.ObjectPool, account *GWAccount, logger gethlog.Logger) (*tenrpc.EncRPCClient, error) { + defer core.LogMethodDuration(logger, measure.NewStopwatch(), "get rpc connection") + connectionObj, err := p.BorrowObject(ctx) + if err != nil { + return nil, fmt.Errorf("cannot fetch rpc connection to backend node %w", err) + } + conn := connectionObj.(*rpc.Client) + encClient, err := wecommon.CreateEncClient(conn, account.Address.Bytes(), account.user.userKey, account.signature, account.signatureType, logger) + if err != nil { + _ = ReturnConn(p, conn, logger) + return nil, fmt.Errorf("error creating new client, %w", err) + } + return encClient, nil +} + +func ReturnConn(p *pool.ObjectPool, conn tenrpc.Client, logger gethlog.Logger) error { + err := p.ReturnObject(context.Background(), conn) + if err != nil { + logger.Error("Error returning connection to pool", log.ErrKey, err) + } + return err +} + +func WithEncRPCConnection[R any](ctx context.Context, w *Services, acct *GWAccount, execute func(*tenrpc.EncRPCClient) (*R, error)) (*R, error) { + rpcClient, err := connect(ctx, acct.user.services.RpcHTTPConnPool, acct, w.logger) + if err != nil { + return nil, fmt.Errorf("could not connect to backed. Cause: %w", err) + } + defer ReturnConn(w.RpcHTTPConnPool, rpcClient.BackingClient(), w.logger) + return execute(rpcClient) +} + +func WithPlainRPCConnection[R any](ctx context.Context, w *Services, execute func(client *rpc.Client) (*R, error)) (*R, error) { + connectionObj, err := w.RpcHTTPConnPool.BorrowObject(ctx) + if err != nil { + return nil, fmt.Errorf("cannot fetch rpc connection to backend node %w", err) + } + rpcClient := connectionObj.(*rpc.Client) + defer ReturnConn(w.RpcHTTPConnPool, rpcClient, w.logger) + return execute(rpcClient) +} diff --git a/tools/walletextension/rpcapi/gw_user.go b/tools/walletextension/services/gw_user.go similarity index 61% rename from tools/walletextension/rpcapi/gw_user.go rename to tools/walletextension/services/gw_user.go index 73117eb1f4..92a64dd476 100644 --- a/tools/walletextension/rpcapi/gw_user.go +++ b/tools/walletextension/services/gw_user.go @@ -1,9 +1,6 @@ -package rpcapi +package services import ( - "fmt" - - "github.com/status-im/keycard-go/hexutils" "github.com/ten-protocol/go-ten/go/common/viewingkey" "github.com/ethereum/go-ethereum/common" @@ -14,7 +11,7 @@ var userCacheKeyPrefix = []byte{0x0, 0x1, 0x2, 0x3} type GWAccount struct { user *GWUser - address *common.Address + Address *common.Address signature []byte signatureType viewingkey.SignatureType } @@ -22,14 +19,14 @@ type GWAccount struct { type GWUser struct { userID []byte services *Services - accounts map[common.Address]*GWAccount + Accounts map[common.Address]*GWAccount userKey []byte } func (u GWUser) GetAllAddresses() []*common.Address { accts := make([]*common.Address, 0) - for _, acc := range u.accounts { - accts = append(accts, acc.address) + for _, acc := range u.Accounts { + accts = append(accts, acc.Address) } return accts } @@ -38,7 +35,7 @@ func gwUserFromDB(userDB wecommon.GWUserDB, s *Services) (*GWUser, error) { result := &GWUser{ userID: userDB.UserId, services: s, - accounts: make(map[common.Address]*GWAccount), + Accounts: make(map[common.Address]*GWAccount), userKey: userDB.PrivateKey, } @@ -46,11 +43,11 @@ func gwUserFromDB(userDB wecommon.GWUserDB, s *Services) (*GWUser, error) { address := common.BytesToAddress(accountDB.AccountAddress) gwAccount := &GWAccount{ user: result, - address: &address, + Address: &address, signature: accountDB.Signature, signatureType: viewingkey.SignatureType(accountDB.SignatureType), } - result.accounts[address] = gwAccount + result.Accounts[address] = gwAccount } return result, nil @@ -62,14 +59,3 @@ func userCacheKey(userID []byte) []byte { key = append(key, userID...) return key } - -func getUser(userID []byte, s *Services) (*GWUser, error) { - return withCache(s.Cache, &CacheCfg{CacheType: LongLiving}, userCacheKey(userID), func() (*GWUser, error) { - user, err := s.Storage.GetUser(userID) - if err != nil { - return nil, fmt.Errorf("user %s not found. %w", hexutils.BytesToHex(userID), err) - } - result, err := gwUserFromDB(user, s) - return result, err - }) -} diff --git a/tools/walletextension/services/utils.go b/tools/walletextension/services/utils.go new file mode 100644 index 0000000000..7f0224fdb2 --- /dev/null +++ b/tools/walletextension/services/utils.go @@ -0,0 +1,11 @@ +package services + +import ( + "fmt" +) + +func audit(services *Services, msg string, params ...any) { + if services.Config.VerboseFlag { + services.logger.Info(fmt.Sprintf(msg, params...)) + } +} diff --git a/tools/walletextension/rpcapi/wallet_extension.go b/tools/walletextension/services/wallet_extension.go similarity index 91% rename from tools/walletextension/rpcapi/wallet_extension.go rename to tools/walletextension/services/wallet_extension.go index 9e93885369..aac8986d99 100644 --- a/tools/walletextension/rpcapi/wallet_extension.go +++ b/tools/walletextension/services/wallet_extension.go @@ -1,4 +1,4 @@ -package rpcapi +package services import ( "bytes" @@ -46,8 +46,8 @@ type Services struct { Cache cache.Cache RateLimiter *ratelimiter.RateLimiter // the OG maintains a connection pool of rpc connections to underlying nodes - rpcHTTPConnPool *pool.ObjectPool - rpcWSConnPool *pool.ObjectPool + RpcHTTPConnPool *pool.ObjectPool + RpcWSConnPool *pool.ObjectPool Config *common.Config NewHeadsService *subscriptioncommon.NewHeadsService } @@ -103,8 +103,8 @@ func NewServices(hostAddrHTTP string, hostAddrWS string, storage storage.Storage version: version, Cache: newGatewayCache, RateLimiter: rateLimiter, - rpcHTTPConnPool: pool.NewObjectPool(context.Background(), factoryHTTP, cfg), - rpcWSConnPool: pool.NewObjectPool(context.Background(), factoryWS, cfg), + RpcHTTPConnPool: pool.NewObjectPool(context.Background(), factoryHTTP, cfg), + RpcWSConnPool: pool.NewObjectPool(context.Background(), factoryWS, cfg), Config: config, } @@ -132,7 +132,7 @@ func subscribeToNewHeadsWithRetry(ch chan *tencommon.BatchHeader, services Servi var sub *gethrpc.ClientSubscription err := retry.Do( func() error { - connectionObj, err := services.rpcWSConnPool.BorrowObject(context.Background()) + connectionObj, err := services.RpcWSConnPool.BorrowObject(context.Background()) if err != nil { return fmt.Errorf("cannot fetch rpc connection to backend node %w", err) } @@ -140,7 +140,7 @@ func subscribeToNewHeadsWithRetry(ch chan *tencommon.BatchHeader, services Servi sub, err = rpcClient.Subscribe(context.Background(), rpc.SubscribeNamespace, ch, rpc.SubscriptionTypeNewHeads) if err != nil { logger.Info("could not subscribe for new head blocks", log.ErrKey, err) - _ = returnConn(services.rpcWSConnPool, rpcClient, logger) + _ = ReturnConn(services.RpcWSConnPool, rpcClient, logger) } return err }, @@ -277,7 +277,7 @@ func (w *Services) Version() string { func (w *Services) GetTenNodeHealthStatus() (bool, error) { audit(w, "Getting TEN node health status") - res, err := withPlainRPCConnection[bool](context.Background(), w, func(client *gethrpc.Client) (*bool, error) { + res, err := WithPlainRPCConnection[bool](context.Background(), w, func(client *gethrpc.Client) (*bool, error) { res, err := obsclient.NewObsClient(client).Health() return &res, err }) @@ -286,7 +286,7 @@ func (w *Services) GetTenNodeHealthStatus() (bool, error) { func (w *Services) GetTenNetworkConfig() (tencommon.TenNetworkInfo, error) { audit(w, "Getting TEN network config") - res, err := withPlainRPCConnection[tencommon.TenNetworkInfo](context.Background(), w, func(client *gethrpc.Client) (*tencommon.TenNetworkInfo, error) { + res, err := WithPlainRPCConnection[tencommon.TenNetworkInfo](context.Background(), w, func(client *gethrpc.Client) (*tencommon.TenNetworkInfo, error) { res, err := obsclient.NewObsClient(client).GetConfig() return res, err }) @@ -310,7 +310,19 @@ func (w *Services) GenerateUserMessageToSign(encryptionToken []byte, formatsSlic return string(message), nil } +func (w *Services) GetUser(userID []byte) (*GWUser, error) { + return cache.WithCache(w.Cache, &cache.Cfg{Type: cache.LongLiving}, userCacheKey(userID), func() (*GWUser, error) { + // todo - use storage with cache + user, err := w.Storage.GetUser(userID) + if err != nil { + return nil, fmt.Errorf("user %s not found. %w", hexutils.BytesToHex(userID), err) + } + result, err := gwUserFromDB(user, w) + return result, err + }) +} + func (w *Services) Stop() { - w.rpcHTTPConnPool.Close(context.Background()) - w.rpcWSConnPool.Close(context.Background()) + w.RpcHTTPConnPool.Close(context.Background()) + w.RpcWSConnPool.Close(context.Background()) } diff --git a/tools/walletextension/walletextension_container.go b/tools/walletextension/walletextension_container.go index a306e3a99a..64f55b7c7d 100644 --- a/tools/walletextension/walletextension_container.go +++ b/tools/walletextension/walletextension_container.go @@ -4,6 +4,8 @@ import ( "os" "time" + "github.com/ten-protocol/go-ten/tools/walletextension/services" + "github.com/ten-protocol/go-ten/go/common/subscription" "github.com/ten-protocol/go-ten/tools/walletextension/httpapi" @@ -24,7 +26,7 @@ type Container struct { stopControl *stopcontrol.StopControl logger gethlog.Logger rpcServer node.Server - services *rpcapi.Services + services *services.Services newHeadsService *subscription.NewHeadsService } @@ -58,7 +60,7 @@ func NewContainerFromConfig(config wecommon.Config, logger gethlog.Logger) *Cont } stopControl := stopcontrol.New() - walletExt := rpcapi.NewServices(hostRPCBindAddrHTTP, hostRPCBindAddrWS, databaseStorage, stopControl, version, logger, &config) + walletExt := services.NewServices(hostRPCBindAddrHTTP, hostRPCBindAddrWS, databaseStorage, stopControl, version, logger, &config) cfg := &node.RPCConfig{ EnableHTTP: true, HTTPPort: config.WalletExtensionPortHTTP,