Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

introduce a more robust cache eviction mechanism #2216

Merged
merged 5 commits into from
Dec 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 19 additions & 6 deletions tools/walletextension/cache/RistrettoCache.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package cache

import (
"fmt"
"sync/atomic"
"time"

"github.com/ethereum/go-ethereum/log"
Expand All @@ -15,9 +16,10 @@ const (
)

type ristrettoCache struct {
cache *ristretto.Cache
quit chan struct{}
lastEviction time.Time
cache *ristretto.Cache
quit chan struct{}
lastEviction time.Time
shortLivingEnabled *atomic.Bool
}

// NewRistrettoCacheWithEviction returns a new ristrettoCache.
Expand All @@ -33,10 +35,12 @@ func NewRistrettoCacheWithEviction(nrElems int, logger log.Logger) (Cache, error
}

c := &ristrettoCache{
cache: cache,
quit: make(chan struct{}),
lastEviction: time.Now(),
cache: cache,
quit: make(chan struct{}),
lastEviction: time.Now(),
shortLivingEnabled: &atomic.Bool{},
}
c.shortLivingEnabled.Store(true)

// Start the metrics logging
go c.startMetricsLogging(logger)
Expand All @@ -45,10 +49,19 @@ func NewRistrettoCacheWithEviction(nrElems int, logger log.Logger) (Cache, error
}

func (c *ristrettoCache) EvictShortLiving() {
// this event happens when a new batch is received, so the cache can be enabled
c.shortLivingEnabled.Store(true)
c.lastEviction = time.Now()
}

func (c *ristrettoCache) DisableShortLiving() {
c.shortLivingEnabled.Store(false)
}

func (c *ristrettoCache) IsEvicted(key any, originalTTL time.Duration) bool {
if !c.shortLivingEnabled.Load() {
return true
}
remainingTTL, notExpired := c.cache.GetTTL(key)
if !notExpired {
return true
Expand Down
3 changes: 3 additions & 0 deletions tools/walletextension/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ type Cache interface {
// EvictShortLiving - notify the cache that all short living elements cached before the events should be considered as evicted.
EvictShortLiving()

// DisableShortLiving disables the caching of short-living elements.
DisableShortLiving()

// IsEvicted - based on the eviction event and the time of caching, calculates whether the key was evicted
IsEvicted(key any, originalTTL time.Duration) bool

Expand Down
39 changes: 23 additions & 16 deletions tools/walletextension/common/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,30 @@ import "time"

// Config contains the configuration required by the WalletExtension.
type Config struct {
WalletExtensionHost string
WalletExtensionPortHTTP int
WalletExtensionPortWS int
NodeRPCHTTPAddress string
NodeRPCWebsocketAddress string
LogPath string
DBPathOverride string // Overrides the database file location. Used in tests.
VerboseFlag bool
DBType string
DBConnectionURL string
TenChainID int
StoreIncomingTxs bool
WalletExtensionHost string
WalletExtensionPortHTTP int
WalletExtensionPortWS int

NodeRPCHTTPAddress string
NodeRPCWebsocketAddress string

LogPath string
DBPathOverride string // Overrides the database file location. Used in tests.
VerboseFlag bool

DBType string
DBConnectionURL string

TenChainID int
StoreIncomingTxs bool

RateLimitUserComputeTime time.Duration
RateLimitWindow time.Duration
RateLimitMaxConcurrentRequests int
InsideEnclave bool // Indicates if the program is running inside an enclave
KeyExchangeURL string
EnableTLS bool
TLSDomain string

InsideEnclave bool // Indicates if the program is running inside an enclave
KeyExchangeURL string

EnableTLS bool
TLSDomain string
}
87 changes: 61 additions & 26 deletions tools/walletextension/services/wallet_extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,18 +36,19 @@ import (

// Services handles the various business logic for the api endpoints
type Services struct {
HostAddrHTTP string // The HTTP address on which the TEN host can be reached
HostAddrWS string // The WS address on which the TEN host can be reached
Storage storage.UserStorage
logger gethlog.Logger
stopControl *stopcontrol.StopControl
version string
RPCResponsesCache cache.Cache
BackendRPC *BackendRPC
RateLimiter *ratelimiter.RateLimiter
SKManager SKManager
Config *common.Config
NewHeadsService *subscriptioncommon.NewHeadsService
HostAddrHTTP string // The HTTP address on which the TEN host can be reached
HostAddrWS string // The WS address on which the TEN host can be reached
Storage storage.UserStorage
logger gethlog.Logger
stopControl *stopcontrol.StopControl
version string
RPCResponsesCache cache.Cache
BackendRPC *BackendRPC
RateLimiter *ratelimiter.RateLimiter
SKManager SKManager
Config *common.Config
NewHeadsService *subscriptioncommon.NewHeadsService
cacheInvalidationCh chan *tencommon.BatchHeader
}

type NewHeadNotifier interface {
Expand All @@ -67,17 +68,18 @@ func NewServices(hostAddrHTTP string, hostAddrWS string, storage storage.UserSto
rateLimiter := ratelimiter.NewRateLimiter(config.RateLimitUserComputeTime, config.RateLimitWindow, uint32(config.RateLimitMaxConcurrentRequests), logger)

services := Services{
HostAddrHTTP: hostAddrHTTP,
HostAddrWS: hostAddrWS,
Storage: storage,
logger: logger,
stopControl: stopControl,
version: version,
RPCResponsesCache: newGatewayCache,
BackendRPC: NewBackendRPC(hostAddrHTTP, hostAddrWS, logger),
SKManager: NewSKManager(storage, config, logger),
RateLimiter: rateLimiter,
Config: config,
HostAddrHTTP: hostAddrHTTP,
HostAddrWS: hostAddrWS,
Storage: storage,
logger: logger,
stopControl: stopControl,
version: version,
RPCResponsesCache: newGatewayCache,
BackendRPC: NewBackendRPC(hostAddrHTTP, hostAddrWS, logger),
SKManager: NewSKManager(storage, config, logger),
RateLimiter: rateLimiter,
Config: config,
cacheInvalidationCh: make(chan *tencommon.BatchHeader),
}

services.NewHeadsService = subscriptioncommon.NewNewHeadsService(
Expand All @@ -86,21 +88,50 @@ func NewServices(hostAddrHTTP string, hostAddrWS string, storage storage.UserSto
// clear the cache to avoid returning stale data during reconnecting.
services.RPCResponsesCache.EvictShortLiving()
ch := make(chan *tencommon.BatchHeader)
errCh, err := subscribeToNewHeadsWithRetry(ch, services, logger)
errCh, err := subscribeToNewHeadsWithRetry(ch, &services, logger)
logger.Info("Connected to new heads service.", log.ErrKey, err)
return ch, errCh, err
},
true,
logger,
func(newHead *tencommon.BatchHeader) error {
services.RPCResponsesCache.EvictShortLiving()
services.cacheInvalidationCh <- newHead
return nil
})

go _startCacheEviction(&services, logger)
return &services
}

func subscribeToNewHeadsWithRetry(ch chan *tencommon.BatchHeader, services Services, logger gethlog.Logger) (<-chan error, error) {
// this is a more robust cache eviction that handles delays in the new heads subscription by disabling caching of short living elements
// until a batch is received.
func _startCacheEviction(services *Services, logger gethlog.Logger) {
//- todo read the batch time from the config once we integrate the new config
// if we don't receive a new head after this interval we assume the connection is lost, and we disable caching
disableCacheDelay := 5 * time.Second
timer := time.NewTimer(disableCacheDelay)

for {
select {
case <-services.cacheInvalidationCh:
services.RPCResponsesCache.EvictShortLiving()

timer.Stop()
timer = time.NewTimer(disableCacheDelay)

case <-timer.C: // should only be fired when the normal subscription hasn't fired
logger.Warn("Disabling short living cache because NewHeads subscription is delayed")
services.RPCResponsesCache.DisableShortLiving()

case <-services.stopControl.Done():
logger.Info("Stopping cache eviction")
timer.Stop()
return
}
}
}

func subscribeToNewHeadsWithRetry(ch chan *tencommon.BatchHeader, services *Services, logger gethlog.Logger) (<-chan error, error) {
var sub *gethrpc.ClientSubscription
err := retry.Do(
func() error {
Expand Down Expand Up @@ -238,6 +269,9 @@ func (w *Services) GetTenNodeHealthStatus() (bool, error) {
res, err := obsclient.NewObsClient(client).Health()
return &res.OverallHealth, err
})
if res == nil {
return false, err
}
return *res, err
}

Expand Down Expand Up @@ -271,4 +305,5 @@ func (w *Services) GenerateUserMessageToSign(encryptionToken []byte, formatsSlic

func (w *Services) Stop() {
w.BackendRPC.Stop()
close(w.cacheInvalidationCh)
}
Loading