Skip to content

Commit

Permalink
fix(api): remove test error by (#212)
Browse files Browse the repository at this point in the history
  • Loading branch information
r3inbowari authored May 15, 2024
1 parent 75c605f commit 52fca64
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 32 deletions.
4 changes: 2 additions & 2 deletions speedtest/data_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ func (td *TestDirection) rateCapture() chan bool {
ticker := time.NewTicker(td.manager.rateCaptureFrequency)
var prevTotalDataVolume int64 = 0
stopCapture := make(chan bool)
td.welford = internal.NewWelford(int(5 * time.Second / td.manager.rateCaptureFrequency))
td.welford = internal.NewWelford(5*time.Second, td.manager.rateCaptureFrequency)
sTime := time.Now()
go func(t *time.Ticker) {
defer t.Stop()
Expand All @@ -263,7 +263,7 @@ func (td *TestDirection) rateCapture() chan bool {
}
// anyway we update the measuring instrument
globalAvg := (float64(td.totalDataVolume)) / float64(time.Since(sTime).Milliseconds()) * 1000
if td.welford.Update(globalAvg) {
if td.welford.Update(globalAvg, float64(deltaDataVolume)) {
go td.closeFunc()
}
// reports the current rate at the given rate
Expand Down
59 changes: 36 additions & 23 deletions speedtest/internal/welford.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,36 +3,49 @@ package internal
import (
"fmt"
"math"
"time"
)

// Welford Fast standard deviation calculation with moving window
// ref Welford, B. P. (1962). Note on a Method for Calculating Corrected Sums of Squares and Products. Technometrics, 4(3), 419–420. https://doi.org/10.1080/00401706.1962.10490022
type Welford struct {
n int // data size
cap int // queue capacity
vector []float64 // data set
mean float64 // mean
sum float64 // sum
vector []float64 // data set
eraseIndex int // the value will be erased next time
cap int
currentStdDev float64
consecutiveStableIterations int
consecutiveStableIterationsThreshold int
cv float64
ewmaMean float64
steps int
minSteps int
beta float64
scale float64
movingVector []float64 // data set
movingAvg float64
}

// NewWelford recommended windowSize = moving time window / sampling frequency
func NewWelford(windowSize int) *Welford {
// NewWelford recommended windowSize = cycle / sampling frequency
func NewWelford(cycle, frequency time.Duration) *Welford {
windowSize := int(cycle / frequency)
return &Welford{
vector: make([]float64, windowSize),
movingVector: make([]float64, windowSize),
cap: windowSize,
consecutiveStableIterationsThreshold: 10,
consecutiveStableIterationsThreshold: windowSize / 3, // 33%
minSteps: windowSize * 2, // set minimum steps with 2x windowSize.
beta: 2 / (float64(windowSize) + 1), // ewma beta ratio
scale: float64(time.Second / frequency),
}
}

// Update Enter the given value into the measuring system.
// return bool stability evaluation
func (w *Welford) Update(value float64) bool {
func (w *Welford) Update(globalAvg, value float64) bool {
value = value * w.scale
if w.n == w.cap {
delta := w.vector[w.eraseIndex] - w.mean
w.mean -= delta / float64(w.n-1)
Expand All @@ -41,37 +54,37 @@ func (w *Welford) Update(value float64) bool {
if w.sum < 0 {
w.sum = 0
}
w.vector[w.eraseIndex] = value
w.vector[w.eraseIndex] = globalAvg
w.movingAvg -= w.movingVector[w.eraseIndex]
w.movingVector[w.eraseIndex] = value
w.movingAvg += value
w.eraseIndex++
if w.eraseIndex == w.cap {
w.eraseIndex = 0
}
} else {
w.vector[w.n] = value
w.vector[w.n] = globalAvg
w.movingVector[w.n] = value
w.movingAvg += value
w.n++
}
delta := value - w.mean
delta := globalAvg - w.mean
w.mean += delta / float64(w.n)
w.sum += delta * (value - w.mean)
w.sum += delta * (globalAvg - w.mean)
w.currentStdDev = math.Sqrt(w.Variance())
// update C.V
if w.mean == 0 {
w.cv = 1
} else {
if w.mean != 0 {
w.cv = w.currentStdDev / w.mean
if w.cv > 1 {
w.cv = 1
}
}
// ewma beta ratio
// TODO: w.cv needs normalization
beta := w.cv*0.381 + 0.618
w.ewmaMean = w.mean*beta + w.ewmaMean*(1-beta)
w.ewmaMean = value*w.beta + w.ewmaMean*(1-w.beta)
// acc consecutiveStableIterations
if w.cap/2 < w.n && w.cv < 0.03 {
if w.n == w.cap && w.cv < 0.03 {
w.consecutiveStableIterations++
} else if w.consecutiveStableIterations > 0 {
w.consecutiveStableIterations--
}
return w.consecutiveStableIterations >= w.consecutiveStableIterationsThreshold
w.steps++
return w.consecutiveStableIterations >= w.consecutiveStableIterationsThreshold && w.steps > w.minSteps
}

func (w *Welford) Mean() float64 {
Expand All @@ -94,7 +107,7 @@ func (w *Welford) StandardDeviation() float64 {
}

func (w *Welford) EWMA() float64 {
return w.ewmaMean
return w.ewmaMean*0.5 + w.movingAvg/float64(w.n)*0.5
}

func (w *Welford) String() string {
Expand Down
Loading

0 comments on commit 52fca64

Please sign in to comment.