Skip to content

Commit

Permalink
introduce a more robust cache eviction mechanism
Browse files Browse the repository at this point in the history
  • Loading branch information
tudor-malene committed Dec 19, 2024
1 parent 05b9147 commit 5cd0778
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 42 deletions.
39 changes: 23 additions & 16 deletions tools/walletextension/common/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,30 @@ import "time"

// Config contains the configuration required by the WalletExtension.
type Config struct {
WalletExtensionHost string
WalletExtensionPortHTTP int
WalletExtensionPortWS int
NodeRPCHTTPAddress string
NodeRPCWebsocketAddress string
LogPath string
DBPathOverride string // Overrides the database file location. Used in tests.
VerboseFlag bool
DBType string
DBConnectionURL string
TenChainID int
StoreIncomingTxs bool
WalletExtensionHost string
WalletExtensionPortHTTP int
WalletExtensionPortWS int

NodeRPCHTTPAddress string
NodeRPCWebsocketAddress string

LogPath string
DBPathOverride string // Overrides the database file location. Used in tests.
VerboseFlag bool

DBType string
DBConnectionURL string

TenChainID int
StoreIncomingTxs bool

RateLimitUserComputeTime time.Duration
RateLimitWindow time.Duration
RateLimitMaxConcurrentRequests int
InsideEnclave bool // Indicates if the program is running inside an enclave
KeyExchangeURL string
EnableTLS bool
TLSDomain string

InsideEnclave bool // Indicates if the program is running inside an enclave
KeyExchangeURL string

EnableTLS bool
TLSDomain string
}
92 changes: 66 additions & 26 deletions tools/walletextension/services/wallet_extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,18 +36,19 @@ import (

// Services handles the various business logic for the api endpoints
type Services struct {
HostAddrHTTP string // The HTTP address on which the TEN host can be reached
HostAddrWS string // The WS address on which the TEN host can be reached
Storage storage.UserStorage
logger gethlog.Logger
stopControl *stopcontrol.StopControl
version string
RPCResponsesCache cache.Cache
BackendRPC *BackendRPC
RateLimiter *ratelimiter.RateLimiter
SKManager SKManager
Config *common.Config
NewHeadsService *subscriptioncommon.NewHeadsService
HostAddrHTTP string // The HTTP address on which the TEN host can be reached
HostAddrWS string // The WS address on which the TEN host can be reached
Storage storage.UserStorage
logger gethlog.Logger
stopControl *stopcontrol.StopControl
version string
RPCResponsesCache cache.Cache
BackendRPC *BackendRPC
RateLimiter *ratelimiter.RateLimiter
SKManager SKManager
Config *common.Config
NewHeadsService *subscriptioncommon.NewHeadsService
cacheInvalidationCh chan *tencommon.BatchHeader
}

type NewHeadNotifier interface {
Expand All @@ -67,17 +68,18 @@ func NewServices(hostAddrHTTP string, hostAddrWS string, storage storage.UserSto
rateLimiter := ratelimiter.NewRateLimiter(config.RateLimitUserComputeTime, config.RateLimitWindow, uint32(config.RateLimitMaxConcurrentRequests), logger)

services := Services{
HostAddrHTTP: hostAddrHTTP,
HostAddrWS: hostAddrWS,
Storage: storage,
logger: logger,
stopControl: stopControl,
version: version,
RPCResponsesCache: newGatewayCache,
BackendRPC: NewBackendRPC(hostAddrHTTP, hostAddrWS, logger),
SKManager: NewSKManager(storage, config, logger),
RateLimiter: rateLimiter,
Config: config,
HostAddrHTTP: hostAddrHTTP,
HostAddrWS: hostAddrWS,
Storage: storage,
logger: logger,
stopControl: stopControl,
version: version,
RPCResponsesCache: newGatewayCache,
BackendRPC: NewBackendRPC(hostAddrHTTP, hostAddrWS, logger),
SKManager: NewSKManager(storage, config, logger),
RateLimiter: rateLimiter,
Config: config,
cacheInvalidationCh: make(chan *tencommon.BatchHeader),
}

services.NewHeadsService = subscriptioncommon.NewNewHeadsService(
Expand All @@ -86,21 +88,55 @@ func NewServices(hostAddrHTTP string, hostAddrWS string, storage storage.UserSto
// clear the cache to avoid returning stale data during reconnecting.
services.RPCResponsesCache.EvictShortLiving()
ch := make(chan *tencommon.BatchHeader)
errCh, err := subscribeToNewHeadsWithRetry(ch, services, logger)
errCh, err := subscribeToNewHeadsWithRetry(ch, &services, logger)
logger.Info("Connected to new heads service.", log.ErrKey, err)
return ch, errCh, err
},
true,
logger,
func(newHead *tencommon.BatchHeader) error {
services.RPCResponsesCache.EvictShortLiving()
services.cacheInvalidationCh <- newHead
return nil
})

go _startCacheEviction(&services, logger)
return &services
}

func subscribeToNewHeadsWithRetry(ch chan *tencommon.BatchHeader, services Services, logger gethlog.Logger) (<-chan error, error) {
// this is a more cache eviction that handles delays in the new heads subscription
// if the delay was temporary, the subscription will catch up
func _startCacheEviction(services *Services, logger gethlog.Logger) {
ticker := time.NewTicker(time.Minute)
defer ticker.Stop()

lastEvictionHeight := uint64(0)

for {
select {
case batch := <-services.cacheInvalidationCh:
// when the batch was delayed the ticker has already fired
if batch.Number.Uint64() > lastEvictionHeight {
services.RPCResponsesCache.EvictShortLiving()
lastEvictionHeight = batch.Number.Uint64()
// start a ticker with a slight delay - todo read the batch time from the config once we integrate the new config
ticker.Reset(1100 * time.Millisecond)
}

case <-ticker.C: // should only be fired when the normal subscription hasn't fired
services.RPCResponsesCache.EvictShortLiving()
lastEvictionHeight++
// we assume this ticker takes over
ticker.Reset(1 * time.Second)
logger.Info("Evicting cache from ticker. Head Batch was delayed")

case <-services.stopControl.Done():
logger.Info("Stopping cache eviction")
return
}
}
}

func subscribeToNewHeadsWithRetry(ch chan *tencommon.BatchHeader, services *Services, logger gethlog.Logger) (<-chan error, error) {
var sub *gethrpc.ClientSubscription
err := retry.Do(
func() error {
Expand Down Expand Up @@ -238,6 +274,9 @@ func (w *Services) GetTenNodeHealthStatus() (bool, error) {
res, err := obsclient.NewObsClient(client).Health()
return &res.OverallHealth, err
})
if res == nil {
return false, err
}
return *res, err
}

Expand Down Expand Up @@ -271,4 +310,5 @@ func (w *Services) GenerateUserMessageToSign(encryptionToken []byte, formatsSlic

func (w *Services) Stop() {
w.BackendRPC.Stop()
close(w.cacheInvalidationCh)
}

0 comments on commit 5cd0778

Please sign in to comment.