From df21b962eb518d313806d41a0e93210f8811f3bc Mon Sep 17 00:00:00 2001 From: Andrew Gouin Date: Fri, 1 Dec 2023 11:44:38 -0700 Subject: [PATCH] Fine tune nonce consumption greater than get nonces interval --- signer/cosigner_nonce_cache.go | 95 ++++++++++++++++---- signer/cosigner_nonce_cache_test.go | 134 ++++++++++++++++++++++++++-- 2 files changed, 205 insertions(+), 24 deletions(-) diff --git a/signer/cosigner_nonce_cache.go b/signer/cosigner_nonce_cache.go index 81da856d..cde98aae 100644 --- a/signer/cosigner_nonce_cache.go +++ b/signer/cosigner_nonce_cache.go @@ -14,7 +14,6 @@ const ( defaultGetNoncesInterval = 3 * time.Second defaultGetNoncesTimeout = 4 * time.Second defaultNonceExpiration = 10 * time.Second // half of the local cosigner cache expiration - targetTrim = 10 ) type CosignerNonceCache struct { @@ -25,7 +24,6 @@ type CosignerNonceCache struct { lastReconcileNonces lastCount lastReconcileTime time.Time - noncesPerMinute float64 getNoncesInterval time.Duration getNoncesTimeout time.Duration @@ -36,6 +34,53 @@ type CosignerNonceCache struct { cache NonceCache pruner NonceCachePruner + + movingAverage *movingAverage + + empty chan struct{} +} + +type movingAverageItem struct { + timeSinceLastReconcile time.Duration + noncesPerMinute float64 +} + +type movingAverage struct { + items []movingAverageItem + period time.Duration +} + +func newMovingAverage(period time.Duration) *movingAverage { + return &movingAverage{period: period} +} + +func (m *movingAverage) add( + timeSinceLastReconcile time.Duration, + noncesPerMinute float64, +) { + duration := timeSinceLastReconcile + keep := len(m.items) - 1 + for i, e := range m.items { + duration += e.timeSinceLastReconcile + if duration >= m.period { + keep = i + break + } + } + m.items = append([]movingAverageItem{{timeSinceLastReconcile: timeSinceLastReconcile, noncesPerMinute: noncesPerMinute}}, m.items[:keep+1]...) +} + +func (m *movingAverage) average() float64 { + weightedSum := float64(0) + duration := float64(0) + + for _, e := range m.items { + d := float64(e.timeSinceLastReconcile) + weightedSum += e.noncesPerMinute * d + duration += d + } + + return weightedSum / duration } type lastCount struct { @@ -126,6 +171,8 @@ func NewCosignerNonceCache( nonceExpiration: nonceExpiration, threshold: threshold, pruner: pruner, + empty: make(chan struct{}, 1), + movingAverage: newMovingAverage(4 * getNoncesInterval), // weighted average over 4 intervals } // the only time pruner is expected to be non-nil is during tests, otherwise we use the cache logic. if pruner == nil { @@ -143,8 +190,12 @@ func (cnc *CosignerNonceCache) getUuids(n int) []uuid.UUID { return uuids } -func (cnc *CosignerNonceCache) target() int { - return int((cnc.noncesPerMinute/60)*cnc.getNoncesInterval.Seconds()*1.2) + int(cnc.noncesPerMinute/30) + targetTrim +func (cnc *CosignerNonceCache) target(noncesPerMinute float64) int { + t := int((noncesPerMinute / 60) * ((cnc.getNoncesInterval.Seconds() * 1.2) + 0.5)) + if t <= 0 { + return 1 // always target at least one nonce ready + } + return t } func (cnc *CosignerNonceCache) reconcile(ctx context.Context) { @@ -164,33 +215,31 @@ func (cnc *CosignerNonceCache) reconcile(ctx context.Context) { noncesPerMin = 0 } - if cnc.noncesPerMinute == 0 { - // initialize nonces per minute for weighted average - cnc.noncesPerMinute = noncesPerMin - } else { - // weighted average over last 4 intervals - cnc.noncesPerMinute = (cnc.noncesPerMinute*3 + noncesPerMin) / 4 - } - - defer func() { - cnc.lastReconcileNonces.Set(cnc.cache.Size()) - cnc.lastReconcileTime = time.Now() - }() + cnc.movingAverage.add(timeSinceLastReconcile, noncesPerMin) // calculate how many nonces we need to load to keep up with demand // load 120% the number of nonces we need to keep up with demand, // plus a couple seconds worth of nonces to account for nonce consumption during LoadN // plus 10 for padding - t := cnc.target() + avgNoncesPerMin := cnc.movingAverage.average() + t := cnc.target(avgNoncesPerMin) additional := t - remainingNonces + + defer func() { + cnc.lastReconcileNonces.Set(remainingNonces + additional) + cnc.lastReconcileTime = time.Now() + }() + if additional <= 0 { + additional = 0 // we're ahead of demand, don't load any more cnc.logger.Debug( "Cosigner nonce cache ahead of demand", "target", t, "remaining", remainingNonces, - "noncesPerMin", cnc.noncesPerMinute, + "nonces_per_min", noncesPerMin, + "avg_nonces_per_min", avgNoncesPerMin, ) return } @@ -200,7 +249,8 @@ func (cnc *CosignerNonceCache) reconcile(ctx context.Context) { "target", t, "remaining", remainingNonces, "additional", additional, - "noncesPerMin", cnc.noncesPerMinute, + "nonces_per_min", noncesPerMin, + "avg_nonces_per_min", avgNoncesPerMin, ) cnc.LoadN(ctx, additional) @@ -282,6 +332,8 @@ func (cnc *CosignerNonceCache) Start(ctx context.Context) { return case <-ticker.C: cnc.reconcile(ctx) + case <-cnc.empty: + cnc.reconcile(ctx) } } } @@ -310,6 +362,11 @@ CheckNoncesLoop: // remove this set of nonces from the cache cnc.cache.Delete(i) + if len(cnc.cache.cache) == 0 { + cnc.logger.Debug("Nonce cache is empty, triggering reload") + cnc.empty <- struct{}{} + } + // all peers found return &CosignerUUIDNonces{ UUID: cn.UUID, diff --git a/signer/cosigner_nonce_cache_test.go b/signer/cosigner_nonce_cache_test.go index e06f2445..8188ce2d 100644 --- a/signer/cosigner_nonce_cache_test.go +++ b/signer/cosigner_nonce_cache_test.go @@ -22,6 +22,39 @@ func TestNonceCache(_ *testing.T) { nc.Delete(0) } +func TestMovingAverage(t *testing.T) { + ma := newMovingAverage(12 * time.Second) + + ma.add(3*time.Second, 500) + require.Len(t, ma.items, 1) + require.Equal(t, float64(500), ma.average()) + + ma.add(3*time.Second, 100) + require.Len(t, ma.items, 2) + require.Equal(t, float64(300), ma.average()) + + ma.add(6*time.Second, 600) + require.Len(t, ma.items, 3) + require.Equal(t, float64(450), ma.average()) + + // should kick out the first one + ma.add(3*time.Second, 500) + require.Len(t, ma.items, 3) + require.Equal(t, float64(450), ma.average()) + + // should kick out the second one + ma.add(6*time.Second, 500) + require.Len(t, ma.items, 3) + require.Equal(t, float64(540), ma.average()) + + for i := 0; i < 5; i++ { + ma.add(2500*time.Millisecond, 1000) + } + + require.Len(t, ma.items, 5) + require.Equal(t, float64(1000), ma.average()) +} + func TestClearNonces(t *testing.T) { lcs, _ := getTestLocalCosigners(t, 2, 3) cosigners := make([]Cosigner, len(lcs)) @@ -131,10 +164,12 @@ func TestNonceCacheDemand(t *testing.T) { cancel() - require.LessOrEqual(t, size, nonceCache.target()) + require.LessOrEqual(t, size, nonceCache.target(nonceCache.movingAverage.average())) + + count, pruned := mp.Result() - require.Greater(t, mp.count, 0) - require.Equal(t, 0, mp.pruned) + require.Greater(t, count, 0) + require.Equal(t, 0, pruned) } func TestNonceCacheExpiration(t *testing.T) { @@ -180,6 +215,95 @@ func TestNonceCacheExpiration(t *testing.T) { cancel() - // the cache should have at most the trim padding since no nonces are being consumed. - require.LessOrEqual(t, nonceCache.cache.Size(), targetTrim) + // the cache should be empty since no nonces are being consumed. + require.Equal(t, nonceCache.cache.Size(), 0) +} + +func TestNonceCacheDemandSlow(t *testing.T) { + lcs, _ := getTestLocalCosigners(t, 2, 3) + cosigners := make([]Cosigner, len(lcs)) + for i, lc := range lcs { + cosigners[i] = lc + } + + mp := &mockPruner{} + + nonceCache := NewCosignerNonceCache( + cometlog.NewTMLogger(cometlog.NewSyncWriter(os.Stdout)), + cosigners, + &MockLeader{id: 1, leader: &ThresholdValidator{myCosigner: lcs[0]}}, + 90*time.Millisecond, + 100*time.Millisecond, + 500*time.Millisecond, + 2, + mp, + ) + + mp.cnc = nonceCache + + ctx, cancel := context.WithCancel(context.Background()) + + go nonceCache.Start(ctx) + + for i := 0; i < 10; i++ { + time.Sleep(200 * time.Millisecond) + require.Greater(t, nonceCache.cache.Size(), 0) + _, err := nonceCache.GetNonces([]Cosigner{cosigners[0], cosigners[1]}) + require.NoError(t, err) + } + + cancel() + + require.LessOrEqual(t, nonceCache.cache.Size(), nonceCache.target(300)) + + count, pruned := mp.Result() + + require.Greater(t, count, 0) + require.Greater(t, pruned, 0) +} + +func TestNonceCacheDemandSlowDefault(t *testing.T) { + if testing.Short() { + t.Skip() + } + lcs, _ := getTestLocalCosigners(t, 2, 3) + cosigners := make([]Cosigner, len(lcs)) + for i, lc := range lcs { + cosigners[i] = lc + } + + mp := &mockPruner{} + + nonceCache := NewCosignerNonceCache( + cometlog.NewTMLogger(cometlog.NewSyncWriter(os.Stdout)), + cosigners, + &MockLeader{id: 1, leader: &ThresholdValidator{myCosigner: lcs[0]}}, + defaultGetNoncesInterval, + defaultGetNoncesTimeout, + defaultNonceExpiration, + 2, + mp, + ) + + mp.cnc = nonceCache + + ctx, cancel := context.WithCancel(context.Background()) + + go nonceCache.Start(ctx) + + for i := 0; i < 10; i++ { + time.Sleep(7 * time.Second) + require.Greater(t, nonceCache.cache.Size(), 0) + _, err := nonceCache.GetNonces([]Cosigner{cosigners[0], cosigners[1]}) + require.NoError(t, err) + } + + cancel() + + require.LessOrEqual(t, nonceCache.cache.Size(), nonceCache.target(nonceCache.movingAverage.average())) + + count, pruned := mp.Result() + + require.Greater(t, count, 0) + require.Equal(t, 0, pruned) }