Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[release-18.0] connpool: Allow time out during shutdown (#15979) #16002

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
762 changes: 762 additions & 0 deletions go/pools/smartconnpool/pool.go

Large diffs are not rendered by default.

1,082 changes: 1,082 additions & 0 deletions go/pools/smartconnpool/pool_test.go

Large diffs are not rendered by default.

6 changes: 4 additions & 2 deletions go/vt/vttablet/endtoend/misc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,8 +265,10 @@ func TestSidecarTables(t *testing.T) {
}

func TestConsolidation(t *testing.T) {
defer framework.Server.SetPoolSize(framework.Server.PoolSize())
framework.Server.SetPoolSize(1)
defer framework.Server.SetPoolSize(context.Background(), framework.Server.PoolSize())

err := framework.Server.SetPoolSize(context.Background(), 1)
require.NoError(t, err)

const tag = "Waits/Histograms/Consolidations/Count"

Expand Down
7 changes: 5 additions & 2 deletions go/vt/vttablet/endtoend/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package endtoend

import (
"context"
"errors"
"fmt"
"reflect"
Expand Down Expand Up @@ -98,11 +99,13 @@ func TestStreamConsolidation(t *testing.T) {

defaultPoolSize := framework.Server.StreamPoolSize()

framework.Server.SetStreamPoolSize(4)
err = framework.Server.SetStreamPoolSize(context.Background(), 4)
require.NoError(t, err)

framework.Server.SetStreamConsolidationBlocking(true)

defer func() {
framework.Server.SetStreamPoolSize(defaultPoolSize)
_ = framework.Server.SetStreamPoolSize(context.Background(), defaultPoolSize)
framework.Server.SetStreamConsolidationBlocking(false)
}()

Expand Down
6 changes: 0 additions & 6 deletions go/vt/vttablet/tabletserver/connpool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,9 @@ import (
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/mysqlctl"
"vitess.io/vitess/go/vt/servenv"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv"

vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
)

// ErrConnPoolClosed is returned when the connection pool is closed.
var ErrConnPoolClosed = vterrors.New(vtrpcpb.Code_INTERNAL, "internal error: unexpected: conn pool is closed")

const (
getWithoutS = "GetWithoutSettings"
getWithS = "GetWithSettings"
Expand Down
11 changes: 10 additions & 1 deletion go/vt/vttablet/tabletserver/connpool/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func TestConnPoolTimeout(t *testing.T) {
require.NoError(t, err)
defer dbConn.Recycle()
_, err = connPool.Get(context.Background(), nil)
assert.EqualError(t, err, "resource pool timed out")
assert.EqualError(t, err, "connection pool timed out")
}

func TestConnPoolMaxWaiters(t *testing.T) {
Expand Down Expand Up @@ -181,6 +181,7 @@ func TestConnPoolSetCapacity(t *testing.T) {
connPool := newPool()
connPool.Open(db.ConnParams(), db.ConnParams(), db.ConnParams())
defer connPool.Close()
<<<<<<< HEAD
err := connPool.SetCapacity(-10)
if err == nil {
t.Fatalf("set capacity should return error for negative capacity")
Expand All @@ -189,6 +190,14 @@ func TestConnPoolSetCapacity(t *testing.T) {
if err != nil {
t.Fatalf("set capacity should succeed")
}
=======

assert.Panics(t, func() {
_ = connPool.SetCapacity(context.Background(), -10)
})
err := connPool.SetCapacity(context.Background(), 10)
assert.NoError(t, err)
>>>>>>> afbce6aa87 (connpool: Allow time out during shutdown (#15979))
if connPool.Capacity() != 10 {
t.Fatalf("capacity should be 10")
}
Expand Down
18 changes: 15 additions & 3 deletions go/vt/vttablet/tabletserver/debugenv.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package tabletserver

import (
"context"
"encoding/json"
"fmt"
"html"
Expand Down Expand Up @@ -82,6 +83,17 @@ func debugEnvHandler(tsv *TabletServer, w http.ResponseWriter, r *http.Request)
f(ival)
msg = fmt.Sprintf("Setting %v to: %v", varname, value)
}
setIntValCtx := func(f func(context.Context, int) error) {
ival, err := strconv.Atoi(value)
if err == nil {
err = f(r.Context(), ival)
if err == nil {
msg = fmt.Sprintf("Setting %v to: %v", varname, value)
return
}
}
msg = fmt.Sprintf("Failed setting value for %v: %v", varname, err)
}
setInt64Val := func(f func(int64)) {
ival, err := strconv.ParseInt(value, 10, 64)
if err != nil {
Expand Down Expand Up @@ -111,11 +123,11 @@ func debugEnvHandler(tsv *TabletServer, w http.ResponseWriter, r *http.Request)
}
switch varname {
case "PoolSize":
setIntVal(tsv.SetPoolSize)
setIntValCtx(tsv.SetPoolSize)
case "StreamPoolSize":
setIntVal(tsv.SetStreamPoolSize)
setIntValCtx(tsv.SetStreamPoolSize)
case "TxPoolSize":
setIntVal(tsv.SetTxPoolSize)
setIntValCtx(tsv.SetTxPoolSize)
case "MaxResultSize":
setIntVal(tsv.SetMaxResultSize)
case "WarnResultSize":
Expand Down
14 changes: 14 additions & 0 deletions go/vt/vttablet/tabletserver/query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -772,6 +772,7 @@ func (qre *QueryExecutor) getConn() (*connpool.DBConn, error) {
span, ctx := trace.NewSpan(qre.ctx, "QueryExecutor.getConn")
defer span.Finish()

<<<<<<< HEAD
start := time.Now()
conn, err := qre.tsv.qe.conns.Get(ctx, qre.setting)

Expand All @@ -783,12 +784,19 @@ func (qre *QueryExecutor) getConn() (*connpool.DBConn, error) {
return nil, err
}
return nil, err
=======
defer func(start time.Time) {
qre.logStats.WaitingForConnection += time.Since(start)
}(time.Now())
return qre.tsv.qe.conns.Get(ctx, qre.setting)
>>>>>>> afbce6aa87 (connpool: Allow time out during shutdown (#15979))
}

func (qre *QueryExecutor) getStreamConn() (*connpool.DBConn, error) {
span, ctx := trace.NewSpan(qre.ctx, "QueryExecutor.getStreamConn")
defer span.Finish()

<<<<<<< HEAD
start := time.Now()
conn, err := qre.tsv.qe.streamConns.Get(ctx, qre.setting)
switch err {
Expand All @@ -799,6 +807,12 @@ func (qre *QueryExecutor) getStreamConn() (*connpool.DBConn, error) {
return nil, err
}
return nil, err
=======
defer func(start time.Time) {
qre.logStats.WaitingForConnection += time.Since(start)
}(time.Now())
return qre.tsv.qe.streamConns.Get(ctx, qre.setting)
>>>>>>> afbce6aa87 (connpool: Allow time out during shutdown (#15979))
}

// txFetch fetches from a TxConnection.
Expand Down
18 changes: 16 additions & 2 deletions go/vt/vttablet/tabletserver/tabletserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -1979,11 +1979,15 @@ func (tsv *TabletServer) EnableHistorian(enabled bool) {
}

// SetPoolSize changes the pool size to the specified value.
func (tsv *TabletServer) SetPoolSize(val int) {
func (tsv *TabletServer) SetPoolSize(ctx context.Context, val int) error {
if val <= 0 {
return
return nil
}
<<<<<<< HEAD
tsv.qe.conns.SetCapacity(val)
=======
return tsv.qe.conns.SetCapacity(ctx, int64(val))
>>>>>>> afbce6aa87 (connpool: Allow time out during shutdown (#15979))
}

// PoolSize returns the pool size.
Expand All @@ -1992,8 +1996,13 @@ func (tsv *TabletServer) PoolSize() int {
}

// SetStreamPoolSize changes the pool size to the specified value.
<<<<<<< HEAD
func (tsv *TabletServer) SetStreamPoolSize(val int) {
tsv.qe.streamConns.SetCapacity(val)
=======
func (tsv *TabletServer) SetStreamPoolSize(ctx context.Context, val int) error {
return tsv.qe.streamConns.SetCapacity(ctx, int64(val))
>>>>>>> afbce6aa87 (connpool: Allow time out during shutdown (#15979))
}

// SetStreamConsolidationBlocking sets whether the stream consolidator should wait for slow clients
Expand All @@ -2007,8 +2016,13 @@ func (tsv *TabletServer) StreamPoolSize() int {
}

// SetTxPoolSize changes the tx pool size to the specified value.
<<<<<<< HEAD
func (tsv *TabletServer) SetTxPoolSize(val int) {
tsv.te.txPool.scp.conns.SetCapacity(val)
=======
func (tsv *TabletServer) SetTxPoolSize(ctx context.Context, val int) error {
return tsv.te.txPool.scp.conns.SetCapacity(ctx, int64(val))
>>>>>>> afbce6aa87 (connpool: Allow time out during shutdown (#15979))
}

// TxPoolSize returns the tx pool size.
Expand Down
12 changes: 9 additions & 3 deletions go/vt/vttablet/tabletserver/tabletserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2046,23 +2046,29 @@ func TestConfigChanges(t *testing.T) {
newSize := 10
newDuration := time.Duration(10 * time.Millisecond)

tsv.SetPoolSize(newSize)
err := tsv.SetPoolSize(context.Background(), newSize)
require.NoError(t, err)

if val := tsv.PoolSize(); val != newSize {
t.Errorf("PoolSize: %d, want %d", val, newSize)
}
if val := int(tsv.qe.conns.Capacity()); val != newSize {
t.Errorf("tsv.qe.connPool.Capacity: %d, want %d", val, newSize)
}

tsv.SetStreamPoolSize(newSize)
err = tsv.SetStreamPoolSize(context.Background(), newSize)
require.NoError(t, err)

if val := tsv.StreamPoolSize(); val != newSize {
t.Errorf("StreamPoolSize: %d, want %d", val, newSize)
}
if val := int(tsv.qe.streamConns.Capacity()); val != newSize {
t.Errorf("tsv.qe.streamConnPool.Capacity: %d, want %d", val, newSize)
}

tsv.SetTxPoolSize(newSize)
err = tsv.SetTxPoolSize(context.Background(), newSize)
require.NoError(t, err)

if val := tsv.TxPoolSize(); val != newSize {
t.Errorf("TxPoolSize: %d, want %d", val, newSize)
}
Expand Down
7 changes: 7 additions & 0 deletions go/vt/vttablet/tabletserver/tx_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,8 +214,15 @@ func primeTxPoolWithConnection(t *testing.T, ctx context.Context) (*fakesqldb.DB
db := fakesqldb.New(t)
txPool, _ := newTxPool()
// Set the capacity to 1 to ensure that the db connection is reused.
<<<<<<< HEAD
txPool.scp.conns.SetCapacity(1)
txPool.Open(db.ConnParams(), db.ConnParams(), db.ConnParams())
=======
err := txPool.scp.conns.SetCapacity(context.Background(), 1)
require.NoError(t, err)
params := dbconfigs.New(db.ConnParams())
txPool.Open(params, params, params)
>>>>>>> afbce6aa87 (connpool: Allow time out during shutdown (#15979))

// Run a query to trigger a database connection. That connection will be
// reused by subsequent transactions.
Expand Down
Loading