From 9d2edb8c03c228c112fd9715878272e73726f47e Mon Sep 17 00:00:00 2001 From: Arthur Schreiber Date: Sun, 14 Jul 2024 15:28:31 +0000 Subject: [PATCH] Move concurrent connection dial limit out of `healthcheck`. There's a concurrent connection dial limit implemented in the healthcheck code that doesn't semantically or logically belong here. First, the healthcheck code does neither know nor care about the network protocol used to execute healthchecks. Arguably, there's no other protocol used for this apart from `grpc`, but it seems wrong to set up a `grpc` connection specific options in the healthcheck code. Additionally, the dial concurrency limit modifies the global `grpc` connection options the first time a healthcheck is started. That seems unexpected, and I believe we want the concurrency limit set for all `grpc`` dial operations, irrespective of whether those connections are used for healtchecking or anything else. This change moves the concurrency limit into the `grpcclient` package, and sets it on any `grpc` connection opened via that package. Signed-off-by: Arthur Schreiber --- go/vt/discovery/healthcheck.go | 10 +---- go/vt/discovery/tablet_health_check.go | 37 +++--------------- go/vt/grpcclient/client.go | 54 ++++++++++++++++++++++++++ 3 files changed, 61 insertions(+), 40 deletions(-) diff --git a/go/vt/discovery/healthcheck.go b/go/vt/discovery/healthcheck.go index 70799b0f6bc..287bbd19dc0 100644 --- a/go/vt/discovery/healthcheck.go +++ b/go/vt/discovery/healthcheck.go @@ -46,7 +46,6 @@ import ( "github.com/google/safehtml/template" "github.com/google/safehtml/template/uncheckedconversions" "github.com/spf13/pflag" - "golang.org/x/sync/semaphore" "vitess.io/vitess/go/flagutil" "vitess.io/vitess/go/netutil" @@ -92,9 +91,6 @@ var ( // refreshKnownTablets tells us whether to process all tablets or only new tablets. refreshKnownTablets = true - // healthCheckDialConcurrency tells us how many healthcheck connections can be opened to tablets at once. This should be less than the golang max thread limit of 10000. - healthCheckDialConcurrency int64 = 1024 - // How much to sleep between each check. waitAvailableTabletInterval = 100 * time.Millisecond @@ -177,7 +173,6 @@ func registerWebUIFlags(fs *pflag.FlagSet) { fs.StringVar(&TabletURLTemplateString, "tablet_url_template", "http://{{.GetTabletHostPort}}", "Format string describing debug tablet url formatting. See getTabletDebugURL() for how to customize this.") fs.DurationVar(&refreshInterval, "tablet_refresh_interval", 1*time.Minute, "Tablet refresh interval.") fs.BoolVar(&refreshKnownTablets, "tablet_refresh_known_tablets", true, "Whether to reload the tablet's address/port map from topo in case they change.") - fs.Int64Var(&healthCheckDialConcurrency, "healthcheck-dial-concurrency", 1024, "Maximum concurrency of new healthcheck connections. This should be less than the golang max thread limit of 10000.") ParseTabletURLTemplateFromFlag() } @@ -297,8 +292,6 @@ type HealthCheckImpl struct { subscribers map[chan *TabletHealth]struct{} // loadTablets trigger is used to immediately load a new primary tablet when the current one has been demoted loadTabletsTrigger chan struct{} - // healthCheckDialSem is used to limit how many healthcheck connections can be opened to tablets at once. - healthCheckDialSem *semaphore.Weighted } // NewHealthCheck creates a new HealthCheck object. @@ -333,7 +326,6 @@ func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Dur cell: localCell, retryDelay: retryDelay, healthCheckTimeout: healthCheckTimeout, - healthCheckDialSem: semaphore.NewWeighted(healthCheckDialConcurrency), healthByAlias: make(map[tabletAliasString]*tabletHealthCheck), healthData: make(map[KeyspaceShardTabletType]map[tabletAliasString]*TabletHealth), healthy: make(map[KeyspaceShardTabletType][]*TabletHealth), @@ -844,7 +836,7 @@ func (hc *HealthCheckImpl) TabletConnection(ctx context.Context, alias *topodata // TODO: test that throws this error return nil, vterrors.Errorf(vtrpc.Code_NOT_FOUND, "tablet: %v is either down or nonexistent", alias) } - return thc.Connection(ctx, hc), nil + return thc.Connection(ctx), nil } // getAliasByCell should only be called while holding hc.mu diff --git a/go/vt/discovery/tablet_health_check.go b/go/vt/discovery/tablet_health_check.go index 64450f4c8c6..ecadeefdf78 100644 --- a/go/vt/discovery/tablet_health_check.go +++ b/go/vt/discovery/tablet_health_check.go @@ -19,7 +19,6 @@ package discovery import ( "context" "fmt" - "net" "strings" "sync" "sync/atomic" @@ -34,16 +33,12 @@ import ( "vitess.io/vitess/go/vt/vttablet/queryservice" "vitess.io/vitess/go/vt/vttablet/tabletconn" - "google.golang.org/grpc" "google.golang.org/protobuf/proto" "vitess.io/vitess/go/vt/proto/query" "vitess.io/vitess/go/vt/proto/topodata" ) -// withDialerContextOnce ensures grpc.WithDialContext() is added once to the options. -var withDialerContextOnce sync.Once - // tabletHealthCheck maintains the health status of a tablet. A map of this // structure is maintained in HealthCheck. type tabletHealthCheck struct { @@ -127,8 +122,8 @@ func (thc *tabletHealthCheck) setServingState(serving bool, reason string) { } // stream streams healthcheck responses to callback. -func (thc *tabletHealthCheck) stream(ctx context.Context, hc *HealthCheckImpl, callback func(*query.StreamHealthResponse) error) error { - conn := thc.Connection(ctx, hc) +func (thc *tabletHealthCheck) stream(ctx context.Context, callback func(*query.StreamHealthResponse) error) error { + conn := thc.Connection(ctx) if conn == nil { // This signals the caller to retry return nil @@ -141,34 +136,14 @@ func (thc *tabletHealthCheck) stream(ctx context.Context, hc *HealthCheckImpl, c return err } -func (thc *tabletHealthCheck) Connection(ctx context.Context, hc *HealthCheckImpl) queryservice.QueryService { +func (thc *tabletHealthCheck) Connection(ctx context.Context) queryservice.QueryService { thc.connMu.Lock() defer thc.connMu.Unlock() - return thc.connectionLocked(ctx, hc) -} - -func healthCheckDialerFactory(hc *HealthCheckImpl) func(ctx context.Context, addr string) (net.Conn, error) { - return func(ctx context.Context, addr string) (net.Conn, error) { - // Limit the number of healthcheck connections opened in parallel to avoid high OS-thread - // usage due to blocking networking syscalls (eg: DNS lookups, TCP connection opens, - // etc). Without this limit it is possible for vtgates watching >10k tablets to hit - // the panic: 'runtime: program exceeds 10000-thread limit'. - if err := hc.healthCheckDialSem.Acquire(ctx, 1); err != nil { - return nil, err - } - defer hc.healthCheckDialSem.Release(1) - var dialer net.Dialer - return dialer.DialContext(ctx, "tcp", addr) - } + return thc.connectionLocked(ctx) } -func (thc *tabletHealthCheck) connectionLocked(ctx context.Context, hc *HealthCheckImpl) queryservice.QueryService { +func (thc *tabletHealthCheck) connectionLocked(ctx context.Context) queryservice.QueryService { if thc.Conn == nil { - withDialerContextOnce.Do(func() { - grpcclient.RegisterGRPCDialOptions(func(opts []grpc.DialOption) ([]grpc.DialOption, error) { - return append(opts, grpc.WithContextDialer(healthCheckDialerFactory(hc))), nil - }) - }) conn, err := tabletconn.GetDialer()(ctx, thc.Tablet, grpcclient.FailFast(true)) if err != nil { thc.LastError = err @@ -297,7 +272,7 @@ func (thc *tabletHealthCheck) checkConn(hc *HealthCheckImpl) { }() // Read stream health responses. - err := thc.stream(streamCtx, hc, func(shr *query.StreamHealthResponse) error { + err := thc.stream(streamCtx, func(shr *query.StreamHealthResponse) error { // We received a message. Reset the back-off. retryDelay = hc.retryDelay // Don't block on send to avoid deadlocks. diff --git a/go/vt/grpcclient/client.go b/go/vt/grpcclient/client.go index b8a8847ac4f..d46712de42c 100644 --- a/go/vt/grpcclient/client.go +++ b/go/vt/grpcclient/client.go @@ -21,12 +21,14 @@ package grpcclient import ( "context" "crypto/tls" + "net" "sync" "time" grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" "github.com/spf13/pflag" + "golang.org/x/sync/semaphore" "google.golang.org/grpc" "google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials/insecure" @@ -46,6 +48,10 @@ var ( initialConnWindowSize int initialWindowSize int + // `dialConcurrencyLimit` tells us how many tablet grpc connections can be dialed concurrently. + // This should be less than the golang max thread limit of 10000. + dialConcurrencyLimit int64 = 1024 + // every vitess binary that makes grpc client-side calls. grpcclientBinaries = []string{ "mysqlctld", @@ -74,9 +80,24 @@ func RegisterFlags(fs *pflag.FlagSet) { fs.StringVar(&credsFile, "grpc_auth_static_client_creds", credsFile, "When using grpc_static_auth in the server, this file provides the credentials to use to authenticate with server.") } +func RegisterDialConcurrencyFlagsHealthcheck(fs *pflag.FlagSet) { + // TODO: Deprecate this and rename it to `grpc-dial-concurrency-limit` + fs.Int64Var(&dialConcurrencyLimit, "healthcheck-dial-concurrency", 1024, "Maximum concurrency of new healthcheck connections. This should be less than the golang max thread limit of 10000.") +} + +func RegisterDialConcurrencyFlags(fs *pflag.FlagSet) { + fs.Int64Var(&dialConcurrencyLimit, "grpc-dial-concurrency-limit", 1024, "Maximum concurrency of grpc dial operations. This should be less than the golang max thread limit of 10000.") +} + func init() { for _, cmd := range grpcclientBinaries { servenv.OnParseFor(cmd, RegisterFlags) + + if cmd == "vtgate" || cmd == "vtcombo" || cmd == "vtctld" { + servenv.OnParseFor(cmd, RegisterDialConcurrencyFlagsHealthcheck) + } else { + servenv.OnParseFor(cmd, RegisterDialConcurrencyFlags) + } } } @@ -129,6 +150,10 @@ func DialContext(ctx context.Context, target string, failFast FailFast, opts ... newopts = append(newopts, grpc.WithInitialWindowSize(int32(initialWindowSize))) } + if dialConcurrencyLimit > 0 { + newopts = append(newopts, dialConcurrencyLimitOption()) + } + newopts = append(newopts, opts...) var err error grpcDialOptionsMu.Lock() @@ -175,6 +200,35 @@ func SecureDialOption(cert, key, ca, crl, name string) (grpc.DialOption, error) return grpc.WithTransportCredentials(creds), nil } +var dialConcurrencyLimitOpt grpc.DialOption + +// withDialerContextOnce ensures grpc.WithDialContext() is added once to the options. +var dialConcurrencyLimitOnce sync.Once + +func dialConcurrencyLimitOption() grpc.DialOption { + dialConcurrencyLimitOnce.Do(func() { + // This semaphore is used to limit how many grpc connections can be dialed to tablets simultanously. + // This does not limit how many tablet connections can be open at the same time. + sem := semaphore.NewWeighted(dialConcurrencyLimit) + + dialConcurrencyLimitOpt = grpc.WithContextDialer(func(ctx context.Context, addr string) (net.Conn, error) { + // Limit the number of grpc connections opened in parallel to avoid high OS-thread + // usage due to blocking networking syscalls (eg: DNS lookups, TCP connection opens, + // etc). Without this limit it is possible for vtgates watching >10k tablets to hit + // the panic: 'runtime: program exceeds 10000-thread limit'. + if err := sem.Acquire(ctx, 1); err != nil { + return nil, err + } + defer sem.Release(1) + + var dialer net.Dialer + return dialer.DialContext(ctx, "tcp", addr) + }) + }) + + return dialConcurrencyLimitOpt +} + // Allows for building a chain of interceptors without knowing the total size up front type clientInterceptorBuilder struct { unaryInterceptors []grpc.UnaryClientInterceptor