Skip to content

Commit

Permalink
Expose read and write request latency in metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
lukasz-antoniak authored Jul 22, 2024
1 parent 6e2fcb2 commit b68856d
Show file tree
Hide file tree
Showing 5 changed files with 83 additions and 36 deletions.
47 changes: 29 additions & 18 deletions integration-tests/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,38 +375,34 @@ func checkMetrics(
require.Contains(t, lines, fmt.Sprintf("%v 0", getPrometheusNameWithNodeLabel(prefix, metrics.OriginUnpreparedErrors, originHost)))
require.Contains(t, lines, fmt.Sprintf("%v 0", getPrometheusNameWithNodeLabel(prefix, metrics.TargetUnpreparedErrors, targetHost)))
if (successTarget + successBoth) == 0 {
require.Contains(t, lines, fmt.Sprintf("%v{node=\"%v\"} 0", getPrometheusNameWithSuffix(prefix, metrics.TargetRequestDuration, "sum"), targetHost))
require.Contains(t, lines, fmt.Sprintf("%v 0", getPrometheusNameWithSuffixAndNodeLabel(prefix, metrics.TargetRequestDuration, "sum", targetHost, "type", "writes")))
require.Contains(t, lines, fmt.Sprintf("%v 0", getPrometheusNameWithSuffixAndNodeLabel(prefix, metrics.TargetRequestDuration, "sum", targetHost, "type", "reads")))
} else {
if successBoth != 0 || !handshakeOnlyTarget {
value, err := findMetricValue(lines, fmt.Sprintf("%v{node=\"%v\"} ", getPrometheusNameWithSuffix(prefix, metrics.TargetRequestDuration, "sum"), targetHost))
require.Nil(t, err)
value := sumAllRequestsFromNode(lines, prefix, metrics.TargetRequestDuration, "sum", targetHost)
require.Greater(t, value, 0.0)
}
}

if (successOrigin + successBoth) == 0 {
require.Contains(t, lines, fmt.Sprintf("%v{node=\"%v\"} 0", getPrometheusNameWithSuffix(prefix, metrics.OriginRequestDuration, "sum"), originHost))
require.Contains(t, lines, fmt.Sprintf("%v 0", getPrometheusNameWithSuffixAndNodeLabel(prefix, metrics.OriginRequestDuration, "sum", originHost, "type", "reads")))
require.Contains(t, lines, fmt.Sprintf("%v 0", getPrometheusNameWithSuffixAndNodeLabel(prefix, metrics.OriginRequestDuration, "sum", originHost, "type", "writes")))
} else {
if successBoth != 0 || !handshakeOnlyOrigin {
value, err := findMetricValue(lines, fmt.Sprintf("%v{node=\"%v\"} ", getPrometheusNameWithSuffix(prefix, metrics.OriginRequestDuration, "sum"), originHost))
require.Nil(t, err)
value := sumAllRequestsFromNode(lines, prefix, metrics.OriginRequestDuration, "sum", originHost)
require.Greater(t, value, 0.0)
}
}

if successAsync == 0 || !asyncEnabled {
if asyncEnabled {
requireEventuallyContainsLine(t, lines, fmt.Sprintf("%v{node=\"%v\"} 0", getPrometheusNameWithSuffix(prefix, metrics.AsyncRequestDuration, "sum"), asyncHost))
requireEventuallyContainsLine(t, lines, fmt.Sprintf("%v{node=\"%v\",type=\"writes\"} 0", getPrometheusNameWithSuffix(prefix, metrics.AsyncRequestDuration, "sum"), asyncHost))
} else {
require.NotContains(t, lines, fmt.Sprintf("%v", getPrometheusNameWithSuffix(prefix, metrics.AsyncRequestDuration, "sum")))
}
} else {
utils.RequireWithRetries(t, func() (err error, fatal bool) {
prefix := fmt.Sprintf("%v{node=\"%v\"} ", getPrometheusNameWithSuffix(prefix, metrics.AsyncRequestDuration, "sum"), asyncHost)
value, err := findMetricValue(lines, prefix)
if err != nil {
return err, false
}
value := sumAllRequestsFromNode(lines, prefix, metrics.AsyncRequestDuration, "sum", asyncHost)
if value <= 0.0 {
return fmt.Errorf("%v expected greater than 0.0 but was %v", prefix, value), false
}
Expand All @@ -415,10 +411,10 @@ func checkMetrics(
}, 25, 200*time.Millisecond)
}

require.Contains(t, lines, fmt.Sprintf("%v{node=\"%v\"} %v", getPrometheusNameWithSuffix(prefix, metrics.TargetRequestDuration, "count"), targetHost, successTarget+successBoth))
require.Contains(t, lines, fmt.Sprintf("%v{node=\"%v\"} %v", getPrometheusNameWithSuffix(prefix, metrics.OriginRequestDuration, "count"), originHost, successOrigin+successBoth))
require.Equal(t, float64(successTarget+successBoth), sumAllRequestsFromNode(lines, prefix, metrics.TargetRequestDuration, "count", targetHost))
require.Equal(t, float64(successOrigin+successBoth), sumAllRequestsFromNode(lines, prefix, metrics.OriginRequestDuration, "count", originHost))
if asyncEnabled {
requireEventuallyContainsLine(t, lines, fmt.Sprintf("%v{node=\"%v\"} %v", getPrometheusNameWithSuffix(prefix, metrics.AsyncRequestDuration, "count"), asyncHost, successAsync))
requireEventuallyContainsLine(t, lines, fmt.Sprintf("%v{node=\"%v\",type=\"reads\"} %v", getPrometheusNameWithSuffix(prefix, metrics.AsyncRequestDuration, "count"), asyncHost, successAsync))
} else {
require.NotContains(t, lines, fmt.Sprintf("%v", getPrometheusNameWithSuffix(prefix, metrics.OriginRequestDuration, "count")))
}
Expand All @@ -432,12 +428,16 @@ func checkMetrics(
}

func findMetricValue(lines []string, prefix string) (float64, error) {
return findMetricValueWithDefault(lines, prefix, -1)
}

func findMetricValueWithDefault(lines []string, prefix string, defaultValue float64) (float64, error) {
for _, line := range lines {
if strings.HasPrefix(line, prefix) {
return strconv.ParseFloat(strings.TrimPrefix(line, prefix), 64)
return strconv.ParseFloat(strings.Trim(strings.TrimPrefix(line, prefix), " "), 64)
}
}
return -1, fmt.Errorf("no line with prefix: %v", prefix)
return defaultValue, fmt.Errorf("no line with prefix: %v", prefix)
}

func handleReads(request *frame.Frame, _ *client.CqlServerConnection, _ client.RequestHandlerContext) (response *frame.Frame) {
Expand Down Expand Up @@ -477,7 +477,7 @@ func getPrometheusNameWithNodeLabel(prefix string, mn metrics.Metric, node strin
return getPrometheusNameWithSuffixAndNodeLabel(prefix, mn, "", node)
}

func getPrometheusNameWithSuffixAndNodeLabel(prefix string, mn metrics.Metric, suffix string, node string) string {
func getPrometheusNameWithSuffixAndNodeLabel(prefix string, mn metrics.Metric, suffix string, node string, customLabels ...string) string {
if suffix != "" {
suffix = "_" + suffix
}
Expand All @@ -487,6 +487,9 @@ func getPrometheusNameWithSuffixAndNodeLabel(prefix string, mn metrics.Metric, s
for key, value := range labels {
newLabels[key] = value
}
for i := 0; i < len(customLabels); i = i + 2 {
newLabels[customLabels[i]] = customLabels[i+1]
}
if node != "" {
newLabels["node"] = node
}
Expand Down Expand Up @@ -519,3 +522,11 @@ func getPrometheusNameWithSuffixAndNodeLabel(prefix string, mn metrics.Metric, s
func getPrometheusNameWithSuffix(prefix string, mn metrics.Metric, suffix string) string {
return getPrometheusNameWithSuffixAndNodeLabel(prefix, mn, suffix, "")
}

func sumAllRequestsFromNode(lines []string, prefix string, metric metrics.Metric, suffix string, host string) float64 {
writesMetric := getPrometheusNameWithSuffixAndNodeLabel(prefix, metric, suffix, host, "type", "writes")
valueWrites, _ := findMetricValueWithDefault(lines, writesMetric, 0)
readsMetric := getPrometheusNameWithSuffixAndNodeLabel(prefix, metric, suffix, host, "type", "reads")
valueReads, _ := findMetricValueWithDefault(lines, readsMetric, 0)
return valueWrites + valueReads
}
7 changes: 4 additions & 3 deletions proxy/pkg/metrics/node_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,8 @@ type NodeMetricsInstance struct {
UnavailableErrors Counter
OtherErrors Counter

RequestDuration Histogram
ReadDurations Histogram
WriteDurations Histogram

OpenConnections Gauge

Expand All @@ -298,9 +299,9 @@ func CreateCounterNodeMetric(metricFactory MetricFactory, nodeDescription string
return m, nil
}

func CreateHistogramNodeMetric(metricFactory MetricFactory, nodeDescription string, mn Metric, buckets []float64) (Histogram, error) {
func CreateHistogramNodeMetric(metricFactory MetricFactory, nodeDescription string, mn Metric, buckets []float64, labels map[string]string) (Histogram, error) {
m, err := metricFactory.GetOrCreateHistogram(
mn.WithLabels(map[string]string{nodeLabel: nodeDescription}), buckets)
mn.WithLabels(map[string]string{nodeLabel: nodeDescription}).WithLabels(labels), buckets)
if err != nil {
return nil, err
}
Expand Down
13 changes: 7 additions & 6 deletions proxy/pkg/metrics/proxy_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ package metrics
const (
typeReadsOrigin = "reads_origin"
typeReadsTarget = "reads_target"
typeWrites = "writes"
TypeWrites = "writes"
TypeReads = "reads"

failedRequestsClusterOrigin = "origin"
failedRequestsClusterTarget = "target"
Expand All @@ -18,7 +19,7 @@ const (
failedWritesFailedOnClusterTypeLabel = "failed_on"

requestDurationName = "proxy_request_duration_seconds"
requestDurationTypeLabel = "type"
RequestDurationTypeLabel = "type"
requestDurationDescription = "Histogram that tracks the latency of requests at proxy entry point"

inFlightRequestsName = "proxy_inflight_requests_total"
Expand Down Expand Up @@ -76,21 +77,21 @@ var (
requestDurationName,
requestDurationDescription,
map[string]string{
requestDurationTypeLabel: typeReadsOrigin,
RequestDurationTypeLabel: typeReadsOrigin,
},
)
ProxyReadsTargetDuration = NewMetricWithLabels(
requestDurationName,
requestDurationDescription,
map[string]string{
requestDurationTypeLabel: typeReadsTarget,
RequestDurationTypeLabel: typeReadsTarget,
},
)
ProxyWritesDuration = NewMetricWithLabels(
requestDurationName,
requestDurationDescription,
map[string]string{
requestDurationTypeLabel: typeWrites,
RequestDurationTypeLabel: TypeWrites,
},
)

Expand All @@ -112,7 +113,7 @@ var (
inFlightRequestsName,
inFlightRequestsDescription,
map[string]string{
inFlightRequestsTypeLabel: typeWrites,
inFlightRequestsTypeLabel: TypeWrites,
},
)

Expand Down
30 changes: 24 additions & 6 deletions proxy/pkg/zdmproxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -826,7 +826,12 @@ func (p *ZdmProxy) CreateOriginNodeMetrics(
return nil, err
}

originRequestDuration, err := metrics.CreateHistogramNodeMetric(metricFactory, originNodeDescription, metrics.OriginRequestDuration, originBuckets)
originReadRequestDuration, err := metrics.CreateHistogramNodeMetric(metricFactory, originNodeDescription, metrics.OriginRequestDuration, originBuckets, map[string]string{metrics.RequestDurationTypeLabel: metrics.TypeReads})
if err != nil {
return nil, err
}

originWriteRequestDuration, err := metrics.CreateHistogramNodeMetric(metricFactory, originNodeDescription, metrics.OriginRequestDuration, originBuckets, map[string]string{metrics.RequestDurationTypeLabel: metrics.TypeWrites})
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -857,7 +862,8 @@ func (p *ZdmProxy) CreateOriginNodeMetrics(
OverloadedErrors: originOverloadedErrors,
UnavailableErrors: originUnavailableErrors,
OtherErrors: originOtherErrors,
RequestDuration: originRequestDuration,
ReadDurations: originReadRequestDuration,
WriteDurations: originWriteRequestDuration,
OpenConnections: openOriginConnections,
InFlightRequests: inflightRequests,
UsedStreamIds: originUsedStreamIds,
Expand Down Expand Up @@ -911,7 +917,12 @@ func (p *ZdmProxy) CreateAsyncNodeMetrics(
return nil, err
}

asyncRequestDuration, err := metrics.CreateHistogramNodeMetric(metricFactory, asyncNodeDescription, metrics.AsyncRequestDuration, asyncBuckets)
asyncReadRequestDuration, err := metrics.CreateHistogramNodeMetric(metricFactory, asyncNodeDescription, metrics.AsyncRequestDuration, asyncBuckets, map[string]string{metrics.RequestDurationTypeLabel: metrics.TypeReads})
if err != nil {
return nil, err
}

asyncWriteRequestDuration, err := metrics.CreateHistogramNodeMetric(metricFactory, asyncNodeDescription, metrics.AsyncRequestDuration, asyncBuckets, map[string]string{metrics.RequestDurationTypeLabel: metrics.TypeWrites})
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -941,7 +952,8 @@ func (p *ZdmProxy) CreateAsyncNodeMetrics(
OverloadedErrors: asyncOverloadedErrors,
UnavailableErrors: asyncUnavailableErrors,
OtherErrors: asyncOtherErrors,
RequestDuration: asyncRequestDuration,
ReadDurations: asyncReadRequestDuration,
WriteDurations: asyncWriteRequestDuration,
OpenConnections: openAsyncConnections,
InFlightRequests: inflightRequestsAsync,
UsedStreamIds: asyncUsedStreamIds,
Expand Down Expand Up @@ -995,7 +1007,12 @@ func (p *ZdmProxy) CreateTargetNodeMetrics(
return nil, err
}

targetRequestDuration, err := metrics.CreateHistogramNodeMetric(metricFactory, targetNodeDescription, metrics.TargetRequestDuration, targetBuckets)
targetReadRequestDuration, err := metrics.CreateHistogramNodeMetric(metricFactory, targetNodeDescription, metrics.TargetRequestDuration, targetBuckets, map[string]string{metrics.RequestDurationTypeLabel: metrics.TypeReads})
if err != nil {
return nil, err
}

targetWriteRequestDuration, err := metrics.CreateHistogramNodeMetric(metricFactory, targetNodeDescription, metrics.TargetRequestDuration, targetBuckets, map[string]string{metrics.RequestDurationTypeLabel: metrics.TypeWrites})
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1026,7 +1043,8 @@ func (p *ZdmProxy) CreateTargetNodeMetrics(
OverloadedErrors: targetOverloadedErrors,
UnavailableErrors: targetUnavailableErrors,
OtherErrors: targetOtherErrors,
RequestDuration: targetRequestDuration,
ReadDurations: targetReadRequestDuration,
WriteDurations: targetWriteRequestDuration,
OpenConnections: openTargetConnections,
InFlightRequests: inflightRequests,
UsedStreamIds: targetUsedStreamIds,
Expand Down
22 changes: 19 additions & 3 deletions proxy/pkg/zdmproxy/requestcontext.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,9 +193,17 @@ func (recv *requestContextImpl) SetResponse(nodeMetrics *metrics.NodeMetrics, f
if recv.GetRequestInfo().ShouldBeTrackedInMetrics() {
switch connectorType {
case ClusterConnectorTypeOrigin:
nodeMetrics.OriginMetrics.RequestDuration.Track(recv.startTime)
if isWriteStatement(recv.GetRequestInfo()) {
nodeMetrics.OriginMetrics.WriteDurations.Track(recv.startTime)
} else {
nodeMetrics.OriginMetrics.ReadDurations.Track(recv.startTime)
}
case ClusterConnectorTypeTarget:
nodeMetrics.TargetMetrics.RequestDuration.Track(recv.startTime)
if isWriteStatement(recv.GetRequestInfo()) {
nodeMetrics.TargetMetrics.WriteDurations.Track(recv.startTime)
} else {
nodeMetrics.TargetMetrics.ReadDurations.Track(recv.startTime)
}
case ClusterConnectorTypeAsync:
default:
log.Errorf("could not recognize connector type %v", connectorType)
Expand All @@ -205,6 +213,10 @@ func (recv *requestContextImpl) SetResponse(nodeMetrics *metrics.NodeMetrics, f
return finished
}

func isWriteStatement(req RequestInfo) bool {
return req.GetForwardDecision() == forwardToBoth
}

func (recv *requestContextImpl) updateInternalState(f *frame.RawFrame, cluster common.ClusterType) (state int, updated bool) {
recv.lock.Lock()
defer recv.lock.Unlock()
Expand Down Expand Up @@ -331,7 +343,11 @@ func (recv *asyncRequestContextImpl) SetResponse(
}

if recv.GetRequestInfo().ShouldBeTrackedInMetrics() {
nodeMetrics.AsyncMetrics.RequestDuration.Track(recv.startTime)
if isWriteStatement(recv.GetRequestInfo()) {
nodeMetrics.AsyncMetrics.WriteDurations.Track(recv.startTime)
} else {
nodeMetrics.AsyncMetrics.ReadDurations.Track(recv.startTime)
}
nodeMetrics.AsyncMetrics.InFlightRequests.Subtract(1)
}

Expand Down

0 comments on commit b68856d

Please sign in to comment.