From 5cd07786565b00fffaa0466f0697ffd85e51c83f Mon Sep 17 00:00:00 2001 From: Tudor Malene Date: Thu, 19 Dec 2024 14:19:23 +0200 Subject: [PATCH] introduce a more robust cache eviction mechanism --- tools/walletextension/common/config.go | 39 ++++---- .../services/wallet_extension.go | 92 +++++++++++++------ 2 files changed, 89 insertions(+), 42 deletions(-) diff --git a/tools/walletextension/common/config.go b/tools/walletextension/common/config.go index 6f2eb11ed1..cd0ee6a050 100644 --- a/tools/walletextension/common/config.go +++ b/tools/walletextension/common/config.go @@ -4,23 +4,30 @@ import "time" // Config contains the configuration required by the WalletExtension. type Config struct { - WalletExtensionHost string - WalletExtensionPortHTTP int - WalletExtensionPortWS int - NodeRPCHTTPAddress string - NodeRPCWebsocketAddress string - LogPath string - DBPathOverride string // Overrides the database file location. Used in tests. - VerboseFlag bool - DBType string - DBConnectionURL string - TenChainID int - StoreIncomingTxs bool + WalletExtensionHost string + WalletExtensionPortHTTP int + WalletExtensionPortWS int + + NodeRPCHTTPAddress string + NodeRPCWebsocketAddress string + + LogPath string + DBPathOverride string // Overrides the database file location. Used in tests. + VerboseFlag bool + + DBType string + DBConnectionURL string + + TenChainID int + StoreIncomingTxs bool + RateLimitUserComputeTime time.Duration RateLimitWindow time.Duration RateLimitMaxConcurrentRequests int - InsideEnclave bool // Indicates if the program is running inside an enclave - KeyExchangeURL string - EnableTLS bool - TLSDomain string + + InsideEnclave bool // Indicates if the program is running inside an enclave + KeyExchangeURL string + + EnableTLS bool + TLSDomain string } diff --git a/tools/walletextension/services/wallet_extension.go b/tools/walletextension/services/wallet_extension.go index dc79c30664..1ef7ff9bb9 100644 --- a/tools/walletextension/services/wallet_extension.go +++ b/tools/walletextension/services/wallet_extension.go @@ -36,18 +36,19 @@ import ( // Services handles the various business logic for the api endpoints type Services struct { - HostAddrHTTP string // The HTTP address on which the TEN host can be reached - HostAddrWS string // The WS address on which the TEN host can be reached - Storage storage.UserStorage - logger gethlog.Logger - stopControl *stopcontrol.StopControl - version string - RPCResponsesCache cache.Cache - BackendRPC *BackendRPC - RateLimiter *ratelimiter.RateLimiter - SKManager SKManager - Config *common.Config - NewHeadsService *subscriptioncommon.NewHeadsService + HostAddrHTTP string // The HTTP address on which the TEN host can be reached + HostAddrWS string // The WS address on which the TEN host can be reached + Storage storage.UserStorage + logger gethlog.Logger + stopControl *stopcontrol.StopControl + version string + RPCResponsesCache cache.Cache + BackendRPC *BackendRPC + RateLimiter *ratelimiter.RateLimiter + SKManager SKManager + Config *common.Config + NewHeadsService *subscriptioncommon.NewHeadsService + cacheInvalidationCh chan *tencommon.BatchHeader } type NewHeadNotifier interface { @@ -67,17 +68,18 @@ func NewServices(hostAddrHTTP string, hostAddrWS string, storage storage.UserSto rateLimiter := ratelimiter.NewRateLimiter(config.RateLimitUserComputeTime, config.RateLimitWindow, uint32(config.RateLimitMaxConcurrentRequests), logger) services := Services{ - HostAddrHTTP: hostAddrHTTP, - HostAddrWS: hostAddrWS, - Storage: storage, - logger: logger, - stopControl: stopControl, - version: version, - RPCResponsesCache: newGatewayCache, - BackendRPC: NewBackendRPC(hostAddrHTTP, hostAddrWS, logger), - SKManager: NewSKManager(storage, config, logger), - RateLimiter: rateLimiter, - Config: config, + HostAddrHTTP: hostAddrHTTP, + HostAddrWS: hostAddrWS, + Storage: storage, + logger: logger, + stopControl: stopControl, + version: version, + RPCResponsesCache: newGatewayCache, + BackendRPC: NewBackendRPC(hostAddrHTTP, hostAddrWS, logger), + SKManager: NewSKManager(storage, config, logger), + RateLimiter: rateLimiter, + Config: config, + cacheInvalidationCh: make(chan *tencommon.BatchHeader), } services.NewHeadsService = subscriptioncommon.NewNewHeadsService( @@ -86,21 +88,55 @@ func NewServices(hostAddrHTTP string, hostAddrWS string, storage storage.UserSto // clear the cache to avoid returning stale data during reconnecting. services.RPCResponsesCache.EvictShortLiving() ch := make(chan *tencommon.BatchHeader) - errCh, err := subscribeToNewHeadsWithRetry(ch, services, logger) + errCh, err := subscribeToNewHeadsWithRetry(ch, &services, logger) logger.Info("Connected to new heads service.", log.ErrKey, err) return ch, errCh, err }, true, logger, func(newHead *tencommon.BatchHeader) error { - services.RPCResponsesCache.EvictShortLiving() + services.cacheInvalidationCh <- newHead return nil }) + go _startCacheEviction(&services, logger) return &services } -func subscribeToNewHeadsWithRetry(ch chan *tencommon.BatchHeader, services Services, logger gethlog.Logger) (<-chan error, error) { +// this is a more cache eviction that handles delays in the new heads subscription +// if the delay was temporary, the subscription will catch up +func _startCacheEviction(services *Services, logger gethlog.Logger) { + ticker := time.NewTicker(time.Minute) + defer ticker.Stop() + + lastEvictionHeight := uint64(0) + + for { + select { + case batch := <-services.cacheInvalidationCh: + // when the batch was delayed the ticker has already fired + if batch.Number.Uint64() > lastEvictionHeight { + services.RPCResponsesCache.EvictShortLiving() + lastEvictionHeight = batch.Number.Uint64() + // start a ticker with a slight delay - todo read the batch time from the config once we integrate the new config + ticker.Reset(1100 * time.Millisecond) + } + + case <-ticker.C: // should only be fired when the normal subscription hasn't fired + services.RPCResponsesCache.EvictShortLiving() + lastEvictionHeight++ + // we assume this ticker takes over + ticker.Reset(1 * time.Second) + logger.Info("Evicting cache from ticker. Head Batch was delayed") + + case <-services.stopControl.Done(): + logger.Info("Stopping cache eviction") + return + } + } +} + +func subscribeToNewHeadsWithRetry(ch chan *tencommon.BatchHeader, services *Services, logger gethlog.Logger) (<-chan error, error) { var sub *gethrpc.ClientSubscription err := retry.Do( func() error { @@ -238,6 +274,9 @@ func (w *Services) GetTenNodeHealthStatus() (bool, error) { res, err := obsclient.NewObsClient(client).Health() return &res.OverallHealth, err }) + if res == nil { + return false, err + } return *res, err } @@ -271,4 +310,5 @@ func (w *Services) GenerateUserMessageToSign(encryptionToken []byte, formatsSlic func (w *Services) Stop() { w.BackendRPC.Stop() + close(w.cacheInvalidationCh) }