From d6854479365f0307560fa28e18e2bd0634b05229 Mon Sep 17 00:00:00 2001
From: dmitri-lerko <dmitri-lerko@users.noreply.github.com>
Date: Wed, 6 Jul 2022 12:30:13 +0100
Subject: [PATCH] feat: add support for latency command parsing (#614)

Add support for LATENCY LATEST, LATEST HISTORY command parsing.
---
 redis/redis.go      | 24 ++++++++++++
 redis/reply.go      | 88 ++++++++++++++++++++++++++++++++++++++++++++
 redis/reply_test.go | 89 +++++++++++++++++++++++++++++++++++++++++++++
 3 files changed, 201 insertions(+)

diff --git a/redis/redis.go b/redis/redis.go
index faf574e0..e3a3968c 100644
--- a/redis/redis.go
+++ b/redis/redis.go
@@ -187,3 +187,27 @@ type SlowLog struct {
 	// ClientName is the name set via the CLIENT SETNAME command (4.0 only).
 	ClientName string
 }
+
+// Latency represents a redis LATENCY LATEST.
+type Latency struct {
+	// Name of the latest latency spike event.
+	Name string
+
+	// Time of the latest latency spike for the event.
+	Time time.Time
+
+	// Latest is the latest recorded latency for the named event.
+	Latest time.Duration
+
+	// Max is the maximum latency for the named event.
+	Max time.Duration
+}
+
+// LatencyHistory represents a redis LATENCY HISTORY.
+type LatencyHistory struct {
+	// Time is the unix timestamp at which the event was processed.
+	Time time.Time
+
+	// ExecutationTime is the amount of time needed for the command execution.
+	ExecutionTime time.Duration
+}
diff --git a/redis/reply.go b/redis/reply.go
index 7a2b3fef..aabf5989 100644
--- a/redis/reply.go
+++ b/redis/reply.go
@@ -645,3 +645,91 @@ func SlowLogs(result interface{}, err error) ([]SlowLog, error) {
 	}
 	return logs, nil
 }
+
+// Latencies is a helper that parses the LATENCY LATEST command output and
+// return the slice of Latency values.
+func Latencies(result interface{}, err error) ([]Latency, error) {
+	rawLatencies, err := Values(result, err)
+	if err != nil {
+		return nil, err
+	}
+
+	latencies := make([]Latency, len(rawLatencies))
+	for i, e := range rawLatencies {
+		rawLatency, ok := e.([]interface{})
+		if !ok {
+			return nil, fmt.Errorf("redigo: latencies element is not slice, got %T", e)
+		}
+
+		var event Latency
+		if len(rawLatency) != 4 {
+			return nil, fmt.Errorf("redigo: latencies element has %d elements, expected 4", len(rawLatency))
+		}
+
+		event.Name, err = String(rawLatency[0], nil)
+		if err != nil {
+			return nil, fmt.Errorf("redigo: latencies element[0] is not a string: %w", err)
+		}
+
+		timestamp, ok := rawLatency[1].(int64)
+		if !ok {
+			return nil, fmt.Errorf("redigo: latencies element[1] not an int64, got %T", rawLatency[1])
+		}
+
+		event.Time = time.Unix(timestamp, 0)
+
+		latestDuration, ok := rawLatency[2].(int64)
+		if !ok {
+			return nil, fmt.Errorf("redigo: latencies element[2] not an int64, got %T", rawLatency[2])
+		}
+
+		event.Latest = time.Duration(latestDuration) * time.Millisecond
+
+		maxDuration, ok := rawLatency[3].(int64)
+		if !ok {
+			return nil, fmt.Errorf("redigo: latencies element[3] not an int64, got %T", rawLatency[3])
+		}
+
+		event.Max = time.Duration(maxDuration) * time.Millisecond
+
+		latencies[i] = event
+	}
+
+	return latencies, nil
+}
+
+// LatencyHistories is a helper that parse the LATENCY HISTORY command output and
+// returns a LatencyHistory slice.
+func LatencyHistories(result interface{}, err error) ([]LatencyHistory, error) {
+	rawLogs, err := Values(result, err)
+	if err != nil {
+		return nil, err
+	}
+
+	latencyHistories := make([]LatencyHistory, len(rawLogs))
+	for i, e := range rawLogs {
+		rawLog, ok := e.([]interface{})
+		if !ok {
+			return nil, fmt.Errorf("redigo: latency history element is not an slice, got %T", e)
+		}
+
+		var event LatencyHistory
+		timestamp, ok := rawLog[0].(int64)
+		if !ok {
+			return nil, fmt.Errorf("redigo: latency history element[0] not an int64, got %T", rawLog[0])
+		}
+
+		event.Time = time.Unix(timestamp, 0)
+
+		duration, ok := rawLog[1].(int64)
+		if !ok {
+			return nil, fmt.Errorf("redigo: latency history element[1] not an int64, got %T", rawLog[1])
+		}
+
+		event.ExecutionTime = time.Duration(duration) * time.Millisecond
+
+		latencyHistories[i] = event
+	}
+
+	return latencyHistories, nil
+}
diff --git a/redis/reply_test.go b/redis/reply_test.go
index 8f43828c..7362a36c 100644
--- a/redis/reply_test.go
+++ b/redis/reply_test.go
@@ -16,6 +16,7 @@ package redis_test
 
 import (
 	"fmt"
+	"github.com/stretchr/testify/require"
 	"math"
 	"reflect"
 	"strconv"
@@ -225,6 +226,94 @@ func TestSlowLog(t *testing.T) {
 	}
 }
 
+func TestLatency(t *testing.T) {
+	c, err := dial()
+	require.NoError(t, err)
+	defer c.Close()
+
+	resultStr, err := redis.Strings(c.Do("CONFIG", "GET", "latency-monitor-threshold"))
+	require.NoError(t, err)
+	// LATENCY commands were added in 2.8.13 so might not be supported.
+	if len(resultStr) == 0 {
+		t.Skip("Latency commands not supported")
+	}
+	latencyMonitorThresholdOldCfg, err := strconv.Atoi(resultStr[1])
+	require.NoError(t, err)
+	// Enable latency monitoring for events that take 1ms or longer.
+	result, err := c.Do("CONFIG", "SET", "latency-monitor-threshold", "1")
+	// reset the old configuration after test.
+	defer func() {
+		res, err := c.Do("CONFIG", "SET", "latency-monitor-threshold", latencyMonitorThresholdOldCfg)
+		require.NoError(t, err)
+		require.Equal(t, "OK", res)
+	}()
+
+	require.NoError(t, err)
+	require.Equal(t, "OK", result)
+
+	// Sleep for 1ms to register a slow event.
+	_, err = c.Do("DEBUG", "SLEEP", 0.001)
+	require.NoError(t, err)
+
+	result, err = c.Do("LATENCY", "LATEST")
+	require.NoError(t, err)
+
+	latestLatencies, err := redis.Latencies(result, err)
+	require.NoError(t, err)
+
+	require.Equal(t, 1, len(latestLatencies))
+
+	latencyEvent := latestLatencies[0]
+	expected := redis.Latency{
+		Name:   "command",
+		Latest: time.Millisecond,
+		Max:    time.Millisecond,
+		Time:   latencyEvent.Time,
+	}
+	require.Equal(t, latencyEvent, expected)
+}
+
+func TestLatencyHistories(t *testing.T) {
+	c, err := dial()
+	require.NoError(t, err)
+	defer c.Close()
+
+	res, err := redis.Strings(c.Do("CONFIG", "GET", "latency-monitor-threshold"))
+	require.NoError(t, err)
+
+	// LATENCY commands were added in 2.8.13 so might not be supported.
+	if len(res) == 0 {
+		t.Skip("Latency commands not supported")
+	}
+	latencyMonitorThresholdOldCfg, err := strconv.Atoi(res[1])
+	require.NoError(t, err)
+
+	// Enable latency monitoring for events that take 1ms or longer
+	result, err := c.Do("CONFIG", "SET", "latency-monitor-threshold", "1")
+	// reset the old configuration after test.
+	defer func() {
+		res, err := c.Do("CONFIG", "SET", "latency-monitor-threshold", latencyMonitorThresholdOldCfg)
+		require.NoError(t, err)
+		require.Equal(t, "OK", res)
+	}()
+	require.NoError(t, err)
+	require.Equal(t, "OK", result)
+
+	// Sleep for 1ms to register a slow event
+	_, err = c.Do("DEBUG", "SLEEP", 0.001)
+	require.NoError(t, err)
+
+	result, err = c.Do("LATENCY", "HISTORY", "command")
+	require.NoError(t, err)
+
+	latencyHistory, err := redis.LatencyHistories(result, err)
+	require.NoError(t, err)
+
+	require.Len(t, latencyHistory, 1)
+	latencyEvent := latencyHistory[0]
+	require.Equal(t, time.Millisecond, latencyEvent.ExecutionTime)
+}
+
 // dial wraps DialDefaultServer() with a more suitable function name for examples.
 func dial() (redis.Conn, error) {
 	return redis.DialDefaultServer()