Skip to content

Commit

Permalink
MaxRequestsPerConn cluster param was added
Browse files Browse the repository at this point in the history
  • Loading branch information
testisnullus committed Oct 14, 2024
1 parent 953e0df commit 65fc5c4
Show file tree
Hide file tree
Showing 6 changed files with 43 additions and 12 deletions.
6 changes: 6 additions & 0 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,11 @@ type ClusterConfig struct {
// Default: 2
NumConns int

// Maximum number of inflight requests allowed per connection.
// Default: 32768 for CQL v3 and newer
// Default: 128 for older CQL versions
MaxRequestsPerConn int

// Default consistency level.
// Default: Quorum
Consistency Consistency
Expand Down Expand Up @@ -282,6 +287,7 @@ func NewCluster(hosts ...string) *ClusterConfig {
ConnectTimeout: 11 * time.Second,
Port: 9042,
NumConns: 2,
MaxRequestsPerConn: 0,
Consistency: Quorum,
MaxPreparedStmts: defaultMaxPreparedStmts,
MaxRoutingKeyInfo: 1000,
Expand Down
1 change: 1 addition & 0 deletions cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ func TestNewCluster_Defaults(t *testing.T) {
assertEqual(t, "cluster config timeout", 11*time.Second, cfg.Timeout)
assertEqual(t, "cluster config port", 9042, cfg.Port)
assertEqual(t, "cluster config num-conns", 2, cfg.NumConns)
assertEqual(t, "cluster config max requests per conn", 0, cfg.MaxRequestsPerConn)
assertEqual(t, "cluster config consistency", Quorum, cfg.Consistency)
assertEqual(t, "cluster config max prepared statements", defaultMaxPreparedStmts, cfg.MaxPreparedStmts)
assertEqual(t, "cluster config max routing key info", 1000, cfg.MaxRoutingKeyInfo)
Expand Down
2 changes: 1 addition & 1 deletion conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ func (s *Session) dialWithoutObserver(ctx context.Context, host *HostInfo, cfg *
errorHandler: errorHandler,
compressor: cfg.Compressor,
session: s,
streams: streams.New(cfg.ProtoVersion),
streams: streams.NewStreamIDGenerator(s.cfg.ProtoVersion, s.cfg.MaxRequestsPerConn),
host: host,
isSchemaV2: true, // Try using "system.peers_v2" until proven otherwise
frameObserver: s.frameObserver,
Expand Down
14 changes: 12 additions & 2 deletions conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -693,6 +693,7 @@ func TestQueryTimeoutClose(t *testing.T) {
func TestStream0(t *testing.T) {
// TODO: replace this with type check
const expErr = "gocql: received unexpected frame on stream 0"
const maxRequestsPerConn = 13

var buf bytes.Buffer
f := newFramer(nil, protoVersion4)
Expand All @@ -706,13 +707,22 @@ func TestStream0(t *testing.T) {
t.Fatal(err)
}

srv := NewTestServer(t, defaultProto, context.Background())
defer srv.Stop()
cluster := testCluster(defaultProto, srv.Address)
s, err := cluster.CreateSession()
s.cfg.MaxRequestsPerConn = maxRequestsPerConn
if err != nil {
t.Fatalf("NewCluster: %v", err)
}

conn := &Conn{
r: bufio.NewReader(&buf),
streams: streams.New(protoVersion4),
streams: streams.NewStreamIDGenerator(defaultProto, s.cfg.MaxRequestsPerConn),
logger: &defaultLogger{},
}

err := conn.recv(context.Background())
err = conn.recv(context.Background())
if err == nil {
t.Fatal("expected to get an error on stream 0")
} else if !strings.HasPrefix(err.Error(), expErr) {
Expand Down
18 changes: 10 additions & 8 deletions connectionpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,10 @@ func setupTLSConfig(sslOpts *SslOptions) (*tls.Config, error) {
type policyConnPool struct {
session *Session

port int
numConns int
keyspace string
port int
numConns int
maxRequestsPerConn int
keyspace string

mu sync.RWMutex
hostConnPools map[string]*hostConnPool
Expand Down Expand Up @@ -161,11 +162,12 @@ func connConfig(cfg *ClusterConfig) (*ConnConfig, error) {
func newPolicyConnPool(session *Session) *policyConnPool {
// create the pool
pool := &policyConnPool{
session: session,
port: session.cfg.Port,
numConns: session.cfg.NumConns,
keyspace: session.cfg.Keyspace,
hostConnPools: map[string]*hostConnPool{},
session: session,
port: session.cfg.Port,
numConns: session.cfg.NumConns,
maxRequestsPerConn: session.cfg.MaxRequestsPerConn,
keyspace: session.cfg.Keyspace,
hostConnPools: map[string]*hostConnPool{},
}

return pool
Expand Down
14 changes: 13 additions & 1 deletion internal/streams/streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,17 +43,29 @@ type IDGenerator struct {
offset uint32
}

func NewStreamIDGenerator(protocol, maxRequestsPerConn int) *IDGenerator {
if maxRequestsPerConn > 0 {
return NewLimited(maxRequestsPerConn)
}
return New(protocol)
}

func New(protocol int) *IDGenerator {
maxStreams := 128
if protocol > 2 {
maxStreams = 32768
}
return NewLimited(maxStreams)
}

func NewLimited(maxStreams int) *IDGenerator {
// Round up maxStreams to a nearest multiple of 64
maxStreams = ((maxStreams + 63) / 64) * 64

buckets := maxStreams / 64
// reserve stream 0
streams := make([]uint64, buckets)
streams[0] = 1 << 63

return &IDGenerator{
NumStreams: maxStreams,
streams: streams,
Expand Down

0 comments on commit 65fc5c4

Please sign in to comment.