From 3a6f2bf224dee0ca400131edcf93d946a5bc2c35 Mon Sep 17 00:00:00 2001 From: Doug Wettlaufer Date: Thu, 30 Sep 2021 13:22:05 -0500 Subject: [PATCH 1/2] Add compression support for clients --- go.mod | 4 ++-- go.sum | 5 +++-- proxy/codecs.go | 3 --- proxy/proxy.go | 15 ++++++++++----- proxy/request.go | 4 ++-- 5 files changed, 17 insertions(+), 14 deletions(-) diff --git a/go.mod b/go.mod index 381a375..452cf53 100644 --- a/go.mod +++ b/go.mod @@ -5,10 +5,10 @@ go 1.16 require ( github.com/alecthomas/kong v0.2.17 github.com/antlr/antlr4/runtime/Go/antlr v0.0.0-20210521184019-c5ad59b459ec - github.com/datastax/go-cassandra-native-protocol v0.0.0-20210604174339-4311e5d5654d + github.com/datastax/go-cassandra-native-protocol v0.0.0-20210929152221-b2edfb76bdf8 github.com/hashicorp/golang-lru v0.5.4 github.com/stretchr/testify v1.7.0 - go.uber.org/atomic v1.8.0 // indirect + go.uber.org/atomic v1.8.0 go.uber.org/multierr v1.7.0 // indirect go.uber.org/zap v1.17.0 ) diff --git a/go.sum b/go.sum index 58e75fe..f098d15 100644 --- a/go.sum +++ b/go.sum @@ -3,8 +3,8 @@ github.com/alecthomas/kong v0.2.17/go.mod h1:ka3VZ8GZNPXv9Ov+j4YNLkI8mTuhXyr/0kt github.com/antlr/antlr4/runtime/Go/antlr v0.0.0-20210521184019-c5ad59b459ec h1:EEyRvzmpEUZ+I8WmD5cw/vY8EqhambkOqy5iFr0908A= github.com/antlr/antlr4/runtime/Go/antlr v0.0.0-20210521184019-c5ad59b459ec/go.mod h1:F7bn7fEU90QkQ3tnmaTx3LTKLEDqnwWODIYppRQ5hnY= github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= -github.com/datastax/go-cassandra-native-protocol v0.0.0-20210604174339-4311e5d5654d h1:sGM7OKF00vOQwn3cKGVpZLixkeP6i6kQRcz9+aV1cqs= -github.com/datastax/go-cassandra-native-protocol v0.0.0-20210604174339-4311e5d5654d/go.mod h1:n6F7IFlMxffEUMfTBvNqwaxwpJMvMnRT5L93km1qeNc= +github.com/datastax/go-cassandra-native-protocol v0.0.0-20210929152221-b2edfb76bdf8 h1:PjLPs5mNlfKiMqIBFgeMqbhkAVUFpr4rualeP6bKW58= +github.com/datastax/go-cassandra-native-protocol v0.0.0-20210929152221-b2edfb76bdf8/go.mod h1:n6F7IFlMxffEUMfTBvNqwaxwpJMvMnRT5L93km1qeNc= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -20,6 +20,7 @@ github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/rs/xid v1.2.1/go.mod h1:+uKXf+4Djp6Md1KODXJxgGQPKngRmWyn10oCKFzNHOQ= +github.com/rs/zerolog v1.20.0 h1:38k9hgtUBdxFwE34yS8rTHmHBa4eN16E4DJlv177LNs= github.com/rs/zerolog v1.20.0/go.mod h1:IzD0RJ65iWH0w97OQQebJEvTZYvsCUm9WVLWBQrJRjo= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= diff --git a/proxy/codecs.go b/proxy/codecs.go index 8679b97..5c230a1 100644 --- a/proxy/codecs.go +++ b/proxy/codecs.go @@ -20,13 +20,10 @@ import ( "fmt" "io" - "github.com/datastax/go-cassandra-native-protocol/frame" "github.com/datastax/go-cassandra-native-protocol/message" "github.com/datastax/go-cassandra-native-protocol/primitive" ) -var codec = frame.NewRawCodec(&partialQueryCodec{}, &partialExecuteCodec{}) - type partialQueryCodec struct{} func (c *partialQueryCodec) Encode(_ message.Message, _ io.Writer, _ primitive.ProtocolVersion) error { diff --git a/proxy/proxy.go b/proxy/proxy.go index ad75ae4..5c141e6 100644 --- a/proxy/proxy.go +++ b/proxy/proxy.go @@ -29,6 +29,7 @@ import ( "cql-proxy/parser" "cql-proxy/proxycore" + nativeProtoClient "github.com/datastax/go-cassandra-native-protocol/client" "github.com/datastax/go-cassandra-native-protocol/datatype" "github.com/datastax/go-cassandra-native-protocol/frame" "github.com/datastax/go-cassandra-native-protocol/message" @@ -78,7 +79,7 @@ func (p *Proxy) OnEvent(event interface{}) { p.schemaEventClients.Range(func(key, value interface{}) bool { cl := value.(*client) err := cl.conn.Write(proxycore.SenderFunc(func(writer io.Writer) error { - return codec.EncodeFrame(frm, writer) + return cl.codec.EncodeFrame(frm, writer) })) cl.conn.LocalAddr() if err != nil { @@ -202,6 +203,7 @@ func (p *Proxy) handle(conn *net.TCPConn) { id: atomic.AddUint64(&p.clientIdGen, 1), preparedSystemQuery: make(map[[16]byte]interface{}), preparedIdempotence: make(map[[16]byte]bool), + codec: frame.NewRawCodec(&partialQueryCodec{}, &partialExecuteCodec{}), } cl.conn = proxycore.NewConn(conn, cl) cl.conn.Start() @@ -270,10 +272,11 @@ type client struct { id uint64 preparedSystemQuery map[[16]byte]interface{} preparedIdempotence map[[16]byte]bool + codec frame.RawCodec } func (c *client) Receive(reader io.Reader) error { - raw, err := codec.DecodeRawFrame(reader) + raw, err := c.codec.DecodeRawFrame(reader) if err != nil { if !errors.Is(err, io.EOF) { c.proxy.logger.Error("unable to decode frame", zap.Error(err)) @@ -286,7 +289,7 @@ func (c *client) Receive(reader io.Reader) error { return nil } - body, err := codec.DecodeBody(raw.Header, bytes.NewReader(raw.Body)) + body, err := c.codec.DecodeBody(raw.Header, bytes.NewReader(raw.Body)) if err != nil { c.proxy.logger.Error("unable to decode body", zap.Error(err)) return err @@ -294,8 +297,10 @@ func (c *client) Receive(reader io.Reader) error { switch msg := body.Message.(type) { case *message.Options: - c.send(raw.Header, &message.Supported{Options: map[string][]string{"CQL_VERSION": {"3.0.0"}, "COMPRESSION": {}}}) + c.send(raw.Header, &message.Supported{Options: map[string][]string{"CQL_VERSION": {"3.0.0"}, "COMPRESSION": {string(primitive.CompressionNone), string(primitive.CompressionSnappy), string(primitive.CompressionLz4)}}}) case *message.Startup: + // Overwrite the codec in case the client is using compression + c.codec = frame.NewRawCodecWithCompression(nativeProtoClient.NewBodyCompressor(msg.GetCompression()), &partialQueryCodec{}, &partialExecuteCodec{}) c.send(raw.Header, &message.Ready{}) case *message.Register: for _, t := range msg.EventTypes { @@ -503,7 +508,7 @@ func (c *client) interceptSystemQuery(hdr *frame.Header, stmt interface{}) { func (c *client) send(hdr *frame.Header, msg message.Message) { _ = c.conn.Write(proxycore.SenderFunc(func(writer io.Writer) error { - return codec.EncodeFrame(frame.NewFrame(hdr.Version, hdr.StreamId, msg), writer) + return c.codec.EncodeFrame(frame.NewFrame(hdr.Version, hdr.StreamId, msg), writer) })) } diff --git a/proxy/request.go b/proxy/request.go index 0657a87..3037186 100644 --- a/proxy/request.go +++ b/proxy/request.go @@ -60,14 +60,14 @@ func (r *request) Execute(next bool) { func (r *request) send(msg message.Message) { _ = r.client.conn.Write(proxycore.SenderFunc(func(writer io.Writer) error { - return codec.EncodeFrame(frame.NewFrame(r.raw.Header.Version, r.stream, msg), writer) + return r.client.codec.EncodeFrame(frame.NewFrame(r.raw.Header.Version, r.stream, msg), writer) })) } func (r *request) sendRaw(raw *frame.RawFrame) { raw.Header.StreamId = r.stream _ = r.client.conn.Write(proxycore.SenderFunc(func(writer io.Writer) error { - return codec.EncodeRawFrame(raw, writer) + return r.client.codec.EncodeRawFrame(raw, writer) })) } From 3ceb9cea6a7e6fbc4f60bc0c138a806377b8ad15 Mon Sep 17 00:00:00 2001 From: Doug Wettlaufer Date: Thu, 30 Sep 2021 16:55:35 -0500 Subject: [PATCH 2/2] Get compression between proxy and server working --- proxy/codecs.go | 3 +++ proxy/proxy.go | 12 +++++++++++- proxycore/clientconn.go | 19 +++++++++++++------ proxycore/codecs.go | 4 +++- 4 files changed, 30 insertions(+), 8 deletions(-) diff --git a/proxy/codecs.go b/proxy/codecs.go index 5c230a1..8679b97 100644 --- a/proxy/codecs.go +++ b/proxy/codecs.go @@ -20,10 +20,13 @@ import ( "fmt" "io" + "github.com/datastax/go-cassandra-native-protocol/frame" "github.com/datastax/go-cassandra-native-protocol/message" "github.com/datastax/go-cassandra-native-protocol/primitive" ) +var codec = frame.NewRawCodec(&partialQueryCodec{}, &partialExecuteCodec{}) + type partialQueryCodec struct{} func (c *partialQueryCodec) Encode(_ message.Message, _ io.Writer, _ primitive.ProtocolVersion) error { diff --git a/proxy/proxy.go b/proxy/proxy.go index 5c141e6..b2a3b56 100644 --- a/proxy/proxy.go +++ b/proxy/proxy.go @@ -22,6 +22,7 @@ import ( "fmt" "io" "net" + "strings" "sync" "sync/atomic" "time" @@ -299,8 +300,17 @@ func (c *client) Receive(reader io.Reader) error { case *message.Options: c.send(raw.Header, &message.Supported{Options: map[string][]string{"CQL_VERSION": {"3.0.0"}, "COMPRESSION": {string(primitive.CompressionNone), string(primitive.CompressionSnappy), string(primitive.CompressionLz4)}}}) case *message.Startup: + + // sent as lowercase by the driver but needs to be handled internally as uppercase + compression := primitive.CompressionNone + if strings.ToUpper(string(msg.GetCompression())) == "LZ4" { + compression = primitive.CompressionLz4 + } else if strings.ToUpper(string(msg.GetCompression())) == "SNAPPY"{ + compression = primitive.CompressionSnappy + } + // Overwrite the codec in case the client is using compression - c.codec = frame.NewRawCodecWithCompression(nativeProtoClient.NewBodyCompressor(msg.GetCompression()), &partialQueryCodec{}, &partialExecuteCodec{}) + c.codec = frame.NewRawCodecWithCompression(nativeProtoClient.NewBodyCompressor(compression), &partialQueryCodec{}, &partialExecuteCodec{}) c.send(raw.Header, &message.Ready{}) case *message.Register: for _, t := range msg.EventTypes { diff --git a/proxycore/clientconn.go b/proxycore/clientconn.go index d7d19fa..58c2dfc 100644 --- a/proxycore/clientconn.go +++ b/proxycore/clientconn.go @@ -85,7 +85,7 @@ func ConnectClient(ctx context.Context, endpoint Endpoint, config ClientConnConf func (c *ClientConn) Handshake(ctx context.Context, version primitive.ProtocolVersion, auth Authenticator) (primitive.ProtocolVersion, error) { for { - response, err := c.SendAndReceive(ctx, frame.NewFrame(version, -1, message.NewStartup())) + response, err := c.SendAndReceive(ctx, frame.NewFrame(version, -1, message.NewStartup(message.StartupOptionCompression, "LZ4"))) if err != nil { return version, err } @@ -342,11 +342,18 @@ func (c *ClientConn) maybeCachePrepared(request Request, raw *frame.RawFrame) { c.logger.Error("failed to decode prepared result response", zap.Error(err)) return } - msg := frm.Body.Message.(*message.PreparedResult) - c.preparedCache.Store(hex.EncodeToString(msg.PreparedQueryId), - &PreparedEntry{ - request.Frame().(*frame.RawFrame), // Store frame so we can re-prepare - }) + + switch v := frm.Body.Message.(type) { + case *message.PreparedResult: + msg := frm.Body.Message.(*message.PreparedResult) + c.preparedCache.Store(hex.EncodeToString(msg.PreparedQueryId), + &PreparedEntry{ + request.Frame().(*frame.RawFrame), // Store frame so we can re-prepare + }) + default: + c.logger.Warn("should have got PreparedResult but instead got ", zap.Any("type", v)) + } + } } diff --git a/proxycore/codecs.go b/proxycore/codecs.go index ec3d4d5..def83bf 100644 --- a/proxycore/codecs.go +++ b/proxycore/codecs.go @@ -16,12 +16,14 @@ package proxycore import ( "fmt" + + nativeProtoClient "github.com/datastax/go-cassandra-native-protocol/client" "github.com/datastax/go-cassandra-native-protocol/datatype" "github.com/datastax/go-cassandra-native-protocol/frame" "github.com/datastax/go-cassandra-native-protocol/primitive" ) -var codec = frame.NewRawCodec() +var codec = frame.NewRawCodecWithCompression(nativeProtoClient.NewBodyCompressor(primitive.CompressionLz4)) var primitiveCodecs = map[datatype.DataType]datatype.Codec{ datatype.Ascii: &datatype.AsciiCodec{},