Skip to content

Commit

Permalink
Support protocol v2 and protocol negotiation
Browse files Browse the repository at this point in the history
  • Loading branch information
lukasz-antoniak authored Jul 24, 2024
1 parent b68856d commit 7e7b1f2
Show file tree
Hide file tree
Showing 33 changed files with 1,282 additions and 207 deletions.
17 changes: 14 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,12 +95,24 @@ containerized sandbox environment.

## Supported Protocol Versions

**ZDM Proxy supports protocol versions v3, v4, DSE_V1 and DSE_V2.**
**ZDM Proxy supports protocol versions v2, v3, v4, DSE_V1 and DSE_V2.**

It technically doesn't support v5, but handles protocol negotiation so that the client application properly downgrades
the protocol version to v4 if v5 is requested. This means that any client application using a recent driver that supports
protocol version v5 can be migrated using the ZDM Proxy (as long as it does not use v5-specific functionality).

ZDM Proxy requires origin and target clusters to have at least one protocol version in common. It is therefore not feasible
to configure Apache Cassandra 2.0 as origin and 3.x / 4.x as target. Below table displays protocol versions supported by
various C* versions:

| Apache Cassandra | Protocol Version |
|------------------|------------------|
| 2.0 | V2 |
| 2.1 | V2, V3 |
| 2.2 | V2, V3, V4 |
| 3.x | V3, V4 |
| 4.x | V3, V4, V5 |

---
:warning: **Thrift is not supported by ZDM Proxy.** If you are using a very old driver or cluster version that only supports Thrift
then you need to change your client application to use CQL and potentially upgrade your cluster before starting the
Expand All @@ -110,8 +122,7 @@ migration process.

In practice this means that ZDM Proxy supports the following cluster versions (as Origin and / or Target):

- Apache Cassandra from 2.1+ up to (and including) Apache Cassandra 4.x. Apache Cassandra 2.0 support will be introduced
when protocol version v2 is supported.
- Apache Cassandra from 2.0+ up to (and including) Apache Cassandra 4.x. (although both clusters have to support a common protocol version as mentioned above).
- DataStax Enterprise 4.8+. DataStax Enterprise 4.6 and 4.7 support will be introduced when protocol version v2 is supported.
- DataStax Astra DB (both Serverless and Classic)

Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.19

require (
github.com/antlr/antlr4/runtime/Go/antlr v0.0.0-20211106181442-e4c1a74c66bd
github.com/datastax/go-cassandra-native-protocol v0.0.0-20220525125956-6158d9e218b8
github.com/datastax/go-cassandra-native-protocol v0.0.0-20240626123646-2abea740da8d
github.com/gocql/gocql v0.0.0-20200624222514-34081eda590e
github.com/google/uuid v1.1.1
github.com/jpillora/backoff v1.0.0
Expand All @@ -15,7 +15,6 @@ require (
github.com/rs/zerolog v1.20.0
github.com/sirupsen/logrus v1.6.0
github.com/stretchr/testify v1.8.0
gopkg.in/yaml.v3 v3.0.1
)

require (
Expand All @@ -35,4 +34,5 @@ require (
github.com/prometheus/procfs v0.0.8 // indirect
golang.org/x/sys v0.3.0 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
6 changes: 2 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dR
github.com/cespare/xxhash/v2 v2.1.1 h1:6MnRN8NT7+YBpUIWxHtefFZOKTAPgGjpQSxqLNn0+qY=
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4=
github.com/datastax/go-cassandra-native-protocol v0.0.0-20220525125956-6158d9e218b8 h1:NKLtNzC76ssf68VOenDAzMyQGg+QkxuD2QCubX+GvLk=
github.com/datastax/go-cassandra-native-protocol v0.0.0-20220525125956-6158d9e218b8/go.mod h1:yFD0OKoVV9d1QW7Es58c1Gv6ijrqTGPcxgHv27wdC4Q=
github.com/datastax/go-cassandra-native-protocol v0.0.0-20240626123646-2abea740da8d h1:UnPtAA8Ux3GvHLazSSUydERFuoQRyxHrB8puzXyjXIE=
github.com/datastax/go-cassandra-native-protocol v0.0.0-20240626123646-2abea740da8d/go.mod h1:6FzirJfdffakAVqmHjwVfFkpru/gNbIazUOK5rIhndc=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
Expand Down Expand Up @@ -105,7 +105,6 @@ github.com/stretchr/objx v0.4.0 h1:M2gUjqZET1qApGOWNSnZ49BAIMX4F/1plDv3+l31EJ4=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
Expand All @@ -131,7 +130,6 @@ golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8T
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f h1:BLraFXnmrev5lT+xlilqcH8XK9/i0At2xKjWk4p6zsU=
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc=
Expand Down
2 changes: 1 addition & 1 deletion integration-tests/asyncreads_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ func TestAsyncReadsRequestTypes(t *testing.T) {
}

testSetup, err := setup.NewSimulacronTestSetupWithSessionAndNodesAndConfig(
t, false, false, 1, nil)
t, false, false, 1, nil, nil)
require.Nil(t, err)
defer testSetup.Cleanup()

Expand Down
117 changes: 101 additions & 16 deletions integration-tests/connect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@ package integration_tests
import (
"bytes"
"context"
client2 "github.com/datastax/go-cassandra-native-protocol/client"
cqlClient "github.com/datastax/go-cassandra-native-protocol/client"
"github.com/datastax/go-cassandra-native-protocol/frame"
"github.com/datastax/go-cassandra-native-protocol/message"
"github.com/datastax/go-cassandra-native-protocol/primitive"
"github.com/datastax/zdm-proxy/integration-tests/client"
"github.com/datastax/zdm-proxy/integration-tests/setup"
"github.com/datastax/zdm-proxy/integration-tests/simulacron"
"github.com/datastax/zdm-proxy/integration-tests/utils"
"github.com/datastax/zdm-proxy/proxy/pkg/config"
"github.com/rs/zerolog"
Expand Down Expand Up @@ -45,6 +46,82 @@ func TestGoCqlConnect(t *testing.T) {
require.Equal(t, "fake", iter.Columns()[0].Name)
}

// Simulacron-based test to make sure that we can handle invalid protocol error and downgrade
// used protocol on control connection. ORIGIN and TARGET are using the same C* version
func TestControlConnectionProtocolVersionNegotiation(t *testing.T) {
tests := []struct {
name string
clusterVersion string
controlConnMaxProtocolVersion string
negotiatedProtocolVersion primitive.ProtocolVersion
}{
{
name: "Cluster2.1_MaxCCProtoVer4_NegotiatedProtoVer3",
clusterVersion: "2.1",
controlConnMaxProtocolVersion: "4",
negotiatedProtocolVersion: primitive.ProtocolVersion3, // protocol downgraded to V3, V4 is not supported
},
{
name: "Cluster3.0_MaxCCProtoVer4_NegotiatedProtoVer4",
clusterVersion: "3.0",
controlConnMaxProtocolVersion: "4",
negotiatedProtocolVersion: primitive.ProtocolVersion4, // make sure that protocol negotiation does not fail if it is not actually needed
},
{
name: "Cluster3.0_MaxCCProtoVer3_NegotiatedProtoVer3",
clusterVersion: "3.0",
controlConnMaxProtocolVersion: "3",
negotiatedProtocolVersion: primitive.ProtocolVersion3, // protocol V3 applied as it is the maximum configured
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
c := setup.NewTestConfig("", "")
c.ControlConnMaxProtocolVersion = tt.controlConnMaxProtocolVersion
testSetup, err := setup.NewSimulacronTestSetupWithSessionAndNodesAndConfig(t, true, false, 1, c,
&simulacron.ClusterVersion{tt.clusterVersion, tt.clusterVersion})
require.Nil(t, err)
defer testSetup.Cleanup()

query := "SELECT * FROM test"
expectedRows := simulacron.NewRowsResult(
map[string]simulacron.DataType{
"company": simulacron.DataTypeText,
}).WithRow(map[string]interface{}{
"company": "TBD",
})

err = testSetup.Origin.Prime(simulacron.WhenQuery(
query,
simulacron.NewWhenQueryOptions()).
ThenRowsSuccess(expectedRows))
require.Nil(t, err)

// Connect to proxy as a "client"
client := cqlClient.NewCqlClient("127.0.0.1:14002", nil)
cqlClientConn, err := client.ConnectAndInit(context.Background(), tt.negotiatedProtocolVersion, 0)
require.Nil(t, err)
defer cqlClientConn.Close()

cqlConn, _ := testSetup.Proxy.GetOriginControlConn().GetConnAndContactPoint()
negotiatedProto := cqlConn.GetProtocolVersion()
require.Equal(t, tt.negotiatedProtocolVersion, negotiatedProto)

queryMsg := &message.Query{
Query: "SELECT * FROM test",
Options: &message.QueryOptions{Consistency: primitive.ConsistencyLevelOne},
}
rsp, err := cqlClientConn.SendAndReceive(frame.NewFrame(primitive.ProtocolVersion3, 0, queryMsg))
if err != nil {
t.Fatal("query failed:", err)
}

require.Equal(t, 1, len(rsp.Body.Message.(*message.RowsResult).Data))
})
}
}

func TestMaxClientsThreshold(t *testing.T) {
maxClients := 10
goCqlConnectionsPerHost := 1
Expand Down Expand Up @@ -82,20 +159,23 @@ func TestMaxClientsThreshold(t *testing.T) {

func TestRequestedProtocolVersionUnsupportedByProxy(t *testing.T) {
tests := []struct {
name string
requestVersion primitive.ProtocolVersion
expectedVersion primitive.ProtocolVersion
errExpected string
name string
requestVersion primitive.ProtocolVersion
negotiatedVersion string
expectedVersion primitive.ProtocolVersion
errExpected string
}{
{
"request v5, response v4",
primitive.ProtocolVersion5,
"4",
primitive.ProtocolVersion4,
"Invalid or unsupported protocol version (5)",
},
{
"request v1, response v4",
primitive.ProtocolVersion(0x1),
"4",
primitive.ProtocolVersion4,
"Invalid or unsupported protocol version (1)",
},
Expand All @@ -112,13 +192,14 @@ func TestRequestedProtocolVersionUnsupportedByProxy(t *testing.T) {
defer zerolog.SetGlobalLevel(oldZeroLogLevel)

cfg := setup.NewTestConfig("127.0.1.1", "127.0.1.2")
cfg.ControlConnMaxProtocolVersion = test.negotiatedVersion
cfg.LogLevel = "TRACE" // saw 1 test failure here once but logs didn't show enough info
testSetup, err := setup.NewCqlServerTestSetup(t, cfg, false, false, false)
require.Nil(t, err)
defer testSetup.Cleanup()

testSetup.Origin.CqlServer.RequestHandlers = []client2.RequestHandler{client2.NewDriverConnectionInitializationHandler("origin", "dc1", func(_ string) {})}
testSetup.Target.CqlServer.RequestHandlers = []client2.RequestHandler{client2.NewDriverConnectionInitializationHandler("target", "dc1", func(_ string) {})}
testSetup.Origin.CqlServer.RequestHandlers = []cqlClient.RequestHandler{cqlClient.NewDriverConnectionInitializationHandler("origin", "dc1", func(_ string) {})}
testSetup.Target.CqlServer.RequestHandlers = []cqlClient.RequestHandler{cqlClient.NewDriverConnectionInitializationHandler("target", "dc1", func(_ string) {})}

err = testSetup.Start(cfg, false, primitive.ProtocolVersion3)
require.Nil(t, err)
Expand All @@ -141,38 +222,42 @@ func TestRequestedProtocolVersionUnsupportedByProxy(t *testing.T) {

func TestReturnedProtocolVersionUnsupportedByProxy(t *testing.T) {
type test struct {
name string
requestVersion primitive.ProtocolVersion
returnedVersion primitive.ProtocolVersion
expectedVersion primitive.ProtocolVersion
errExpected string
name string
requestVersion primitive.ProtocolVersion
negotiatedVersion string
returnedVersion primitive.ProtocolVersion
expectedVersion primitive.ProtocolVersion
errExpected string
}
tests := []*test{
{
"DSE_V2 request, v5 returned, v4 expected",
primitive.ProtocolVersionDse2,
"4",
primitive.ProtocolVersion5,
primitive.ProtocolVersion4,
"Invalid or unsupported protocol version (5)",
},
{
"DSE_V2 request, v1 returned, v4 expected",
primitive.ProtocolVersionDse2,
"4",
primitive.ProtocolVersion(0x01),
primitive.ProtocolVersion4,
"Invalid or unsupported protocol version (1)",
},
}

runTestFunc := func(t *testing.T, test *test, cfg *config.Config) {
cfg.ControlConnMaxProtocolVersion = test.negotiatedVersion // simulate what version was negotiated on control connection
testSetup, err := setup.NewCqlServerTestSetup(t, cfg, false, false, false)
require.Nil(t, err)
defer testSetup.Cleanup()

enableHandlers := atomic.Value{}
enableHandlers.Store(false)

rawHandler := func(request *frame.Frame, conn *client2.CqlServerConnection, ctx client2.RequestHandlerContext) (response []byte) {
rawHandler := func(request *frame.Frame, conn *cqlClient.CqlServerConnection, ctx cqlClient.RequestHandlerContext) (response []byte) {
if enableHandlers.Load().(bool) && request.Header.Version == test.requestVersion {
encodedFrame, err := createFrameWithUnsupportedVersion(test.returnedVersion, request.Header.StreamId, true)
if err != nil {
Expand All @@ -184,8 +269,8 @@ func TestReturnedProtocolVersionUnsupportedByProxy(t *testing.T) {
return nil
}

testSetup.Origin.CqlServer.RequestRawHandlers = []client2.RawRequestHandler{rawHandler}
testSetup.Target.CqlServer.RequestRawHandlers = []client2.RawRequestHandler{rawHandler}
testSetup.Origin.CqlServer.RequestRawHandlers = []cqlClient.RawRequestHandler{rawHandler}
testSetup.Target.CqlServer.RequestRawHandlers = []cqlClient.RawRequestHandler{rawHandler}

err = testSetup.Start(cfg, false, primitive.ProtocolVersion4)
require.Nil(t, err)
Expand Down Expand Up @@ -222,7 +307,7 @@ func TestReturnedProtocolVersionUnsupportedByProxy(t *testing.T) {
}

func createFrameWithUnsupportedVersion(version primitive.ProtocolVersion, streamId int16, isResponse bool) ([]byte, error) {
mostSimilarVersion := primitive.ProtocolVersion4
mostSimilarVersion := version
if version > primitive.ProtocolVersionDse2 {
mostSimilarVersion = primitive.ProtocolVersionDse2
} else if version < primitive.ProtocolVersion2 {
Expand Down
Loading

0 comments on commit 7e7b1f2

Please sign in to comment.