Skip to content

Commit

Permalink
Move concurrent connection dial limit out of healthcheck.
Browse files Browse the repository at this point in the history
There's a concurrent connection dial limit implemented in the healthcheck code that doesn't semantically or logically belong here.

First, the healthcheck code does neither know nor care about the network protocol used to execute healthchecks. Arguably, there's no other protocol used for this apart from `grpc`, but it seems wrong to set up a `grpc` connection specific options in the healthcheck code.

Additionally, the dial concurrency limit modifies the global `grpc` connection options the first time a healthcheck is started. That seems unexpected, and I believe we want the concurrency limit set for all `grpc`` dial operations, irrespective of whether those connections are used for healtchecking or anything else.

This change moves the concurrency limit into the `grpcclient` package, and sets it on any `grpc` connection opened via that package.

Signed-off-by: Arthur Schreiber <[email protected]>
  • Loading branch information
arthurschreiber committed Jul 14, 2024
1 parent 3d36adb commit 9d2edb8
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 40 deletions.
10 changes: 1 addition & 9 deletions go/vt/discovery/healthcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ import (
"github.com/google/safehtml/template"
"github.com/google/safehtml/template/uncheckedconversions"
"github.com/spf13/pflag"
"golang.org/x/sync/semaphore"

"vitess.io/vitess/go/flagutil"
"vitess.io/vitess/go/netutil"
Expand Down Expand Up @@ -92,9 +91,6 @@ var (
// refreshKnownTablets tells us whether to process all tablets or only new tablets.
refreshKnownTablets = true

// healthCheckDialConcurrency tells us how many healthcheck connections can be opened to tablets at once. This should be less than the golang max thread limit of 10000.
healthCheckDialConcurrency int64 = 1024

// How much to sleep between each check.
waitAvailableTabletInterval = 100 * time.Millisecond

Expand Down Expand Up @@ -177,7 +173,6 @@ func registerWebUIFlags(fs *pflag.FlagSet) {
fs.StringVar(&TabletURLTemplateString, "tablet_url_template", "http://{{.GetTabletHostPort}}", "Format string describing debug tablet url formatting. See getTabletDebugURL() for how to customize this.")
fs.DurationVar(&refreshInterval, "tablet_refresh_interval", 1*time.Minute, "Tablet refresh interval.")
fs.BoolVar(&refreshKnownTablets, "tablet_refresh_known_tablets", true, "Whether to reload the tablet's address/port map from topo in case they change.")
fs.Int64Var(&healthCheckDialConcurrency, "healthcheck-dial-concurrency", 1024, "Maximum concurrency of new healthcheck connections. This should be less than the golang max thread limit of 10000.")
ParseTabletURLTemplateFromFlag()
}

Expand Down Expand Up @@ -297,8 +292,6 @@ type HealthCheckImpl struct {
subscribers map[chan *TabletHealth]struct{}
// loadTablets trigger is used to immediately load a new primary tablet when the current one has been demoted
loadTabletsTrigger chan struct{}
// healthCheckDialSem is used to limit how many healthcheck connections can be opened to tablets at once.
healthCheckDialSem *semaphore.Weighted
}

// NewHealthCheck creates a new HealthCheck object.
Expand Down Expand Up @@ -333,7 +326,6 @@ func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Dur
cell: localCell,
retryDelay: retryDelay,
healthCheckTimeout: healthCheckTimeout,
healthCheckDialSem: semaphore.NewWeighted(healthCheckDialConcurrency),
healthByAlias: make(map[tabletAliasString]*tabletHealthCheck),
healthData: make(map[KeyspaceShardTabletType]map[tabletAliasString]*TabletHealth),
healthy: make(map[KeyspaceShardTabletType][]*TabletHealth),
Expand Down Expand Up @@ -844,7 +836,7 @@ func (hc *HealthCheckImpl) TabletConnection(ctx context.Context, alias *topodata
// TODO: test that throws this error
return nil, vterrors.Errorf(vtrpc.Code_NOT_FOUND, "tablet: %v is either down or nonexistent", alias)
}
return thc.Connection(ctx, hc), nil
return thc.Connection(ctx), nil
}

// getAliasByCell should only be called while holding hc.mu
Expand Down
37 changes: 6 additions & 31 deletions go/vt/discovery/tablet_health_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package discovery
import (
"context"
"fmt"
"net"
"strings"
"sync"
"sync/atomic"
Expand All @@ -34,16 +33,12 @@ import (
"vitess.io/vitess/go/vt/vttablet/queryservice"
"vitess.io/vitess/go/vt/vttablet/tabletconn"

"google.golang.org/grpc"
"google.golang.org/protobuf/proto"

"vitess.io/vitess/go/vt/proto/query"
"vitess.io/vitess/go/vt/proto/topodata"
)

// withDialerContextOnce ensures grpc.WithDialContext() is added once to the options.
var withDialerContextOnce sync.Once

// tabletHealthCheck maintains the health status of a tablet. A map of this
// structure is maintained in HealthCheck.
type tabletHealthCheck struct {
Expand Down Expand Up @@ -127,8 +122,8 @@ func (thc *tabletHealthCheck) setServingState(serving bool, reason string) {
}

// stream streams healthcheck responses to callback.
func (thc *tabletHealthCheck) stream(ctx context.Context, hc *HealthCheckImpl, callback func(*query.StreamHealthResponse) error) error {
conn := thc.Connection(ctx, hc)
func (thc *tabletHealthCheck) stream(ctx context.Context, callback func(*query.StreamHealthResponse) error) error {
conn := thc.Connection(ctx)
if conn == nil {
// This signals the caller to retry
return nil
Expand All @@ -141,34 +136,14 @@ func (thc *tabletHealthCheck) stream(ctx context.Context, hc *HealthCheckImpl, c
return err
}

func (thc *tabletHealthCheck) Connection(ctx context.Context, hc *HealthCheckImpl) queryservice.QueryService {
func (thc *tabletHealthCheck) Connection(ctx context.Context) queryservice.QueryService {
thc.connMu.Lock()
defer thc.connMu.Unlock()
return thc.connectionLocked(ctx, hc)
}

func healthCheckDialerFactory(hc *HealthCheckImpl) func(ctx context.Context, addr string) (net.Conn, error) {
return func(ctx context.Context, addr string) (net.Conn, error) {
// Limit the number of healthcheck connections opened in parallel to avoid high OS-thread
// usage due to blocking networking syscalls (eg: DNS lookups, TCP connection opens,
// etc). Without this limit it is possible for vtgates watching >10k tablets to hit
// the panic: 'runtime: program exceeds 10000-thread limit'.
if err := hc.healthCheckDialSem.Acquire(ctx, 1); err != nil {
return nil, err
}
defer hc.healthCheckDialSem.Release(1)
var dialer net.Dialer
return dialer.DialContext(ctx, "tcp", addr)
}
return thc.connectionLocked(ctx)
}

func (thc *tabletHealthCheck) connectionLocked(ctx context.Context, hc *HealthCheckImpl) queryservice.QueryService {
func (thc *tabletHealthCheck) connectionLocked(ctx context.Context) queryservice.QueryService {
if thc.Conn == nil {
withDialerContextOnce.Do(func() {
grpcclient.RegisterGRPCDialOptions(func(opts []grpc.DialOption) ([]grpc.DialOption, error) {
return append(opts, grpc.WithContextDialer(healthCheckDialerFactory(hc))), nil
})
})
conn, err := tabletconn.GetDialer()(ctx, thc.Tablet, grpcclient.FailFast(true))
if err != nil {
thc.LastError = err
Expand Down Expand Up @@ -297,7 +272,7 @@ func (thc *tabletHealthCheck) checkConn(hc *HealthCheckImpl) {
}()

// Read stream health responses.
err := thc.stream(streamCtx, hc, func(shr *query.StreamHealthResponse) error {
err := thc.stream(streamCtx, func(shr *query.StreamHealthResponse) error {
// We received a message. Reset the back-off.
retryDelay = hc.retryDelay
// Don't block on send to avoid deadlocks.
Expand Down
54 changes: 54 additions & 0 deletions go/vt/grpcclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@ package grpcclient
import (
"context"
"crypto/tls"
"net"
"sync"
"time"

grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/spf13/pflag"
"golang.org/x/sync/semaphore"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
Expand All @@ -46,6 +48,10 @@ var (
initialConnWindowSize int
initialWindowSize int

// `dialConcurrencyLimit` tells us how many tablet grpc connections can be dialed concurrently.
// This should be less than the golang max thread limit of 10000.
dialConcurrencyLimit int64 = 1024

// every vitess binary that makes grpc client-side calls.
grpcclientBinaries = []string{
"mysqlctld",
Expand Down Expand Up @@ -74,9 +80,24 @@ func RegisterFlags(fs *pflag.FlagSet) {
fs.StringVar(&credsFile, "grpc_auth_static_client_creds", credsFile, "When using grpc_static_auth in the server, this file provides the credentials to use to authenticate with server.")
}

func RegisterDialConcurrencyFlagsHealthcheck(fs *pflag.FlagSet) {
// TODO: Deprecate this and rename it to `grpc-dial-concurrency-limit`
fs.Int64Var(&dialConcurrencyLimit, "healthcheck-dial-concurrency", 1024, "Maximum concurrency of new healthcheck connections. This should be less than the golang max thread limit of 10000.")
}

func RegisterDialConcurrencyFlags(fs *pflag.FlagSet) {
fs.Int64Var(&dialConcurrencyLimit, "grpc-dial-concurrency-limit", 1024, "Maximum concurrency of grpc dial operations. This should be less than the golang max thread limit of 10000.")
}

func init() {
for _, cmd := range grpcclientBinaries {
servenv.OnParseFor(cmd, RegisterFlags)

if cmd == "vtgate" || cmd == "vtcombo" || cmd == "vtctld" {
servenv.OnParseFor(cmd, RegisterDialConcurrencyFlagsHealthcheck)
} else {
servenv.OnParseFor(cmd, RegisterDialConcurrencyFlags)
}
}
}

Expand Down Expand Up @@ -129,6 +150,10 @@ func DialContext(ctx context.Context, target string, failFast FailFast, opts ...
newopts = append(newopts, grpc.WithInitialWindowSize(int32(initialWindowSize)))
}

if dialConcurrencyLimit > 0 {
newopts = append(newopts, dialConcurrencyLimitOption())
}

newopts = append(newopts, opts...)
var err error
grpcDialOptionsMu.Lock()
Expand Down Expand Up @@ -175,6 +200,35 @@ func SecureDialOption(cert, key, ca, crl, name string) (grpc.DialOption, error)
return grpc.WithTransportCredentials(creds), nil
}

var dialConcurrencyLimitOpt grpc.DialOption

// withDialerContextOnce ensures grpc.WithDialContext() is added once to the options.
var dialConcurrencyLimitOnce sync.Once

func dialConcurrencyLimitOption() grpc.DialOption {
dialConcurrencyLimitOnce.Do(func() {
// This semaphore is used to limit how many grpc connections can be dialed to tablets simultanously.
// This does not limit how many tablet connections can be open at the same time.
sem := semaphore.NewWeighted(dialConcurrencyLimit)

dialConcurrencyLimitOpt = grpc.WithContextDialer(func(ctx context.Context, addr string) (net.Conn, error) {
// Limit the number of grpc connections opened in parallel to avoid high OS-thread
// usage due to blocking networking syscalls (eg: DNS lookups, TCP connection opens,
// etc). Without this limit it is possible for vtgates watching >10k tablets to hit
// the panic: 'runtime: program exceeds 10000-thread limit'.
if err := sem.Acquire(ctx, 1); err != nil {
return nil, err
}
defer sem.Release(1)

var dialer net.Dialer
return dialer.DialContext(ctx, "tcp", addr)
})
})

return dialConcurrencyLimitOpt
}

// Allows for building a chain of interceptors without knowing the total size up front
type clientInterceptorBuilder struct {
unaryInterceptors []grpc.UnaryClientInterceptor
Expand Down

0 comments on commit 9d2edb8

Please sign in to comment.