diff --git a/integration-tests/metrics_test.go b/integration-tests/metrics_test.go index 32b7223..3da0698 100644 --- a/integration-tests/metrics_test.go +++ b/integration-tests/metrics_test.go @@ -283,6 +283,8 @@ func checkMetrics( require.Contains(t, lines, fmt.Sprintf("%v 0", getPrometheusName(prefix, metrics.FailedWritesOnTarget))) require.Contains(t, lines, fmt.Sprintf("%v 0", getPrometheusName(prefix, metrics.FailedReadsTarget))) require.Contains(t, lines, fmt.Sprintf("%v 0", getPrometheusName(prefix, metrics.FailedReadsOrigin))) + require.Contains(t, lines, fmt.Sprintf("%v 0", getPrometheusName(prefix, metrics.FailedConnectionsOrigin))) + require.Contains(t, lines, fmt.Sprintf("%v 0", getPrometheusName(prefix, metrics.FailedConnectionsTarget))) require.Contains(t, lines, fmt.Sprintf("%v 0", getPrometheusName(prefix, metrics.InFlightWrites))) require.Contains(t, lines, fmt.Sprintf("%v 0", getPrometheusName(prefix, metrics.InFlightReadsOrigin))) diff --git a/integration-tests/runner_test.go b/integration-tests/runner_test.go index ff4b23f..947ea7f 100644 --- a/integration-tests/runner_test.go +++ b/integration-tests/runner_test.go @@ -3,6 +3,9 @@ package integration_tests import ( "context" "fmt" + "github.com/datastax/go-cassandra-native-protocol/message" + "github.com/datastax/go-cassandra-native-protocol/primitive" + "github.com/datastax/zdm-proxy/integration-tests/client" "github.com/datastax/zdm-proxy/integration-tests/setup" "github.com/datastax/zdm-proxy/integration-tests/utils" "github.com/datastax/zdm-proxy/proxy/pkg/config" @@ -10,9 +13,14 @@ import ( "github.com/datastax/zdm-proxy/proxy/pkg/httpzdmproxy" "github.com/datastax/zdm-proxy/proxy/pkg/metrics" "github.com/datastax/zdm-proxy/proxy/pkg/runner" + "github.com/datastax/zdm-proxy/proxy/pkg/zdmproxy" + "github.com/jpillora/backoff" + log "github.com/sirupsen/logrus" "github.com/stretchr/testify/require" "net/http" + "strings" "sync" + "sync/atomic" "testing" "time" ) @@ -42,6 +50,10 @@ func TestWithHttpHandlers(t *testing.T) { t.Run("testHttpEndpointsWithUnavailableNode", func(t *testing.T) { testHttpEndpointsWithUnavailableNode(t, metricsHandler, readinessHandler) }) + + t.Run("testMetricsWithUnavailableNode", func(t *testing.T) { + testMetricsWithUnavailableNode(t, metricsHandler) + }) } func testHttpEndpointsWithProxyNotInitialized( @@ -137,6 +149,86 @@ func testHttpEndpointsWithProxyInitialized( require.Equal(t, health.UP, report.Status) } +func testMetricsWithUnavailableNode( + t *testing.T, metricsHandler *httpzdmproxy.HandlerWithFallback) { + + simulacronSetup, err := setup.NewSimulacronTestSetupWithSession(t, false, false) + require.Nil(t, err) + defer simulacronSetup.Cleanup() + + conf := setup.NewTestConfig(simulacronSetup.Origin.GetInitialContactPoint(), simulacronSetup.Target.GetInitialContactPoint()) + modifyConfForHealthTests(conf, 2) + + waitGroup := &sync.WaitGroup{} + ctx, cancelFunc := context.WithCancel(context.Background()) + + defer waitGroup.Wait() + defer cancelFunc() + + srv := httpzdmproxy.StartHttpServer(fmt.Sprintf("%s:%d", conf.MetricsAddress, conf.MetricsPort), waitGroup) + defer func(srv *http.Server, ctx context.Context) { + err := srv.Shutdown(ctx) + if err != nil { + log.Error("Failed to shutdown metrics server:", err.Error()) + } + }(srv, ctx) + + b := &backoff.Backoff{ + Factor: 2, + Jitter: false, + Min: 100 * time.Millisecond, + Max: 500 * time.Millisecond, + } + proxy := atomic.Value{} + waitGroup.Add(1) + go func() { + defer waitGroup.Done() + p, err := zdmproxy.RunWithRetries(conf, ctx, b) + if err == nil { + metricsHandler.SetHandler(p.GetMetricHandler().GetHttpHandler()) + proxy.Store(&p) + <-ctx.Done() + p.Shutdown() + } + }() + + httpAddr := fmt.Sprintf("%s:%d", conf.MetricsAddress, conf.MetricsPort) + + // check that metrics endpoint has been initialized + utils.RequireWithRetries(t, func() (err error, fatal bool) { + fatal = false + err = utils.CheckMetricsEndpointResult(httpAddr, true) + return + }, 10, 100*time.Millisecond) + + // stop origin cluster + err = simulacronSetup.Origin.DisableConnectionListener() + require.Nil(t, err, "failed to disable origin connection listener: %v", err) + err = simulacronSetup.Origin.DropAllConnections() + require.Nil(t, err, "failed to drop origin connections: %v", err) + + // send a request + testClient, err := client.NewTestClient(context.Background(), "127.0.0.1:14002") + require.Nil(t, err) + queryMsg := &message.Query{ + Query: "SELECT * FROM table1", + } + _, _, _ = testClient.SendMessage(context.Background(), primitive.ProtocolVersion4, queryMsg) + + utils.RequireWithRetries(t, func() (err error, fatal bool) { + // expect connection failure to origin cluster + statusCode, rspStr, err := utils.GetMetrics(httpAddr) + require.Nil(t, err) + require.Equal(t, http.StatusOK, statusCode) + if !strings.Contains(rspStr, fmt.Sprintf("%v 1", getPrometheusName("zdm", metrics.FailedConnectionsOrigin))) { + err = fmt.Errorf("did not observe failed connection attempts") + } else { + err = nil + } + return + }, 10, 500*time.Millisecond) +} + func testHttpEndpointsWithUnavailableNode( t *testing.T, metricsHandler *httpzdmproxy.HandlerWithFallback, healthHandler *httpzdmproxy.HandlerWithFallback) { diff --git a/proxy/pkg/metrics/proxy_metrics.go b/proxy/pkg/metrics/proxy_metrics.go index 9c3eb5f..ee13ac9 100644 --- a/proxy/pkg/metrics/proxy_metrics.go +++ b/proxy/pkg/metrics/proxy_metrics.go @@ -10,6 +10,10 @@ const ( failedRequestsClusterTarget = "target" failedRequestsClusterBoth = "both" + failedConnectionsName = "proxy_failed_connections_total" + failedConnectionsDescription = "Running total of failed requests due to inability to connect to given cluster" + failedConnectionsClusterLabel = "cluster" + failedReadsName = "proxy_failed_reads_total" failedReadsDescription = "Running total of failed reads" failedReadsClusterLabel = "cluster" @@ -28,6 +32,20 @@ const ( ) var ( + FailedConnectionsOrigin = NewMetricWithLabels( + failedConnectionsName, + failedConnectionsDescription, + map[string]string{ + failedConnectionsClusterLabel: failedRequestsClusterOrigin, + }, + ) + FailedConnectionsTarget = NewMetricWithLabels( + failedConnectionsName, + failedConnectionsDescription, + map[string]string{ + failedConnectionsClusterLabel: failedRequestsClusterTarget, + }, + ) FailedReadsOrigin = NewMetricWithLabels( failedReadsName, failedReadsDescription, @@ -124,11 +142,13 @@ var ( ) type ProxyMetrics struct { - FailedReadsOrigin Counter - FailedReadsTarget Counter - FailedWritesOnOrigin Counter - FailedWritesOnTarget Counter - FailedWritesOnBoth Counter + FailedConnectionsOrigin Counter + FailedConnectionsTarget Counter + FailedReadsOrigin Counter + FailedReadsTarget Counter + FailedWritesOnOrigin Counter + FailedWritesOnTarget Counter + FailedWritesOnBoth Counter PSCacheSize GaugeFunc PSCacheMissCount Counter diff --git a/proxy/pkg/zdmproxy/clienthandler.go b/proxy/pkg/zdmproxy/clienthandler.go index 066acf0..43af28f 100644 --- a/proxy/pkg/zdmproxy/clienthandler.go +++ b/proxy/pkg/zdmproxy/clienthandler.go @@ -209,6 +209,7 @@ func NewClientHandler( clientHandlerContext, clientHandlerCancelFunc, respChannel, readScheduler, writeScheduler, requestsDoneCtx, false, nil, handshakeDone, originFrameProcessor, originCCProtoVer) if err != nil { + metricHandler.GetProxyMetrics().FailedConnectionsOrigin.Add(1) clientHandlerCancelFunc() return nil, err } @@ -218,6 +219,7 @@ func NewClientHandler( clientHandlerContext, clientHandlerCancelFunc, respChannel, readScheduler, writeScheduler, requestsDoneCtx, false, nil, handshakeDone, targetFrameProcessor, targetCCProtoVer) if err != nil { + metricHandler.GetProxyMetrics().FailedConnectionsTarget.Add(1) clientHandlerCancelFunc() return nil, err } diff --git a/proxy/pkg/zdmproxy/proxy.go b/proxy/pkg/zdmproxy/proxy.go index f1ba5af..419e086 100644 --- a/proxy/pkg/zdmproxy/proxy.go +++ b/proxy/pkg/zdmproxy/proxy.go @@ -687,6 +687,16 @@ func sleepWithContext(d time.Duration, ctx context.Context, reconnectCh chan boo } func (p *ZdmProxy) CreateProxyMetrics(metricFactory metrics.MetricFactory) (*metrics.ProxyMetrics, error) { + failedConnectionsOrigin, err := metricFactory.GetOrCreateCounter(metrics.FailedConnectionsOrigin) + if err != nil { + return nil, err + } + + failedConnectionsTarget, err := metricFactory.GetOrCreateCounter(metrics.FailedConnectionsTarget) + if err != nil { + return nil, err + } + failedReadsOrigin, err := metricFactory.GetOrCreateCounter(metrics.FailedReadsOrigin) if err != nil { return nil, err @@ -760,6 +770,8 @@ func (p *ZdmProxy) CreateProxyMetrics(metricFactory metrics.MetricFactory) (*met } proxyMetrics := &metrics.ProxyMetrics{ + FailedConnectionsOrigin: failedConnectionsOrigin, + FailedConnectionsTarget: failedConnectionsTarget, FailedReadsOrigin: failedReadsOrigin, FailedReadsTarget: failedReadsTarget, FailedWritesOnOrigin: failedWritesOnOrigin,