Skip to content

Commit

Permalink
Merge pull request #10 from redis-performance/processed.entries
Browse files Browse the repository at this point in the history
Extended the tracked latencies to include p95 and p999. Only accounting XREADGROUP if we've received entries on the reply.
  • Loading branch information
filipecosta90 authored Sep 21, 2023
2 parents 222ef0c + 684919e commit 1e3484e
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 13 deletions.
17 changes: 11 additions & 6 deletions cmd/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,15 +112,20 @@ func updateCLI(tick *time.Ticker, c chan os.Signal, message_limit uint64, loop b
select {
case dp = <-datapointsChan:
{
latencies.RecordValue(dp.durationMs)
latenciesTick.RecordValue(dp.durationMs)
// in case of blocking commands that did not processed entries we don't really account it
if dp.processedEntries > 0 {
latencies.RecordValue(dp.durationMs)
latenciesTick.RecordValue(dp.durationMs)
currentCount++
for _, cmdType := range dp.commandsIssued {
cmdRateTick[cmdType]++
}
}
// in all cases we check for errors
if !dp.success {
currentErr++
}
currentCount++
for _, cmdType := range dp.commandsIssued {
cmdRateTick[cmdType]++
}

}
case <-tick.C:
{
Expand Down
5 changes: 4 additions & 1 deletion cmd/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ func benchmarkConsumerRoutine(client rueidis.Client, c chan os.Signal, datapoint
default:
cmdsIssued := make([]int, 0, 1)
cmdsIssued = append(cmdsIssued, XREADGROUP)
processedEntries := 0
startT := time.Now()
xreadEntries, err := client.Do(ctx, client.B().Xreadgroup().Group(groupname, consumername).Count(readCount).Block(readBlockMs).Streams().Key(keyname).Id(">").Build()).AsXRead()
if err != nil {
Expand All @@ -186,6 +187,7 @@ func benchmarkConsumerRoutine(client rueidis.Client, c chan os.Signal, datapoint
cmdsIssued = append(cmdsIssued, XREADGROUP)
err = client.Do(ctx, client.B().XgroupCreate().Key(keyname).Group(groupname).Id("0").Mkstream().Build()).Error()
err = client.Do(ctx, client.B().XgroupCreateconsumer().Key(keyname).Group(groupname).Consumer(consumername).Build()).Error()
startT = time.Now()
xreadEntries, err = client.Do(ctx, client.B().Xreadgroup().Group(groupname, consumername).Count(readCount).Block(readBlockMs).Streams().Key(keyname).Id(">").Build()).AsXRead()
}
if err == nil {
Expand All @@ -194,12 +196,13 @@ func benchmarkConsumerRoutine(client rueidis.Client, c chan os.Signal, datapoint
for _, xrangeEntry := range xrangeEntries {
cmdsIssued = append(cmdsIssued, XACK)
err = client.Do(ctx, client.B().Xack().Key(keyname).Group(groupname).Id(xrangeEntry.ID).Build()).Error()
processedEntries++
}
}
}
endT := time.Now()
duration := endT.Sub(startT)
datapointsChan <- datapoint{!(err != nil), duration.Microseconds(), cmdsIssued}
datapointsChan <- datapoint{!(err != nil), duration.Microseconds(), cmdsIssued, processedEntries}
}
}
}
Expand Down
9 changes: 5 additions & 4 deletions cmd/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,10 @@ const Inf = rate.Limit(math.MaxFloat64)
const charset = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"

type datapoint struct {
success bool
durationMs int64
commandsIssued []int
success bool
durationMs int64
commandsIssued []int
processedEntries int
}

func stringWithCharset(length int, charset string) string {
Expand Down Expand Up @@ -216,7 +217,7 @@ func benchmarkRoutine(client rueidis.Client, streamPrefix, value string, datapoi
endT := time.Now()
duration := endT.Sub(startT)
streamMessages[streamId] = counter
datapointsChan <- datapoint{!(err != nil), duration.Microseconds(), cmdsIssued}
datapointsChan <- datapoint{!(err != nil), duration.Microseconds(), cmdsIssued, 1}
}
}

Expand Down
8 changes: 6 additions & 2 deletions cmd/test_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,18 +83,22 @@ func calculateRateMetrics(current, prev int64, took time.Duration) (rate float64

func generateLatenciesMap(hist *hdrhistogram.Histogram, tick time.Duration) (int64, map[string]float64) {
ops := hist.TotalCount()
percentilesTrack := []float64{0.0, 50.0, 99.0, 100.0}
percentilesTrack := []float64{0.0, 50.0, 95.0, 99.0, 99.9, 100.0}
q0 := 0.0
q50 := 0.0
q95 := 0.0
q99 := 0.0
q999 := 0.0
q100 := 0.0
if ops > 0 {
percentilesMap := hist.ValueAtPercentiles(percentilesTrack)
q0 = float64(percentilesMap[0.0]) / 10e2
q50 = float64(percentilesMap[50.0]) / 10e2
q95 = float64(percentilesMap[95.0]) / 10e2
q99 = float64(percentilesMap[99.0]) / 10e2
q999 = float64(percentilesMap[99.9]) / 10e2
q100 = float64(percentilesMap[100.0]) / 10e2
}
mp := map[string]float64{"q0": q0, "q50": q50, "q99": q99, "q100": q100, "ops/sec": float64(ops) / tick.Seconds()}
mp := map[string]float64{"q0": q0, "q50": q50, "q95": q95, "q99": q99, "q999": q999, "q100": q100, "ops/sec": float64(ops) / tick.Seconds()}
return ops, mp
}

0 comments on commit 1e3484e

Please sign in to comment.