Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Node write and read latency metrics #121

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 31 additions & 18 deletions integration-tests/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,38 +375,34 @@ func checkMetrics(
require.Contains(t, lines, fmt.Sprintf("%v 0", getPrometheusNameWithNodeLabel(prefix, metrics.OriginUnpreparedErrors, originHost)))
require.Contains(t, lines, fmt.Sprintf("%v 0", getPrometheusNameWithNodeLabel(prefix, metrics.TargetUnpreparedErrors, targetHost)))
if (successTarget + successBoth) == 0 {
require.Contains(t, lines, fmt.Sprintf("%v{node=\"%v\"} 0", getPrometheusNameWithSuffix(prefix, metrics.TargetRequestDuration, "sum"), targetHost))
require.Contains(t, lines, fmt.Sprintf("%v 0", getPrometheusNameWithSuffixAndNodeLabel(prefix, metrics.TargetRequestDuration, "sum", targetHost, "type", "writes")))
require.Contains(t, lines, fmt.Sprintf("%v 0", getPrometheusNameWithSuffixAndNodeLabel(prefix, metrics.TargetRequestDuration, "sum", targetHost, "type", "reads")))
} else {
if successBoth != 0 || !handshakeOnlyTarget {
value, err := findMetricValue(lines, fmt.Sprintf("%v{node=\"%v\"} ", getPrometheusNameWithSuffix(prefix, metrics.TargetRequestDuration, "sum"), targetHost))
require.Nil(t, err)
value := sumAllRequestsFromNode(lines, prefix, metrics.TargetRequestDuration, "sum", targetHost)
require.Greater(t, value, 0.0)
}
}

if (successOrigin + successBoth) == 0 {
require.Contains(t, lines, fmt.Sprintf("%v{node=\"%v\"} 0", getPrometheusNameWithSuffix(prefix, metrics.OriginRequestDuration, "sum"), originHost))
require.Contains(t, lines, fmt.Sprintf("%v 0", getPrometheusNameWithSuffixAndNodeLabel(prefix, metrics.OriginRequestDuration, "sum", originHost, "type", "reads")))
require.Contains(t, lines, fmt.Sprintf("%v 0", getPrometheusNameWithSuffixAndNodeLabel(prefix, metrics.OriginRequestDuration, "sum", originHost, "type", "writes")))
} else {
if successBoth != 0 || !handshakeOnlyOrigin {
value, err := findMetricValue(lines, fmt.Sprintf("%v{node=\"%v\"} ", getPrometheusNameWithSuffix(prefix, metrics.OriginRequestDuration, "sum"), originHost))
require.Nil(t, err)
value := sumAllRequestsFromNode(lines, prefix, metrics.OriginRequestDuration, "sum", originHost)
require.Greater(t, value, 0.0)
}
}

if successAsync == 0 || !asyncEnabled {
if asyncEnabled {
requireEventuallyContainsLine(t, lines, fmt.Sprintf("%v{node=\"%v\"} 0", getPrometheusNameWithSuffix(prefix, metrics.AsyncRequestDuration, "sum"), asyncHost))
requireEventuallyContainsLine(t, lines, fmt.Sprintf("%v{node=\"%v\",type=\"writes\"} 0", getPrometheusNameWithSuffix(prefix, metrics.AsyncRequestDuration, "sum"), asyncHost))
} else {
require.NotContains(t, lines, fmt.Sprintf("%v", getPrometheusNameWithSuffix(prefix, metrics.AsyncRequestDuration, "sum")))
}
} else {
utils.RequireWithRetries(t, func() (err error, fatal bool) {
prefix := fmt.Sprintf("%v{node=\"%v\"} ", getPrometheusNameWithSuffix(prefix, metrics.AsyncRequestDuration, "sum"), asyncHost)
value, err := findMetricValue(lines, prefix)
if err != nil {
return err, false
}
value := sumAllRequestsFromNode(lines, prefix, metrics.AsyncRequestDuration, "sum", asyncHost)
if value <= 0.0 {
return fmt.Errorf("%v expected greater than 0.0 but was %v", prefix, value), false
}
Expand All @@ -415,10 +411,10 @@ func checkMetrics(
}, 25, 200*time.Millisecond)
}

require.Contains(t, lines, fmt.Sprintf("%v{node=\"%v\"} %v", getPrometheusNameWithSuffix(prefix, metrics.TargetRequestDuration, "count"), targetHost, successTarget+successBoth))
require.Contains(t, lines, fmt.Sprintf("%v{node=\"%v\"} %v", getPrometheusNameWithSuffix(prefix, metrics.OriginRequestDuration, "count"), originHost, successOrigin+successBoth))
require.Equal(t, float64(successTarget+successBoth), sumAllRequestsFromNode(lines, prefix, metrics.TargetRequestDuration, "count", targetHost))
require.Equal(t, float64(successOrigin+successBoth), sumAllRequestsFromNode(lines, prefix, metrics.OriginRequestDuration, "count", originHost))
if asyncEnabled {
requireEventuallyContainsLine(t, lines, fmt.Sprintf("%v{node=\"%v\"} %v", getPrometheusNameWithSuffix(prefix, metrics.AsyncRequestDuration, "count"), asyncHost, successAsync))
requireEventuallyContainsLine(t, lines, fmt.Sprintf("%v{node=\"%v\",type=\"reads\"} %v", getPrometheusNameWithSuffix(prefix, metrics.AsyncRequestDuration, "count"), asyncHost, successAsync))
} else {
require.NotContains(t, lines, fmt.Sprintf("%v", getPrometheusNameWithSuffix(prefix, metrics.OriginRequestDuration, "count")))
}
Expand All @@ -432,12 +428,16 @@ func checkMetrics(
}

func findMetricValue(lines []string, prefix string) (float64, error) {
return findMetricValueWithDefault(lines, prefix, -1)
}

func findMetricValueWithDefault(lines []string, prefix string, defaultValue float64) (float64, error) {
for _, line := range lines {
if strings.HasPrefix(line, prefix) {
return strconv.ParseFloat(strings.TrimPrefix(line, prefix), 64)
return strconv.ParseFloat(strings.Trim(strings.TrimPrefix(line, prefix), " "), 64)
}
}
return -1, fmt.Errorf("no line with prefix: %v", prefix)
return defaultValue, fmt.Errorf("no line with prefix: %v", prefix)
}

func handleReads(request *frame.Frame, _ *client.CqlServerConnection, _ client.RequestHandlerContext) (response *frame.Frame) {
Expand Down Expand Up @@ -477,7 +477,7 @@ func getPrometheusNameWithNodeLabel(prefix string, mn metrics.Metric, node strin
return getPrometheusNameWithSuffixAndNodeLabel(prefix, mn, "", node)
}

func getPrometheusNameWithSuffixAndNodeLabel(prefix string, mn metrics.Metric, suffix string, node string) string {
func getPrometheusNameWithSuffixAndNodeLabel(prefix string, mn metrics.Metric, suffix string, node string, customLabels ...string) string {
if suffix != "" {
suffix = "_" + suffix
}
Expand All @@ -487,6 +487,9 @@ func getPrometheusNameWithSuffixAndNodeLabel(prefix string, mn metrics.Metric, s
for key, value := range labels {
newLabels[key] = value
}
for i := 0; i < len(customLabels); i = i + 2 {
newLabels[customLabels[i]] = customLabels[i+1]
}
if node != "" {
newLabels["node"] = node
}
Expand Down Expand Up @@ -519,3 +522,13 @@ func getPrometheusNameWithSuffixAndNodeLabel(prefix string, mn metrics.Metric, s
func getPrometheusNameWithSuffix(prefix string, mn metrics.Metric, suffix string) string {
return getPrometheusNameWithSuffixAndNodeLabel(prefix, mn, suffix, "")
}

func sumAllRequestsFromNode(lines []string, prefix string, metric metrics.Metric, suffix string, host string) float64 {
writesMetric := getPrometheusNameWithSuffixAndNodeLabel(prefix, metric, suffix, host, "type", "writes")
valueWrites, _ := findMetricValueWithDefault(lines, writesMetric, 0)
readsMetric := getPrometheusNameWithSuffixAndNodeLabel(prefix, metric, suffix, host, "type", "reads")
valueReads, _ := findMetricValueWithDefault(lines, readsMetric, 0)
otherMetric := getPrometheusNameWithSuffixAndNodeLabel(prefix, metric, suffix, host, "type", "other")
valueOther, _ := findMetricValueWithDefault(lines, otherMetric, 0)
return valueWrites + valueReads + valueOther
}
22 changes: 19 additions & 3 deletions proxy/pkg/metrics/node_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ type NodeMetricsInstance struct {
UnavailableErrors Counter
OtherErrors Counter

RequestDuration Histogram
RequestDuration map[string]Histogram

OpenConnections Gauge

Expand All @@ -298,15 +298,31 @@ func CreateCounterNodeMetric(metricFactory MetricFactory, nodeDescription string
return m, nil
}

func CreateHistogramNodeMetric(metricFactory MetricFactory, nodeDescription string, mn Metric, buckets []float64) (Histogram, error) {
func CreateHistogramNodeMetric(metricFactory MetricFactory, nodeDescription string, mn Metric, buckets []float64, labels ...string) (Histogram, error) {
customLabels := make(map[string]string)
for i := 0; i < len(labels); i = i + 2 {
customLabels[labels[i]] = labels[i+1]
}
m, err := metricFactory.GetOrCreateHistogram(
mn.WithLabels(map[string]string{nodeLabel: nodeDescription}), buckets)
mn.WithLabels(map[string]string{nodeLabel: nodeDescription}).WithLabels(customLabels), buckets)
if err != nil {
return nil, err
}
return m, nil
}

func CreateHistogramNodeRequestDurationMetrics(metricFactory MetricFactory, nodeDescription string, mn Metric, buckets []float64) (map[string]Histogram, error) {
requestDuration := make(map[string]Histogram)
for _, stmtCtg := range StatementCategories {
item, err := CreateHistogramNodeMetric(metricFactory, nodeDescription, mn, buckets, RequestDurationTypeLabel, stmtCtg)
if err != nil {
return nil, err
}
requestDuration[stmtCtg] = item
}
return requestDuration, nil
}

func CreateGaugeNodeMetric(metricFactory MetricFactory, nodeDescription string, mn Metric) (Gauge, error) {
m, err := metricFactory.GetOrCreateGauge(
mn.WithLabels(map[string]string{nodeLabel: nodeDescription}))
Expand Down
18 changes: 12 additions & 6 deletions proxy/pkg/metrics/proxy_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ package metrics
const (
typeReadsOrigin = "reads_origin"
typeReadsTarget = "reads_target"
typeWrites = "writes"
TypeWrites = "writes"
TypeReads = "reads"
TypeOther = "other"

failedRequestsClusterOrigin = "origin"
failedRequestsClusterTarget = "target"
Expand All @@ -18,14 +20,18 @@ const (
failedWritesFailedOnClusterTypeLabel = "failed_on"

requestDurationName = "proxy_request_duration_seconds"
requestDurationTypeLabel = "type"
RequestDurationTypeLabel = "type"
requestDurationDescription = "Histogram that tracks the latency of requests at proxy entry point"

inFlightRequestsName = "proxy_inflight_requests_total"
inFlightRequestsTypeLabel = "type"
inFlightRequestsDescription = "Number of requests currently in flight in the proxy"
)

var (
StatementCategories = []string{TypeWrites, TypeReads, TypeOther}
)

var (
FailedReadsOrigin = NewMetricWithLabels(
failedReadsName,
Expand Down Expand Up @@ -76,21 +82,21 @@ var (
requestDurationName,
requestDurationDescription,
map[string]string{
requestDurationTypeLabel: typeReadsOrigin,
RequestDurationTypeLabel: typeReadsOrigin,
},
)
ProxyReadsTargetDuration = NewMetricWithLabels(
requestDurationName,
requestDurationDescription,
map[string]string{
requestDurationTypeLabel: typeReadsTarget,
RequestDurationTypeLabel: typeReadsTarget,
},
)
ProxyWritesDuration = NewMetricWithLabels(
requestDurationName,
requestDurationDescription,
map[string]string{
requestDurationTypeLabel: typeWrites,
RequestDurationTypeLabel: TypeWrites,
},
)

Expand All @@ -112,7 +118,7 @@ var (
inFlightRequestsName,
inFlightRequestsDescription,
map[string]string{
inFlightRequestsTypeLabel: typeWrites,
inFlightRequestsTypeLabel: TypeWrites,
},
)

Expand Down
17 changes: 9 additions & 8 deletions proxy/pkg/zdmproxy/cqlparser.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func buildRequestInfo(
}
return getRequestInfoFromQueryInfo(
frameContext.GetRawFrame(), primaryCluster,
forwardSystemQueriesToTarget, virtualizationEnabled, stmtQueryData.queryData), nil
forwardSystemQueriesToTarget, virtualizationEnabled, stmtQueryData.queryData, f.Header.OpCode), nil
case primitive.OpCodePrepare:
stmtQueryData, err := frameContext.GetOrInspectStatement(currentKeyspaceName, timeUuidGenerator)
if err != nil {
Expand All @@ -96,7 +96,7 @@ func buildRequestInfo(
}
baseRequestInfo := getRequestInfoFromQueryInfo(
frameContext.GetRawFrame(), primaryCluster,
forwardSystemQueriesToTarget, virtualizationEnabled, stmtQueryData.queryData)
forwardSystemQueriesToTarget, virtualizationEnabled, stmtQueryData.queryData, f.Header.OpCode)
replacedTerms := make([]*term, 0)
if len(stmtsReplacedTerms) > 1 {
return nil, fmt.Errorf("expected single list of replaced terms for prepare message but got %v", len(stmtsReplacedTerms))
Expand Down Expand Up @@ -144,14 +144,14 @@ func buildRequestInfo(
}
case primitive.OpCodeAuthResponse:
if forwardAuthToTarget {
return NewGenericRequestInfo(forwardToTarget, false, false), nil
return NewGenericRequestInfo(forwardToTarget, false, false, f.Header.OpCode), nil
} else {
return NewGenericRequestInfo(forwardToOrigin, false, false), nil
return NewGenericRequestInfo(forwardToOrigin, false, false, f.Header.OpCode), nil
}
case primitive.OpCodeRegister, primitive.OpCodeStartup:
return NewGenericRequestInfo(forwardToBoth, false, false), nil
return NewGenericRequestInfo(forwardToBoth, false, false, f.Header.OpCode), nil
default:
return NewGenericRequestInfo(forwardToBoth, true, false), nil
return NewGenericRequestInfo(forwardToBoth, true, false, f.Header.OpCode), nil
}
}

Expand All @@ -178,7 +178,8 @@ func getRequestInfoFromQueryInfo(
primaryCluster common.ClusterType,
forwardSystemQueriesToTarget bool,
virtualizationEnabled bool,
queryInfo QueryInfo) RequestInfo {
queryInfo QueryInfo,
opCode primitive.OpCode) RequestInfo {

var sendAlsoToAsync bool
forwardDecision := forwardToBoth
Expand Down Expand Up @@ -221,7 +222,7 @@ func getRequestInfoFromQueryInfo(

log.Tracef("Forward decision: %s", forwardDecision)

return NewGenericRequestInfo(forwardDecision, sendAlsoToAsync, true)
return NewGenericRequestInfo(forwardDecision, sendAlsoToAsync, true, opCode)
}

func isSystemQuery(info QueryInfo) bool {
Expand Down
16 changes: 8 additions & 8 deletions proxy/pkg/zdmproxy/cqlparser_graph_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func TestParseAndInspect_GraphRequests_Core_StringAPI(t *testing.T) {
{"Create graph",
false,
"system.graph('friendship').ifNotExists().create()",
NewGenericRequestInfo(forwardToBoth, false, true),
NewGenericRequestInfo(forwardToBoth, false, true, primitive.OpCodeExecute),
},
{"Create graph schema",
true,
Expand All @@ -32,7 +32,7 @@ func TestParseAndInspect_GraphRequests_Core_StringAPI(t *testing.T) {
".by('hometown').asText().by('age').create();" +
"schema.edgeLabel('is_friend_of').from('person').to('person')" +
".materializedView('person__is_parent_of__person_by_in_id').ifNotExists().inverse().create()}} ",
NewGenericRequestInfo(forwardToBoth, false, true),
NewGenericRequestInfo(forwardToBoth, false, true, primitive.OpCodeExecute),
},
{"Write data to graph",
true,
Expand All @@ -41,17 +41,17 @@ func TestParseAndInspect_GraphRequests_Core_StringAPI(t *testing.T) {
".property('firstname', p2_firstname).property('surname', p2_surname)" +
".property('hometown', p2_hometown).property('age', p2_age).as('p2')" +
".addE('is_friend_of').from('p1').to('p2').property('friendshipStartDate', fsd);",
NewGenericRequestInfo(forwardToBoth, false, true),
NewGenericRequestInfo(forwardToBoth, false, true, primitive.OpCodeExecute),
},
{"Select person vertex by id (Brenda_Peterson)",
true,
"g.V().has('person','id', p1_id).elementMap()",
NewGenericRequestInfo(forwardToBoth, false, true),
NewGenericRequestInfo(forwardToBoth, false, true, primitive.OpCodeQuery),
},
{"Select person vertices by age range (70-90)",
true,
"g.V().has('person','age', gt(lower_end)).has('person','age', lt(upper_end)).elementMap()",
NewGenericRequestInfo(forwardToBoth, false, true),
NewGenericRequestInfo(forwardToBoth, false, true, primitive.OpCodeQuery),
},
}

Expand Down Expand Up @@ -96,20 +96,20 @@ func TestParseAndInspect_GraphRequests_Core_FluentAPI(t *testing.T) {
0, 0, 0, 1, 3, 0, 0, 0, 0, 2, 112, 49, 0, 0, 0, 2, 116, 111, 0, 0, 0, 1, 3, 0, 0, 0, 0, 2, 112, 50, 0, 0, 0, 8, 112, 114, 111, 112, 101, 114, 116,
121, 0, 0, 0, 2, 3, 0, 0, 0, 0, 19, 102, 114, 105, 101, 110, 100, 115, 104, 105, 112, 83, 116, 97, 114, 116, 68, 97, 116, 101, 132, 0, 0,
0, 7, 169, 6, 16, 0, 0, 0, 0},
NewGenericRequestInfo(forwardToBoth, false, true),
NewGenericRequestInfo(forwardToBoth, false, true, primitive.OpCodeExecute),
},
{"Select person vertex by id (Brenda_Peterson)",
[]byte{21, 0, 0, 0, 0, 3, 0, 0, 0, 1, 86, 0, 0, 0, 0, 0, 0, 0, 3, 104, 97, 115, 0, 0, 0, 3, 3, 0, 0, 0, 0, 6, 112, 101, 114, 115, 111,
110, 3, 0, 0, 0, 0, 2, 105, 100, 3, 0, 0, 0, 0, 15, 66, 114, 101, 110, 100, 97, 95, 80, 101, 116, 101, 114, 115, 111, 110, 0, 0, 0, 10, 101,
108, 101, 109, 101, 110, 116, 77, 97, 112, 0, 0, 0, 0, 0, 0, 0, 0},
NewGenericRequestInfo(forwardToBoth, false, true),
NewGenericRequestInfo(forwardToBoth, false, true, primitive.OpCodeQuery),
},
{"Select person vertices by age range (70-90)",
[]byte{21, 0, 0, 0, 0, 4, 0, 0, 0, 1, 86, 0, 0, 0, 0, 0, 0, 0, 3, 104, 97, 115, 0, 0, 0, 3, 3, 0, 0, 0, 0, 6, 112, 101, 114, 115, 111,
110, 3, 0, 0, 0, 0, 3, 97, 103, 101, 30, 0, 0, 0, 0, 2, 103, 116, 0, 0, 0, 1, 1, 0, 0, 0, 0, 70, 0, 0, 0, 3, 104, 97, 115, 0, 0, 0, 3, 3, 0, 0, 0,
0, 6, 112, 101, 114, 115, 111, 110, 3, 0, 0, 0, 0, 3, 97, 103, 101, 30, 0, 0, 0, 0, 2, 108, 116, 0, 0, 0, 1, 1, 0, 0, 0, 0, 90, 0, 0, 0, 10,
101, 108, 101, 109, 101, 110, 116, 77, 97, 112, 0, 0, 0, 0, 0, 0, 0, 0},
NewGenericRequestInfo(forwardToBoth, false, true),
NewGenericRequestInfo(forwardToBoth, false, true, primitive.OpCodeQuery),
},
}

Expand Down
Loading
Loading