From 684919ebcdf91757b1733b82988c8b795ded5272 Mon Sep 17 00:00:00 2001 From: filipecosta90 Date: Thu, 21 Sep 2023 16:40:40 +0100 Subject: [PATCH] Extended the tracked latencies to include p95 and p999. Only accounting XREADGROUP if we've received entries on the reply. --- cmd/common.go | 17 +++++++++++------ cmd/consumer.go | 5 ++++- cmd/producer.go | 9 +++++---- cmd/test_result.go | 8 ++++++-- 4 files changed, 26 insertions(+), 13 deletions(-) diff --git a/cmd/common.go b/cmd/common.go index ec38d43..4da6a0f 100644 --- a/cmd/common.go +++ b/cmd/common.go @@ -112,15 +112,20 @@ func updateCLI(tick *time.Ticker, c chan os.Signal, message_limit uint64, loop b select { case dp = <-datapointsChan: { - latencies.RecordValue(dp.durationMs) - latenciesTick.RecordValue(dp.durationMs) + // in case of blocking commands that did not processed entries we don't really account it + if dp.processedEntries > 0 { + latencies.RecordValue(dp.durationMs) + latenciesTick.RecordValue(dp.durationMs) + currentCount++ + for _, cmdType := range dp.commandsIssued { + cmdRateTick[cmdType]++ + } + } + // in all cases we check for errors if !dp.success { currentErr++ } - currentCount++ - for _, cmdType := range dp.commandsIssued { - cmdRateTick[cmdType]++ - } + } case <-tick.C: { diff --git a/cmd/consumer.go b/cmd/consumer.go index 61a3e95..d900750 100644 --- a/cmd/consumer.go +++ b/cmd/consumer.go @@ -178,6 +178,7 @@ func benchmarkConsumerRoutine(client rueidis.Client, c chan os.Signal, datapoint default: cmdsIssued := make([]int, 0, 1) cmdsIssued = append(cmdsIssued, XREADGROUP) + processedEntries := 0 startT := time.Now() xreadEntries, err := client.Do(ctx, client.B().Xreadgroup().Group(groupname, consumername).Count(readCount).Block(readBlockMs).Streams().Key(keyname).Id(">").Build()).AsXRead() if err != nil { @@ -186,6 +187,7 @@ func benchmarkConsumerRoutine(client rueidis.Client, c chan os.Signal, datapoint cmdsIssued = append(cmdsIssued, XREADGROUP) err = client.Do(ctx, client.B().XgroupCreate().Key(keyname).Group(groupname).Id("0").Mkstream().Build()).Error() err = client.Do(ctx, client.B().XgroupCreateconsumer().Key(keyname).Group(groupname).Consumer(consumername).Build()).Error() + startT = time.Now() xreadEntries, err = client.Do(ctx, client.B().Xreadgroup().Group(groupname, consumername).Count(readCount).Block(readBlockMs).Streams().Key(keyname).Id(">").Build()).AsXRead() } if err == nil { @@ -194,12 +196,13 @@ func benchmarkConsumerRoutine(client rueidis.Client, c chan os.Signal, datapoint for _, xrangeEntry := range xrangeEntries { cmdsIssued = append(cmdsIssued, XACK) err = client.Do(ctx, client.B().Xack().Key(keyname).Group(groupname).Id(xrangeEntry.ID).Build()).Error() + processedEntries++ } } } endT := time.Now() duration := endT.Sub(startT) - datapointsChan <- datapoint{!(err != nil), duration.Microseconds(), cmdsIssued} + datapointsChan <- datapoint{!(err != nil), duration.Microseconds(), cmdsIssued, processedEntries} } } } diff --git a/cmd/producer.go b/cmd/producer.go index 850880a..fdde0ad 100644 --- a/cmd/producer.go +++ b/cmd/producer.go @@ -29,9 +29,10 @@ const Inf = rate.Limit(math.MaxFloat64) const charset = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789" type datapoint struct { - success bool - durationMs int64 - commandsIssued []int + success bool + durationMs int64 + commandsIssued []int + processedEntries int } func stringWithCharset(length int, charset string) string { @@ -216,7 +217,7 @@ func benchmarkRoutine(client rueidis.Client, streamPrefix, value string, datapoi endT := time.Now() duration := endT.Sub(startT) streamMessages[streamId] = counter - datapointsChan <- datapoint{!(err != nil), duration.Microseconds(), cmdsIssued} + datapointsChan <- datapoint{!(err != nil), duration.Microseconds(), cmdsIssued, 1} } } diff --git a/cmd/test_result.go b/cmd/test_result.go index f6b732e..12cbf14 100644 --- a/cmd/test_result.go +++ b/cmd/test_result.go @@ -83,18 +83,22 @@ func calculateRateMetrics(current, prev int64, took time.Duration) (rate float64 func generateLatenciesMap(hist *hdrhistogram.Histogram, tick time.Duration) (int64, map[string]float64) { ops := hist.TotalCount() - percentilesTrack := []float64{0.0, 50.0, 99.0, 100.0} + percentilesTrack := []float64{0.0, 50.0, 95.0, 99.0, 99.9, 100.0} q0 := 0.0 q50 := 0.0 + q95 := 0.0 q99 := 0.0 + q999 := 0.0 q100 := 0.0 if ops > 0 { percentilesMap := hist.ValueAtPercentiles(percentilesTrack) q0 = float64(percentilesMap[0.0]) / 10e2 q50 = float64(percentilesMap[50.0]) / 10e2 + q95 = float64(percentilesMap[95.0]) / 10e2 q99 = float64(percentilesMap[99.0]) / 10e2 + q999 = float64(percentilesMap[99.9]) / 10e2 q100 = float64(percentilesMap[100.0]) / 10e2 } - mp := map[string]float64{"q0": q0, "q50": q50, "q99": q99, "q100": q100, "ops/sec": float64(ops) / tick.Seconds()} + mp := map[string]float64{"q0": q0, "q50": q50, "q95": q95, "q99": q99, "q999": q999, "q100": q100, "ops/sec": float64(ops) / tick.Seconds()} return ops, mp }