From 65fc5c40006df5b17b6ebc36791617e76c1dfa05 Mon Sep 17 00:00:00 2001 From: testisnullus Date: Thu, 2 May 2024 14:26:43 +0300 Subject: [PATCH] MaxRequestsPerConn cluster param was added --- cluster.go | 6 ++++++ cluster_test.go | 1 + conn.go | 2 +- conn_test.go | 14 ++++++++++++-- connectionpool.go | 18 ++++++++++-------- internal/streams/streams.go | 14 +++++++++++++- 6 files changed, 43 insertions(+), 12 deletions(-) diff --git a/cluster.go b/cluster.go index 13e62f3b0..595bd25b5 100644 --- a/cluster.go +++ b/cluster.go @@ -106,6 +106,11 @@ type ClusterConfig struct { // Default: 2 NumConns int + // Maximum number of inflight requests allowed per connection. + // Default: 32768 for CQL v3 and newer + // Default: 128 for older CQL versions + MaxRequestsPerConn int + // Default consistency level. // Default: Quorum Consistency Consistency @@ -282,6 +287,7 @@ func NewCluster(hosts ...string) *ClusterConfig { ConnectTimeout: 11 * time.Second, Port: 9042, NumConns: 2, + MaxRequestsPerConn: 0, Consistency: Quorum, MaxPreparedStmts: defaultMaxPreparedStmts, MaxRoutingKeyInfo: 1000, diff --git a/cluster_test.go b/cluster_test.go index adc21fd05..bef447248 100644 --- a/cluster_test.go +++ b/cluster_test.go @@ -37,6 +37,7 @@ func TestNewCluster_Defaults(t *testing.T) { assertEqual(t, "cluster config timeout", 11*time.Second, cfg.Timeout) assertEqual(t, "cluster config port", 9042, cfg.Port) assertEqual(t, "cluster config num-conns", 2, cfg.NumConns) + assertEqual(t, "cluster config max requests per conn", 0, cfg.MaxRequestsPerConn) assertEqual(t, "cluster config consistency", Quorum, cfg.Consistency) assertEqual(t, "cluster config max prepared statements", defaultMaxPreparedStmts, cfg.MaxPreparedStmts) assertEqual(t, "cluster config max routing key info", 1000, cfg.MaxRoutingKeyInfo) diff --git a/conn.go b/conn.go index 3daca6250..96b8c8e4a 100644 --- a/conn.go +++ b/conn.go @@ -277,7 +277,7 @@ func (s *Session) dialWithoutObserver(ctx context.Context, host *HostInfo, cfg * errorHandler: errorHandler, compressor: cfg.Compressor, session: s, - streams: streams.New(cfg.ProtoVersion), + streams: streams.NewStreamIDGenerator(s.cfg.ProtoVersion, s.cfg.MaxRequestsPerConn), host: host, isSchemaV2: true, // Try using "system.peers_v2" until proven otherwise frameObserver: s.frameObserver, diff --git a/conn_test.go b/conn_test.go index cab4c2f8f..b1b060c3a 100644 --- a/conn_test.go +++ b/conn_test.go @@ -693,6 +693,7 @@ func TestQueryTimeoutClose(t *testing.T) { func TestStream0(t *testing.T) { // TODO: replace this with type check const expErr = "gocql: received unexpected frame on stream 0" + const maxRequestsPerConn = 13 var buf bytes.Buffer f := newFramer(nil, protoVersion4) @@ -706,13 +707,22 @@ func TestStream0(t *testing.T) { t.Fatal(err) } + srv := NewTestServer(t, defaultProto, context.Background()) + defer srv.Stop() + cluster := testCluster(defaultProto, srv.Address) + s, err := cluster.CreateSession() + s.cfg.MaxRequestsPerConn = maxRequestsPerConn + if err != nil { + t.Fatalf("NewCluster: %v", err) + } + conn := &Conn{ r: bufio.NewReader(&buf), - streams: streams.New(protoVersion4), + streams: streams.NewStreamIDGenerator(defaultProto, s.cfg.MaxRequestsPerConn), logger: &defaultLogger{}, } - err := conn.recv(context.Background()) + err = conn.recv(context.Background()) if err == nil { t.Fatal("expected to get an error on stream 0") } else if !strings.HasPrefix(err.Error(), expErr) { diff --git a/connectionpool.go b/connectionpool.go index 2ccd3c8a7..6f5c13255 100644 --- a/connectionpool.go +++ b/connectionpool.go @@ -99,9 +99,10 @@ func setupTLSConfig(sslOpts *SslOptions) (*tls.Config, error) { type policyConnPool struct { session *Session - port int - numConns int - keyspace string + port int + numConns int + maxRequestsPerConn int + keyspace string mu sync.RWMutex hostConnPools map[string]*hostConnPool @@ -161,11 +162,12 @@ func connConfig(cfg *ClusterConfig) (*ConnConfig, error) { func newPolicyConnPool(session *Session) *policyConnPool { // create the pool pool := &policyConnPool{ - session: session, - port: session.cfg.Port, - numConns: session.cfg.NumConns, - keyspace: session.cfg.Keyspace, - hostConnPools: map[string]*hostConnPool{}, + session: session, + port: session.cfg.Port, + numConns: session.cfg.NumConns, + maxRequestsPerConn: session.cfg.MaxRequestsPerConn, + keyspace: session.cfg.Keyspace, + hostConnPools: map[string]*hostConnPool{}, } return pool diff --git a/internal/streams/streams.go b/internal/streams/streams.go index 1bb372f38..b5c9aa4a0 100644 --- a/internal/streams/streams.go +++ b/internal/streams/streams.go @@ -43,17 +43,29 @@ type IDGenerator struct { offset uint32 } +func NewStreamIDGenerator(protocol, maxRequestsPerConn int) *IDGenerator { + if maxRequestsPerConn > 0 { + return NewLimited(maxRequestsPerConn) + } + return New(protocol) +} + func New(protocol int) *IDGenerator { maxStreams := 128 if protocol > 2 { maxStreams = 32768 } + return NewLimited(maxStreams) +} + +func NewLimited(maxStreams int) *IDGenerator { + // Round up maxStreams to a nearest multiple of 64 + maxStreams = ((maxStreams + 63) / 64) * 64 buckets := maxStreams / 64 // reserve stream 0 streams := make([]uint64, buckets) streams[0] = 1 << 63 - return &IDGenerator{ NumStreams: maxStreams, streams: streams,