Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

120: Support protocol v2 #119

Merged
merged 30 commits into from
Jul 24, 2024
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
9257cbd
ZDM-71: Introduce protocol negotiation
lukasz-antoniak Jun 17, 2024
72e5518
ZDM-71: Introduce protocol negotiation
lukasz-antoniak Jun 18, 2024
9de161b
Allow to run with specific Simulacron cluster version
lukasz-antoniak Jun 18, 2024
f60bd28
Allow to run with specific Simulacron cluster version
lukasz-antoniak Jun 18, 2024
a244c8b
Better classification of ProtocolError
lukasz-antoniak Jun 18, 2024
3002a92
Validation of protocol version
lukasz-antoniak Jun 20, 2024
4bf2cdd
Protocol V2 support
lukasz-antoniak Jun 25, 2024
b8feb06
Upgrade go-cassandra-native-protocol library
lukasz-antoniak Jun 26, 2024
1d353fa
Cleanup
lukasz-antoniak Jun 26, 2024
081bec0
Protocol V2 stubbed tests
lukasz-antoniak Jun 27, 2024
65ce9a0
Protocol V2 stubbed tests
lukasz-antoniak Jun 27, 2024
d8ba2db
Update README
lukasz-antoniak Jun 27, 2024
58680ad
Tidy dependencies
lukasz-antoniak Jun 28, 2024
7247e64
Apply review comments
lukasz-antoniak Jun 28, 2024
5efe8c2
Apply review comments
lukasz-antoniak Jun 28, 2024
f8e9529
Limit number of maximum stream IDs
lukasz-antoniak Jul 11, 2024
a918eaf
Merge branch 'main' of github.com:datastax/zdm-proxy into ZDM-71
lukasz-antoniak Jul 11, 2024
f94e037
Fix merge issues
lukasz-antoniak Jul 11, 2024
a5389e0
Fix merge issues
lukasz-antoniak Jul 11, 2024
ce7179c
Fix merge issues
lukasz-antoniak Jul 11, 2024
c2e46f1
Fix merge issues
lukasz-antoniak Jul 11, 2024
4bd3785
New maximum stream IDs test
lukasz-antoniak Jul 17, 2024
7722bf1
Automated gofmt changes
Jul 17, 2024
1dabb23
Cleanup
lukasz-antoniak Jul 17, 2024
2615ce9
Use DSEv2 as default max protocol version
lukasz-antoniak Jul 18, 2024
21eebb6
More various protocol version tests
lukasz-antoniak Jul 18, 2024
5ea7eef
Stream ID verification
lukasz-antoniak Jul 19, 2024
6e4b20d
Documentation
lukasz-antoniak Jul 19, 2024
bca996c
Fix build
lukasz-antoniak Jul 19, 2024
3a18a11
Cleanup
lukasz-antoniak Jul 22, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 15 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,12 +95,24 @@ containerized sandbox environment.

## Supported Protocol Versions

**ZDM Proxy supports protocol versions v3, v4, DSE_V1 and DSE_V2.**
**ZDM Proxy supports protocol versions v2, v3, v4, DSE_V1 and DSE_V2.**
lukasz-antoniak marked this conversation as resolved.
Show resolved Hide resolved

It technically doesn't support v5, but handles protocol negotiation so that the client application properly downgrades
the protocol version to v4 if v5 is requested. This means that any client application using a recent driver that supports
protocol version v5 can be migrated using the ZDM Proxy (as long as it does not use v5-specific functionality).

ZDM Proxy requires origin and target clusters to have at least one protocol version in common. It is therefore not feasible
to configure Apache Cassandra 2.0 as origin and 3.x / 4.x as target. Below table displays protocol versions supported by
various C* versions:

| Apache Cassandra | Protocol Version |
|------------------|------------------|
| 2.0 | V2 |
| 2.1 | V2, V3 |
| 2.2 | V3, V4 |
| 3.x | V3, V4 |
lukasz-antoniak marked this conversation as resolved.
Show resolved Hide resolved
| 4.x | V3, V4, V5 |

lukasz-antoniak marked this conversation as resolved.
Show resolved Hide resolved
---
:warning: **Thrift is not supported by ZDM Proxy.** If you are using a very old driver or cluster version that only supports Thrift
then you need to change your client application to use CQL and potentially upgrade your cluster before starting the
Expand All @@ -110,8 +122,8 @@ migration process.

In practice this means that ZDM Proxy supports the following cluster versions (as Origin and / or Target):

- Apache Cassandra from 2.1+ up to (and including) Apache Cassandra 4.x. Apache Cassandra 2.0 support will be introduced
when protocol version v2 is supported.
- Apache Cassandra from 2.1+ up to (and including) Apache Cassandra 4.x.
- Apache Cassandra 2.0 up to 2.1.
lukasz-antoniak marked this conversation as resolved.
Show resolved Hide resolved
- DataStax Enterprise 4.8+. DataStax Enterprise 4.6 and 4.7 support will be introduced when protocol version v2 is supported.
- DataStax Astra DB (both Serverless and Classic)

Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.19

require (
github.com/antlr/antlr4/runtime/Go/antlr v0.0.0-20211106181442-e4c1a74c66bd
github.com/datastax/go-cassandra-native-protocol v0.0.0-20220525125956-6158d9e218b8
github.com/datastax/go-cassandra-native-protocol v0.0.0-20240626123646-2abea740da8d
github.com/gocql/gocql v0.0.0-20200624222514-34081eda590e
github.com/google/uuid v1.1.1
github.com/jpillora/backoff v1.0.0
Expand All @@ -15,7 +15,6 @@ require (
github.com/rs/zerolog v1.20.0
github.com/sirupsen/logrus v1.6.0
github.com/stretchr/testify v1.8.0
gopkg.in/yaml.v3 v3.0.1
)

require (
Expand All @@ -35,4 +34,5 @@ require (
github.com/prometheus/procfs v0.0.8 // indirect
golang.org/x/sys v0.3.0 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
6 changes: 2 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dR
github.com/cespare/xxhash/v2 v2.1.1 h1:6MnRN8NT7+YBpUIWxHtefFZOKTAPgGjpQSxqLNn0+qY=
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4=
github.com/datastax/go-cassandra-native-protocol v0.0.0-20220525125956-6158d9e218b8 h1:NKLtNzC76ssf68VOenDAzMyQGg+QkxuD2QCubX+GvLk=
github.com/datastax/go-cassandra-native-protocol v0.0.0-20220525125956-6158d9e218b8/go.mod h1:yFD0OKoVV9d1QW7Es58c1Gv6ijrqTGPcxgHv27wdC4Q=
github.com/datastax/go-cassandra-native-protocol v0.0.0-20240626123646-2abea740da8d h1:UnPtAA8Ux3GvHLazSSUydERFuoQRyxHrB8puzXyjXIE=
github.com/datastax/go-cassandra-native-protocol v0.0.0-20240626123646-2abea740da8d/go.mod h1:6FzirJfdffakAVqmHjwVfFkpru/gNbIazUOK5rIhndc=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
Expand Down Expand Up @@ -105,7 +105,6 @@ github.com/stretchr/objx v0.4.0 h1:M2gUjqZET1qApGOWNSnZ49BAIMX4F/1plDv3+l31EJ4=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
Expand All @@ -131,7 +130,6 @@ golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8T
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f h1:BLraFXnmrev5lT+xlilqcH8XK9/i0At2xKjWk4p6zsU=
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc=
Expand Down
2 changes: 1 addition & 1 deletion integration-tests/asyncreads_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ func TestAsyncReadsRequestTypes(t *testing.T) {
}

testSetup, err := setup.NewSimulacronTestSetupWithSessionAndNodesAndConfig(
t, false, false, 1, nil)
t, false, false, 1, nil, nil)
require.Nil(t, err)
defer testSetup.Cleanup()

Expand Down
89 changes: 83 additions & 6 deletions integration-tests/connect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@ package integration_tests
import (
"bytes"
"context"
client2 "github.com/datastax/go-cassandra-native-protocol/client"
cqlClient "github.com/datastax/go-cassandra-native-protocol/client"
"github.com/datastax/go-cassandra-native-protocol/frame"
"github.com/datastax/go-cassandra-native-protocol/message"
"github.com/datastax/go-cassandra-native-protocol/primitive"
"github.com/datastax/zdm-proxy/integration-tests/client"
"github.com/datastax/zdm-proxy/integration-tests/setup"
"github.com/datastax/zdm-proxy/integration-tests/simulacron"
"github.com/datastax/zdm-proxy/integration-tests/utils"
"github.com/datastax/zdm-proxy/proxy/pkg/config"
"github.com/rs/zerolog"
Expand Down Expand Up @@ -45,6 +46,82 @@ func TestGoCqlConnect(t *testing.T) {
require.Equal(t, "fake", iter.Columns()[0].Name)
}

// Simulacron-based test to make sure that we can handle invalid protocol error and downgrade
// used protocol on control connection. ORIGIN and TARGET are using the same C* version
func TestControlConnectionProtocolVersionNegotiation(t *testing.T) {
tests := []struct {
name string
clusterVersion string
controlConnMaxProtocolVersion string
negotiatedProtocolVersion primitive.ProtocolVersion
}{
{
name: "Cluster2.1_MaxCCProtoVer4_NegotiatedProtoVer3",
clusterVersion: "2.1",
controlConnMaxProtocolVersion: "4",
negotiatedProtocolVersion: primitive.ProtocolVersion3, // protocol downgraded to V3, V4 is not supported
},
{
name: "Cluster3.0_MaxCCProtoVer4_NegotiatedProtoVer4",
clusterVersion: "3.0",
controlConnMaxProtocolVersion: "4",
negotiatedProtocolVersion: primitive.ProtocolVersion4, // make sure that protocol negotiation does not fail if it is not actually needed
},
{
name: "Cluster3.0_MaxCCProtoVer3_NegotiatedProtoVer3",
clusterVersion: "3.0",
controlConnMaxProtocolVersion: "3",
negotiatedProtocolVersion: primitive.ProtocolVersion3, // protocol V3 applied as it is the maximum configured
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
c := setup.NewTestConfig("", "")
c.ControlConnMaxProtocolVersion = tt.controlConnMaxProtocolVersion
testSetup, err := setup.NewSimulacronTestSetupWithSessionAndNodesAndConfig(t, true, false, 1, c,
&simulacron.ClusterVersion{tt.clusterVersion, tt.clusterVersion})
require.Nil(t, err)
defer testSetup.Cleanup()

query := "SELECT * FROM test"
expectedRows := simulacron.NewRowsResult(
map[string]simulacron.DataType{
"company": simulacron.DataTypeText,
}).WithRow(map[string]interface{}{
"company": "TBD",
})

err = testSetup.Origin.Prime(simulacron.WhenQuery(
query,
simulacron.NewWhenQueryOptions()).
ThenRowsSuccess(expectedRows))
require.Nil(t, err)

// Connect to proxy as a "client"
client := cqlClient.NewCqlClient("127.0.0.1:14002", nil)
cqlClientConn, err := client.ConnectAndInit(context.Background(), tt.negotiatedProtocolVersion, 0)
require.Nil(t, err)
defer cqlClientConn.Close()

cqlConn, _ := testSetup.Proxy.GetOriginControlConn().GetConnAndContactPoint()
negotiatedProto := cqlConn.GetProtocolVersion()
require.Equal(t, tt.negotiatedProtocolVersion, negotiatedProto)

queryMsg := &message.Query{
Query: "SELECT * FROM test",
Options: &message.QueryOptions{Consistency: primitive.ConsistencyLevelOne},
}
rsp, err := cqlClientConn.SendAndReceive(frame.NewFrame(primitive.ProtocolVersion3, 0, queryMsg))
if err != nil {
t.Fatal("query failed:", err)
}

require.Equal(t, 1, len(rsp.Body.Message.(*message.RowsResult).Data))
})
}
}

func TestMaxClientsThreshold(t *testing.T) {
maxClients := 10
goCqlConnectionsPerHost := 1
Expand Down Expand Up @@ -117,8 +194,8 @@ func TestRequestedProtocolVersionUnsupportedByProxy(t *testing.T) {
require.Nil(t, err)
defer testSetup.Cleanup()

testSetup.Origin.CqlServer.RequestHandlers = []client2.RequestHandler{client2.NewDriverConnectionInitializationHandler("origin", "dc1", func(_ string) {})}
testSetup.Target.CqlServer.RequestHandlers = []client2.RequestHandler{client2.NewDriverConnectionInitializationHandler("target", "dc1", func(_ string) {})}
testSetup.Origin.CqlServer.RequestHandlers = []cqlClient.RequestHandler{cqlClient.NewDriverConnectionInitializationHandler("origin", "dc1", func(_ string) {})}
testSetup.Target.CqlServer.RequestHandlers = []cqlClient.RequestHandler{cqlClient.NewDriverConnectionInitializationHandler("target", "dc1", func(_ string) {})}

err = testSetup.Start(cfg, false, primitive.ProtocolVersion3)
require.Nil(t, err)
Expand Down Expand Up @@ -172,7 +249,7 @@ func TestReturnedProtocolVersionUnsupportedByProxy(t *testing.T) {
enableHandlers := atomic.Value{}
enableHandlers.Store(false)

rawHandler := func(request *frame.Frame, conn *client2.CqlServerConnection, ctx client2.RequestHandlerContext) (response []byte) {
rawHandler := func(request *frame.Frame, conn *cqlClient.CqlServerConnection, ctx cqlClient.RequestHandlerContext) (response []byte) {
if enableHandlers.Load().(bool) && request.Header.Version == test.requestVersion {
encodedFrame, err := createFrameWithUnsupportedVersion(test.returnedVersion, request.Header.StreamId, true)
if err != nil {
Expand All @@ -184,8 +261,8 @@ func TestReturnedProtocolVersionUnsupportedByProxy(t *testing.T) {
return nil
}

testSetup.Origin.CqlServer.RequestRawHandlers = []client2.RawRequestHandler{rawHandler}
testSetup.Target.CqlServer.RequestRawHandlers = []client2.RawRequestHandler{rawHandler}
testSetup.Origin.CqlServer.RequestRawHandlers = []cqlClient.RawRequestHandler{rawHandler}
testSetup.Target.CqlServer.RequestRawHandlers = []cqlClient.RawRequestHandler{rawHandler}

err = testSetup.Start(cfg, false, primitive.ProtocolVersion4)
require.Nil(t, err)
Expand Down
52 changes: 42 additions & 10 deletions integration-tests/customhandler_test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ var (
releaseVersionColumn = &message.ColumnMetadata{Keyspace: "system", Table: "local", Name: "release_version", Type: datatype.Varchar}
rpcAddressColumn = &message.ColumnMetadata{Keyspace: "system", Table: "local", Name: "rpc_address", Type: datatype.Inet}
schemaVersionColumn = &message.ColumnMetadata{Keyspace: "system", Table: "local", Name: "schema_version", Type: datatype.Uuid}
tokensColumn = &message.ColumnMetadata{Keyspace: "system", Table: "local", Name: "tokens", Type: datatype.NewSetType(datatype.Varchar)}
tokensColumn = &message.ColumnMetadata{Keyspace: "system", Table: "local", Name: "tokens", Type: datatype.NewSet(datatype.Varchar)}
)

// These columns are a subset of the total columns returned by OSS C* 3.11.2, and contain all the information that
Expand All @@ -78,6 +78,22 @@ var systemLocalColumns = []*message.ColumnMetadata{
tokensColumn,
}

// These columns are a subset of the total columns returned by OSS C* 2.0.0, and contain all the information that
// drivers need in order to establish the cluster topology and determine its characteristics. Please note that RPC address
// column is not present.
var systemLocalColumnsProtocolV2 = []*message.ColumnMetadata{
keyColumn,
clusterNameColumn,
cqlVersionColumn,
datacenterColumn,
hostIdColumn,
partitionerColumn,
rackColumn,
releaseVersionColumn,
schemaVersionColumn,
tokensColumn,
}

var (
peerColumn = &message.ColumnMetadata{Keyspace: "system", Table: "peers", Name: "peer", Type: datatype.Inet}
datacenterPeersColumn = &message.ColumnMetadata{Keyspace: "system", Table: "peers", Name: "data_center", Type: datatype.Varchar}
Expand All @@ -86,7 +102,7 @@ var (
releaseVersionPeersColumn = &message.ColumnMetadata{Keyspace: "system", Table: "peers", Name: "release_version", Type: datatype.Varchar}
rpcAddressPeersColumn = &message.ColumnMetadata{Keyspace: "system", Table: "peers", Name: "rpc_address", Type: datatype.Inet}
schemaVersionPeersColumn = &message.ColumnMetadata{Keyspace: "system", Table: "peers", Name: "schema_version", Type: datatype.Uuid}
tokensPeersColumn = &message.ColumnMetadata{Keyspace: "system", Table: "peers", Name: "tokens", Type: datatype.NewSetType(datatype.Varchar)}
tokensPeersColumn = &message.ColumnMetadata{Keyspace: "system", Table: "peers", Name: "tokens", Type: datatype.NewSet(datatype.Varchar)}
)

// These columns are a subset of the total columns returned by OSS C* 3.11.2, and contain all the information that
Expand Down Expand Up @@ -114,11 +130,13 @@ var (

func systemLocalRow(cluster string, datacenter string, customPartitioner string, addr net.Addr, version primitive.ProtocolVersion) message.Row {
addrBuf := &bytes.Buffer{}
inetAddr := addr.(*net.TCPAddr).IP
if inetAddr.To4() != nil {
addrBuf.Write(inetAddr.To4())
} else {
addrBuf.Write(inetAddr)
if addr != nil {
inetAddr := addr.(*net.TCPAddr).IP
if inetAddr.To4() != nil {
addrBuf.Write(inetAddr.To4())
} else {
addrBuf.Write(inetAddr)
}
}
// emulate {'-9223372036854775808'} (entire ring)
tokensBuf := &bytes.Buffer{}
Expand All @@ -135,18 +153,32 @@ func systemLocalRow(cluster string, datacenter string, customPartitioner string,
if customPartitioner != "" {
partitionerValue = message.Column(customPartitioner)
}
if version >= primitive.ProtocolVersion3 {
joao-r-reis marked this conversation as resolved.
Show resolved Hide resolved
return message.Row{
keyValue,
addrBuf.Bytes(),
message.Column(cluster),
cqlVersionValue,
message.Column(datacenter),
hostIdValue,
addrBuf.Bytes(),
partitionerValue,
rackValue,
releaseVersionValue,
addrBuf.Bytes(),
schemaVersionValue,
tokensBuf.Bytes(),
}
}
return message.Row{
keyValue,
addrBuf.Bytes(),
message.Column(cluster),
cqlVersionValue,
message.Column(datacenter),
hostIdValue,
addrBuf.Bytes(),
partitionerValue,
rackValue,
releaseVersionValue,
addrBuf.Bytes(),
schemaVersionValue,
tokensBuf.Bytes(),
}
Expand Down
23 changes: 13 additions & 10 deletions integration-tests/functioncalls_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -854,7 +854,7 @@ func TestNowFunctionReplacementPreparedStatement(t *testing.T) {
isReplacedNow: false,
value: []int{11, 22, 33},
valueSimulacron: []int{11, 22, 33},
dataType: datatype.NewListType(datatype.Int),
dataType: datatype.NewList(datatype.Int),
simulacronType: "list<int>",
},
{
Expand All @@ -880,7 +880,7 @@ func TestNowFunctionReplacementPreparedStatement(t *testing.T) {
{1, 2, 3},
{2, 3, 4},
},
dataType: datatype.NewListType(datatype.NewTupleType(datatype.Int, datatype.Int, datatype.Int)),
dataType: datatype.NewList(datatype.NewTuple(datatype.Int, datatype.Int, datatype.Int)),
simulacronType: "list<tuple<int, int, int>>",
},
{
Expand Down Expand Up @@ -2261,7 +2261,7 @@ func TestNowFunctionReplacementBatchStatement(t *testing.T) {
}
expectedBatchChildQueries = append(expectedBatchChildQueries, expectedBatchChildQuery)

var queryOrId interface{}
var batchChild *message.BatchChild
if childStatement.prepared {
when := simulacron.NewWhenQueryOptions()
for _, p := range expectedChildQueryParams {
Expand All @@ -2285,18 +2285,21 @@ func TestNowFunctionReplacementBatchStatement(t *testing.T) {
require.Nil(t, err)
prepared, ok := resp.Body.Message.(*message.PreparedResult)
require.True(t, ok)
queryOrId = prepared.PreparedQueryId
batchChild = &message.BatchChild{
Id: prepared.PreparedQueryId,
Values: positionalValues,
}

validateForwardedPrepare(simulacronSetup.Origin, childStatement)
validateForwardedPrepare(simulacronSetup.Target, childStatement)
} else {
queryOrId = childStatement.originalQuery
batchChild = &message.BatchChild{
Query: childStatement.originalQuery,
Values: positionalValues,
}
}

batchChildStatements = append(batchChildStatements, &message.BatchChild{
QueryOrId: queryOrId,
Values: positionalValues,
})
batchChildStatements = append(batchChildStatements, batchChild)
}

batchMsg := &message.Batch{
Expand Down Expand Up @@ -2325,7 +2328,7 @@ func TestNowFunctionReplacementBatchStatement(t *testing.T) {
actualStmt := matching[0].QueriesOrIds[idx]
actualParams := matching[0].Values[idx]
if childStatement.prepared {
b64ExpectedValue := base64.StdEncoding.EncodeToString(batchChildStatements[idx].QueryOrId.([]byte))
b64ExpectedValue := base64.StdEncoding.EncodeToString(batchChildStatements[idx].Id)
require.Equal(t, b64ExpectedValue, actualStmt, idx)
} else {
if enableNowReplacement {
Expand Down
Loading
Loading