Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Limit concurrency of .AddTablet() calls in discovery healthcheck #180

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,6 @@ github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwc
github.com/cpuguy83/go-md2man/v2 v2.0.2/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/cyphar/filepath-securejoin v0.2.3 h1:YX6ebbZCZP7VkM3scTTokDgBL2TY741X51MTk3ycuNI=
github.com/cyphar/filepath-securejoin v0.2.3/go.mod h1:aPGpWjXOXUn2NCNjFvBE6aRxGGx79pTxQpKOJNYHHl4=
github.com/cyphar/filepath-securejoin v0.2.4 h1:Ugdm7cg7i6ZK6x3xDF1oEu1nfkyfH53EtKeQYTC3kyg=
github.com/cyphar/filepath-securejoin v0.2.4/go.mod h1:aPGpWjXOXUn2NCNjFvBE6aRxGGx79pTxQpKOJNYHHl4=
Expand Down
2 changes: 2 additions & 0 deletions go/flags/endtoend/vtgate.txt
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ Usage of vtgate:
gRPC server permit client keepalive pings even when there are no active streams (RPCs)
--grpc_use_effective_callerid
If set, and SSL is not used, will set the immediate caller id from the effective caller id's principal.
--healthcheck_concurrency int
concurrent healthchecks (default 32)
--healthcheck_retry_delay duration
health check retry delay (default 2ms)
--healthcheck_timeout duration
Expand Down
2 changes: 2 additions & 0 deletions go/flags/endtoend/vttablet.txt
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,8 @@ Usage of vttablet:
gRPC server permit client keepalive pings even when there are no active streams (RPCs)
--health_check_interval duration
Interval between health checks (default 20s)
--healthcheck_concurrency int
concurrent healthchecks (default 32)
--heartbeat_enable
If true, vttablet records (if master) or checks (if replica) the current time of a replication heartbeat in the table _vt.heartbeat. The result is used to inform the serving state of the vttablet via healthchecks.
--heartbeat_interval duration
Expand Down
7 changes: 7 additions & 0 deletions go/vt/discovery/healthcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ var (
refreshKnownTablets = flag.Bool("tablet_refresh_known_tablets", true, "tablet refresh reloads the tablet address/port map from topo in case it changes")
// topoReadConcurrency tells us how many topo reads are allowed in parallel
topoReadConcurrency = flag.Int("topo_read_concurrency", 32, "concurrent topo reads")
// healthCheckConcurrency tells us how many tablets can be healthchecked in parallel
healthCheckConcurrency = flag.Int("healthcheck_concurrency", 32, "concurrent healthchecks")
)

// See the documentation for NewHealthCheck below for an explanation of these parameters.
Expand Down Expand Up @@ -260,6 +262,8 @@ type HealthCheckImpl struct {
subMu sync.Mutex
// subscribers
subscribers map[chan *TabletHealth]struct{}
// healthCheckSem
healthCheckSem chan int
}

// NewHealthCheck creates a new HealthCheck object.
Expand Down Expand Up @@ -294,6 +298,7 @@ func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Dur
cell: localCell,
retryDelay: retryDelay,
healthCheckTimeout: healthCheckTimeout,
healthCheckSem: make(chan int, *healthCheckConcurrency),
healthByAlias: make(map[tabletAliasString]*tabletHealthCheck),
healthData: make(map[KeyspaceShardTabletType]map[tabletAliasString]*TabletHealth),
healthy: make(map[KeyspaceShardTabletType][]*TabletHealth),
Expand Down Expand Up @@ -384,9 +389,11 @@ func (hc *HealthCheckImpl) AddTablet(tablet *topodata.Tablet) {
}
hc.healthData[key][tabletAliasString(tabletAlias)] = res

hc.healthCheckSem <- 1 // Wait for active queue to drain.
hc.broadcast(res)
hc.connsWG.Add(1)
go thc.checkConn(hc)
<-hc.healthCheckSem
}

// RemoveTablet removes the tablet, and stops the health check.
Expand Down
Loading