Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add proxy metric representing failed connection attempts #136

Merged
merged 2 commits into from
Nov 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions integration-tests/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,8 @@ func checkMetrics(
require.Contains(t, lines, fmt.Sprintf("%v 0", getPrometheusName(prefix, metrics.FailedWritesOnTarget)))
require.Contains(t, lines, fmt.Sprintf("%v 0", getPrometheusName(prefix, metrics.FailedReadsTarget)))
require.Contains(t, lines, fmt.Sprintf("%v 0", getPrometheusName(prefix, metrics.FailedReadsOrigin)))
require.Contains(t, lines, fmt.Sprintf("%v 0", getPrometheusName(prefix, metrics.FailedConnectionsOrigin)))
require.Contains(t, lines, fmt.Sprintf("%v 0", getPrometheusName(prefix, metrics.FailedConnectionsTarget)))

require.Contains(t, lines, fmt.Sprintf("%v 0", getPrometheusName(prefix, metrics.InFlightWrites)))
require.Contains(t, lines, fmt.Sprintf("%v 0", getPrometheusName(prefix, metrics.InFlightReadsOrigin)))
Expand Down
98 changes: 98 additions & 0 deletions integration-tests/runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,24 @@ package integration_tests
import (
"context"
"fmt"
"github.com/datastax/go-cassandra-native-protocol/message"
"github.com/datastax/go-cassandra-native-protocol/primitive"
"github.com/datastax/zdm-proxy/integration-tests/client"
"github.com/datastax/zdm-proxy/integration-tests/setup"
"github.com/datastax/zdm-proxy/integration-tests/utils"
"github.com/datastax/zdm-proxy/proxy/pkg/config"
"github.com/datastax/zdm-proxy/proxy/pkg/health"
"github.com/datastax/zdm-proxy/proxy/pkg/httpzdmproxy"
"github.com/datastax/zdm-proxy/proxy/pkg/metrics"
"github.com/datastax/zdm-proxy/proxy/pkg/runner"
"github.com/datastax/zdm-proxy/proxy/pkg/zdmproxy"
"github.com/jpillora/backoff"
log "github.com/sirupsen/logrus"
"github.com/stretchr/testify/require"
"net/http"
"strings"
"sync"
"sync/atomic"
"testing"
"time"
)
Expand Down Expand Up @@ -42,6 +50,10 @@ func TestWithHttpHandlers(t *testing.T) {
t.Run("testHttpEndpointsWithUnavailableNode", func(t *testing.T) {
testHttpEndpointsWithUnavailableNode(t, metricsHandler, readinessHandler)
})

t.Run("testMetricsWithUnavailableNode", func(t *testing.T) {
testMetricsWithUnavailableNode(t, metricsHandler)
})
}

func testHttpEndpointsWithProxyNotInitialized(
Expand Down Expand Up @@ -137,6 +149,92 @@ func testHttpEndpointsWithProxyInitialized(
require.Equal(t, health.UP, report.Status)
}

func testMetricsWithUnavailableNode(
t *testing.T, metricsHandler *httpzdmproxy.HandlerWithFallback) {

simulacronSetup, err := setup.NewSimulacronTestSetupWithSession(t, false, false)
require.Nil(t, err)
defer simulacronSetup.Cleanup()

conf := setup.NewTestConfig(simulacronSetup.Origin.GetInitialContactPoint(), simulacronSetup.Target.GetInitialContactPoint())
modifyConfForHealthTests(conf, 2)

waitGroup := &sync.WaitGroup{}
ctx, cancelFunc := context.WithCancel(context.Background())

defer waitGroup.Wait()
defer cancelFunc()

srv := httpzdmproxy.StartHttpServer(fmt.Sprintf("%s:%d", conf.MetricsAddress, conf.MetricsPort), waitGroup)
defer func(srv *http.Server, ctx context.Context) {
err := srv.Shutdown(ctx)
if err != nil {
log.Error("Failed to shutdown metrics server:", err.Error())
}
}(srv, ctx)

b := &backoff.Backoff{
Factor: 2,
Jitter: false,
Min: 100 * time.Millisecond,
Max: 500 * time.Millisecond,
}
proxy := atomic.Value{}
waitGroup.Add(1)
go func() {
defer waitGroup.Done()
p, err := zdmproxy.RunWithRetries(conf, ctx, b)
if err == nil {
metricsHandler.SetHandler(p.GetMetricHandler().GetHttpHandler())
proxy.Store(&p)
<-ctx.Done()
p.Shutdown()
}
}()

httpAddr := fmt.Sprintf("%s:%d", conf.MetricsAddress, conf.MetricsPort)

// check that metrics endpoint has been initialized
utils.RequireWithRetries(t, func() (err error, fatal bool) {
fatal = false
err = utils.CheckMetricsEndpointResult(httpAddr, true)
return
}, 10, 100*time.Millisecond)

// stop origin cluster
err = simulacronSetup.Origin.DisableConnectionListener()
require.Nil(t, err, "failed to disable origin connection listener: %v", err)
err = simulacronSetup.Origin.DropAllConnections()
require.Nil(t, err, "failed to drop origin connections: %v", err)

// send a request
testClient, err := client.NewTestClient(context.Background(), "127.0.0.1:14002")
require.Nil(t, err)
queryMsg := &message.Query{
Query: "SELECT * FROM table1",
}
_, _, _ = testClient.SendMessage(context.Background(), primitive.ProtocolVersion4, queryMsg)

utils.RequireWithRetries(t, func() (err error, fatal bool) {
// expect connection failure to origin cluster
statusCode, rspStr, err := utils.GetMetrics(httpAddr)
require.Nil(t, err)
require.Equal(t, http.StatusOK, statusCode)
originEndpoint := fmt.Sprintf("%v:9042", simulacronSetup.Origin.GetInitialContactPoint())
// search for:
// zdm_proxy_failed_connections_total{cluster="origin"} 1
// zdm_origin_failed_connections_total{node="127.0.0.40:9042"} 1
if !strings.Contains(rspStr, fmt.Sprintf("%v 1", getPrometheusName("zdm", metrics.FailedConnectionsOrigin))) {
err = fmt.Errorf("did not observe failed connection attempts at proxy metric")
} else if !strings.Contains(rspStr, fmt.Sprintf("%v 1", getPrometheusNameWithNodeLabel("zdm", metrics.FailedOriginConnections, originEndpoint))) {
err = fmt.Errorf("did not observe failed connection attempts at node metric")
} else {
err = nil
}
return
}, 10, 500*time.Millisecond)
}

func testHttpEndpointsWithUnavailableNode(
t *testing.T, metricsHandler *httpzdmproxy.HandlerWithFallback, healthHandler *httpzdmproxy.HandlerWithFallback) {

Expand Down
15 changes: 14 additions & 1 deletion proxy/pkg/metrics/node_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,14 +236,26 @@ var (
"origin_connections_total",
"Number of connections to Origin Cassandra currently open",
)
FailedOriginConnections = NewMetric(
"origin_failed_connections_total",
"Number of failed connection attempts to Origin Cassandra",
)
OpenTargetConnections = NewMetric(
"target_connections_total",
"Number of connections to Target Cassandra currently open",
)
FailedTargetConnections = NewMetric(
"target_failed_connections_total",
"Number of failed connection attempts to Target Cassandra",
)
OpenAsyncConnections = NewMetric(
"async_connections_total",
"Number of connections currently open for async requests",
)
FailedAsyncConnections = NewMetric(
"async_failed_connections_total",
"Number of failed connection attempts for async requests",
)

InFlightRequestsAsync = NewMetric(
"async_inflight_requests_total",
Expand Down Expand Up @@ -283,7 +295,8 @@ type NodeMetricsInstance struct {
ReadDurations Histogram
WriteDurations Histogram

OpenConnections Gauge
OpenConnections Gauge
FailedConnections Counter

InFlightRequests Gauge

Expand Down
30 changes: 25 additions & 5 deletions proxy/pkg/metrics/proxy_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ const (
failedRequestsClusterTarget = "target"
failedRequestsClusterBoth = "both"

failedConnectionsName = "proxy_failed_connections_total"
failedConnectionsDescription = "Running total of failed requests due to inability to connect to given cluster"
failedConnectionsClusterLabel = "cluster"

failedReadsName = "proxy_failed_reads_total"
failedReadsDescription = "Running total of failed reads"
failedReadsClusterLabel = "cluster"
Expand All @@ -28,6 +32,20 @@ const (
)

var (
FailedConnectionsOrigin = NewMetricWithLabels(
failedConnectionsName,
failedConnectionsDescription,
map[string]string{
failedConnectionsClusterLabel: failedRequestsClusterOrigin,
},
)
FailedConnectionsTarget = NewMetricWithLabels(
failedConnectionsName,
failedConnectionsDescription,
map[string]string{
failedConnectionsClusterLabel: failedRequestsClusterTarget,
},
)
FailedReadsOrigin = NewMetricWithLabels(
failedReadsName,
failedReadsDescription,
Expand Down Expand Up @@ -124,11 +142,13 @@ var (
)

type ProxyMetrics struct {
FailedReadsOrigin Counter
FailedReadsTarget Counter
FailedWritesOnOrigin Counter
FailedWritesOnTarget Counter
FailedWritesOnBoth Counter
FailedConnectionsOrigin Counter
FailedConnectionsTarget Counter
FailedReadsOrigin Counter
FailedReadsTarget Counter
FailedWritesOnOrigin Counter
FailedWritesOnTarget Counter
FailedWritesOnBoth Counter

PSCacheSize GaugeFunc
PSCacheMissCount Counter
Expand Down
2 changes: 2 additions & 0 deletions proxy/pkg/zdmproxy/clienthandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ func NewClientHandler(
clientHandlerContext, clientHandlerCancelFunc, respChannel, readScheduler, writeScheduler, requestsDoneCtx,
false, nil, handshakeDone, originFrameProcessor, originCCProtoVer)
if err != nil {
metricHandler.GetProxyMetrics().FailedConnectionsOrigin.Add(1)
clientHandlerCancelFunc()
return nil, err
}
Expand All @@ -218,6 +219,7 @@ func NewClientHandler(
clientHandlerContext, clientHandlerCancelFunc, respChannel, readScheduler, writeScheduler, requestsDoneCtx,
false, nil, handshakeDone, targetFrameProcessor, targetCCProtoVer)
if err != nil {
metricHandler.GetProxyMetrics().FailedConnectionsTarget.Add(1)
clientHandlerCancelFunc()
return nil, err
}
Expand Down
13 changes: 9 additions & 4 deletions proxy/pkg/zdmproxy/clusterconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,15 +196,20 @@ func (cc *ClusterConnector) run() {
func openConnectionToCluster(connInfo *ClusterConnectionInfo, context context.Context, connectorType ClusterConnectorType, nodeMetrics *metrics.NodeMetrics) (net.Conn, context.Context, error) {
clusterType := connInfo.connConfig.GetClusterType()
log.Infof("[%s] Opening request connection to %v (%v).", connectorType, clusterType, connInfo.endpoint.GetEndpointIdentifier())
nodeMetricsInstance, err := GetNodeMetricsByClusterConnector(nodeMetrics, connectorType)
if err != nil {
log.Errorf("Failed to track open connection metrics for endpoint %v: %v.", connInfo.endpoint.GetEndpointIdentifier(), err)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should probably be a warning

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was an error, so let us leave it like this for time being.

}

conn, timeoutCtx, err := openConnection(connInfo.connConfig, connInfo.endpoint, context, true)
if err != nil {
if nodeMetricsInstance != nil {
nodeMetricsInstance.FailedConnections.Add(1)
}
return nil, timeoutCtx, err
}

nodeMetricsInstance, err := GetNodeMetricsByClusterConnector(nodeMetrics, connectorType)
if err != nil {
log.Errorf("Failed to track open connection metrics for conn %v: %v.", conn.RemoteAddr().String(), err)
} else {
if nodeMetricsInstance != nil {
nodeMetricsInstance.OpenConnections.Add(1)
}

Expand Down
30 changes: 30 additions & 0 deletions proxy/pkg/zdmproxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -687,6 +687,16 @@ func sleepWithContext(d time.Duration, ctx context.Context, reconnectCh chan boo
}

func (p *ZdmProxy) CreateProxyMetrics(metricFactory metrics.MetricFactory) (*metrics.ProxyMetrics, error) {
failedConnectionsOrigin, err := metricFactory.GetOrCreateCounter(metrics.FailedConnectionsOrigin)
if err != nil {
return nil, err
}

failedConnectionsTarget, err := metricFactory.GetOrCreateCounter(metrics.FailedConnectionsTarget)
if err != nil {
return nil, err
}

failedReadsOrigin, err := metricFactory.GetOrCreateCounter(metrics.FailedReadsOrigin)
if err != nil {
return nil, err
Expand Down Expand Up @@ -760,6 +770,8 @@ func (p *ZdmProxy) CreateProxyMetrics(metricFactory metrics.MetricFactory) (*met
}

proxyMetrics := &metrics.ProxyMetrics{
FailedConnectionsOrigin: failedConnectionsOrigin,
FailedConnectionsTarget: failedConnectionsTarget,
FailedReadsOrigin: failedReadsOrigin,
FailedReadsTarget: failedReadsTarget,
FailedWritesOnOrigin: failedWritesOnOrigin,
Expand Down Expand Up @@ -841,6 +853,11 @@ func (p *ZdmProxy) CreateOriginNodeMetrics(
return nil, err
}

failedOriginConnections, err := metrics.CreateCounterNodeMetric(metricFactory, originNodeDescription, metrics.FailedOriginConnections)
if err != nil {
return nil, err
}

// inflight requests metric for non async requests are implemented as proxy level metrics (not node metrics)
inflightRequests, err := noopmetrics.NewNoopMetricFactory().GetOrCreateGauge(nil)
if err != nil {
Expand All @@ -865,6 +882,7 @@ func (p *ZdmProxy) CreateOriginNodeMetrics(
ReadDurations: originReadRequestDuration,
WriteDurations: originWriteRequestDuration,
OpenConnections: openOriginConnections,
FailedConnections: failedOriginConnections,
InFlightRequests: inflightRequests,
UsedStreamIds: originUsedStreamIds,
}, nil
Expand Down Expand Up @@ -932,6 +950,11 @@ func (p *ZdmProxy) CreateAsyncNodeMetrics(
return nil, err
}

failedAsyncConnections, err := metrics.CreateCounterNodeMetric(metricFactory, asyncNodeDescription, metrics.FailedAsyncConnections)
if err != nil {
return nil, err
}

inflightRequestsAsync, err := metrics.CreateGaugeNodeMetric(metricFactory, asyncNodeDescription, metrics.InFlightRequestsAsync)
if err != nil {
return nil, err
Expand All @@ -955,6 +978,7 @@ func (p *ZdmProxy) CreateAsyncNodeMetrics(
ReadDurations: asyncReadRequestDuration,
WriteDurations: asyncWriteRequestDuration,
OpenConnections: openAsyncConnections,
FailedConnections: failedAsyncConnections,
InFlightRequests: inflightRequestsAsync,
UsedStreamIds: asyncUsedStreamIds,
}, nil
Expand Down Expand Up @@ -1022,6 +1046,11 @@ func (p *ZdmProxy) CreateTargetNodeMetrics(
return nil, err
}

failedTargetConnections, err := metrics.CreateCounterNodeMetric(metricFactory, targetNodeDescription, metrics.FailedTargetConnections)
if err != nil {
return nil, err
}

// inflight requests metric for non async requests are implemented as proxy level metrics (not node metrics)
inflightRequests, err := noopmetrics.NewNoopMetricFactory().GetOrCreateGauge(nil)
if err != nil {
Expand All @@ -1046,6 +1075,7 @@ func (p *ZdmProxy) CreateTargetNodeMetrics(
ReadDurations: targetReadRequestDuration,
WriteDurations: targetWriteRequestDuration,
OpenConnections: openTargetConnections,
FailedConnections: failedTargetConnections,
InFlightRequests: inflightRequests,
UsedStreamIds: targetUsedStreamIds,
}, nil
Expand Down
Loading