Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add compression support for clients #40

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ go 1.16
require (
github.com/alecthomas/kong v0.2.17
github.com/antlr/antlr4/runtime/Go/antlr v0.0.0-20210521184019-c5ad59b459ec
github.com/datastax/go-cassandra-native-protocol v0.0.0-20210604174339-4311e5d5654d
github.com/datastax/go-cassandra-native-protocol v0.0.0-20210929152221-b2edfb76bdf8
github.com/hashicorp/golang-lru v0.5.4
github.com/stretchr/testify v1.7.0
go.uber.org/atomic v1.8.0 // indirect
go.uber.org/atomic v1.8.0
go.uber.org/multierr v1.7.0 // indirect
go.uber.org/zap v1.17.0
)
5 changes: 3 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ github.com/alecthomas/kong v0.2.17/go.mod h1:ka3VZ8GZNPXv9Ov+j4YNLkI8mTuhXyr/0kt
github.com/antlr/antlr4/runtime/Go/antlr v0.0.0-20210521184019-c5ad59b459ec h1:EEyRvzmpEUZ+I8WmD5cw/vY8EqhambkOqy5iFr0908A=
github.com/antlr/antlr4/runtime/Go/antlr v0.0.0-20210521184019-c5ad59b459ec/go.mod h1:F7bn7fEU90QkQ3tnmaTx3LTKLEDqnwWODIYppRQ5hnY=
github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4=
github.com/datastax/go-cassandra-native-protocol v0.0.0-20210604174339-4311e5d5654d h1:sGM7OKF00vOQwn3cKGVpZLixkeP6i6kQRcz9+aV1cqs=
github.com/datastax/go-cassandra-native-protocol v0.0.0-20210604174339-4311e5d5654d/go.mod h1:n6F7IFlMxffEUMfTBvNqwaxwpJMvMnRT5L93km1qeNc=
github.com/datastax/go-cassandra-native-protocol v0.0.0-20210929152221-b2edfb76bdf8 h1:PjLPs5mNlfKiMqIBFgeMqbhkAVUFpr4rualeP6bKW58=
github.com/datastax/go-cassandra-native-protocol v0.0.0-20210929152221-b2edfb76bdf8/go.mod h1:n6F7IFlMxffEUMfTBvNqwaxwpJMvMnRT5L93km1qeNc=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
Expand All @@ -20,6 +20,7 @@ github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rs/xid v1.2.1/go.mod h1:+uKXf+4Djp6Md1KODXJxgGQPKngRmWyn10oCKFzNHOQ=
github.com/rs/zerolog v1.20.0 h1:38k9hgtUBdxFwE34yS8rTHmHBa4eN16E4DJlv177LNs=
github.com/rs/zerolog v1.20.0/go.mod h1:IzD0RJ65iWH0w97OQQebJEvTZYvsCUm9WVLWBQrJRjo=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
Expand Down
25 changes: 20 additions & 5 deletions proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@ import (
"fmt"
"io"
"net"
"strings"
"sync"
"sync/atomic"
"time"

"cql-proxy/parser"
"cql-proxy/proxycore"

nativeProtoClient "github.com/datastax/go-cassandra-native-protocol/client"
"github.com/datastax/go-cassandra-native-protocol/datatype"
"github.com/datastax/go-cassandra-native-protocol/frame"
"github.com/datastax/go-cassandra-native-protocol/message"
Expand Down Expand Up @@ -78,7 +80,7 @@ func (p *Proxy) OnEvent(event interface{}) {
p.schemaEventClients.Range(func(key, value interface{}) bool {
cl := value.(*client)
err := cl.conn.Write(proxycore.SenderFunc(func(writer io.Writer) error {
return codec.EncodeFrame(frm, writer)
return cl.codec.EncodeFrame(frm, writer)
}))
cl.conn.LocalAddr()
if err != nil {
Expand Down Expand Up @@ -202,6 +204,7 @@ func (p *Proxy) handle(conn *net.TCPConn) {
id: atomic.AddUint64(&p.clientIdGen, 1),
preparedSystemQuery: make(map[[16]byte]interface{}),
preparedIdempotence: make(map[[16]byte]bool),
codec: frame.NewRawCodec(&partialQueryCodec{}, &partialExecuteCodec{}),
}
cl.conn = proxycore.NewConn(conn, cl)
cl.conn.Start()
Expand Down Expand Up @@ -270,10 +273,11 @@ type client struct {
id uint64
preparedSystemQuery map[[16]byte]interface{}
preparedIdempotence map[[16]byte]bool
codec frame.RawCodec
}

func (c *client) Receive(reader io.Reader) error {
raw, err := codec.DecodeRawFrame(reader)
raw, err := c.codec.DecodeRawFrame(reader)
if err != nil {
if !errors.Is(err, io.EOF) {
c.proxy.logger.Error("unable to decode frame", zap.Error(err))
Expand All @@ -286,16 +290,27 @@ func (c *client) Receive(reader io.Reader) error {
return nil
}

body, err := codec.DecodeBody(raw.Header, bytes.NewReader(raw.Body))
body, err := c.codec.DecodeBody(raw.Header, bytes.NewReader(raw.Body))
if err != nil {
c.proxy.logger.Error("unable to decode body", zap.Error(err))
return err
}

switch msg := body.Message.(type) {
case *message.Options:
c.send(raw.Header, &message.Supported{Options: map[string][]string{"CQL_VERSION": {"3.0.0"}, "COMPRESSION": {}}})
c.send(raw.Header, &message.Supported{Options: map[string][]string{"CQL_VERSION": {"3.0.0"}, "COMPRESSION": {string(primitive.CompressionNone), string(primitive.CompressionSnappy), string(primitive.CompressionLz4)}}})
case *message.Startup:

// sent as lowercase by the driver but needs to be handled internally as uppercase
compression := primitive.CompressionNone
if strings.ToUpper(string(msg.GetCompression())) == "LZ4" {
compression = primitive.CompressionLz4
} else if strings.ToUpper(string(msg.GetCompression())) == "SNAPPY"{
compression = primitive.CompressionSnappy
}

// Overwrite the codec in case the client is using compression
c.codec = frame.NewRawCodecWithCompression(nativeProtoClient.NewBodyCompressor(compression), &partialQueryCodec{}, &partialExecuteCodec{})
c.send(raw.Header, &message.Ready{})
case *message.Register:
for _, t := range msg.EventTypes {
Expand Down Expand Up @@ -503,7 +518,7 @@ func (c *client) interceptSystemQuery(hdr *frame.Header, stmt interface{}) {

func (c *client) send(hdr *frame.Header, msg message.Message) {
_ = c.conn.Write(proxycore.SenderFunc(func(writer io.Writer) error {
return codec.EncodeFrame(frame.NewFrame(hdr.Version, hdr.StreamId, msg), writer)
return c.codec.EncodeFrame(frame.NewFrame(hdr.Version, hdr.StreamId, msg), writer)
}))
}

Expand Down
4 changes: 2 additions & 2 deletions proxy/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,14 +60,14 @@ func (r *request) Execute(next bool) {

func (r *request) send(msg message.Message) {
_ = r.client.conn.Write(proxycore.SenderFunc(func(writer io.Writer) error {
return codec.EncodeFrame(frame.NewFrame(r.raw.Header.Version, r.stream, msg), writer)
return r.client.codec.EncodeFrame(frame.NewFrame(r.raw.Header.Version, r.stream, msg), writer)
}))
}

func (r *request) sendRaw(raw *frame.RawFrame) {
raw.Header.StreamId = r.stream
_ = r.client.conn.Write(proxycore.SenderFunc(func(writer io.Writer) error {
return codec.EncodeRawFrame(raw, writer)
return r.client.codec.EncodeRawFrame(raw, writer)
}))
}

Expand Down
19 changes: 13 additions & 6 deletions proxycore/clientconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func ConnectClient(ctx context.Context, endpoint Endpoint, config ClientConnConf

func (c *ClientConn) Handshake(ctx context.Context, version primitive.ProtocolVersion, auth Authenticator) (primitive.ProtocolVersion, error) {
for {
response, err := c.SendAndReceive(ctx, frame.NewFrame(version, -1, message.NewStartup()))
response, err := c.SendAndReceive(ctx, frame.NewFrame(version, -1, message.NewStartup(message.StartupOptionCompression, "LZ4")))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This tagged along with another change. Will need to figure out something else since we can't blindly set compression for every client between proxy and server.

if err != nil {
return version, err
}
Expand Down Expand Up @@ -342,11 +342,18 @@ func (c *ClientConn) maybeCachePrepared(request Request, raw *frame.RawFrame) {
c.logger.Error("failed to decode prepared result response", zap.Error(err))
return
}
msg := frm.Body.Message.(*message.PreparedResult)
c.preparedCache.Store(hex.EncodeToString(msg.PreparedQueryId),
&PreparedEntry{
request.Frame().(*frame.RawFrame), // Store frame so we can re-prepare
})

switch v := frm.Body.Message.(type) {
case *message.PreparedResult:
msg := frm.Body.Message.(*message.PreparedResult)
c.preparedCache.Store(hex.EncodeToString(msg.PreparedQueryId),
&PreparedEntry{
request.Frame().(*frame.RawFrame), // Store frame so we can re-prepare
})
default:
c.logger.Warn("should have got PreparedResult but instead got ", zap.Any("type", v))
}

}
}

Expand Down
4 changes: 3 additions & 1 deletion proxycore/codecs.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@ package proxycore

import (
"fmt"

nativeProtoClient "github.com/datastax/go-cassandra-native-protocol/client"
"github.com/datastax/go-cassandra-native-protocol/datatype"
"github.com/datastax/go-cassandra-native-protocol/frame"
"github.com/datastax/go-cassandra-native-protocol/primitive"
)

var codec = frame.NewRawCodec()
var codec = frame.NewRawCodecWithCompression(nativeProtoClient.NewBodyCompressor(primitive.CompressionLz4))

var primitiveCodecs = map[datatype.DataType]datatype.Codec{
datatype.Ascii: &datatype.AsciiCodec{},
Expand Down