diff --git a/go/vt/vttablet/tabletserver/query_executor.go b/go/vt/vttablet/tabletserver/query_executor.go index 862d41b115d..fd4978a2fe9 100644 --- a/go/vt/vttablet/tabletserver/query_executor.go +++ b/go/vt/vttablet/tabletserver/query_executor.go @@ -1111,7 +1111,7 @@ func (qre *QueryExecutor) execStreamSQL(conn *connpool.PooledConn, isTransaction // Add query detail object into QueryExecutor TableServer list w.r.t if it is a transactional or not. Previously we were adding it // to olapql list regardless but that resulted in problems, where long-running stream queries which can be stateful (or transactional) - // weren't getting cleaned up during unserveCommon>handleShutdownGracePeriod in state_manager.go. + // weren't getting cleaned up during unserveCommon>terminateAllQueries in state_manager.go. // This change will ensure that long-running streaming stateful queries get gracefully shutdown during ServingTypeChange // once their grace period is over. qd := NewQueryDetail(qre.logStats.Ctx, conn.Conn) diff --git a/go/vt/vttablet/tabletserver/state_manager.go b/go/vt/vttablet/tabletserver/state_manager.go index 8aa7776957f..26952b784c9 100644 --- a/go/vt/vttablet/tabletserver/state_manager.go +++ b/go/vt/vttablet/tabletserver/state_manager.go @@ -71,6 +71,8 @@ var ErrNoTarget = vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "No target") // stateManager manages state transition for all the TabletServer // subcomponents. type stateManager struct { + ctx context.Context + cancel context.CancelFunc // transitioning is a semaphore that must to be obtained // before attempting a state transition. To prevent deadlocks, // this must be acquired before the mu lock. We use a semaphore @@ -100,7 +102,7 @@ type stateManager struct { reason string transitionErr error - requests sync.WaitGroup + requestsCounter atomic.Int64 // QueryList does not have an Open or Close. statelessql *QueryList @@ -401,13 +403,13 @@ func (sm *stateManager) StartRequest(ctx context.Context, target *querypb.Target if err != nil { return err } - sm.requests.Add(1) + sm.requestsCounter.Add(1) return nil } // EndRequest unregisters the current request (a waitgroup) as done. func (sm *stateManager) EndRequest() { - sm.requests.Done() + sm.requestsCounter.Add(-1) } // VerifyTarget allows requests to be executed even in non-serving state. @@ -485,11 +487,6 @@ func (sm *stateManager) unservePrimary() error { } func (sm *stateManager) serveNonPrimary(wantTabletType topodatapb.TabletType) error { - // We are likely transitioning from primary. We have to honor - // the shutdown grace period. - cancel := sm.handleShutdownGracePeriod() - defer cancel() - sm.ddle.Close() sm.tableGC.Close() sm.messager.Close() @@ -540,9 +537,11 @@ func (sm *stateManager) connect(tabletType topodatapb.TabletType) error { } func (sm *stateManager) unserveCommon() { + wg := sync.WaitGroup{} + wg.Add(1) log.Infof("Started execution of unserveCommon") - cancel := sm.handleShutdownGracePeriod() - log.Infof("Finished execution of handleShutdownGracePeriod") + cancel := sm.terminateAllQueries(&wg) + log.Infof("Finished execution of terminateAllQueries") defer cancel() log.Infof("Started online ddl executor close") @@ -560,22 +559,54 @@ func (sm *stateManager) unserveCommon() { log.Info("Finished Killing all OLAP queries. Started tracker close") sm.tracker.Close() log.Infof("Finished tracker close. Started wait for requests") - sm.requests.Wait() - log.Infof("Finished wait for requests. Finished execution of unserveCommon") + sm.handleShutdownGracePeriod(&wg) + log.Infof("Finished handling grace period. Finished execution of unserveCommon") } -func (sm *stateManager) handleShutdownGracePeriod() (cancel func()) { +func (sm *stateManager) handleShutdownGracePeriod(wg *sync.WaitGroup) { + // If there is no shutdown grace period specified, then we should wait for all the requests to be empty. + if sm.shutdownGracePeriod == 0 { + sm.waitForRequestsToBeEmpty() + } else { + // We quickly check if the requests are empty or not. + // If they are, then we don't need to wait for the shutdown to complete. + count := sm.requestsCounter.Load() + if count == 0 { + return + } + // Otherwise, we should wait for all olap queries to be killed. + // We don't need to wait for requests to be empty since we have ensured all the queries against MySQL have been killed. + wg.Wait() + } +} + +func (sm *stateManager) waitForRequestsToBeEmpty() { + ticker := time.NewTicker(100 * time.Millisecond) + defer ticker.Stop() + for { + count := sm.requestsCounter.Load() + if count == 0 { + return + } + <-ticker.C + } +} + +func (sm *stateManager) terminateAllQueries(wg *sync.WaitGroup) (cancel func()) { if sm.shutdownGracePeriod == 0 { return func() {} } ctx, cancel := context.WithCancel(context.TODO()) go func() { + if wg != nil { + defer wg.Done() + } if err := timer.SleepContext(ctx, sm.shutdownGracePeriod); err != nil { return } log.Infof("Grace Period %v exceeded. Killing all OLTP queries.", sm.shutdownGracePeriod) sm.statelessql.TerminateAll() - log.Infof("Killed all stateful OLTP queries.") + log.Infof("Killed all stateless OLTP queries.") sm.statefulql.TerminateAll() log.Infof("Killed all OLTP queries.") }() @@ -625,7 +656,7 @@ func (sm *stateManager) setState(tabletType topodatapb.TabletType, state serving log.Infof("TabletServer transition: %v -> %v for tablet %s:%s/%s", sm.stateStringLocked(sm.target.TabletType, sm.state), sm.stateStringLocked(tabletType, state), sm.target.Cell, sm.target.Keyspace, sm.target.Shard) - sm.handleGracePeriod(tabletType) + sm.handleTransitionGracePeriod(tabletType) sm.target.TabletType = tabletType if sm.state == StateNotConnected { // If we're transitioning out of StateNotConnected, we have @@ -644,7 +675,7 @@ func (sm *stateManager) stateStringLocked(tabletType topodatapb.TabletType, stat return fmt.Sprintf("%v: %v, %v", tabletType, state, sm.ptsTimestamp.Local().Format("Jan 2, 2006 at 15:04:05 (MST)")) } -func (sm *stateManager) handleGracePeriod(tabletType topodatapb.TabletType) { +func (sm *stateManager) handleTransitionGracePeriod(tabletType topodatapb.TabletType) { if tabletType != topodatapb.TabletType_PRIMARY { // We allow serving of previous type only for a primary transition. sm.alsoAllow = nil