From efdc00c1626a3b169b301b705ee5a0f62f6751dc Mon Sep 17 00:00:00 2001 From: Lukasz Antoniak Date: Tue, 26 Nov 2024 09:57:14 +0100 Subject: [PATCH 1/2] Add proxy metric representing failed connection attempts to downstream clusters --- integration-tests/metrics_test.go | 2 + integration-tests/runner_test.go | 92 +++++++++++++++++++++++++++++ proxy/pkg/metrics/proxy_metrics.go | 30 ++++++++-- proxy/pkg/zdmproxy/clienthandler.go | 2 + proxy/pkg/zdmproxy/proxy.go | 12 ++++ 5 files changed, 133 insertions(+), 5 deletions(-) diff --git a/integration-tests/metrics_test.go b/integration-tests/metrics_test.go index 32b72232..3da06989 100644 --- a/integration-tests/metrics_test.go +++ b/integration-tests/metrics_test.go @@ -283,6 +283,8 @@ func checkMetrics( require.Contains(t, lines, fmt.Sprintf("%v 0", getPrometheusName(prefix, metrics.FailedWritesOnTarget))) require.Contains(t, lines, fmt.Sprintf("%v 0", getPrometheusName(prefix, metrics.FailedReadsTarget))) require.Contains(t, lines, fmt.Sprintf("%v 0", getPrometheusName(prefix, metrics.FailedReadsOrigin))) + require.Contains(t, lines, fmt.Sprintf("%v 0", getPrometheusName(prefix, metrics.FailedConnectionsOrigin))) + require.Contains(t, lines, fmt.Sprintf("%v 0", getPrometheusName(prefix, metrics.FailedConnectionsTarget))) require.Contains(t, lines, fmt.Sprintf("%v 0", getPrometheusName(prefix, metrics.InFlightWrites))) require.Contains(t, lines, fmt.Sprintf("%v 0", getPrometheusName(prefix, metrics.InFlightReadsOrigin))) diff --git a/integration-tests/runner_test.go b/integration-tests/runner_test.go index ff4b23f0..947ea7f9 100644 --- a/integration-tests/runner_test.go +++ b/integration-tests/runner_test.go @@ -3,6 +3,9 @@ package integration_tests import ( "context" "fmt" + "github.com/datastax/go-cassandra-native-protocol/message" + "github.com/datastax/go-cassandra-native-protocol/primitive" + "github.com/datastax/zdm-proxy/integration-tests/client" "github.com/datastax/zdm-proxy/integration-tests/setup" "github.com/datastax/zdm-proxy/integration-tests/utils" "github.com/datastax/zdm-proxy/proxy/pkg/config" @@ -10,9 +13,14 @@ import ( "github.com/datastax/zdm-proxy/proxy/pkg/httpzdmproxy" "github.com/datastax/zdm-proxy/proxy/pkg/metrics" "github.com/datastax/zdm-proxy/proxy/pkg/runner" + "github.com/datastax/zdm-proxy/proxy/pkg/zdmproxy" + "github.com/jpillora/backoff" + log "github.com/sirupsen/logrus" "github.com/stretchr/testify/require" "net/http" + "strings" "sync" + "sync/atomic" "testing" "time" ) @@ -42,6 +50,10 @@ func TestWithHttpHandlers(t *testing.T) { t.Run("testHttpEndpointsWithUnavailableNode", func(t *testing.T) { testHttpEndpointsWithUnavailableNode(t, metricsHandler, readinessHandler) }) + + t.Run("testMetricsWithUnavailableNode", func(t *testing.T) { + testMetricsWithUnavailableNode(t, metricsHandler) + }) } func testHttpEndpointsWithProxyNotInitialized( @@ -137,6 +149,86 @@ func testHttpEndpointsWithProxyInitialized( require.Equal(t, health.UP, report.Status) } +func testMetricsWithUnavailableNode( + t *testing.T, metricsHandler *httpzdmproxy.HandlerWithFallback) { + + simulacronSetup, err := setup.NewSimulacronTestSetupWithSession(t, false, false) + require.Nil(t, err) + defer simulacronSetup.Cleanup() + + conf := setup.NewTestConfig(simulacronSetup.Origin.GetInitialContactPoint(), simulacronSetup.Target.GetInitialContactPoint()) + modifyConfForHealthTests(conf, 2) + + waitGroup := &sync.WaitGroup{} + ctx, cancelFunc := context.WithCancel(context.Background()) + + defer waitGroup.Wait() + defer cancelFunc() + + srv := httpzdmproxy.StartHttpServer(fmt.Sprintf("%s:%d", conf.MetricsAddress, conf.MetricsPort), waitGroup) + defer func(srv *http.Server, ctx context.Context) { + err := srv.Shutdown(ctx) + if err != nil { + log.Error("Failed to shutdown metrics server:", err.Error()) + } + }(srv, ctx) + + b := &backoff.Backoff{ + Factor: 2, + Jitter: false, + Min: 100 * time.Millisecond, + Max: 500 * time.Millisecond, + } + proxy := atomic.Value{} + waitGroup.Add(1) + go func() { + defer waitGroup.Done() + p, err := zdmproxy.RunWithRetries(conf, ctx, b) + if err == nil { + metricsHandler.SetHandler(p.GetMetricHandler().GetHttpHandler()) + proxy.Store(&p) + <-ctx.Done() + p.Shutdown() + } + }() + + httpAddr := fmt.Sprintf("%s:%d", conf.MetricsAddress, conf.MetricsPort) + + // check that metrics endpoint has been initialized + utils.RequireWithRetries(t, func() (err error, fatal bool) { + fatal = false + err = utils.CheckMetricsEndpointResult(httpAddr, true) + return + }, 10, 100*time.Millisecond) + + // stop origin cluster + err = simulacronSetup.Origin.DisableConnectionListener() + require.Nil(t, err, "failed to disable origin connection listener: %v", err) + err = simulacronSetup.Origin.DropAllConnections() + require.Nil(t, err, "failed to drop origin connections: %v", err) + + // send a request + testClient, err := client.NewTestClient(context.Background(), "127.0.0.1:14002") + require.Nil(t, err) + queryMsg := &message.Query{ + Query: "SELECT * FROM table1", + } + _, _, _ = testClient.SendMessage(context.Background(), primitive.ProtocolVersion4, queryMsg) + + utils.RequireWithRetries(t, func() (err error, fatal bool) { + // expect connection failure to origin cluster + statusCode, rspStr, err := utils.GetMetrics(httpAddr) + require.Nil(t, err) + require.Equal(t, http.StatusOK, statusCode) + if !strings.Contains(rspStr, fmt.Sprintf("%v 1", getPrometheusName("zdm", metrics.FailedConnectionsOrigin))) { + err = fmt.Errorf("did not observe failed connection attempts") + } else { + err = nil + } + return + }, 10, 500*time.Millisecond) +} + func testHttpEndpointsWithUnavailableNode( t *testing.T, metricsHandler *httpzdmproxy.HandlerWithFallback, healthHandler *httpzdmproxy.HandlerWithFallback) { diff --git a/proxy/pkg/metrics/proxy_metrics.go b/proxy/pkg/metrics/proxy_metrics.go index 9c3eb5f1..ee13ac9a 100644 --- a/proxy/pkg/metrics/proxy_metrics.go +++ b/proxy/pkg/metrics/proxy_metrics.go @@ -10,6 +10,10 @@ const ( failedRequestsClusterTarget = "target" failedRequestsClusterBoth = "both" + failedConnectionsName = "proxy_failed_connections_total" + failedConnectionsDescription = "Running total of failed requests due to inability to connect to given cluster" + failedConnectionsClusterLabel = "cluster" + failedReadsName = "proxy_failed_reads_total" failedReadsDescription = "Running total of failed reads" failedReadsClusterLabel = "cluster" @@ -28,6 +32,20 @@ const ( ) var ( + FailedConnectionsOrigin = NewMetricWithLabels( + failedConnectionsName, + failedConnectionsDescription, + map[string]string{ + failedConnectionsClusterLabel: failedRequestsClusterOrigin, + }, + ) + FailedConnectionsTarget = NewMetricWithLabels( + failedConnectionsName, + failedConnectionsDescription, + map[string]string{ + failedConnectionsClusterLabel: failedRequestsClusterTarget, + }, + ) FailedReadsOrigin = NewMetricWithLabels( failedReadsName, failedReadsDescription, @@ -124,11 +142,13 @@ var ( ) type ProxyMetrics struct { - FailedReadsOrigin Counter - FailedReadsTarget Counter - FailedWritesOnOrigin Counter - FailedWritesOnTarget Counter - FailedWritesOnBoth Counter + FailedConnectionsOrigin Counter + FailedConnectionsTarget Counter + FailedReadsOrigin Counter + FailedReadsTarget Counter + FailedWritesOnOrigin Counter + FailedWritesOnTarget Counter + FailedWritesOnBoth Counter PSCacheSize GaugeFunc PSCacheMissCount Counter diff --git a/proxy/pkg/zdmproxy/clienthandler.go b/proxy/pkg/zdmproxy/clienthandler.go index 066acf0a..43af28fb 100644 --- a/proxy/pkg/zdmproxy/clienthandler.go +++ b/proxy/pkg/zdmproxy/clienthandler.go @@ -209,6 +209,7 @@ func NewClientHandler( clientHandlerContext, clientHandlerCancelFunc, respChannel, readScheduler, writeScheduler, requestsDoneCtx, false, nil, handshakeDone, originFrameProcessor, originCCProtoVer) if err != nil { + metricHandler.GetProxyMetrics().FailedConnectionsOrigin.Add(1) clientHandlerCancelFunc() return nil, err } @@ -218,6 +219,7 @@ func NewClientHandler( clientHandlerContext, clientHandlerCancelFunc, respChannel, readScheduler, writeScheduler, requestsDoneCtx, false, nil, handshakeDone, targetFrameProcessor, targetCCProtoVer) if err != nil { + metricHandler.GetProxyMetrics().FailedConnectionsTarget.Add(1) clientHandlerCancelFunc() return nil, err } diff --git a/proxy/pkg/zdmproxy/proxy.go b/proxy/pkg/zdmproxy/proxy.go index f1ba5afc..419e086c 100644 --- a/proxy/pkg/zdmproxy/proxy.go +++ b/proxy/pkg/zdmproxy/proxy.go @@ -687,6 +687,16 @@ func sleepWithContext(d time.Duration, ctx context.Context, reconnectCh chan boo } func (p *ZdmProxy) CreateProxyMetrics(metricFactory metrics.MetricFactory) (*metrics.ProxyMetrics, error) { + failedConnectionsOrigin, err := metricFactory.GetOrCreateCounter(metrics.FailedConnectionsOrigin) + if err != nil { + return nil, err + } + + failedConnectionsTarget, err := metricFactory.GetOrCreateCounter(metrics.FailedConnectionsTarget) + if err != nil { + return nil, err + } + failedReadsOrigin, err := metricFactory.GetOrCreateCounter(metrics.FailedReadsOrigin) if err != nil { return nil, err @@ -760,6 +770,8 @@ func (p *ZdmProxy) CreateProxyMetrics(metricFactory metrics.MetricFactory) (*met } proxyMetrics := &metrics.ProxyMetrics{ + FailedConnectionsOrigin: failedConnectionsOrigin, + FailedConnectionsTarget: failedConnectionsTarget, FailedReadsOrigin: failedReadsOrigin, FailedReadsTarget: failedReadsTarget, FailedWritesOnOrigin: failedWritesOnOrigin, From d3e4925dddba4a939dd40d0e4b6e84d3aeb1c996 Mon Sep 17 00:00:00 2001 From: Lukasz Antoniak Date: Tue, 26 Nov 2024 11:09:06 +0100 Subject: [PATCH 2/2] Add node metric representing failed connection attempts to downstream clusters --- integration-tests/runner_test.go | 8 +++++++- proxy/pkg/metrics/node_metrics.go | 15 ++++++++++++++- proxy/pkg/zdmproxy/clusterconn.go | 13 +++++++++---- proxy/pkg/zdmproxy/proxy.go | 18 ++++++++++++++++++ 4 files changed, 48 insertions(+), 6 deletions(-) diff --git a/integration-tests/runner_test.go b/integration-tests/runner_test.go index 947ea7f9..0cdbb5d6 100644 --- a/integration-tests/runner_test.go +++ b/integration-tests/runner_test.go @@ -220,8 +220,14 @@ func testMetricsWithUnavailableNode( statusCode, rspStr, err := utils.GetMetrics(httpAddr) require.Nil(t, err) require.Equal(t, http.StatusOK, statusCode) + originEndpoint := fmt.Sprintf("%v:9042", simulacronSetup.Origin.GetInitialContactPoint()) + // search for: + // zdm_proxy_failed_connections_total{cluster="origin"} 1 + // zdm_origin_failed_connections_total{node="127.0.0.40:9042"} 1 if !strings.Contains(rspStr, fmt.Sprintf("%v 1", getPrometheusName("zdm", metrics.FailedConnectionsOrigin))) { - err = fmt.Errorf("did not observe failed connection attempts") + err = fmt.Errorf("did not observe failed connection attempts at proxy metric") + } else if !strings.Contains(rspStr, fmt.Sprintf("%v 1", getPrometheusNameWithNodeLabel("zdm", metrics.FailedOriginConnections, originEndpoint))) { + err = fmt.Errorf("did not observe failed connection attempts at node metric") } else { err = nil } diff --git a/proxy/pkg/metrics/node_metrics.go b/proxy/pkg/metrics/node_metrics.go index 7bcf3fde..968a558b 100644 --- a/proxy/pkg/metrics/node_metrics.go +++ b/proxy/pkg/metrics/node_metrics.go @@ -236,14 +236,26 @@ var ( "origin_connections_total", "Number of connections to Origin Cassandra currently open", ) + FailedOriginConnections = NewMetric( + "origin_failed_connections_total", + "Number of failed connection attempts to Origin Cassandra", + ) OpenTargetConnections = NewMetric( "target_connections_total", "Number of connections to Target Cassandra currently open", ) + FailedTargetConnections = NewMetric( + "target_failed_connections_total", + "Number of failed connection attempts to Target Cassandra", + ) OpenAsyncConnections = NewMetric( "async_connections_total", "Number of connections currently open for async requests", ) + FailedAsyncConnections = NewMetric( + "async_failed_connections_total", + "Number of failed connection attempts for async requests", + ) InFlightRequestsAsync = NewMetric( "async_inflight_requests_total", @@ -283,7 +295,8 @@ type NodeMetricsInstance struct { ReadDurations Histogram WriteDurations Histogram - OpenConnections Gauge + OpenConnections Gauge + FailedConnections Counter InFlightRequests Gauge diff --git a/proxy/pkg/zdmproxy/clusterconn.go b/proxy/pkg/zdmproxy/clusterconn.go index deeeaa45..ddc2ef67 100644 --- a/proxy/pkg/zdmproxy/clusterconn.go +++ b/proxy/pkg/zdmproxy/clusterconn.go @@ -196,15 +196,20 @@ func (cc *ClusterConnector) run() { func openConnectionToCluster(connInfo *ClusterConnectionInfo, context context.Context, connectorType ClusterConnectorType, nodeMetrics *metrics.NodeMetrics) (net.Conn, context.Context, error) { clusterType := connInfo.connConfig.GetClusterType() log.Infof("[%s] Opening request connection to %v (%v).", connectorType, clusterType, connInfo.endpoint.GetEndpointIdentifier()) + nodeMetricsInstance, err := GetNodeMetricsByClusterConnector(nodeMetrics, connectorType) + if err != nil { + log.Errorf("Failed to track open connection metrics for endpoint %v: %v.", connInfo.endpoint.GetEndpointIdentifier(), err) + } + conn, timeoutCtx, err := openConnection(connInfo.connConfig, connInfo.endpoint, context, true) if err != nil { + if nodeMetricsInstance != nil { + nodeMetricsInstance.FailedConnections.Add(1) + } return nil, timeoutCtx, err } - nodeMetricsInstance, err := GetNodeMetricsByClusterConnector(nodeMetrics, connectorType) - if err != nil { - log.Errorf("Failed to track open connection metrics for conn %v: %v.", conn.RemoteAddr().String(), err) - } else { + if nodeMetricsInstance != nil { nodeMetricsInstance.OpenConnections.Add(1) } diff --git a/proxy/pkg/zdmproxy/proxy.go b/proxy/pkg/zdmproxy/proxy.go index 419e086c..027d698a 100644 --- a/proxy/pkg/zdmproxy/proxy.go +++ b/proxy/pkg/zdmproxy/proxy.go @@ -853,6 +853,11 @@ func (p *ZdmProxy) CreateOriginNodeMetrics( return nil, err } + failedOriginConnections, err := metrics.CreateCounterNodeMetric(metricFactory, originNodeDescription, metrics.FailedOriginConnections) + if err != nil { + return nil, err + } + // inflight requests metric for non async requests are implemented as proxy level metrics (not node metrics) inflightRequests, err := noopmetrics.NewNoopMetricFactory().GetOrCreateGauge(nil) if err != nil { @@ -877,6 +882,7 @@ func (p *ZdmProxy) CreateOriginNodeMetrics( ReadDurations: originReadRequestDuration, WriteDurations: originWriteRequestDuration, OpenConnections: openOriginConnections, + FailedConnections: failedOriginConnections, InFlightRequests: inflightRequests, UsedStreamIds: originUsedStreamIds, }, nil @@ -944,6 +950,11 @@ func (p *ZdmProxy) CreateAsyncNodeMetrics( return nil, err } + failedAsyncConnections, err := metrics.CreateCounterNodeMetric(metricFactory, asyncNodeDescription, metrics.FailedAsyncConnections) + if err != nil { + return nil, err + } + inflightRequestsAsync, err := metrics.CreateGaugeNodeMetric(metricFactory, asyncNodeDescription, metrics.InFlightRequestsAsync) if err != nil { return nil, err @@ -967,6 +978,7 @@ func (p *ZdmProxy) CreateAsyncNodeMetrics( ReadDurations: asyncReadRequestDuration, WriteDurations: asyncWriteRequestDuration, OpenConnections: openAsyncConnections, + FailedConnections: failedAsyncConnections, InFlightRequests: inflightRequestsAsync, UsedStreamIds: asyncUsedStreamIds, }, nil @@ -1034,6 +1046,11 @@ func (p *ZdmProxy) CreateTargetNodeMetrics( return nil, err } + failedTargetConnections, err := metrics.CreateCounterNodeMetric(metricFactory, targetNodeDescription, metrics.FailedTargetConnections) + if err != nil { + return nil, err + } + // inflight requests metric for non async requests are implemented as proxy level metrics (not node metrics) inflightRequests, err := noopmetrics.NewNoopMetricFactory().GetOrCreateGauge(nil) if err != nil { @@ -1058,6 +1075,7 @@ func (p *ZdmProxy) CreateTargetNodeMetrics( ReadDurations: targetReadRequestDuration, WriteDurations: targetWriteRequestDuration, OpenConnections: openTargetConnections, + FailedConnections: failedTargetConnections, InFlightRequests: inflightRequests, UsedStreamIds: targetUsedStreamIds, }, nil