Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix prepared queries #28

Merged
merged 14 commits into from
Sep 3, 2021
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ go 1.16
require (
github.com/alecthomas/kong v0.2.17
github.com/antlr/antlr4/runtime/Go/antlr v0.0.0-20210521184019-c5ad59b459ec
github.com/cespare/xxhash v1.1.0 // indirect
github.com/datastax/go-cassandra-native-protocol v0.0.0-20210604174339-4311e5d5654d
github.com/dgraph-io/ristretto v0.1.0 // indirect
github.com/stretchr/testify v1.7.0
go.uber.org/atomic v1.8.0 // indirect
go.uber.org/multierr v1.7.0 // indirect
Expand Down
17 changes: 17 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,13 +1,25 @@
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
github.com/alecthomas/kong v0.2.17 h1:URDISCI96MIgcIlQyoCAlhOmrSw6pZScBNkctg8r0W0=
github.com/alecthomas/kong v0.2.17/go.mod h1:ka3VZ8GZNPXv9Ov+j4YNLkI8mTuhXyr/0ktSlqIydQQ=
github.com/antlr/antlr4/runtime/Go/antlr v0.0.0-20210521184019-c5ad59b459ec h1:EEyRvzmpEUZ+I8WmD5cw/vY8EqhambkOqy5iFr0908A=
github.com/antlr/antlr4/runtime/Go/antlr v0.0.0-20210521184019-c5ad59b459ec/go.mod h1:F7bn7fEU90QkQ3tnmaTx3LTKLEDqnwWODIYppRQ5hnY=
github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko=
github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
github.com/cespare/xxhash/v2 v2.1.1 h1:6MnRN8NT7+YBpUIWxHtefFZOKTAPgGjpQSxqLNn0+qY=
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4=
github.com/datastax/go-cassandra-native-protocol v0.0.0-20210604174339-4311e5d5654d h1:sGM7OKF00vOQwn3cKGVpZLixkeP6i6kQRcz9+aV1cqs=
github.com/datastax/go-cassandra-native-protocol v0.0.0-20210604174339-4311e5d5654d/go.mod h1:n6F7IFlMxffEUMfTBvNqwaxwpJMvMnRT5L93km1qeNc=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgraph-io/ristretto v0.1.0 h1:Jv3CGQHp9OjuMBSne1485aDpUkTKEcUqF+jm/LuerPI=
github.com/dgraph-io/ristretto v0.1.0/go.mod h1:fux0lOrBhrVCJd3lcTHsIJhq1T2rokOu6v9Vcb3Q9ug=
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw=
github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo=
github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
github.com/golang/snappy v0.0.2 h1:aeE13tS0IiQgFjYdoL8qN3K1N2bXXtI6Vi51/y7BpMw=
github.com/golang/snappy v0.0.2/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/pierrec/lz4/v4 v4.0.3 h1:vNQKSVZNYUEAvRY9FaUXAF1XPbSOHJtDTiP41kzDz2E=
Expand All @@ -19,9 +31,11 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rs/xid v1.2.1/go.mod h1:+uKXf+4Djp6Md1KODXJxgGQPKngRmWyn10oCKFzNHOQ=
github.com/rs/zerolog v1.20.0/go.mod h1:IzD0RJ65iWH0w97OQQebJEvTZYvsCUm9WVLWBQrJRjo=
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
github.com/stretchr/objx v0.1.0 h1:4G4v2dO3VZwixGIRoQ5Lfboy6nUhCyYzaqnIAPPhYs4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
Expand All @@ -37,11 +51,14 @@ golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACk
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f h1:+Nyd8tzPX9R7BWHguqsrbFdRx3WQ/1ib8I44HXV5yTA=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/tools v0.0.0-20190828213141-aed303cbaa74/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10=
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
Expand Down
5 changes: 3 additions & 2 deletions proxy/codecs.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@ import (
"encoding/hex"
"errors"
"fmt"
"io"

"github.com/datastax/go-cassandra-native-protocol/frame"
"github.com/datastax/go-cassandra-native-protocol/message"
"github.com/datastax/go-cassandra-native-protocol/primitive"
"io"
)

var codec = frame.NewRawCodec(&partialQueryCodec{}, &partialExecuteCodec{})
Expand Down Expand Up @@ -110,4 +111,4 @@ func (c *partialExecuteCodec) Decode(source io.Reader, _ primitive.ProtocolVersi

func (c *partialExecuteCodec) GetOpCode() primitive.OpCode {
return primitive.OpCodeExecute
}
}
67 changes: 59 additions & 8 deletions proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,16 @@ import (
"net"
"sync"
"sync/atomic"
"unsafe"

"cql-proxy/parser"
"cql-proxy/proxycore"

"github.com/datastax/go-cassandra-native-protocol/datatype"
"github.com/datastax/go-cassandra-native-protocol/frame"
"github.com/datastax/go-cassandra-native-protocol/message"
"github.com/datastax/go-cassandra-native-protocol/primitive"
"github.com/dgraph-io/ristretto"
"go.uber.org/zap"
)

Expand All @@ -46,6 +49,9 @@ type Config struct {
ReconnectPolicy proxycore.ReconnectPolicy
NumConns int
Logger *zap.Logger
// PreparedCache a cache that stores prepared queries. If not set it uses the default implementation with a max
// capacity of 100MB.
PreparedCache proxycore.PreparedCache
}

type Proxy struct {
Expand All @@ -55,8 +61,9 @@ type Proxy struct {
listener net.Listener
cluster *proxycore.Cluster
sessions sync.Map
sessMu *sync.Mutex
sessMu sync.Mutex
schemaEventClients sync.Map
preparedCache proxycore.PreparedCache
clientIdGen uint64
lb proxycore.LoadBalancer
systemLocalValues map[string]message.Column
Expand Down Expand Up @@ -86,12 +93,9 @@ func (p *Proxy) OnEvent(event interface{}) {

func NewProxy(ctx context.Context, config Config) *Proxy {
return &Proxy{
ctx: ctx,
config: config,
logger: proxycore.GetOrCreateNopLogger(config.Logger),
sessions: sync.Map{},
sessMu: &sync.Mutex{},
schemaEventClients: sync.Map{},
ctx: ctx,
config: config,
logger: proxycore.GetOrCreateNopLogger(config.Logger),
}
}

Expand All @@ -106,6 +110,11 @@ func (p *Proxy) ListenAndServe(address string) error {
func (p *Proxy) Listen(address string) error {
var err error

p.preparedCache, err = getOrCreateDefaultPreparedCache(p.config.PreparedCache)
if err != nil {
return fmt.Errorf("unable to create prepared cache %w", err)
}

p.cluster, err = proxycore.ConnectCluster(p.ctx, proxycore.ClusterConfig{
Version: p.config.Version,
Auth: p.config.Auth,
Expand Down Expand Up @@ -135,6 +144,7 @@ func (p *Proxy) Listen(address string) error {
NumConns: p.config.NumConns,
Version: p.cluster.NegotiatedVersion,
Auth: p.config.Auth,
PreparedCache: p.preparedCache,
})

if err != nil {
Expand Down Expand Up @@ -163,6 +173,10 @@ func (p *Proxy) Serve() error {
}
}

func (p *Proxy) Shutdown() error {
return p.listener.Close()
}

func (p *Proxy) handle(conn net.Conn) {
cl := &client{
ctx: p.ctx,
Expand All @@ -184,6 +198,7 @@ func (p *Proxy) maybeCreateSession(keyspace string) error {
NumConns: p.config.NumConns,
Version: p.cluster.NegotiatedVersion,
Auth: p.config.Auth,
PreparedCache: p.preparedCache,
Keyspace: keyspace,
})
if err != nil {
Expand Down Expand Up @@ -293,7 +308,7 @@ func (c *client) execute(raw *frame.RawFrame, idempotent bool) {
qp: c.proxy.newQueryPlan(),
raw: raw,
}
req.execute()
req.Execute(true)
} else {
c.send(raw.Header, &message.ServerError{ErrorMessage: "Attempted to use invalid keyspace"})
}
Expand Down Expand Up @@ -475,3 +490,39 @@ func (c *client) send(hdr *frame.Header, msg message.Message) {
func (c *client) Closing(_ error) {
c.proxy.schemaEventClients.Delete(c.id)
}

func getOrCreateDefaultPreparedCache(cache proxycore.PreparedCache) (proxycore.PreparedCache, error) {
if cache == nil {
return NewDefaultPreparedCache(1e8) // 100MB
mpenick marked this conversation as resolved.
Show resolved Hide resolved
}
return cache, nil
}

// NewDefaultPreparedCache creates a new default prepared cache capping the max capacity to `maxBytes`.
func NewDefaultPreparedCache(maxBytes int64) (proxycore.PreparedCache, error) {
cache, err := ristretto.NewCache(&ristretto.Config{
NumCounters: maxBytes / 64, // Assuming an average size of ~64 bytes per entry. Underestimating is preferred.
MaxCost: maxBytes,
BufferItems: 64,
})
if err != nil {
return nil, err
}
return &defaultPreparedCache{cache}, nil
}

type defaultPreparedCache struct {
cache *ristretto.Cache
}

func (d defaultPreparedCache) Store(id string, entry *proxycore.PreparedEntry) {
cost := int64(len(id)) + int64(unsafe.Sizeof(frame.Header{})) + int64(unsafe.Sizeof(frame.Body{})) + int64(len(entry.PreparedFrame.Body))
d.cache.Set(id, entry, cost)
}

func (d defaultPreparedCache) Load(id string) (entry *proxycore.PreparedEntry, ok bool) {
if val, ok := d.cache.Get(id); ok {
return val.(*proxycore.PreparedEntry), true
}
return nil, false
}
103 changes: 102 additions & 1 deletion proxy/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@ import (
"errors"
"net"
"strconv"
"sync"
"testing"
"time"

"cql-proxy/proxycore"

"github.com/datastax/go-cassandra-native-protocol/datatype"
"github.com/datastax/go-cassandra-native-protocol/frame"
"github.com/datastax/go-cassandra-native-protocol/message"
Expand All @@ -41,6 +43,7 @@ func TestProxy_ListenAndServe(t *testing.T) {
const proxyContactPoint = "127.0.0.1:9042"

cluster := proxycore.NewMockCluster(net.ParseIP("127.0.0.0"), clusterPort)
defer cluster.Shutdown()

cluster.Handlers = proxycore.NewMockRequestHandlers(proxycore.MockRequestHandlers{
primitive.OpCodeQuery: func(cl *proxycore.MockClient, frm *frame.Frame) message.Message {
Expand Down Expand Up @@ -88,13 +91,16 @@ func TestProxy_ListenAndServe(t *testing.T) {
})

err = proxy.Listen(proxyContactPoint)
defer func(proxy *Proxy) {
_ = proxy.Shutdown()
}(proxy)
require.NoError(t, err)

go func() {
_ = proxy.Serve()
}()

cl, err := proxycore.ConnectClient(ctx, proxycore.NewEndpoint(proxyContactPoint))
cl, err := proxycore.ConnectClient(ctx, proxycore.NewEndpoint(proxyContactPoint), proxycore.ClientConnConfig{})
require.NoError(t, err)

version, err := cl.Handshake(ctx, primitive.ProtocolVersion4, nil)
Expand Down Expand Up @@ -125,6 +131,101 @@ func TestProxy_ListenAndServe(t *testing.T) {
assert.True(t, added)
}

func TestProxy_Unprepared(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

const numNodes = 3

const clusterContactPoint = "127.0.0.1:8000"
const clusterPort = 8000

const proxyContactPoint = "127.0.0.1:9042"
const version = primitive.ProtocolVersion4

preparedId := []byte("abc")

cluster := proxycore.NewMockCluster(net.ParseIP("127.0.0.0"), clusterPort)
defer cluster.Shutdown()

var prepared sync.Map

cluster.Handlers = proxycore.NewMockRequestHandlers(proxycore.MockRequestHandlers{
primitive.OpCodePrepare: func(cl *proxycore.MockClient, frm *frame.Frame) message.Message {
prepared.Store(cl.Local().IP, true)
return &message.PreparedResult{
PreparedQueryId: preparedId,
}
},
primitive.OpCodeExecute: func(cl *proxycore.MockClient, frm *frame.Frame) message.Message {
if _, ok := prepared.Load(cl.Local().IP); ok {
return &message.RowsResult{
Metadata: &message.RowsMetadata{
ColumnCount: 0,
},
Data: message.RowSet{},
}
} else {
ex := frm.Body.Message.(*message.Execute)
assert.Equal(t, preparedId, ex.QueryId)
return &message.Unprepared{Id: ex.QueryId}
}
},
})

for i := 1; i <= numNodes; i++ {
err := cluster.Add(ctx, i)
require.NoError(t, err)
}

proxy := NewProxy(ctx, Config{
Version: version,
Resolver: proxycore.NewResolverWithDefaultPort([]string{clusterContactPoint}, clusterPort),
ReconnectPolicy: proxycore.NewReconnectPolicyWithDelays(200*time.Millisecond, time.Second),
NumConns: 2,
})

err := proxy.Listen(proxyContactPoint)
defer func(proxy *Proxy) {
_ = proxy.Shutdown()
}(proxy)
require.NoError(t, err)

go func() {
_ = proxy.Serve()
}()

cl, err := proxycore.ConnectClient(ctx, proxycore.NewEndpoint(proxyContactPoint), proxycore.ClientConnConfig{})
require.NoError(t, err)

negotiated, err := cl.Handshake(ctx, version, nil)
require.NoError(t, err)
assert.Equal(t, version, negotiated)

// Only prepare on a single node
resp, err := cl.SendAndReceive(ctx, frame.NewFrame(version, 0, &message.Prepare{Query: "SELECT * FROM test.test"}))
require.NoError(t, err)
assert.Equal(t, primitive.OpCodeResult, resp.Header.OpCode)
_, ok := resp.Body.Message.(*message.PreparedResult)
assert.True(t, ok, "expected prepared result")

for i := 0; i < numNodes; i++ {
resp, err = cl.SendAndReceive(ctx, frame.NewFrame(version, 0, &message.Execute{QueryId: preparedId}))
require.NoError(t, err)
assert.Equal(t, primitive.OpCodeResult, resp.Header.OpCode)
_, ok = resp.Body.Message.(*message.RowsResult)
assert.True(t, ok, "expected rows result")
}

// Count the number of unique nodes that were prepared
count := 0
prepared.Range(func(_, _ interface{}) bool {
count++
return true
})
assert.Equal(t, numNodes, count)
}

func testQueryHosts(ctx context.Context, cl *proxycore.ClientConn) (map[string]struct{}, error) {
hosts := make(map[string]struct{})
for i := 0; i < 3; i++ {
Expand Down
Loading