From 106f376663c904e438621052a3bbc2975256fef6 Mon Sep 17 00:00:00 2001 From: Michael Penick Date: Fri, 17 Dec 2021 15:07:52 -0500 Subject: [PATCH 1/5] Add support for retries --- proxy/proxy.go | 4 + proxy/proxy_retries_test.go | 227 ++++++++++++++++++++++++++++++++++++ proxy/proxy_test.go | 212 +++++++++++---------------------- proxy/request.go | 116 +++++++++++++++--- proxy/retrypolicy.go | 88 ++++++++++++++ proxy/retrypolicy_test.go | 96 +++++++++++++++ 6 files changed, 585 insertions(+), 158 deletions(-) create mode 100644 proxy/proxy_retries_test.go create mode 100644 proxy/retrypolicy.go create mode 100644 proxy/retrypolicy_test.go diff --git a/proxy/proxy.go b/proxy/proxy.go index f5513e4..355b619 100644 --- a/proxy/proxy.go +++ b/proxy/proxy.go @@ -48,6 +48,7 @@ type Config struct { Auth proxycore.Authenticator Resolver proxycore.EndpointResolver ReconnectPolicy proxycore.ReconnectPolicy + RetryPolicy RetryPolicy NumConns int Logger *zap.Logger HeartBeatInterval time.Duration @@ -101,6 +102,9 @@ func NewProxy(ctx context.Context, config Config) *Proxy { if config.MaxVersion == 0 { config.MaxVersion = primitive.ProtocolVersion4 } + if config.RetryPolicy == nil { + config.RetryPolicy = NewDefaultRetryPolicy() + } return &Proxy{ ctx: ctx, config: config, diff --git a/proxy/proxy_retries_test.go b/proxy/proxy_retries_test.go new file mode 100644 index 0000000..5f9cac6 --- /dev/null +++ b/proxy/proxy_retries_test.go @@ -0,0 +1,227 @@ +// Copyright (c) DataStax, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package proxy + +import ( + "context" + "sync" + "testing" + + "github.com/datastax/cql-proxy/proxycore" + "github.com/datastax/go-cassandra-native-protocol/frame" + "github.com/datastax/go-cassandra-native-protocol/message" + "github.com/datastax/go-cassandra-native-protocol/primitive" + "github.com/stretchr/testify/assert" +) + +func TestProxy_Retries(t *testing.T) { + const idempotentQuery = "SELECT * FROM test.test" + const nonIdempotentQuery = "INSERT INTO test.test (k, v) VALUES ('a', uuid())" + + var tests = []struct { + query string + response message.Error + errString string + numNodesTried int + retryCount int + }{ + { // Bootstrapping error + idempotentQuery, + &message.IsBootstrapping{ErrorMessage: "Bootstrapping"}, + "cql error: ERROR UNAVAILABLE (code=ErrorCode Unavailable [0x00001000], msg=No more hosts available (exhausted query plan), cl=ConsistencyLevel ANY [0x0000], required=0, alive=0)", + 3, + 2, + }, + { // Bootstrapping error w/ non-idempotent query + nonIdempotentQuery, + &message.IsBootstrapping{ErrorMessage: "Bootstrapping"}, + "cql error: ERROR UNAVAILABLE (code=ErrorCode Unavailable [0x00001000], msg=No more hosts available (exhausted query plan), cl=ConsistencyLevel ANY [0x0000], required=0, alive=0)", + 3, + 2, + }, + { // Error response (truncate), retry until succeeds or exhausts query plan + idempotentQuery, + &message.TruncateError{ErrorMessage: "Truncate"}, + "cql error: ERROR UNAVAILABLE (code=ErrorCode Unavailable [0x00001000], msg=No more hosts available (exhausted query plan), cl=ConsistencyLevel ANY [0x0000], required=0, alive=0)", + 3, + 2, + }, + { // Error response (truncate) w/ non-idempotent query, retry until succeeds or exhausts query plan + nonIdempotentQuery, + &message.TruncateError{ErrorMessage: "Truncate"}, + "cql error: ERROR TRUNCATE ERROR (code=ErrorCode TruncateError [0x00001003], msg=Truncate)", + 1, + 0, + }, + { // Error response (read failure), don't retry + idempotentQuery, + &message.ReadFailure{ + ErrorMessage: "", + Consistency: primitive.ConsistencyLevelQuorum, + Received: 2, + BlockFor: 2, + NumFailures: 1, + }, + "cql error: ERROR READ FAILURE (code=ErrorCode ReadFailure [0x00001300], msg=, cl=ConsistencyLevel QUORUM [0x0004], received=2, blockfor=2, data=false)", + 1, + 0, + }, + { // Error response (write failure), don't retry + idempotentQuery, + &message.WriteFailure{ + ErrorMessage: "", + Consistency: primitive.ConsistencyLevelQuorum, + Received: 2, + BlockFor: 2, + NumFailures: 1, + WriteType: primitive.WriteTypeSimple, + }, + "cql error: ERROR WRITE FAILURE (code=ErrorCode WriteFailure [0x00001500], msg=, cl=ConsistencyLevel QUORUM [0x0004], received=2, blockfor=2, type=SIMPLE)", + 1, + 0, + }, + { // Unavailable error, retry on the next node + idempotentQuery, + &message.Unavailable{ + ErrorMessage: "Unavailable", + Consistency: primitive.ConsistencyLevelQuorum, + Required: 2, + Alive: 1, + }, + "cql error: ERROR UNAVAILABLE (code=ErrorCode Unavailable [0x00001000], msg=Unavailable, cl=ConsistencyLevel QUORUM [0x0004], required=2, alive=1)", + 2, + 1, + }, + { // Unavailable error w/ non-idempotent query, retry on the next node (same) + nonIdempotentQuery, + &message.Unavailable{ + ErrorMessage: "Unavailable", + Consistency: primitive.ConsistencyLevelQuorum, + Required: 2, + Alive: 1, + }, + "cql error: ERROR UNAVAILABLE (code=ErrorCode Unavailable [0x00001000], msg=Unavailable, cl=ConsistencyLevel QUORUM [0x0004], required=2, alive=1)", + 2, + 1, + }, + { // Read timeout error, retry once on the same node + idempotentQuery, + &message.ReadTimeout{ + ErrorMessage: "ReadTimeout", + Consistency: primitive.ConsistencyLevelQuorum, + Received: 3, + BlockFor: 2, + DataPresent: false, // Data wasn't present, read repair, retry + }, + "cql error: ERROR READ TIMEOUT (code=ErrorCode ReadTimeout [0x00001200], msg=ReadTimeout, cl=ConsistencyLevel QUORUM [0x0004], received=3, blockfor=2, data=false)", + 1, + 1, + }, + { // Read timeout error w/ non-idempotent query, retry once on the same node (same) + nonIdempotentQuery, + &message.ReadTimeout{ + ErrorMessage: "ReadTimeout", + Consistency: primitive.ConsistencyLevelQuorum, + Received: 3, + BlockFor: 2, + DataPresent: false, // Data wasn't present, read repair, retry + }, + "cql error: ERROR READ TIMEOUT (code=ErrorCode ReadTimeout [0x00001200], msg=ReadTimeout, cl=ConsistencyLevel QUORUM [0x0004], received=3, blockfor=2, data=false)", + 1, + 1, + }, + { // Read timeout error w/ unmet conditions, don't retry + idempotentQuery, + &message.ReadTimeout{ + ErrorMessage: "ReadTimeout", + Consistency: primitive.ConsistencyLevelQuorum, + Received: 2, + BlockFor: 2, + DataPresent: true, // Data was present don't retry + }, + "cql error: ERROR READ TIMEOUT (code=ErrorCode ReadTimeout [0x00001200], msg=ReadTimeout, cl=ConsistencyLevel QUORUM [0x0004], received=2, blockfor=2, data=true)", + 1, + 0, + }, + { // Write timeout error, retry once if logged batch + idempotentQuery, // Not a logged batch, but it doesn't matter for this test + &message.WriteTimeout{ + ErrorMessage: "WriteTimeout", + Consistency: primitive.ConsistencyLevelQuorum, + Received: 1, + BlockFor: 2, + WriteType: primitive.WriteTypeBatchLog, // Retry if a logged batch + }, + "cql error: ERROR WRITE TIMEOUT (code=ErrorCode WriteTimeout [0x00001100], msg=WriteTimeout, cl=ConsistencyLevel QUORUM [0x0004], received=1, blockfor=2, type=BATCH_LOG, contentions=0)", + 1, + 1, + }, + { // Write timeout error w/ unmet conditions, don't retry + idempotentQuery, + &message.WriteTimeout{ + ErrorMessage: "WriteTimeout", + Consistency: primitive.ConsistencyLevelQuorum, + Received: 1, + BlockFor: 2, + WriteType: primitive.WriteTypeSimple, // Don't retry for anything other than logged batches + }, + "cql error: ERROR WRITE TIMEOUT (code=ErrorCode WriteTimeout [0x00001100], msg=WriteTimeout, cl=ConsistencyLevel QUORUM [0x0004], received=1, blockfor=2, type=SIMPLE, contentions=0)", + 1, + 0, + }, + } + + for _, tt := range tests { + numNodesTried, retryCount, err := testProxyRetry(t, tt.query, tt.response) + assert.EqualError(t, err, tt.errString) + assert.Equal(t, tt.numNodesTried, numNodesTried) + assert.Equal(t, tt.retryCount, retryCount) + } +} + +func testProxyRetry(t *testing.T, query string, response message.Error) (numNodesTried, retryCount int, responseError error) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + var mu sync.Mutex + tried := make(map[string]int) + + cluster, proxy := setupProxyTest(t, ctx, 3, proxycore.MockRequestHandlers{ + primitive.OpCodeQuery: func(cl *proxycore.MockClient, frm *frame.Frame) message.Message { + if msg := cl.InterceptQuery(frm.Header, frm.Body.Message.(*message.Query)); msg != nil { + return msg + } else { + mu.Lock() + tried[cl.Local().IP]++ + mu.Unlock() + return response + } + }, + }) + defer func() { + cluster.Shutdown() + _ = proxy.Shutdown() + }() + + cl := connectTestClient(t, ctx) + + _, err := cl.Query(ctx, primitive.ProtocolVersion4, &message.Query{Query: query}) + + retryCount = 0 + for _, v := range tried { + retryCount += v + } + return len(tried), retryCount - 1, err +} diff --git a/proxy/proxy_test.go b/proxy/proxy_test.go index 2f52697..92e2dd6 100644 --- a/proxy/proxy_test.go +++ b/proxy/proxy_test.go @@ -33,19 +33,18 @@ import ( "github.com/stretchr/testify/require" ) +const ( + testClusterContactPoint = "127.0.0.1:8000" + testClusterPort = 8000 + testClusterStartIP = "127.0.0.0" + testProxyContactPoint = "127.0.0.1:9042" +) + func TestProxy_ListenAndServe(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - const clusterContactPoint = "127.0.0.1:8000" - const clusterPort = 8000 - - const proxyContactPoint = "127.0.0.1:9042" - - cluster := proxycore.NewMockCluster(net.ParseIP("127.0.0.0"), clusterPort) - defer cluster.Shutdown() - - cluster.Handlers = proxycore.NewMockRequestHandlers(proxycore.MockRequestHandlers{ + cluster, proxy := setupProxyTest(t, ctx, 3, proxycore.MockRequestHandlers{ primitive.OpCodeQuery: func(cl *proxycore.MockClient, frm *frame.Frame) message.Message { if msg := cl.InterceptQuery(frm.Header, frm.Body.Message.(*message.Query)); msg != nil { return msg @@ -73,50 +72,21 @@ func TestProxy_ListenAndServe(t *testing.T) { } }, }) - - err := cluster.Add(ctx, 1) - require.NoError(t, err) - - err = cluster.Add(ctx, 2) - require.NoError(t, err) - - err = cluster.Add(ctx, 3) - require.NoError(t, err) - - proxy := NewProxy(ctx, Config{ - Version: primitive.ProtocolVersion4, - Resolver: proxycore.NewResolverWithDefaultPort([]string{clusterContactPoint}, clusterPort), - ReconnectPolicy: proxycore.NewReconnectPolicyWithDelays(200*time.Millisecond, time.Second), - NumConns: 2, - HeartBeatInterval: 30 * time.Second, - IdleTimeout: 60 * time.Second, - }) - - err = proxy.Listen(proxyContactPoint) - defer func(proxy *Proxy) { + defer func() { + cluster.Shutdown() _ = proxy.Shutdown() - }(proxy) - require.NoError(t, err) - - go func() { - _ = proxy.Serve() }() - cl, err := proxycore.ConnectClient(ctx, proxycore.NewEndpoint(proxyContactPoint), proxycore.ClientConnConfig{}) - require.NoError(t, err) + cl := connectTestClient(t, ctx) - version, err := cl.Handshake(ctx, primitive.ProtocolVersion4, nil) - require.NoError(t, err) - assert.Equal(t, primitive.ProtocolVersion4, version) - - hosts, err := testQueryHosts(ctx, cl) + hosts, err := queryTestHosts(ctx, cl) require.NoError(t, err) assert.Equal(t, 3, len(hosts)) cluster.Stop(1) removed := waitUntil(10*time.Second, func() bool { - hosts, err := testQueryHosts(ctx, cl) + hosts, err := queryTestHosts(ctx, cl) require.NoError(t, err) return len(hosts) == 2 }) @@ -126,7 +96,7 @@ func TestProxy_ListenAndServe(t *testing.T) { require.NoError(t, err) added := waitUntil(10*time.Second, func() bool { - hosts, err := testQueryHosts(ctx, cl) + hosts, err := queryTestHosts(ctx, cl) require.NoError(t, err) return len(hosts) == 3 }) @@ -139,20 +109,12 @@ func TestProxy_Unprepared(t *testing.T) { const numNodes = 3 - const clusterContactPoint = "127.0.0.1:8000" - const clusterPort = 8000 - - const proxyContactPoint = "127.0.0.1:9042" const version = primitive.ProtocolVersion4 preparedId := []byte("abc") - - cluster := proxycore.NewMockCluster(net.ParseIP("127.0.0.0"), clusterPort) - defer cluster.Shutdown() - var prepared sync.Map - cluster.Handlers = proxycore.NewMockRequestHandlers(proxycore.MockRequestHandlers{ + cluster, proxy := setupProxyTest(t, ctx, numNodes, proxycore.MockRequestHandlers{ primitive.OpCodePrepare: func(cl *proxycore.MockClient, frm *frame.Frame) message.Message { prepared.Store(cl.Local().IP, true) return &message.PreparedResult{ @@ -174,37 +136,12 @@ func TestProxy_Unprepared(t *testing.T) { } }, }) - - for i := 1; i <= numNodes; i++ { - err := cluster.Add(ctx, i) - require.NoError(t, err) - } - - proxy := NewProxy(ctx, Config{ - Version: version, - Resolver: proxycore.NewResolverWithDefaultPort([]string{clusterContactPoint}, clusterPort), - ReconnectPolicy: proxycore.NewReconnectPolicyWithDelays(200*time.Millisecond, time.Second), - NumConns: 2, - HeartBeatInterval: 30 * time.Second, - IdleTimeout: 60 * time.Second, - }) - - err := proxy.Listen(proxyContactPoint) - defer func(proxy *Proxy) { + defer func() { + cluster.Shutdown() _ = proxy.Shutdown() - }(proxy) - require.NoError(t, err) - - go func() { - _ = proxy.Serve() }() - cl, err := proxycore.ConnectClient(ctx, proxycore.NewEndpoint(proxyContactPoint), proxycore.ClientConnConfig{}) - require.NoError(t, err) - - negotiated, err := cl.Handshake(ctx, version, nil) - require.NoError(t, err) - assert.Equal(t, version, negotiated) + cl := connectTestClient(t, ctx) // Only prepare on a single node resp, err := cl.SendAndReceive(ctx, frame.NewFrame(version, 0, &message.Prepare{Query: "SELECT * FROM test.test"})) @@ -234,44 +171,15 @@ func TestProxy_UseKeyspace(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - const clusterContactPoint = "127.0.0.1:8000" - const clusterPort = 8000 - - const proxyContactPoint = "127.0.0.1:9042" - - cluster := proxycore.NewMockCluster(net.ParseIP("127.0.0.0"), clusterPort) - defer cluster.Shutdown() - - err := cluster.Add(ctx, 1) - require.NoError(t, err) - - proxy := NewProxy(ctx, Config{ - Version: primitive.ProtocolVersion4, - Resolver: proxycore.NewResolverWithDefaultPort([]string{clusterContactPoint}, clusterPort), - ReconnectPolicy: proxycore.NewReconnectPolicyWithDelays(200*time.Millisecond, time.Second), - NumConns: 2, - HeartBeatInterval: 30 * time.Second, - IdleTimeout: 60 * time.Second, - }) - - err = proxy.Listen(proxyContactPoint) - defer func(proxy *Proxy) { + cluster, proxy := setupProxyTest(t, ctx, 1, nil) + defer func() { + cluster.Shutdown() _ = proxy.Shutdown() - }(proxy) - require.NoError(t, err) - - go func() { - _ = proxy.Serve() }() - cl, err := proxycore.ConnectClient(ctx, proxycore.NewEndpoint(proxyContactPoint), proxycore.ClientConnConfig{}) - require.NoError(t, err) + cl := connectTestClient(t, ctx) - version, err := cl.Handshake(ctx, primitive.ProtocolVersion4, nil) - require.NoError(t, err) - assert.Equal(t, primitive.ProtocolVersion4, version) - - resp, err := cl.SendAndReceive(ctx, frame.NewFrame(version, 0, &message.Query{Query: "USE system"})) + resp, err := cl.SendAndReceive(ctx, frame.NewFrame(primitive.ProtocolVersion4, 0, &message.Query{Query: "USE system"})) require.NoError(t, err) assert.Equal(t, primitive.OpCodeResult, resp.Header.OpCode) @@ -284,37 +192,13 @@ func TestProxy_NegotiateProtocolV5(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - const clusterContactPoint = "127.0.0.1:8000" - const clusterPort = 8000 - - const proxyContactPoint = "127.0.0.1:9042" - - cluster := proxycore.NewMockCluster(net.ParseIP("127.0.0.0"), clusterPort) - defer cluster.Shutdown() - - err := cluster.Add(ctx, 1) - require.NoError(t, err) - - proxy := NewProxy(ctx, Config{ - Version: primitive.ProtocolVersion4, - Resolver: proxycore.NewResolverWithDefaultPort([]string{clusterContactPoint}, clusterPort), - ReconnectPolicy: proxycore.NewReconnectPolicyWithDelays(200*time.Millisecond, time.Second), - NumConns: 2, - HeartBeatInterval: 30 * time.Second, - IdleTimeout: 60 * time.Second, - }) - - err = proxy.Listen(proxyContactPoint) - defer func(proxy *Proxy) { + cluster, proxy := setupProxyTest(t, ctx, 1, nil) + defer func() { + cluster.Shutdown() _ = proxy.Shutdown() - }(proxy) - require.NoError(t, err) - - go func() { - _ = proxy.Serve() }() - cl, err := proxycore.ConnectClient(ctx, proxycore.NewEndpoint(proxyContactPoint), proxycore.ClientConnConfig{}) + cl, err := proxycore.ConnectClient(ctx, proxycore.NewEndpoint(testProxyContactPoint), proxycore.ClientConnConfig{}) require.NoError(t, err) version, err := cl.Handshake(ctx, primitive.ProtocolVersion5, nil) @@ -322,7 +206,7 @@ func TestProxy_NegotiateProtocolV5(t *testing.T) { assert.Equal(t, primitive.ProtocolVersion4, version) // Expected to be negotiated to v4 } -func testQueryHosts(ctx context.Context, cl *proxycore.ClientConn) (map[string]struct{}, error) { +func queryTestHosts(ctx context.Context, cl *proxycore.ClientConn) (map[string]struct{}, error) { hosts := make(map[string]struct{}) for i := 0; i < 3; i++ { rs, err := cl.Query(ctx, primitive.ProtocolVersion4, &message.Query{Query: "SELECT * FROM test.test"}) @@ -341,6 +225,46 @@ func testQueryHosts(ctx context.Context, cl *proxycore.ClientConn) (map[string]s return hosts, nil } +func setupProxyTest(t *testing.T, ctx context.Context, numNodes int, handlers proxycore.MockRequestHandlers) (*proxycore.MockCluster, *Proxy) { + cluster := proxycore.NewMockCluster(net.ParseIP(testClusterStartIP), testClusterPort) + if handlers != nil { + cluster.Handlers = proxycore.NewMockRequestHandlers(handlers) + } + for i := 1; i <= numNodes; i++ { + err := cluster.Add(ctx, i) + require.NoError(t, err) + } + + proxy := NewProxy(ctx, Config{ + Version: primitive.ProtocolVersion4, + Resolver: proxycore.NewResolverWithDefaultPort([]string{testClusterContactPoint}, testClusterPort), + ReconnectPolicy: proxycore.NewReconnectPolicyWithDelays(200*time.Millisecond, time.Second), + NumConns: 2, + HeartBeatInterval: 30 * time.Second, + IdleTimeout: 60 * time.Second, + }) + + err := proxy.Listen(testProxyContactPoint) + require.NoError(t, err) + + go func() { + _ = proxy.Serve() + }() + + return cluster, proxy +} + +func connectTestClient(t *testing.T, ctx context.Context) *proxycore.ClientConn { + cl, err := proxycore.ConnectClient(ctx, proxycore.NewEndpoint(testProxyContactPoint), proxycore.ClientConnConfig{}) + require.NoError(t, err) + + version, err := cl.Handshake(ctx, primitive.ProtocolVersion4, nil) + require.NoError(t, err) + assert.Equal(t, primitive.ProtocolVersion4, version) + + return cl +} + func waitUntil(d time.Duration, check func() bool) bool { iterations := int(d / (100 * time.Millisecond)) for i := 0; i < iterations; i++ { diff --git a/proxy/request.go b/proxy/request.go index d695378..4065483 100644 --- a/proxy/request.go +++ b/proxy/request.go @@ -122,25 +122,113 @@ func (r *request) OnClose(_ error) { func (r *request) OnResult(raw *frame.RawFrame) { r.mu.Lock() + defer r.mu.Unlock() if !r.done { - r.done = true - if r.raw.Header.OpCode == primitive.OpCodePrepare && raw.Header.OpCode == primitive.OpCodeResult { // Prepared result - frm, err := codec.ConvertFromRawFrame(raw) - if err != nil { - r.client.proxy.logger.Error("error attempting to decode prepared result message") - } else if _, ok := frm.Body.Message.(*message.PreparedResult); !ok { // TODO: Use prepared type data to disambiguate idempotency - r.client.proxy.logger.Error("expected prepared result message, but got something else") - } else { - idempotent, err := parser.IsQueryIdempotent(r.query) + if raw.Header.OpCode != primitive.OpCodeError || + !r.handleErrorResult(raw) { // If the error result is retried then we don't send back this response + if r.raw.Header.OpCode == primitive.OpCodePrepare && raw.Header.OpCode == primitive.OpCodeResult { // Prepared result + frm, err := codec.ConvertFromRawFrame(raw) if err != nil { - r.client.proxy.logger.Error("error parsing query for idempotence", zap.Error(err)) + r.client.proxy.logger.Error("error attempting to decode prepared result message") + } else if _, ok := frm.Body.Message.(*message.PreparedResult); !ok { // TODO: Use prepared type data to disambiguate idempotency + r.client.proxy.logger.Error("expected prepared result message, but got something else") } else { - // TODO: Make sure this hash matches server-side impl. - r.client.preparedIdempotence.Store(md5.Sum([]byte(r.keyspace+r.query)), idempotent) + idempotent, err := parser.IsQueryIdempotent(r.query) + if err != nil { + r.client.proxy.logger.Error("error parsing query for idempotence", zap.Error(err)) + } else { + // TODO: Make sure this hash matches server-side impl. + r.client.preparedIdempotence.Store(md5.Sum([]byte(r.keyspace+r.query)), idempotent) + } } } + r.done = true + r.sendRaw(raw) } - r.sendRaw(raw) } - r.mu.Unlock() +} + +func (r *request) handleErrorResult(raw *frame.RawFrame) (retried bool) { + retried = false + logger := r.client.proxy.logger + decision := ReturnError + + frm, err := codec.ConvertFromRawFrame(raw) + if err != nil { + logger.Error("unable to decode error frame for retry decision", zap.Error(err)) + } else { + idempotent := r.checkIdempotent() + + errMsg := frm.Body.Message.(message.Error) + logger.Debug("received error response", + zap.Stringer("host", r.host), + zap.Stringer("errorCode", errMsg.GetErrorCode()), + zap.String("error", errMsg.GetErrorMessage()), + ) + switch msg := frm.Body.Message.(type) { + case *message.ReadTimeout: + decision = r.client.proxy.config.RetryPolicy.OnReadTimeout(msg, r.retryCount) + if decision != ReturnError { + logger.Debug("retrying read timeout", + zap.Stringer("decision", decision), + zap.Stringer("response", msg), + zap.Int("retryCount", r.retryCount), + ) + } + case *message.WriteTimeout: + if idempotent { + decision = r.client.proxy.config.RetryPolicy.OnWriteTimeout(msg, r.retryCount) + if decision != ReturnError { + logger.Debug("retrying write timeout", + zap.Stringer("decision", decision), + zap.Stringer("response", msg), + zap.Int("retryCount", r.retryCount), + ) + } + } + case *message.Unavailable: + decision = r.client.proxy.config.RetryPolicy.OnUnavailable(msg, r.retryCount) + if decision != ReturnError { + logger.Debug("retrying on unavailable error", + zap.Stringer("decision", decision), + zap.Stringer("response", msg), + zap.Int("retryCount", r.retryCount), + ) + } + case *message.IsBootstrapping: + decision = RetryNext + logger.Debug("retrying on bootstrapping error", + zap.Stringer("decision", decision), + zap.Int("retryCount", r.retryCount), + ) + case *message.ServerError, *message.Overloaded, *message.TruncateError, + *message.ReadFailure, *message.WriteFailure: + if idempotent { + decision = r.client.proxy.config.RetryPolicy.OnErrorResponse(errMsg, r.retryCount) + if decision != ReturnError { + logger.Debug("retrying on error response", + zap.Stringer("decision", decision), + zap.Int("retryCount", r.retryCount), + ) + } + } + default: + // Do nothing, return the error + } + + switch decision { + case RetryNext: + r.retryCount++ + r.executeInternal(true) + retried = true + case RetrySame: + r.retryCount++ + r.executeInternal(false) + retried = true + default: + // Do nothing, return the error + } + } + + return retried } diff --git a/proxy/retrypolicy.go b/proxy/retrypolicy.go new file mode 100644 index 0000000..b0de359 --- /dev/null +++ b/proxy/retrypolicy.go @@ -0,0 +1,88 @@ +// Copyright (c) DataStax, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package proxy + +import ( + "github.com/datastax/go-cassandra-native-protocol/message" + "github.com/datastax/go-cassandra-native-protocol/primitive" +) + +type RetryDecision int + +func (r RetryDecision) String() string { + switch r { + case RetrySame: + return "retry same node" + case RetryNext: + return "retry next node" + case ReturnError: + return "returning error" + } + return "unknown" +} + +const ( + RetrySame RetryDecision = iota + RetryNext + ReturnError +) + +type RetryPolicy interface { + OnReadTimeout(msg *message.ReadTimeout, retryCount int) RetryDecision + OnWriteTimeout(msg *message.WriteTimeout, retryCount int) RetryDecision + OnUnavailable(msg *message.Unavailable, retryCount int) RetryDecision + OnErrorResponse(msg message.Error, retryCount int) RetryDecision +} + +type defaultRetryPolicy struct{} + +var defaultRetryPolicyInstance defaultRetryPolicy + +func NewDefaultRetryPolicy() RetryPolicy { + return &defaultRetryPolicyInstance +} + +func (d defaultRetryPolicy) OnReadTimeout(msg *message.ReadTimeout, retryCount int) RetryDecision { + if retryCount == 0 && msg.Received >= msg.BlockFor && !msg.DataPresent { + return RetrySame + } else { + return ReturnError + } +} + +func (d defaultRetryPolicy) OnWriteTimeout(msg *message.WriteTimeout, retryCount int) RetryDecision { + if retryCount == 0 && msg.WriteType == primitive.WriteTypeBatchLog { + return RetrySame + } else { + return ReturnError + } +} + +func (d defaultRetryPolicy) OnUnavailable(_ *message.Unavailable, retryCount int) RetryDecision { + if retryCount == 0 { + return RetryNext + } else { + return ReturnError + } +} + +func (d defaultRetryPolicy) OnErrorResponse(msg message.Error, retryCount int) RetryDecision { + code := msg.GetErrorCode() + if code == primitive.ErrorCodeReadFailure || code == primitive.ErrorCodeWriteFailure { + return ReturnError + } else { + return RetryNext + } +} diff --git a/proxy/retrypolicy_test.go b/proxy/retrypolicy_test.go new file mode 100644 index 0000000..943daed --- /dev/null +++ b/proxy/retrypolicy_test.go @@ -0,0 +1,96 @@ +// Copyright (c) DataStax, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package proxy + +import ( + "testing" + + "github.com/datastax/go-cassandra-native-protocol/message" + "github.com/datastax/go-cassandra-native-protocol/primitive" + "github.com/stretchr/testify/assert" +) + +func TestDefaultRetryPolicy_OnUnavailable(t *testing.T) { + var tests = []struct { + msg *message.Unavailable + decision RetryDecision + retryCount int + }{ + {&message.Unavailable{Consistency: 0, Required: 0, Alive: 0}, RetryNext, 0}, // Never retried + {&message.Unavailable{Consistency: 0, Required: 0, Alive: 0}, ReturnError, 1}, // Already retried once + } + + policy := NewDefaultRetryPolicy() + for _, tt := range tests { + decision := policy.OnUnavailable(tt.msg, tt.retryCount) + assert.Equal(t, tt.decision, decision) + } +} + +func TestDefaultRetryPolicy_OnReadTimeout(t *testing.T) { + var tests = []struct { + msg *message.ReadTimeout + decision RetryDecision + retryCount int + }{ + {&message.ReadTimeout{Consistency: 0, Received: 2, BlockFor: 2, DataPresent: false}, RetrySame, 0}, // Enough received with no data + {&message.ReadTimeout{Consistency: 0, Received: 3, BlockFor: 2, DataPresent: false}, ReturnError, 1}, // Already retried once + {&message.ReadTimeout{Consistency: 0, Received: 2, BlockFor: 2, DataPresent: true}, ReturnError, 0}, // Data was present + } + + policy := NewDefaultRetryPolicy() + for _, tt := range tests { + decision := policy.OnReadTimeout(tt.msg, tt.retryCount) + assert.Equal(t, tt.decision, decision) + } +} + +func TestDefaultRetryPolicy_OnWriteTimeout(t *testing.T) { + var tests = []struct { + msg *message.WriteTimeout + decision RetryDecision + retryCount int + }{ + {&message.WriteTimeout{Consistency: 0, Received: 0, BlockFor: 0, WriteType: primitive.WriteTypeBatchLog, Contentions: 0}, RetrySame, 0}, // Logged batch + {&message.WriteTimeout{Consistency: 0, Received: 0, BlockFor: 0, WriteType: primitive.WriteTypeBatchLog, Contentions: 0}, ReturnError, 1}, // Logged batch, already retried once + {&message.WriteTimeout{Consistency: 0, Received: 0, BlockFor: 0, WriteType: primitive.WriteTypeSimple, Contentions: 0}, ReturnError, 0}, // Not a logged batch + } + + policy := NewDefaultRetryPolicy() + for _, tt := range tests { + decision := policy.OnWriteTimeout(tt.msg, tt.retryCount) + assert.Equal(t, tt.decision, decision) + } +} + +func TestDefaultRetryPolicy_OnErrorResponse(t *testing.T) { + var tests = []struct { + msg message.Error + decision RetryDecision + retryCount int + }{ + {&message.WriteFailure{}, ReturnError, 0}, // Write failure + {&message.ReadFailure{}, ReturnError, 0}, // Read failure + {&message.TruncateError{}, RetryNext, 0}, // Truncate failure + {&message.ServerError{}, RetryNext, 0}, // Server failure + {&message.Overloaded{}, RetryNext, 0}, // Overloaded failure + } + + policy := NewDefaultRetryPolicy() + for _, tt := range tests { + decision := policy.OnErrorResponse(tt.msg, tt.retryCount) + assert.Equal(t, tt.decision, decision) + } +} From e6fa62f6c52c2120cd042e46a1cfb2c71e534ee7 Mon Sep 17 00:00:00 2001 From: Michael Penick Date: Wed, 12 Jan 2022 16:29:01 -0500 Subject: [PATCH 2/5] Fix error types and remove asserting error strings --- proxy/proxy_retries_test.go | 61 +++++++++++++++++++------------------ proxy/request.go | 4 +-- 2 files changed, 33 insertions(+), 32 deletions(-) diff --git a/proxy/proxy_retries_test.go b/proxy/proxy_retries_test.go index 5f9cac6..e8a381d 100644 --- a/proxy/proxy_retries_test.go +++ b/proxy/proxy_retries_test.go @@ -31,41 +31,42 @@ func TestProxy_Retries(t *testing.T) { const nonIdempotentQuery = "INSERT INTO test.test (k, v) VALUES ('a', uuid())" var tests = []struct { + msg string query string response message.Error - errString string numNodesTried int retryCount int }{ - { // Bootstrapping error + { + "bootstrapping error", idempotentQuery, &message.IsBootstrapping{ErrorMessage: "Bootstrapping"}, - "cql error: ERROR UNAVAILABLE (code=ErrorCode Unavailable [0x00001000], msg=No more hosts available (exhausted query plan), cl=ConsistencyLevel ANY [0x0000], required=0, alive=0)", 3, 2, }, - { // Bootstrapping error w/ non-idempotent query + { + "bootstrapping w/ non-idempotent query", nonIdempotentQuery, &message.IsBootstrapping{ErrorMessage: "Bootstrapping"}, - "cql error: ERROR UNAVAILABLE (code=ErrorCode Unavailable [0x00001000], msg=No more hosts available (exhausted query plan), cl=ConsistencyLevel ANY [0x0000], required=0, alive=0)", 3, 2, }, - { // Error response (truncate), retry until succeeds or exhausts query plan + { + "error response (truncate), retry until succeeds or exhausts query plan", idempotentQuery, &message.TruncateError{ErrorMessage: "Truncate"}, - "cql error: ERROR UNAVAILABLE (code=ErrorCode Unavailable [0x00001000], msg=No more hosts available (exhausted query plan), cl=ConsistencyLevel ANY [0x0000], required=0, alive=0)", 3, 2, }, - { // Error response (truncate) w/ non-idempotent query, retry until succeeds or exhausts query plan + { + "error response (truncate) w/ non-idempotent query, retry until succeeds or exhausts query plan", nonIdempotentQuery, &message.TruncateError{ErrorMessage: "Truncate"}, - "cql error: ERROR TRUNCATE ERROR (code=ErrorCode TruncateError [0x00001003], msg=Truncate)", 1, 0, }, - { // Error response (read failure), don't retry + { + "error response (read failure), don't retry", idempotentQuery, &message.ReadFailure{ ErrorMessage: "", @@ -74,11 +75,11 @@ func TestProxy_Retries(t *testing.T) { BlockFor: 2, NumFailures: 1, }, - "cql error: ERROR READ FAILURE (code=ErrorCode ReadFailure [0x00001300], msg=, cl=ConsistencyLevel QUORUM [0x0004], received=2, blockfor=2, data=false)", 1, 0, }, - { // Error response (write failure), don't retry + { + "error response (write failure), don't retry", idempotentQuery, &message.WriteFailure{ ErrorMessage: "", @@ -88,11 +89,11 @@ func TestProxy_Retries(t *testing.T) { NumFailures: 1, WriteType: primitive.WriteTypeSimple, }, - "cql error: ERROR WRITE FAILURE (code=ErrorCode WriteFailure [0x00001500], msg=, cl=ConsistencyLevel QUORUM [0x0004], received=2, blockfor=2, type=SIMPLE)", 1, 0, }, - { // Unavailable error, retry on the next node + { + "unavailable error, retry on the next node", idempotentQuery, &message.Unavailable{ ErrorMessage: "Unavailable", @@ -100,11 +101,11 @@ func TestProxy_Retries(t *testing.T) { Required: 2, Alive: 1, }, - "cql error: ERROR UNAVAILABLE (code=ErrorCode Unavailable [0x00001000], msg=Unavailable, cl=ConsistencyLevel QUORUM [0x0004], required=2, alive=1)", 2, 1, }, - { // Unavailable error w/ non-idempotent query, retry on the next node (same) + { + "unavailable error w/ non-idempotent query, retry on the next node (same)", nonIdempotentQuery, &message.Unavailable{ ErrorMessage: "Unavailable", @@ -112,11 +113,11 @@ func TestProxy_Retries(t *testing.T) { Required: 2, Alive: 1, }, - "cql error: ERROR UNAVAILABLE (code=ErrorCode Unavailable [0x00001000], msg=Unavailable, cl=ConsistencyLevel QUORUM [0x0004], required=2, alive=1)", 2, 1, }, - { // Read timeout error, retry once on the same node + { + "read timeout error, retry once on the same node", idempotentQuery, &message.ReadTimeout{ ErrorMessage: "ReadTimeout", @@ -125,11 +126,11 @@ func TestProxy_Retries(t *testing.T) { BlockFor: 2, DataPresent: false, // Data wasn't present, read repair, retry }, - "cql error: ERROR READ TIMEOUT (code=ErrorCode ReadTimeout [0x00001200], msg=ReadTimeout, cl=ConsistencyLevel QUORUM [0x0004], received=3, blockfor=2, data=false)", 1, 1, }, - { // Read timeout error w/ non-idempotent query, retry once on the same node (same) + { + "read timeout error w/ non-idempotent query, retry once on the same node (same)", nonIdempotentQuery, &message.ReadTimeout{ ErrorMessage: "ReadTimeout", @@ -138,11 +139,11 @@ func TestProxy_Retries(t *testing.T) { BlockFor: 2, DataPresent: false, // Data wasn't present, read repair, retry }, - "cql error: ERROR READ TIMEOUT (code=ErrorCode ReadTimeout [0x00001200], msg=ReadTimeout, cl=ConsistencyLevel QUORUM [0x0004], received=3, blockfor=2, data=false)", 1, 1, }, - { // Read timeout error w/ unmet conditions, don't retry + { + "read timeout error w/ unmet conditions, don't retry", idempotentQuery, &message.ReadTimeout{ ErrorMessage: "ReadTimeout", @@ -151,11 +152,11 @@ func TestProxy_Retries(t *testing.T) { BlockFor: 2, DataPresent: true, // Data was present don't retry }, - "cql error: ERROR READ TIMEOUT (code=ErrorCode ReadTimeout [0x00001200], msg=ReadTimeout, cl=ConsistencyLevel QUORUM [0x0004], received=2, blockfor=2, data=true)", 1, 0, }, - { // Write timeout error, retry once if logged batch + { + "write timeout error, retry once if logged batch", idempotentQuery, // Not a logged batch, but it doesn't matter for this test &message.WriteTimeout{ ErrorMessage: "WriteTimeout", @@ -164,11 +165,11 @@ func TestProxy_Retries(t *testing.T) { BlockFor: 2, WriteType: primitive.WriteTypeBatchLog, // Retry if a logged batch }, - "cql error: ERROR WRITE TIMEOUT (code=ErrorCode WriteTimeout [0x00001100], msg=WriteTimeout, cl=ConsistencyLevel QUORUM [0x0004], received=1, blockfor=2, type=BATCH_LOG, contentions=0)", 1, 1, }, - { // Write timeout error w/ unmet conditions, don't retry + { + "write timeout error w/ unmet conditions, don't retry", idempotentQuery, &message.WriteTimeout{ ErrorMessage: "WriteTimeout", @@ -177,7 +178,6 @@ func TestProxy_Retries(t *testing.T) { BlockFor: 2, WriteType: primitive.WriteTypeSimple, // Don't retry for anything other than logged batches }, - "cql error: ERROR WRITE TIMEOUT (code=ErrorCode WriteTimeout [0x00001100], msg=WriteTimeout, cl=ConsistencyLevel QUORUM [0x0004], received=1, blockfor=2, type=SIMPLE, contentions=0)", 1, 0, }, @@ -185,9 +185,10 @@ func TestProxy_Retries(t *testing.T) { for _, tt := range tests { numNodesTried, retryCount, err := testProxyRetry(t, tt.query, tt.response) - assert.EqualError(t, err, tt.errString) - assert.Equal(t, tt.numNodesTried, numNodesTried) - assert.Equal(t, tt.retryCount, retryCount) + assert.Error(t, err, tt.msg) + assert.IsType(t, err, &proxycore.CqlError{}, tt.msg) + assert.Equal(t, tt.numNodesTried, numNodesTried, tt.msg) + assert.Equal(t, tt.retryCount, retryCount, tt.msg) } } diff --git a/proxy/request.go b/proxy/request.go index f1b5709..b5591f8 100644 --- a/proxy/request.go +++ b/proxy/request.go @@ -65,7 +65,7 @@ func (r *request) executeInternal(next bool) { } if r.host == nil { r.done = true - r.send(&message.Unavailable{ErrorMessage: "No more hosts available (exhausted query plan)"}) + r.send(&message.ServerError{ErrorMessage: "Proxy exhausted query plan and there are no more hosts available to try"}) } else { err := r.session.Send(r.host, r) if err == nil { @@ -118,7 +118,7 @@ func (r *request) OnClose(_ error) { } else { if !r.done { r.done = true - r.send(&message.Unavailable{ErrorMessage: "No more hosts available (cluster connection closed and request is not idempotent)"}) + r.send(&message.ServerError{ErrorMessage: "Proxy is unable to retry non-idempotent query after connection to backend cluster closed"}) } } } From 5b290ed9e4cbbcbe1c8db8245f834f78f5658cb8 Mon Sep 17 00:00:00 2001 From: Michael Penick Date: Wed, 19 Jan 2022 10:45:24 -0500 Subject: [PATCH 3/5] Add docs to retry policy --- proxy/retrypolicy.go | 36 ++++++++++++++++++++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/proxy/retrypolicy.go b/proxy/retrypolicy.go index b0de359..124a191 100644 --- a/proxy/retrypolicy.go +++ b/proxy/retrypolicy.go @@ -19,6 +19,7 @@ import ( "github.com/datastax/go-cassandra-native-protocol/primitive" ) +// RetryDecision is a type used for deciding what to do when a request has failed. type RetryDecision int func (r RetryDecision) String() string { @@ -34,15 +35,34 @@ func (r RetryDecision) String() string { } const ( + // RetrySame should be returned when a request should be retried on the same host. RetrySame RetryDecision = iota + // RetryNext should be returned when a request should be retried on the next host according to the request's query + // plan. RetryNext + // ReturnError should be returned when a request's original error should be forwarded along to the client. ReturnError ) +// RetryPolicy is an interface for defining retry behavior when a server-side error occurs. type RetryPolicy interface { + // OnReadTimeout handles the retry decision for a server-side read timeout error (Read_timeout = 0x1200). + // This occurs when a replica read request times out during a read query. OnReadTimeout(msg *message.ReadTimeout, retryCount int) RetryDecision + + // OnWriteTimeout handles the retry decision for a server-side write timeout error (Write_timeout = 0x1100). + // This occurs when a replica write request times out during a write query. OnWriteTimeout(msg *message.WriteTimeout, retryCount int) RetryDecision + + // OnUnavailable handles the retry decision for a server-side unavailable exception (Unavailable = 0x1000). + // This occurs when a coordinator determines that there are not enough replicas to handle a query at the requested + // consistency level. OnUnavailable(msg *message.Unavailable, retryCount int) RetryDecision + + // OnErrorResponse handles the retry decision for other potentially recoverable errors. + // This can be called for the following error types: server error (ServerError = 0x0000), + // overloaded (Overloaded = 0x1001), truncate error (Truncate_error = 0x1003), read failure (Read_failure = 0x1300), + // and write failure (Write_failure = 0x1500). OnErrorResponse(msg message.Error, retryCount int) RetryDecision } @@ -50,10 +70,18 @@ type defaultRetryPolicy struct{} var defaultRetryPolicyInstance defaultRetryPolicy +// NewDefaultRetryPolicy creates a new default retry policy. +// The default retry policy takes a conservative approach to retrying requests. In most cases it retries only once in +// cases where a retry is likely to succeed. func NewDefaultRetryPolicy() RetryPolicy { return &defaultRetryPolicyInstance } +// OnReadTimeout retries in the case where there were enough replicas to satisfy the request, but one of the replicas +// didn't respond with data and timed out. It's likely that a single retry to the same coordinator will succeed because +// it will have recognized the replica as dead before the retry is attempted. +// +// In all other cases it will forward the original error to the client. func (d defaultRetryPolicy) OnReadTimeout(msg *message.ReadTimeout, retryCount int) RetryDecision { if retryCount == 0 && msg.Received >= msg.BlockFor && !msg.DataPresent { return RetrySame @@ -62,6 +90,11 @@ func (d defaultRetryPolicy) OnReadTimeout(msg *message.ReadTimeout, retryCount i } } +// OnWriteTimeout retries in the case where a coordinator failed to write its batch log to a set of datacenter local +// nodes. It's likely that a single retry to the same coordinator will succeed because it will have recognized the +// dead nodes and use a different set of nodes. +// +// In all other cases it will forward the original error to the client. func (d defaultRetryPolicy) OnWriteTimeout(msg *message.WriteTimeout, retryCount int) RetryDecision { if retryCount == 0 && msg.WriteType == primitive.WriteTypeBatchLog { return RetrySame @@ -70,6 +103,8 @@ func (d defaultRetryPolicy) OnWriteTimeout(msg *message.WriteTimeout, retryCount } } +// OnUnavailable retries, once, on the next coordinator in the query plan. This is to handle the case where a +// coordinator is failing because it was partitioned from a set of its replicas. func (d defaultRetryPolicy) OnUnavailable(_ *message.Unavailable, retryCount int) RetryDecision { if retryCount == 0 { return RetryNext @@ -78,6 +113,7 @@ func (d defaultRetryPolicy) OnUnavailable(_ *message.Unavailable, retryCount int } } +// OnErrorResponse retries on the next coordinator for all error types except for read and write failures. func (d defaultRetryPolicy) OnErrorResponse(msg message.Error, retryCount int) RetryDecision { code := msg.GetErrorCode() if code == primitive.ErrorCodeReadFailure || code == primitive.ErrorCodeWriteFailure { From 1593a03659f370b448c96818faeab6e3c8b0363c Mon Sep 17 00:00:00 2001 From: Michael Penick Date: Wed, 19 Jan 2022 14:28:20 -0500 Subject: [PATCH 4/5] Add comments to make retry tests clearer --- proxy/proxy_retries_test.go | 60 +++++++++++++++++++------------------ 1 file changed, 31 insertions(+), 29 deletions(-) diff --git a/proxy/proxy_retries_test.go b/proxy/proxy_retries_test.go index e8a381d..e95e7e0 100644 --- a/proxy/proxy_retries_test.go +++ b/proxy/proxy_retries_test.go @@ -41,29 +41,29 @@ func TestProxy_Retries(t *testing.T) { "bootstrapping error", idempotentQuery, &message.IsBootstrapping{ErrorMessage: "Bootstrapping"}, - 3, - 2, + 3, // Ran the query on all nodes + 2, // Retried on all remaining nodes }, { "bootstrapping w/ non-idempotent query", nonIdempotentQuery, &message.IsBootstrapping{ErrorMessage: "Bootstrapping"}, - 3, - 2, + 3, // Ran the query on all nodes + 2, // Retried on all remaining nodes }, { "error response (truncate), retry until succeeds or exhausts query plan", idempotentQuery, &message.TruncateError{ErrorMessage: "Truncate"}, - 3, - 2, + 3, // Ran the query on all nodes + 2, // Retried on all remaining nodes }, { "error response (truncate) w/ non-idempotent query, retry until succeeds or exhausts query plan", nonIdempotentQuery, &message.TruncateError{ErrorMessage: "Truncate"}, - 1, - 0, + 1, // Tried the queried on the first node and it failed + 0, // Did not retry }, { "error response (read failure), don't retry", @@ -75,8 +75,8 @@ func TestProxy_Retries(t *testing.T) { BlockFor: 2, NumFailures: 1, }, - 1, - 0, + 1, // Tried the query on the first node and it failed + 0, // Did not retry }, { "error response (write failure), don't retry", @@ -89,8 +89,8 @@ func TestProxy_Retries(t *testing.T) { NumFailures: 1, WriteType: primitive.WriteTypeSimple, }, - 1, - 0, + 1, // Tried the query on the first node and it failed + 0, // Did not retry }, { "unavailable error, retry on the next node", @@ -101,8 +101,8 @@ func TestProxy_Retries(t *testing.T) { Required: 2, Alive: 1, }, - 2, - 1, + 2, // Tried and failed on the first node, then retried on the next node + 1, // Retried on the next node }, { "unavailable error w/ non-idempotent query, retry on the next node (same)", @@ -113,8 +113,8 @@ func TestProxy_Retries(t *testing.T) { Required: 2, Alive: 1, }, - 2, - 1, + 2, // Tried and failed on the first node, then retried on the next node + 1, // Retried on the next node }, { "read timeout error, retry once on the same node", @@ -126,8 +126,8 @@ func TestProxy_Retries(t *testing.T) { BlockFor: 2, DataPresent: false, // Data wasn't present, read repair, retry }, - 1, - 1, + 1, // Tried and retried on a single node + 1, // Retried on the same node }, { "read timeout error w/ non-idempotent query, retry once on the same node (same)", @@ -139,8 +139,8 @@ func TestProxy_Retries(t *testing.T) { BlockFor: 2, DataPresent: false, // Data wasn't present, read repair, retry }, - 1, - 1, + 1, // Tried and retried on a single node + 1, // Retried on the same node }, { "read timeout error w/ unmet conditions, don't retry", @@ -152,12 +152,14 @@ func TestProxy_Retries(t *testing.T) { BlockFor: 2, DataPresent: true, // Data was present don't retry }, - 1, - 0, + 1, // Tried the query on the first node and it failed + 0, // Did not retry }, { "write timeout error, retry once if logged batch", - idempotentQuery, // Not a logged batch, but it doesn't matter for this test + // Not actually a logged batch query, but this is opaque to the proxy and mock cluster. It's considered a + // logged batch because the error returned by the server says it is. + idempotentQuery, &message.WriteTimeout{ ErrorMessage: "WriteTimeout", Consistency: primitive.ConsistencyLevelQuorum, @@ -165,12 +167,12 @@ func TestProxy_Retries(t *testing.T) { BlockFor: 2, WriteType: primitive.WriteTypeBatchLog, // Retry if a logged batch }, - 1, - 1, + 1, // Tried and retried on a single node + 1, // Retried on the same node }, { - "write timeout error w/ unmet conditions, don't retry", - idempotentQuery, + "write timeout error w/ not a logged batch, don't retry", + idempotentQuery, // Opaque idempotent query (see reason it's not an actual batch query above) &message.WriteTimeout{ ErrorMessage: "WriteTimeout", Consistency: primitive.ConsistencyLevelQuorum, @@ -178,8 +180,8 @@ func TestProxy_Retries(t *testing.T) { BlockFor: 2, WriteType: primitive.WriteTypeSimple, // Don't retry for anything other than logged batches }, - 1, - 0, + 1, // Tried the query on the first node and it failed + 0, // Did not retry }, } From 31e1dd4885d398dcd0637aafa452e5f7a75d8363 Mon Sep 17 00:00:00 2001 From: Michael Penick Date: Wed, 19 Jan 2022 14:47:17 -0500 Subject: [PATCH 5/5] Use the prepared id from the response --- proxy/proxy.go | 32 +++++++++++++++++++------------- proxy/request.go | 8 +++++--- 2 files changed, 24 insertions(+), 16 deletions(-) diff --git a/proxy/proxy.go b/proxy/proxy.go index 67b9dc2..94cb874 100644 --- a/proxy/proxy.go +++ b/proxy/proxy.go @@ -42,6 +42,8 @@ var ( encodedZeroValue, _ = proxycore.EncodeType(datatype.Int, primitive.ProtocolVersion4, 0) ) +const preparedIdSize = 16 + type Config struct { Version primitive.ProtocolVersion MaxVersion primitive.ProtocolVersion @@ -211,7 +213,7 @@ func (p *Proxy) handle(conn *net.TCPConn) { ctx: p.ctx, proxy: p, id: atomic.AddUint64(&p.clientIdGen, 1), - preparedSystemQuery: make(map[[16]byte]interface{}), + preparedSystemQuery: make(map[[preparedIdSize]byte]interface{}), } cl.conn = proxycore.NewConn(conn, cl) cl.conn.Start() @@ -288,7 +290,7 @@ type client struct { conn *proxycore.Conn keyspace string id uint64 - preparedSystemQuery map[[16]byte]interface{} + preparedSystemQuery map[[preparedIdSize]byte]interface{} preparedIdempotence sync.Map } @@ -380,24 +382,24 @@ func (c *client) handlePrepare(raw *frame.RawFrame, msg *message.Prepare) { if columns, err := parser.FilterColumns(s, systemColumns); err != nil { c.send(hdr, &message.Invalid{ErrorMessage: err.Error()}) } else { - hash := md5.Sum([]byte(msg.Query + keyspace)) + id := md5.Sum([]byte(msg.Query + keyspace)) c.send(hdr, &message.PreparedResult{ - PreparedQueryId: hash[:], + PreparedQueryId: id[:], ResultMetadata: &message.RowsMetadata{ ColumnCount: int32(len(columns)), Columns: columns, }, }) - c.preparedSystemQuery[hash] = stmt + c.preparedSystemQuery[id] = stmt } } else { c.send(hdr, &message.Invalid{ErrorMessage: "Doesn't exist"}) } case *parser.UseStatement: - hash := md5.Sum([]byte(msg.Query)) - c.preparedSystemQuery[hash] = stmt + id := md5.Sum([]byte(msg.Query)) + c.preparedSystemQuery[id] = stmt c.send(hdr, &message.PreparedResult{ - PreparedQueryId: hash[:], + PreparedQueryId: id[:], }) default: c.send(hdr, &message.ServerError{ErrorMessage: "Proxy attempted to intercept an unhandled query"}) @@ -410,13 +412,11 @@ func (c *client) handlePrepare(raw *frame.RawFrame, msg *message.Prepare) { } func (c *client) handleExecute(raw *frame.RawFrame, msg *partialExecute) { - var hash [16]byte - copy(hash[:], msg.queryId) - - if stmt, ok := c.preparedSystemQuery[hash]; ok { + id := preparedIdKey(msg.queryId) + if stmt, ok := c.preparedSystemQuery[id]; ok { c.interceptSystemQuery(raw.Header, stmt) } else { - idempotent, ok := c.preparedIdempotence.Load(hash) + idempotent, ok := c.preparedIdempotence.Load(id) state := notIdempotent if ok && idempotent.(bool) { state = isIdempotent @@ -570,3 +570,9 @@ func (d defaultPreparedCache) Load(id string) (entry *proxycore.PreparedEntry, o } return nil, false } + +func preparedIdKey(bytes []byte) [preparedIdSize]byte { + var buf [preparedIdSize]byte + copy(buf[:], bytes) + return buf +} diff --git a/proxy/request.go b/proxy/request.go index b5591f8..d713731 100644 --- a/proxy/request.go +++ b/proxy/request.go @@ -15,8 +15,8 @@ package proxy import ( - "crypto/md5" "io" + "reflect" "sync" "github.com/datastax/cql-proxy/parser" @@ -139,9 +139,11 @@ func (r *request) OnResult(raw *frame.RawFrame) { idempotent, err := parser.IsQueryIdempotent(r.query) if err != nil { r.client.proxy.logger.Error("error parsing query for idempotence", zap.Error(err)) + } else if result, ok := frm.Body.Message.(*message.PreparedResult); ok { + r.client.preparedIdempotence.Store(preparedIdKey(result.PreparedQueryId), idempotent) } else { - // TODO: Make sure this hash matches server-side impl. - r.client.preparedIdempotence.Store(md5.Sum([]byte(r.keyspace+r.query)), idempotent) + r.client.proxy.logger.Error("expected prepared result, but got some other type of message", + zap.Stringer("type", reflect.TypeOf(frm.Body.Message))) } } }