From 625097303b93aee8851dec13e663175bfb4aa1bf Mon Sep 17 00:00:00 2001 From: Manan Gupta Date: Tue, 12 Dec 2023 19:05:41 +0530 Subject: [PATCH 1/2] feat: rework unserveCommon so that we don't wait for requests always Signed-off-by: Manan Gupta --- go/vt/vttablet/tabletserver/state_manager.go | 47 +++++++++++++++++--- 1 file changed, 40 insertions(+), 7 deletions(-) diff --git a/go/vt/vttablet/tabletserver/state_manager.go b/go/vt/vttablet/tabletserver/state_manager.go index 98ed846600c..8e4c49ddfda 100644 --- a/go/vt/vttablet/tabletserver/state_manager.go +++ b/go/vt/vttablet/tabletserver/state_manager.go @@ -71,6 +71,8 @@ var ErrNoTarget = vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "No target") // stateManager manages state transition for all the TabletServer // subcomponents. type stateManager struct { + ctx context.Context + cancel context.CancelFunc // transitioning is a semaphore that must to be obtained // before attempting a state transition. To prevent deadlocks, // this must be acquired before the mu lock. We use a semaphore @@ -100,7 +102,7 @@ type stateManager struct { reason string transitionErr error - requests sync.WaitGroup + requestsCounter atomic.Int64 // QueryList does not have an Open or Close. statelessql *QueryList @@ -401,13 +403,13 @@ func (sm *stateManager) StartRequest(ctx context.Context, target *querypb.Target if err != nil { return err } - sm.requests.Add(1) + sm.requestsCounter.Add(1) return nil } // EndRequest unregisters the current request (a waitgroup) as done. func (sm *stateManager) EndRequest() { - sm.requests.Done() + sm.requestsCounter.Add(-1) } // VerifyTarget allows requests to be executed even in non-serving state. @@ -487,7 +489,7 @@ func (sm *stateManager) unservePrimary() error { func (sm *stateManager) serveNonPrimary(wantTabletType topodatapb.TabletType) error { // We are likely transitioning from primary. We have to honor // the shutdown grace period. - cancel := sm.handleShutdownGracePeriod() + cancel := sm.handleShutdownGracePeriod(nil) defer cancel() sm.ddle.Close() @@ -540,8 +542,10 @@ func (sm *stateManager) connect(tabletType topodatapb.TabletType) error { } func (sm *stateManager) unserveCommon() { + wg := sync.WaitGroup{} + wg.Add(1) log.Infof("Started execution of unserveCommon") - cancel := sm.handleShutdownGracePeriod() + cancel := sm.handleShutdownGracePeriod(&wg) log.Infof("Finished execution of handleShutdownGracePeriod") defer cancel() @@ -560,16 +564,45 @@ func (sm *stateManager) unserveCommon() { log.Info("Finished Killing all OLAP queries. Started tracker close") sm.tracker.Close() log.Infof("Finished tracker close. Started wait for requests") - sm.requests.Wait() + // If there is not shutdown grace period, then we should wait for all the requests to be empty. + if sm.shutdownGracePeriod == 0 { + sm.waitForRequestsToBeEmpty() + } else { + // We quickly check if the requests are empty or not. + // If they are, then we don't need to wait for the shutdown to complete. + count := sm.requestsCounter.Load() + if count == 0 { + return + } + // Otherwise, we should wait for all olap queries to be killed. + // We don't need to wait for requests to be empty since we have ensured all the queries against MySQL have been killed. + wg.Wait() + } + log.Infof("Finished wait for requests. Finished execution of unserveCommon") } -func (sm *stateManager) handleShutdownGracePeriod() (cancel func()) { +func (sm *stateManager) waitForRequestsToBeEmpty() { + ticker := time.NewTicker(100 * time.Millisecond) + defer ticker.Stop() + for { + count := sm.requestsCounter.Load() + if count == 0 { + return + } + <-ticker.C + } +} + +func (sm *stateManager) handleShutdownGracePeriod(wg *sync.WaitGroup) (cancel func()) { if sm.shutdownGracePeriod == 0 { return func() {} } ctx, cancel := context.WithCancel(context.TODO()) go func() { + if wg != nil { + defer wg.Done() + } if err := timer.SleepContext(ctx, sm.shutdownGracePeriod); err != nil { return } From c28226fcad60e26b005b6a78949ed8bbb3982dc6 Mon Sep 17 00:00:00 2001 From: Manan Gupta Date: Thu, 14 Dec 2023 14:24:40 +0530 Subject: [PATCH 2/2] refactor: minor refactor of code to make it more readable Signed-off-by: Manan Gupta --- go/vt/vttablet/tabletserver/query_executor.go | 2 +- go/vt/vttablet/tabletserver/state_manager.go | 26 +++++++++---------- .../vttablet/tabletserver/tabletenv/config.go | 2 +- 3 files changed, 14 insertions(+), 16 deletions(-) diff --git a/go/vt/vttablet/tabletserver/query_executor.go b/go/vt/vttablet/tabletserver/query_executor.go index e53bdaec754..cb513b946c8 100644 --- a/go/vt/vttablet/tabletserver/query_executor.go +++ b/go/vt/vttablet/tabletserver/query_executor.go @@ -1104,7 +1104,7 @@ func (qre *QueryExecutor) execStreamSQL(conn *connpool.PooledConn, isTransaction // Add query detail object into QueryExecutor TableServer list w.r.t if it is a transactional or not. Previously we were adding it // to olapql list regardless but that resulted in problems, where long-running stream queries which can be stateful (or transactional) - // weren't getting cleaned up during unserveCommon>handleShutdownGracePeriod in state_manager.go. + // weren't getting cleaned up during unserveCommon>terminateAllQueries in state_manager.go. // This change will ensure that long-running streaming stateful queries get gracefully shutdown during ServingTypeChange // once their grace period is over. qd := NewQueryDetail(qre.logStats.Ctx, conn.Conn) diff --git a/go/vt/vttablet/tabletserver/state_manager.go b/go/vt/vttablet/tabletserver/state_manager.go index 8e4c49ddfda..1f94daaba08 100644 --- a/go/vt/vttablet/tabletserver/state_manager.go +++ b/go/vt/vttablet/tabletserver/state_manager.go @@ -487,11 +487,6 @@ func (sm *stateManager) unservePrimary() error { } func (sm *stateManager) serveNonPrimary(wantTabletType topodatapb.TabletType) error { - // We are likely transitioning from primary. We have to honor - // the shutdown grace period. - cancel := sm.handleShutdownGracePeriod(nil) - defer cancel() - sm.ddle.Close() sm.tableGC.Close() sm.messager.Close() @@ -545,8 +540,8 @@ func (sm *stateManager) unserveCommon() { wg := sync.WaitGroup{} wg.Add(1) log.Infof("Started execution of unserveCommon") - cancel := sm.handleShutdownGracePeriod(&wg) - log.Infof("Finished execution of handleShutdownGracePeriod") + cancel := sm.terminateAllQueries(&wg) + log.Infof("Finished execution of terminateAllQueries") defer cancel() log.Infof("Started online ddl executor close") @@ -564,7 +559,12 @@ func (sm *stateManager) unserveCommon() { log.Info("Finished Killing all OLAP queries. Started tracker close") sm.tracker.Close() log.Infof("Finished tracker close. Started wait for requests") - // If there is not shutdown grace period, then we should wait for all the requests to be empty. + sm.handleShutdownGracePeriod(&wg) + log.Infof("Finished handling grace period. Finished execution of unserveCommon") +} + +func (sm *stateManager) handleShutdownGracePeriod(wg *sync.WaitGroup) { + // If there is no shutdown grace period specified, then we should wait for all the requests to be empty. if sm.shutdownGracePeriod == 0 { sm.waitForRequestsToBeEmpty() } else { @@ -578,8 +578,6 @@ func (sm *stateManager) unserveCommon() { // We don't need to wait for requests to be empty since we have ensured all the queries against MySQL have been killed. wg.Wait() } - - log.Infof("Finished wait for requests. Finished execution of unserveCommon") } func (sm *stateManager) waitForRequestsToBeEmpty() { @@ -594,7 +592,7 @@ func (sm *stateManager) waitForRequestsToBeEmpty() { } } -func (sm *stateManager) handleShutdownGracePeriod(wg *sync.WaitGroup) (cancel func()) { +func (sm *stateManager) terminateAllQueries(wg *sync.WaitGroup) (cancel func()) { if sm.shutdownGracePeriod == 0 { return func() {} } @@ -608,7 +606,7 @@ func (sm *stateManager) handleShutdownGracePeriod(wg *sync.WaitGroup) (cancel fu } log.Infof("Grace Period %v exceeded. Killing all OLTP queries.", sm.shutdownGracePeriod) sm.statelessql.TerminateAll() - log.Infof("Killed all stateful OLTP queries.") + log.Infof("Killed all stateless OLTP queries.") sm.statefulql.TerminateAll() log.Infof("Killed all OLTP queries.") }() @@ -658,7 +656,7 @@ func (sm *stateManager) setState(tabletType topodatapb.TabletType, state serving log.Infof("TabletServer transition: %v -> %v for tablet %s:%s/%s", sm.stateStringLocked(sm.target.TabletType, sm.state), sm.stateStringLocked(tabletType, state), sm.target.Cell, sm.target.Keyspace, sm.target.Shard) - sm.handleGracePeriod(tabletType) + sm.handleTransitionGracePeriod(tabletType) sm.target.TabletType = tabletType if sm.state == StateNotConnected { // If we're transitioning out of StateNotConnected, we have @@ -677,7 +675,7 @@ func (sm *stateManager) stateStringLocked(tabletType topodatapb.TabletType, stat return fmt.Sprintf("%v: %v, %v", tabletType, state, sm.ptsTimestamp.Local().Format("Jan 2, 2006 at 15:04:05 (MST)")) } -func (sm *stateManager) handleGracePeriod(tabletType topodatapb.TabletType) { +func (sm *stateManager) handleTransitionGracePeriod(tabletType topodatapb.TabletType) { if tabletType != topodatapb.TabletType_PRIMARY { // We allow serving of previous type only for a primary transition. sm.alsoAllow = nil diff --git a/go/vt/vttablet/tabletserver/tabletenv/config.go b/go/vt/vttablet/tabletserver/tabletenv/config.go index ac2629709b9..885bb884152 100644 --- a/go/vt/vttablet/tabletserver/tabletenv/config.go +++ b/go/vt/vttablet/tabletserver/tabletenv/config.go @@ -124,7 +124,7 @@ func registerTabletEnvFlags(fs *pflag.FlagSet) { fs.IntVar(¤tConfig.MessagePostponeParallelism, "queryserver-config-message-postpone-cap", defaultConfig.MessagePostponeParallelism, "query server message postpone cap is the maximum number of messages that can be postponed at any given time. Set this number to substantially lower than transaction cap, so that the transaction pool isn't exhausted by the message subsystem.") currentConfig.Oltp.TxTimeoutSeconds = defaultConfig.Oltp.TxTimeoutSeconds.Clone() fs.Var(¤tConfig.Oltp.TxTimeoutSeconds, currentConfig.Oltp.TxTimeoutSeconds.Name(), "query server transaction timeout (in seconds), a transaction will be killed if it takes longer than this value") - currentConfig.GracePeriods.ShutdownSeconds = flagutil.NewDeprecatedFloat64Seconds(defaultConfig.GracePeriods.ShutdownSeconds.Name(), defaultConfig.GracePeriods.TransitionSeconds.Get()) + currentConfig.GracePeriods.ShutdownSeconds = flagutil.NewDeprecatedFloat64Seconds(defaultConfig.GracePeriods.ShutdownSeconds.Name(), defaultConfig.GracePeriods.ShutdownSeconds.Get()) fs.Var(¤tConfig.GracePeriods.ShutdownSeconds, currentConfig.GracePeriods.ShutdownSeconds.Name(), "how long to wait (in seconds) for queries and transactions to complete during graceful shutdown.") fs.IntVar(¤tConfig.Oltp.MaxRows, "queryserver-config-max-result-size", defaultConfig.Oltp.MaxRows, "query server max result size, maximum number of rows allowed to return from vttablet for non-streaming queries.") fs.IntVar(¤tConfig.Oltp.WarnRows, "queryserver-config-warn-result-size", defaultConfig.Oltp.WarnRows, "query server result size warning threshold, warn if number of rows returned from vttablet for non-streaming queries exceeds this")