Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose read and write request latency in metrics #125

Merged
merged 7 commits into from
Jul 22, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 29 additions & 18 deletions integration-tests/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,38 +375,34 @@ func checkMetrics(
require.Contains(t, lines, fmt.Sprintf("%v 0", getPrometheusNameWithNodeLabel(prefix, metrics.OriginUnpreparedErrors, originHost)))
require.Contains(t, lines, fmt.Sprintf("%v 0", getPrometheusNameWithNodeLabel(prefix, metrics.TargetUnpreparedErrors, targetHost)))
if (successTarget + successBoth) == 0 {
require.Contains(t, lines, fmt.Sprintf("%v{node=\"%v\"} 0", getPrometheusNameWithSuffix(prefix, metrics.TargetRequestDuration, "sum"), targetHost))
require.Contains(t, lines, fmt.Sprintf("%v 0", getPrometheusNameWithSuffixAndNodeLabel(prefix, metrics.TargetRequestDuration, "sum", targetHost, "type", "writes")))
require.Contains(t, lines, fmt.Sprintf("%v 0", getPrometheusNameWithSuffixAndNodeLabel(prefix, metrics.TargetRequestDuration, "sum", targetHost, "type", "reads")))
} else {
if successBoth != 0 || !handshakeOnlyTarget {
value, err := findMetricValue(lines, fmt.Sprintf("%v{node=\"%v\"} ", getPrometheusNameWithSuffix(prefix, metrics.TargetRequestDuration, "sum"), targetHost))
require.Nil(t, err)
value := sumAllRequestsFromNode(lines, prefix, metrics.TargetRequestDuration, "sum", targetHost)
require.Greater(t, value, 0.0)
}
}

if (successOrigin + successBoth) == 0 {
require.Contains(t, lines, fmt.Sprintf("%v{node=\"%v\"} 0", getPrometheusNameWithSuffix(prefix, metrics.OriginRequestDuration, "sum"), originHost))
require.Contains(t, lines, fmt.Sprintf("%v 0", getPrometheusNameWithSuffixAndNodeLabel(prefix, metrics.OriginRequestDuration, "sum", originHost, "type", "reads")))
require.Contains(t, lines, fmt.Sprintf("%v 0", getPrometheusNameWithSuffixAndNodeLabel(prefix, metrics.OriginRequestDuration, "sum", originHost, "type", "writes")))
} else {
if successBoth != 0 || !handshakeOnlyOrigin {
value, err := findMetricValue(lines, fmt.Sprintf("%v{node=\"%v\"} ", getPrometheusNameWithSuffix(prefix, metrics.OriginRequestDuration, "sum"), originHost))
require.Nil(t, err)
value := sumAllRequestsFromNode(lines, prefix, metrics.OriginRequestDuration, "sum", originHost)
require.Greater(t, value, 0.0)
}
}

if successAsync == 0 || !asyncEnabled {
if asyncEnabled {
requireEventuallyContainsLine(t, lines, fmt.Sprintf("%v{node=\"%v\"} 0", getPrometheusNameWithSuffix(prefix, metrics.AsyncRequestDuration, "sum"), asyncHost))
requireEventuallyContainsLine(t, lines, fmt.Sprintf("%v{node=\"%v\",type=\"writes\"} 0", getPrometheusNameWithSuffix(prefix, metrics.AsyncRequestDuration, "sum"), asyncHost))
} else {
require.NotContains(t, lines, fmt.Sprintf("%v", getPrometheusNameWithSuffix(prefix, metrics.AsyncRequestDuration, "sum")))
}
} else {
utils.RequireWithRetries(t, func() (err error, fatal bool) {
prefix := fmt.Sprintf("%v{node=\"%v\"} ", getPrometheusNameWithSuffix(prefix, metrics.AsyncRequestDuration, "sum"), asyncHost)
value, err := findMetricValue(lines, prefix)
if err != nil {
return err, false
}
value := sumAllRequestsFromNode(lines, prefix, metrics.AsyncRequestDuration, "sum", asyncHost)
if value <= 0.0 {
return fmt.Errorf("%v expected greater than 0.0 but was %v", prefix, value), false
}
Expand All @@ -415,10 +411,10 @@ func checkMetrics(
}, 25, 200*time.Millisecond)
}

require.Contains(t, lines, fmt.Sprintf("%v{node=\"%v\"} %v", getPrometheusNameWithSuffix(prefix, metrics.TargetRequestDuration, "count"), targetHost, successTarget+successBoth))
require.Contains(t, lines, fmt.Sprintf("%v{node=\"%v\"} %v", getPrometheusNameWithSuffix(prefix, metrics.OriginRequestDuration, "count"), originHost, successOrigin+successBoth))
require.Equal(t, float64(successTarget+successBoth), sumAllRequestsFromNode(lines, prefix, metrics.TargetRequestDuration, "count", targetHost))
require.Equal(t, float64(successOrigin+successBoth), sumAllRequestsFromNode(lines, prefix, metrics.OriginRequestDuration, "count", originHost))
if asyncEnabled {
requireEventuallyContainsLine(t, lines, fmt.Sprintf("%v{node=\"%v\"} %v", getPrometheusNameWithSuffix(prefix, metrics.AsyncRequestDuration, "count"), asyncHost, successAsync))
requireEventuallyContainsLine(t, lines, fmt.Sprintf("%v{node=\"%v\",type=\"reads\"} %v", getPrometheusNameWithSuffix(prefix, metrics.AsyncRequestDuration, "count"), asyncHost, successAsync))
} else {
require.NotContains(t, lines, fmt.Sprintf("%v", getPrometheusNameWithSuffix(prefix, metrics.OriginRequestDuration, "count")))
}
Expand All @@ -432,12 +428,16 @@ func checkMetrics(
}

func findMetricValue(lines []string, prefix string) (float64, error) {
return findMetricValueWithDefault(lines, prefix, -1)
}

func findMetricValueWithDefault(lines []string, prefix string, defaultValue float64) (float64, error) {
for _, line := range lines {
if strings.HasPrefix(line, prefix) {
return strconv.ParseFloat(strings.TrimPrefix(line, prefix), 64)
return strconv.ParseFloat(strings.Trim(strings.TrimPrefix(line, prefix), " "), 64)
}
}
return -1, fmt.Errorf("no line with prefix: %v", prefix)
return defaultValue, fmt.Errorf("no line with prefix: %v", prefix)
}

func handleReads(request *frame.Frame, _ *client.CqlServerConnection, _ client.RequestHandlerContext) (response *frame.Frame) {
Expand Down Expand Up @@ -477,7 +477,7 @@ func getPrometheusNameWithNodeLabel(prefix string, mn metrics.Metric, node strin
return getPrometheusNameWithSuffixAndNodeLabel(prefix, mn, "", node)
}

func getPrometheusNameWithSuffixAndNodeLabel(prefix string, mn metrics.Metric, suffix string, node string) string {
func getPrometheusNameWithSuffixAndNodeLabel(prefix string, mn metrics.Metric, suffix string, node string, customLabels ...string) string {
if suffix != "" {
suffix = "_" + suffix
}
Expand All @@ -487,6 +487,9 @@ func getPrometheusNameWithSuffixAndNodeLabel(prefix string, mn metrics.Metric, s
for key, value := range labels {
newLabels[key] = value
}
for i := 0; i < len(customLabels); i = i + 2 {
newLabels[customLabels[i]] = customLabels[i+1]
}
if node != "" {
newLabels["node"] = node
}
Expand Down Expand Up @@ -519,3 +522,11 @@ func getPrometheusNameWithSuffixAndNodeLabel(prefix string, mn metrics.Metric, s
func getPrometheusNameWithSuffix(prefix string, mn metrics.Metric, suffix string) string {
return getPrometheusNameWithSuffixAndNodeLabel(prefix, mn, suffix, "")
}

func sumAllRequestsFromNode(lines []string, prefix string, metric metrics.Metric, suffix string, host string) float64 {
writesMetric := getPrometheusNameWithSuffixAndNodeLabel(prefix, metric, suffix, host, "type", "writes")
valueWrites, _ := findMetricValueWithDefault(lines, writesMetric, 0)
readsMetric := getPrometheusNameWithSuffixAndNodeLabel(prefix, metric, suffix, host, "type", "reads")
valueReads, _ := findMetricValueWithDefault(lines, readsMetric, 0)
return valueWrites + valueReads
}
11 changes: 8 additions & 3 deletions proxy/pkg/metrics/node_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,8 @@ type NodeMetricsInstance struct {
UnavailableErrors Counter
OtherErrors Counter

RequestDuration Histogram
ReadDurations Histogram
WriteDurations Histogram

OpenConnections Gauge

Expand All @@ -298,9 +299,13 @@ func CreateCounterNodeMetric(metricFactory MetricFactory, nodeDescription string
return m, nil
}

func CreateHistogramNodeMetric(metricFactory MetricFactory, nodeDescription string, mn Metric, buckets []float64) (Histogram, error) {
func CreateHistogramNodeMetric(metricFactory MetricFactory, nodeDescription string, mn Metric, buckets []float64, labels ...string) (Histogram, error) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the labels ...string is not very obvious, it's a list where evens are keys and odds are values for those keys... Can we make it a simple map[string]string instead? Or create a new label type:

1:

type MetricLabel struct {
   Key string
   Value string
}
func CreateHistogramNodeMetric(metricFactory MetricFactory, nodeDescription string, mn Metric, buckets []float64, labels ...MetricLabel) (Histogram, error) { }

metrics.CreateHistogramNodeMetric(metricFactory, asyncNodeDescription, metrics.AsyncRequestDuration, asyncBuckets, metrics.MetricLabel{Key: metrics.RequestDurationTypeLabel, Value: metrics.TypeReads})

2:

func CreateHistogramNodeMetric(metricFactory MetricFactory, nodeDescription string, mn Metric, buckets []float64, labels map[string]string) (Histogram, error) { }

metrics.CreateHistogramNodeMetric(metricFactory, asyncNodeDescription, metrics.AsyncRequestDuration, asyncBuckets, map[string]string{metrics.RequestDurationTypeLabel: metrics.TypeReads})

customLabels := make(map[string]string)
for i := 0; i < len(labels); i = i + 2 {
customLabels[labels[i]] = labels[i+1]
}
m, err := metricFactory.GetOrCreateHistogram(
mn.WithLabels(map[string]string{nodeLabel: nodeDescription}), buckets)
mn.WithLabels(map[string]string{nodeLabel: nodeDescription}).WithLabels(customLabels), buckets)
if err != nil {
return nil, err
}
Expand Down
13 changes: 7 additions & 6 deletions proxy/pkg/metrics/proxy_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ package metrics
const (
typeReadsOrigin = "reads_origin"
typeReadsTarget = "reads_target"
typeWrites = "writes"
TypeWrites = "writes"
TypeReads = "reads"

failedRequestsClusterOrigin = "origin"
failedRequestsClusterTarget = "target"
Expand All @@ -18,7 +19,7 @@ const (
failedWritesFailedOnClusterTypeLabel = "failed_on"

requestDurationName = "proxy_request_duration_seconds"
requestDurationTypeLabel = "type"
RequestDurationTypeLabel = "type"
joao-r-reis marked this conversation as resolved.
Show resolved Hide resolved
requestDurationDescription = "Histogram that tracks the latency of requests at proxy entry point"

inFlightRequestsName = "proxy_inflight_requests_total"
Expand Down Expand Up @@ -76,21 +77,21 @@ var (
requestDurationName,
requestDurationDescription,
map[string]string{
requestDurationTypeLabel: typeReadsOrigin,
RequestDurationTypeLabel: typeReadsOrigin,
},
)
ProxyReadsTargetDuration = NewMetricWithLabels(
requestDurationName,
requestDurationDescription,
map[string]string{
requestDurationTypeLabel: typeReadsTarget,
RequestDurationTypeLabel: typeReadsTarget,
},
)
ProxyWritesDuration = NewMetricWithLabels(
requestDurationName,
requestDurationDescription,
map[string]string{
requestDurationTypeLabel: typeWrites,
RequestDurationTypeLabel: TypeWrites,
},
)

Expand All @@ -112,7 +113,7 @@ var (
inFlightRequestsName,
inFlightRequestsDescription,
map[string]string{
inFlightRequestsTypeLabel: typeWrites,
inFlightRequestsTypeLabel: TypeWrites,
},
)

Expand Down
30 changes: 24 additions & 6 deletions proxy/pkg/zdmproxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -826,7 +826,12 @@ func (p *ZdmProxy) CreateOriginNodeMetrics(
return nil, err
}

originRequestDuration, err := metrics.CreateHistogramNodeMetric(metricFactory, originNodeDescription, metrics.OriginRequestDuration, originBuckets)
originReadRequestDuration, err := metrics.CreateHistogramNodeMetric(metricFactory, originNodeDescription, metrics.OriginRequestDuration, originBuckets, metrics.RequestDurationTypeLabel, metrics.TypeReads)
if err != nil {
return nil, err
}

originWriteRequestDuration, err := metrics.CreateHistogramNodeMetric(metricFactory, originNodeDescription, metrics.OriginRequestDuration, originBuckets, metrics.RequestDurationTypeLabel, metrics.TypeWrites)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -857,7 +862,8 @@ func (p *ZdmProxy) CreateOriginNodeMetrics(
OverloadedErrors: originOverloadedErrors,
UnavailableErrors: originUnavailableErrors,
OtherErrors: originOtherErrors,
RequestDuration: originRequestDuration,
ReadDurations: originReadRequestDuration,
WriteDurations: originWriteRequestDuration,
OpenConnections: openOriginConnections,
InFlightRequests: inflightRequests,
UsedStreamIds: originUsedStreamIds,
Expand Down Expand Up @@ -911,7 +917,12 @@ func (p *ZdmProxy) CreateAsyncNodeMetrics(
return nil, err
}

asyncRequestDuration, err := metrics.CreateHistogramNodeMetric(metricFactory, asyncNodeDescription, metrics.AsyncRequestDuration, asyncBuckets)
asyncReadRequestDuration, err := metrics.CreateHistogramNodeMetric(metricFactory, asyncNodeDescription, metrics.AsyncRequestDuration, asyncBuckets, metrics.RequestDurationTypeLabel, metrics.TypeReads)
if err != nil {
return nil, err
}

asyncWriteRequestDuration, err := metrics.CreateHistogramNodeMetric(metricFactory, asyncNodeDescription, metrics.AsyncRequestDuration, asyncBuckets, metrics.RequestDurationTypeLabel, metrics.TypeWrites)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -941,7 +952,8 @@ func (p *ZdmProxy) CreateAsyncNodeMetrics(
OverloadedErrors: asyncOverloadedErrors,
UnavailableErrors: asyncUnavailableErrors,
OtherErrors: asyncOtherErrors,
RequestDuration: asyncRequestDuration,
ReadDurations: asyncReadRequestDuration,
WriteDurations: asyncWriteRequestDuration,
OpenConnections: openAsyncConnections,
InFlightRequests: inflightRequestsAsync,
UsedStreamIds: asyncUsedStreamIds,
Expand Down Expand Up @@ -995,7 +1007,12 @@ func (p *ZdmProxy) CreateTargetNodeMetrics(
return nil, err
}

targetRequestDuration, err := metrics.CreateHistogramNodeMetric(metricFactory, targetNodeDescription, metrics.TargetRequestDuration, targetBuckets)
targetReadRequestDuration, err := metrics.CreateHistogramNodeMetric(metricFactory, targetNodeDescription, metrics.TargetRequestDuration, targetBuckets, metrics.RequestDurationTypeLabel, metrics.TypeReads)
if err != nil {
return nil, err
}

targetWriteRequestDuration, err := metrics.CreateHistogramNodeMetric(metricFactory, targetNodeDescription, metrics.TargetRequestDuration, targetBuckets, metrics.RequestDurationTypeLabel, metrics.TypeWrites)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1026,7 +1043,8 @@ func (p *ZdmProxy) CreateTargetNodeMetrics(
OverloadedErrors: targetOverloadedErrors,
UnavailableErrors: targetUnavailableErrors,
OtherErrors: targetOtherErrors,
RequestDuration: targetRequestDuration,
ReadDurations: targetReadRequestDuration,
WriteDurations: targetWriteRequestDuration,
OpenConnections: openTargetConnections,
InFlightRequests: inflightRequests,
UsedStreamIds: targetUsedStreamIds,
Expand Down
22 changes: 19 additions & 3 deletions proxy/pkg/zdmproxy/requestcontext.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,9 +193,17 @@ func (recv *requestContextImpl) SetResponse(nodeMetrics *metrics.NodeMetrics, f
if recv.GetRequestInfo().ShouldBeTrackedInMetrics() {
switch connectorType {
case ClusterConnectorTypeOrigin:
nodeMetrics.OriginMetrics.RequestDuration.Track(recv.startTime)
if isWriteStatement(recv.GetRequestInfo()) {
nodeMetrics.OriginMetrics.WriteDurations.Track(recv.startTime)
} else {
nodeMetrics.OriginMetrics.ReadDurations.Track(recv.startTime)
}
case ClusterConnectorTypeTarget:
nodeMetrics.TargetMetrics.RequestDuration.Track(recv.startTime)
if isWriteStatement(recv.GetRequestInfo()) {
nodeMetrics.TargetMetrics.WriteDurations.Track(recv.startTime)
} else {
nodeMetrics.TargetMetrics.ReadDurations.Track(recv.startTime)
}
case ClusterConnectorTypeAsync:
default:
log.Errorf("could not recognize connector type %v", connectorType)
Expand All @@ -205,6 +213,10 @@ func (recv *requestContextImpl) SetResponse(nodeMetrics *metrics.NodeMetrics, f
return finished
}

func isWriteStatement(req RequestInfo) bool {
return req.GetForwardDecision() == forwardToBoth
}

func (recv *requestContextImpl) updateInternalState(f *frame.RawFrame, cluster common.ClusterType) (state int, updated bool) {
recv.lock.Lock()
defer recv.lock.Unlock()
Expand Down Expand Up @@ -331,7 +343,11 @@ func (recv *asyncRequestContextImpl) SetResponse(
}

if recv.GetRequestInfo().ShouldBeTrackedInMetrics() {
nodeMetrics.AsyncMetrics.RequestDuration.Track(recv.startTime)
if isWriteStatement(recv.GetRequestInfo()) {
nodeMetrics.AsyncMetrics.WriteDurations.Track(recv.startTime)
} else {
nodeMetrics.AsyncMetrics.ReadDurations.Track(recv.startTime)
}
nodeMetrics.AsyncMetrics.InFlightRequests.Subtract(1)
}

Expand Down
Loading