Skip to content

Commit

Permalink
Move semaphore to HealthCheckImpl
Browse files Browse the repository at this point in the history
Signed-off-by: Tim Vaillancourt <[email protected]>
  • Loading branch information
timvaillancourt committed Jan 19, 2024
1 parent 115238f commit 6331043
Show file tree
Hide file tree
Showing 5 changed files with 19 additions and 18 deletions.
4 changes: 2 additions & 2 deletions go/flags/endtoend/vtgate.txt
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ Usage of vtgate:
gRPC server permit client keepalive pings even when there are no active streams (RPCs)
--grpc_use_effective_callerid
If set, and SSL is not used, will set the immediate caller id from the effective caller id's principal.
--healthcheck_concurrency int
concurrent healthchecks (default 32)
--healthcheck_retry_delay duration
health check retry delay (default 2ms)
--healthcheck_timeout duration
Expand Down Expand Up @@ -313,8 +315,6 @@ Usage of vtgate:
the number of bytes sent from vtgate for each stream call. It's recommended to keep this value in sync with vttablet's query-server-config-stream-buffer-size. (default 32768)
--structured-logging
whether to use structured logging (Zap) or the original (glog) logger
--tablet_discovery_concurrency int
concurrent tablet discoveries (default 1024)
--tablet_filters value
Specifies a comma-separated list of 'keyspace|shard_name or keyrange' values to filter the tablets to watch
--tablet_grpc_ca string
Expand Down
4 changes: 2 additions & 2 deletions go/flags/endtoend/vttablet.txt
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,8 @@ Usage of vttablet:
gRPC server permit client keepalive pings even when there are no active streams (RPCs)
--health_check_interval duration
Interval between health checks (default 20s)
--healthcheck_concurrency int
concurrent healthchecks (default 32)
--heartbeat_enable
If true, vttablet records (if master) or checks (if replica) the current time of a replication heartbeat in the table _vt.heartbeat. The result is used to inform the serving state of the vttablet via healthchecks.
--heartbeat_interval duration
Expand Down Expand Up @@ -809,8 +811,6 @@ Usage of vttablet:
YAML file config for tablet
--tablet_dir string
The directory within the vtdataroot to store vttablet/mysql files. Defaults to being generated by the tablet uid.
--tablet_discovery_concurrency int
concurrent tablet discoveries (default 1024)
--tablet_filters value
Specifies a comma-separated list of 'keyspace|shard_name or keyrange' values to filter the tablets to watch
--tablet_grpc_ca string
Expand Down
11 changes: 8 additions & 3 deletions go/vt/discovery/healthcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,8 @@ var (
refreshKnownTablets = flag.Bool("tablet_refresh_known_tablets", true, "tablet refresh reloads the tablet address/port map from topo in case it changes")
// topoReadConcurrency tells us how many topo reads are allowed in parallel
topoReadConcurrency = flag.Int("topo_read_concurrency", 32, "concurrent topo reads")
// tabletDiscoveryConcurrency tells us how many tablets can be discovered in parallel
tabletDiscoveryConcurrency = flag.Int("tablet_discovery_concurrency", 1024, "concurrent tablet discoveries")
// healthCheckConcurrency tells us how many tablets can be healthchecked in parallel
healthCheckConcurrency = flag.Int("healthcheck_concurrency", 32, "concurrent healthchecks")
)

// See the documentation for NewHealthCheck below for an explanation of these parameters.
Expand Down Expand Up @@ -262,6 +262,8 @@ type HealthCheckImpl struct {
subMu sync.Mutex
// subscribers
subscribers map[chan *TabletHealth]struct{}
// healthCheckSem
healthCheckSem chan int
}

// NewHealthCheck creates a new HealthCheck object.
Expand Down Expand Up @@ -296,6 +298,7 @@ func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Dur
cell: localCell,
retryDelay: retryDelay,
healthCheckTimeout: healthCheckTimeout,
healthCheckSem: make(chan int, *healthCheckConcurrency),
healthByAlias: make(map[tabletAliasString]*tabletHealthCheck),
healthData: make(map[KeyspaceShardTabletType]map[tabletAliasString]*TabletHealth),
healthy: make(map[KeyspaceShardTabletType][]*TabletHealth),
Expand Down Expand Up @@ -326,7 +329,7 @@ func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Dur
} else if len(KeyspacesToWatch) > 0 {
filter = NewFilterByKeyspace(KeyspacesToWatch)
}
topoWatchers = append(topoWatchers, NewCellTabletsWatcher(ctx, topoServer, hc, filter, c, *refreshInterval, *refreshKnownTablets, *topoReadConcurrency, *tabletDiscoveryConcurrency))
topoWatchers = append(topoWatchers, NewCellTabletsWatcher(ctx, topoServer, hc, filter, c, *refreshInterval, *refreshKnownTablets, *topoReadConcurrency))
}

hc.topoWatchers = topoWatchers
Expand Down Expand Up @@ -386,9 +389,11 @@ func (hc *HealthCheckImpl) AddTablet(tablet *topodata.Tablet) {
}
hc.healthData[key][tabletAliasString(tabletAlias)] = res

hc.healthCheckSem <- 1 // Wait for active queue to drain.
hc.broadcast(res)
hc.connsWG.Add(1)
go thc.checkConn(hc)
<-hc.healthCheckSem
}

// RemoveTablet removes the tablet, and stops the health check.
Expand Down
10 changes: 3 additions & 7 deletions go/vt/discovery/topology_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ type TopologyWatcher struct {
refreshKnownTablets bool
getTablets func(tw *TopologyWatcher) ([]*topodata.TabletAlias, error)
sem chan int
tabletDiscoverySem chan int
ctx context.Context
cancelFunc context.CancelFunc
// wg keeps track of all launched Go routines.
Expand All @@ -95,7 +94,7 @@ type TopologyWatcher struct {

// NewTopologyWatcher returns a TopologyWatcher that monitors all
// the tablets in a cell, and starts refreshing.
func NewTopologyWatcher(ctx context.Context, topoServer *topo.Server, tr TabletRecorder, filter TabletFilter, cell string, refreshInterval time.Duration, refreshKnownTablets bool, topoReadConcurrency, tabletDiscoveryConcurrency int, getTablets func(tw *TopologyWatcher) ([]*topodata.TabletAlias, error)) *TopologyWatcher {
func NewTopologyWatcher(ctx context.Context, topoServer *topo.Server, tr TabletRecorder, filter TabletFilter, cell string, refreshInterval time.Duration, refreshKnownTablets bool, topoReadConcurrency int, getTablets func(tw *TopologyWatcher) ([]*topodata.TabletAlias, error)) *TopologyWatcher {
tw := &TopologyWatcher{
topoServer: topoServer,
tabletRecorder: tr,
Expand All @@ -105,7 +104,6 @@ func NewTopologyWatcher(ctx context.Context, topoServer *topo.Server, tr TabletR
refreshKnownTablets: refreshKnownTablets,
getTablets: getTablets,
sem: make(chan int, topoReadConcurrency),
tabletDiscoverySem: make(chan int, tabletDiscoveryConcurrency),
tablets: make(map[string]*tabletInfo),
}
tw.firstLoadChan = make(chan struct{})
Expand All @@ -118,8 +116,8 @@ func NewTopologyWatcher(ctx context.Context, topoServer *topo.Server, tr TabletR

// NewCellTabletsWatcher returns a TopologyWatcher that monitors all
// the tablets in a cell, and starts refreshing.
func NewCellTabletsWatcher(ctx context.Context, topoServer *topo.Server, tr TabletRecorder, f TabletFilter, cell string, refreshInterval time.Duration, refreshKnownTablets bool, topoReadConcurrency, tabletDiscoveryConcurrency int) *TopologyWatcher {
return NewTopologyWatcher(ctx, topoServer, tr, f, cell, refreshInterval, refreshKnownTablets, topoReadConcurrency, tabletDiscoveryConcurrency, func(tw *TopologyWatcher) ([]*topodata.TabletAlias, error) {
func NewCellTabletsWatcher(ctx context.Context, topoServer *topo.Server, tr TabletRecorder, f TabletFilter, cell string, refreshInterval time.Duration, refreshKnownTablets bool, topoReadConcurrency int) *TopologyWatcher {
return NewTopologyWatcher(ctx, topoServer, tr, f, cell, refreshInterval, refreshKnownTablets, topoReadConcurrency, func(tw *TopologyWatcher) ([]*topodata.TabletAlias, error) {
return tw.topoServer.GetTabletAliasesByCell(ctx, tw.cell)
})
}
Expand Down Expand Up @@ -232,11 +230,9 @@ func (tw *TopologyWatcher) loadTablets() {
topologyWatcherOperations.Add(topologyWatcherOpReplaceTablet, 1)
}
} else {
tw.tabletDiscoverySem <- 1 // Wait for active queue to drain.
// This is a new tablet record, let's add it to the healthcheck
tw.tabletRecorder.AddTablet(newVal.tablet)
topologyWatcherOperations.Add(topologyWatcherOpAddTablet, 1)
<-tw.tabletDiscoverySem // Done; enable next request to run
}
}

Expand Down
8 changes: 4 additions & 4 deletions go/vt/discovery/topology_watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func TestStartAndCloseTopoWatcher(t *testing.T) {
ts := memorytopo.NewServer("aa")
fhc := NewFakeHealthCheck(nil)
topologyWatcherOperations.ZeroAll()
tw := NewCellTabletsWatcher(context.Background(), ts, fhc, nil, "aa", 100*time.Microsecond, true, 5, 5)
tw := NewCellTabletsWatcher(context.Background(), ts, fhc, nil, "aa", 100*time.Microsecond, true, 5)

done := make(chan bool, 3)
result := make(chan bool, 1)
Expand Down Expand Up @@ -115,7 +115,7 @@ func checkWatcher(t *testing.T, refreshKnownTablets bool) {
logger := logutil.NewMemoryLogger()
topologyWatcherOperations.ZeroAll()
counts := topologyWatcherOperations.Counts()
tw := NewCellTabletsWatcher(context.Background(), ts, fhc, nil, "aa", 10*time.Minute, refreshKnownTablets, 5, 5)
tw := NewCellTabletsWatcher(context.Background(), ts, fhc, nil, "aa", 10*time.Minute, refreshKnownTablets, 5)

counts = checkOpCounts(t, counts, map[string]int64{})
checkChecksum(t, tw, 0)
Expand Down Expand Up @@ -432,7 +432,7 @@ func TestFilterByKeyspace(t *testing.T) {
hc := NewFakeHealthCheck(nil)
f := NewFilterByKeyspace(testKeyspacesToWatch)
ts := memorytopo.NewServer(testCell)
tw := NewCellTabletsWatcher(context.Background(), ts, hc, f, testCell, 10*time.Minute, true, 5, 5)
tw := NewCellTabletsWatcher(context.Background(), ts, hc, f, testCell, 10*time.Minute, true, 5)

for _, test := range testFilterByKeyspace {
// Add a new tablet to the topology.
Expand Down Expand Up @@ -514,7 +514,7 @@ func TestFilterByKeypsaceSkipsIgnoredTablets(t *testing.T) {
topologyWatcherOperations.ZeroAll()
counts := topologyWatcherOperations.Counts()
f := NewFilterByKeyspace(testKeyspacesToWatch)
tw := NewCellTabletsWatcher(context.Background(), ts, fhc, f, "aa", 10*time.Minute, false /*refreshKnownTablets*/, 5, 5)
tw := NewCellTabletsWatcher(context.Background(), ts, fhc, f, "aa", 10*time.Minute, false /*refreshKnownTablets*/, 5)

counts = checkOpCounts(t, counts, map[string]int64{})
checkChecksum(t, tw, 0)
Expand Down

0 comments on commit 6331043

Please sign in to comment.