From 8a8a4643346ad448154ff25fee7f900e3d6ada41 Mon Sep 17 00:00:00 2001 From: Tudor Malene Date: Fri, 20 Dec 2024 10:37:29 +0000 Subject: [PATCH] introduce a more robust cache eviction mechanism (#2216) * introduce a more robust cache eviction mechanism * fix * change logic to disable cache * lint --- tools/walletextension/cache/RistrettoCache.go | 25 ++++-- tools/walletextension/cache/cache.go | 3 + tools/walletextension/common/config.go | 39 +++++---- .../services/wallet_extension.go | 87 +++++++++++++------ 4 files changed, 106 insertions(+), 48 deletions(-) diff --git a/tools/walletextension/cache/RistrettoCache.go b/tools/walletextension/cache/RistrettoCache.go index 979965570b..ab1a492cdc 100644 --- a/tools/walletextension/cache/RistrettoCache.go +++ b/tools/walletextension/cache/RistrettoCache.go @@ -2,6 +2,7 @@ package cache import ( "fmt" + "sync/atomic" "time" "github.com/ethereum/go-ethereum/log" @@ -15,9 +16,10 @@ const ( ) type ristrettoCache struct { - cache *ristretto.Cache - quit chan struct{} - lastEviction time.Time + cache *ristretto.Cache + quit chan struct{} + lastEviction time.Time + shortLivingEnabled *atomic.Bool } // NewRistrettoCacheWithEviction returns a new ristrettoCache. @@ -33,10 +35,12 @@ func NewRistrettoCacheWithEviction(nrElems int, logger log.Logger) (Cache, error } c := &ristrettoCache{ - cache: cache, - quit: make(chan struct{}), - lastEviction: time.Now(), + cache: cache, + quit: make(chan struct{}), + lastEviction: time.Now(), + shortLivingEnabled: &atomic.Bool{}, } + c.shortLivingEnabled.Store(true) // Start the metrics logging go c.startMetricsLogging(logger) @@ -45,10 +49,19 @@ func NewRistrettoCacheWithEviction(nrElems int, logger log.Logger) (Cache, error } func (c *ristrettoCache) EvictShortLiving() { + // this event happens when a new batch is received, so the cache can be enabled + c.shortLivingEnabled.Store(true) c.lastEviction = time.Now() } +func (c *ristrettoCache) DisableShortLiving() { + c.shortLivingEnabled.Store(false) +} + func (c *ristrettoCache) IsEvicted(key any, originalTTL time.Duration) bool { + if !c.shortLivingEnabled.Load() { + return true + } remainingTTL, notExpired := c.cache.GetTTL(key) if !notExpired { return true diff --git a/tools/walletextension/cache/cache.go b/tools/walletextension/cache/cache.go index 722a5e60bf..9043351fa7 100644 --- a/tools/walletextension/cache/cache.go +++ b/tools/walletextension/cache/cache.go @@ -11,6 +11,9 @@ type Cache interface { // EvictShortLiving - notify the cache that all short living elements cached before the events should be considered as evicted. EvictShortLiving() + // DisableShortLiving disables the caching of short-living elements. + DisableShortLiving() + // IsEvicted - based on the eviction event and the time of caching, calculates whether the key was evicted IsEvicted(key any, originalTTL time.Duration) bool diff --git a/tools/walletextension/common/config.go b/tools/walletextension/common/config.go index 6f2eb11ed1..cd0ee6a050 100644 --- a/tools/walletextension/common/config.go +++ b/tools/walletextension/common/config.go @@ -4,23 +4,30 @@ import "time" // Config contains the configuration required by the WalletExtension. type Config struct { - WalletExtensionHost string - WalletExtensionPortHTTP int - WalletExtensionPortWS int - NodeRPCHTTPAddress string - NodeRPCWebsocketAddress string - LogPath string - DBPathOverride string // Overrides the database file location. Used in tests. - VerboseFlag bool - DBType string - DBConnectionURL string - TenChainID int - StoreIncomingTxs bool + WalletExtensionHost string + WalletExtensionPortHTTP int + WalletExtensionPortWS int + + NodeRPCHTTPAddress string + NodeRPCWebsocketAddress string + + LogPath string + DBPathOverride string // Overrides the database file location. Used in tests. + VerboseFlag bool + + DBType string + DBConnectionURL string + + TenChainID int + StoreIncomingTxs bool + RateLimitUserComputeTime time.Duration RateLimitWindow time.Duration RateLimitMaxConcurrentRequests int - InsideEnclave bool // Indicates if the program is running inside an enclave - KeyExchangeURL string - EnableTLS bool - TLSDomain string + + InsideEnclave bool // Indicates if the program is running inside an enclave + KeyExchangeURL string + + EnableTLS bool + TLSDomain string } diff --git a/tools/walletextension/services/wallet_extension.go b/tools/walletextension/services/wallet_extension.go index dc79c30664..98981963c5 100644 --- a/tools/walletextension/services/wallet_extension.go +++ b/tools/walletextension/services/wallet_extension.go @@ -36,18 +36,19 @@ import ( // Services handles the various business logic for the api endpoints type Services struct { - HostAddrHTTP string // The HTTP address on which the TEN host can be reached - HostAddrWS string // The WS address on which the TEN host can be reached - Storage storage.UserStorage - logger gethlog.Logger - stopControl *stopcontrol.StopControl - version string - RPCResponsesCache cache.Cache - BackendRPC *BackendRPC - RateLimiter *ratelimiter.RateLimiter - SKManager SKManager - Config *common.Config - NewHeadsService *subscriptioncommon.NewHeadsService + HostAddrHTTP string // The HTTP address on which the TEN host can be reached + HostAddrWS string // The WS address on which the TEN host can be reached + Storage storage.UserStorage + logger gethlog.Logger + stopControl *stopcontrol.StopControl + version string + RPCResponsesCache cache.Cache + BackendRPC *BackendRPC + RateLimiter *ratelimiter.RateLimiter + SKManager SKManager + Config *common.Config + NewHeadsService *subscriptioncommon.NewHeadsService + cacheInvalidationCh chan *tencommon.BatchHeader } type NewHeadNotifier interface { @@ -67,17 +68,18 @@ func NewServices(hostAddrHTTP string, hostAddrWS string, storage storage.UserSto rateLimiter := ratelimiter.NewRateLimiter(config.RateLimitUserComputeTime, config.RateLimitWindow, uint32(config.RateLimitMaxConcurrentRequests), logger) services := Services{ - HostAddrHTTP: hostAddrHTTP, - HostAddrWS: hostAddrWS, - Storage: storage, - logger: logger, - stopControl: stopControl, - version: version, - RPCResponsesCache: newGatewayCache, - BackendRPC: NewBackendRPC(hostAddrHTTP, hostAddrWS, logger), - SKManager: NewSKManager(storage, config, logger), - RateLimiter: rateLimiter, - Config: config, + HostAddrHTTP: hostAddrHTTP, + HostAddrWS: hostAddrWS, + Storage: storage, + logger: logger, + stopControl: stopControl, + version: version, + RPCResponsesCache: newGatewayCache, + BackendRPC: NewBackendRPC(hostAddrHTTP, hostAddrWS, logger), + SKManager: NewSKManager(storage, config, logger), + RateLimiter: rateLimiter, + Config: config, + cacheInvalidationCh: make(chan *tencommon.BatchHeader), } services.NewHeadsService = subscriptioncommon.NewNewHeadsService( @@ -86,21 +88,50 @@ func NewServices(hostAddrHTTP string, hostAddrWS string, storage storage.UserSto // clear the cache to avoid returning stale data during reconnecting. services.RPCResponsesCache.EvictShortLiving() ch := make(chan *tencommon.BatchHeader) - errCh, err := subscribeToNewHeadsWithRetry(ch, services, logger) + errCh, err := subscribeToNewHeadsWithRetry(ch, &services, logger) logger.Info("Connected to new heads service.", log.ErrKey, err) return ch, errCh, err }, true, logger, func(newHead *tencommon.BatchHeader) error { - services.RPCResponsesCache.EvictShortLiving() + services.cacheInvalidationCh <- newHead return nil }) + go _startCacheEviction(&services, logger) return &services } -func subscribeToNewHeadsWithRetry(ch chan *tencommon.BatchHeader, services Services, logger gethlog.Logger) (<-chan error, error) { +// this is a more robust cache eviction that handles delays in the new heads subscription by disabling caching of short living elements +// until a batch is received. +func _startCacheEviction(services *Services, logger gethlog.Logger) { + //- todo read the batch time from the config once we integrate the new config + // if we don't receive a new head after this interval we assume the connection is lost, and we disable caching + disableCacheDelay := 5 * time.Second + timer := time.NewTimer(disableCacheDelay) + + for { + select { + case <-services.cacheInvalidationCh: + services.RPCResponsesCache.EvictShortLiving() + + timer.Stop() + timer = time.NewTimer(disableCacheDelay) + + case <-timer.C: // should only be fired when the normal subscription hasn't fired + logger.Warn("Disabling short living cache because NewHeads subscription is delayed") + services.RPCResponsesCache.DisableShortLiving() + + case <-services.stopControl.Done(): + logger.Info("Stopping cache eviction") + timer.Stop() + return + } + } +} + +func subscribeToNewHeadsWithRetry(ch chan *tencommon.BatchHeader, services *Services, logger gethlog.Logger) (<-chan error, error) { var sub *gethrpc.ClientSubscription err := retry.Do( func() error { @@ -238,6 +269,9 @@ func (w *Services) GetTenNodeHealthStatus() (bool, error) { res, err := obsclient.NewObsClient(client).Health() return &res.OverallHealth, err }) + if res == nil { + return false, err + } return *res, err } @@ -271,4 +305,5 @@ func (w *Services) GenerateUserMessageToSign(encryptionToken []byte, formatsSlic func (w *Services) Stop() { w.BackendRPC.Stop() + close(w.cacheInvalidationCh) }