Skip to content

Commit

Permalink
Merge branch 'main' into ziga/gateway_flag_store_cert_unencrypted
Browse files Browse the repository at this point in the history
  • Loading branch information
zkokelj authored Dec 20, 2024
2 parents 242a1fd + 8a8a464 commit 0b41c3a
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 44 deletions.
25 changes: 19 additions & 6 deletions tools/walletextension/cache/RistrettoCache.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package cache

import (
"fmt"
"sync/atomic"
"time"

"github.com/ethereum/go-ethereum/log"
Expand All @@ -15,9 +16,10 @@ const (
)

type ristrettoCache struct {
cache *ristretto.Cache
quit chan struct{}
lastEviction time.Time
cache *ristretto.Cache
quit chan struct{}
lastEviction time.Time
shortLivingEnabled *atomic.Bool
}

// NewRistrettoCacheWithEviction returns a new ristrettoCache.
Expand All @@ -33,10 +35,12 @@ func NewRistrettoCacheWithEviction(nrElems int, logger log.Logger) (Cache, error
}

c := &ristrettoCache{
cache: cache,
quit: make(chan struct{}),
lastEviction: time.Now(),
cache: cache,
quit: make(chan struct{}),
lastEviction: time.Now(),
shortLivingEnabled: &atomic.Bool{},
}
c.shortLivingEnabled.Store(true)

// Start the metrics logging
go c.startMetricsLogging(logger)
Expand All @@ -45,10 +49,19 @@ func NewRistrettoCacheWithEviction(nrElems int, logger log.Logger) (Cache, error
}

func (c *ristrettoCache) EvictShortLiving() {
// this event happens when a new batch is received, so the cache can be enabled
c.shortLivingEnabled.Store(true)
c.lastEviction = time.Now()
}

func (c *ristrettoCache) DisableShortLiving() {
c.shortLivingEnabled.Store(false)
}

func (c *ristrettoCache) IsEvicted(key any, originalTTL time.Duration) bool {
if !c.shortLivingEnabled.Load() {
return true
}
remainingTTL, notExpired := c.cache.GetTTL(key)
if !notExpired {
return true
Expand Down
3 changes: 3 additions & 0 deletions tools/walletextension/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ type Cache interface {
// EvictShortLiving - notify the cache that all short living elements cached before the events should be considered as evicted.
EvictShortLiving()

// DisableShortLiving disables the caching of short-living elements.
DisableShortLiving()

// IsEvicted - based on the eviction event and the time of caching, calculates whether the key was evicted
IsEvicted(key any, originalTTL time.Duration) bool

Expand Down
30 changes: 18 additions & 12 deletions tools/walletextension/common/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,27 @@ import "time"

// Config contains the configuration required by the WalletExtension.
type Config struct {
WalletExtensionHost string
WalletExtensionPortHTTP int
WalletExtensionPortWS int
NodeRPCHTTPAddress string
NodeRPCWebsocketAddress string
LogPath string
DBPathOverride string // Overrides the database file location. Used in tests.
VerboseFlag bool
DBType string
DBConnectionURL string
TenChainID int
StoreIncomingTxs bool
WalletExtensionHost string
WalletExtensionPortHTTP int
WalletExtensionPortWS int

NodeRPCHTTPAddress string
NodeRPCWebsocketAddress string

LogPath string
DBPathOverride string // Overrides the database file location. Used in tests.
VerboseFlag bool

DBType string
DBConnectionURL string

TenChainID int
StoreIncomingTxs bool

RateLimitUserComputeTime time.Duration
RateLimitWindow time.Duration
RateLimitMaxConcurrentRequests int

InsideEnclave bool // Indicates if the program is running inside an enclave

Check failure on line 28 in tools/walletextension/common/config.go

View workflow job for this annotation

GitHub Actions / lint

File is not `goimports`-ed (goimports)
KeyExchangeURL string
EnableTLS bool
Expand Down
87 changes: 61 additions & 26 deletions tools/walletextension/services/wallet_extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,18 +36,19 @@ import (

// Services handles the various business logic for the api endpoints
type Services struct {
HostAddrHTTP string // The HTTP address on which the TEN host can be reached
HostAddrWS string // The WS address on which the TEN host can be reached
Storage storage.UserStorage
logger gethlog.Logger
stopControl *stopcontrol.StopControl
version string
RPCResponsesCache cache.Cache
BackendRPC *BackendRPC
RateLimiter *ratelimiter.RateLimiter
SKManager SKManager
Config *common.Config
NewHeadsService *subscriptioncommon.NewHeadsService
HostAddrHTTP string // The HTTP address on which the TEN host can be reached
HostAddrWS string // The WS address on which the TEN host can be reached
Storage storage.UserStorage
logger gethlog.Logger
stopControl *stopcontrol.StopControl
version string
RPCResponsesCache cache.Cache
BackendRPC *BackendRPC
RateLimiter *ratelimiter.RateLimiter
SKManager SKManager
Config *common.Config
NewHeadsService *subscriptioncommon.NewHeadsService
cacheInvalidationCh chan *tencommon.BatchHeader
}

type NewHeadNotifier interface {
Expand All @@ -67,17 +68,18 @@ func NewServices(hostAddrHTTP string, hostAddrWS string, storage storage.UserSto
rateLimiter := ratelimiter.NewRateLimiter(config.RateLimitUserComputeTime, config.RateLimitWindow, uint32(config.RateLimitMaxConcurrentRequests), logger)

services := Services{
HostAddrHTTP: hostAddrHTTP,
HostAddrWS: hostAddrWS,
Storage: storage,
logger: logger,
stopControl: stopControl,
version: version,
RPCResponsesCache: newGatewayCache,
BackendRPC: NewBackendRPC(hostAddrHTTP, hostAddrWS, logger),
SKManager: NewSKManager(storage, config, logger),
RateLimiter: rateLimiter,
Config: config,
HostAddrHTTP: hostAddrHTTP,
HostAddrWS: hostAddrWS,
Storage: storage,
logger: logger,
stopControl: stopControl,
version: version,
RPCResponsesCache: newGatewayCache,
BackendRPC: NewBackendRPC(hostAddrHTTP, hostAddrWS, logger),
SKManager: NewSKManager(storage, config, logger),
RateLimiter: rateLimiter,
Config: config,
cacheInvalidationCh: make(chan *tencommon.BatchHeader),
}

services.NewHeadsService = subscriptioncommon.NewNewHeadsService(
Expand All @@ -86,21 +88,50 @@ func NewServices(hostAddrHTTP string, hostAddrWS string, storage storage.UserSto
// clear the cache to avoid returning stale data during reconnecting.
services.RPCResponsesCache.EvictShortLiving()
ch := make(chan *tencommon.BatchHeader)
errCh, err := subscribeToNewHeadsWithRetry(ch, services, logger)
errCh, err := subscribeToNewHeadsWithRetry(ch, &services, logger)
logger.Info("Connected to new heads service.", log.ErrKey, err)
return ch, errCh, err
},
true,
logger,
func(newHead *tencommon.BatchHeader) error {
services.RPCResponsesCache.EvictShortLiving()
services.cacheInvalidationCh <- newHead
return nil
})

go _startCacheEviction(&services, logger)
return &services
}

func subscribeToNewHeadsWithRetry(ch chan *tencommon.BatchHeader, services Services, logger gethlog.Logger) (<-chan error, error) {
// this is a more robust cache eviction that handles delays in the new heads subscription by disabling caching of short living elements
// until a batch is received.
func _startCacheEviction(services *Services, logger gethlog.Logger) {
//- todo read the batch time from the config once we integrate the new config
// if we don't receive a new head after this interval we assume the connection is lost, and we disable caching
disableCacheDelay := 5 * time.Second
timer := time.NewTimer(disableCacheDelay)

for {
select {
case <-services.cacheInvalidationCh:
services.RPCResponsesCache.EvictShortLiving()

timer.Stop()
timer = time.NewTimer(disableCacheDelay)

case <-timer.C: // should only be fired when the normal subscription hasn't fired
logger.Warn("Disabling short living cache because NewHeads subscription is delayed")
services.RPCResponsesCache.DisableShortLiving()

case <-services.stopControl.Done():
logger.Info("Stopping cache eviction")
timer.Stop()
return
}
}
}

func subscribeToNewHeadsWithRetry(ch chan *tencommon.BatchHeader, services *Services, logger gethlog.Logger) (<-chan error, error) {
var sub *gethrpc.ClientSubscription
err := retry.Do(
func() error {
Expand Down Expand Up @@ -238,6 +269,9 @@ func (w *Services) GetTenNodeHealthStatus() (bool, error) {
res, err := obsclient.NewObsClient(client).Health()
return &res.OverallHealth, err
})
if res == nil {
return false, err
}
return *res, err
}

Expand Down Expand Up @@ -271,4 +305,5 @@ func (w *Services) GenerateUserMessageToSign(encryptionToken []byte, formatsSlic

func (w *Services) Stop() {
w.BackendRPC.Stop()
close(w.cacheInvalidationCh)
}

0 comments on commit 0b41c3a

Please sign in to comment.