Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rework unserveCommon so that we don't wait for requests always #14761

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go/vt/vttablet/tabletserver/query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1111,7 +1111,7 @@ func (qre *QueryExecutor) execStreamSQL(conn *connpool.PooledConn, isTransaction

// Add query detail object into QueryExecutor TableServer list w.r.t if it is a transactional or not. Previously we were adding it
// to olapql list regardless but that resulted in problems, where long-running stream queries which can be stateful (or transactional)
// weren't getting cleaned up during unserveCommon>handleShutdownGracePeriod in state_manager.go.
// weren't getting cleaned up during unserveCommon>terminateAllQueries in state_manager.go.
// This change will ensure that long-running streaming stateful queries get gracefully shutdown during ServingTypeChange
// once their grace period is over.
qd := NewQueryDetail(qre.logStats.Ctx, conn.Conn)
Expand Down
63 changes: 47 additions & 16 deletions go/vt/vttablet/tabletserver/state_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ var ErrNoTarget = vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "No target")
// stateManager manages state transition for all the TabletServer
// subcomponents.
type stateManager struct {
ctx context.Context
cancel context.CancelFunc
// transitioning is a semaphore that must to be obtained
// before attempting a state transition. To prevent deadlocks,
// this must be acquired before the mu lock. We use a semaphore
Expand Down Expand Up @@ -100,7 +102,7 @@ type stateManager struct {
reason string
transitionErr error

requests sync.WaitGroup
requestsCounter atomic.Int64

// QueryList does not have an Open or Close.
statelessql *QueryList
Expand Down Expand Up @@ -401,13 +403,13 @@ func (sm *stateManager) StartRequest(ctx context.Context, target *querypb.Target
if err != nil {
return err
}
sm.requests.Add(1)
sm.requestsCounter.Add(1)
return nil
}

// EndRequest unregisters the current request (a waitgroup) as done.
func (sm *stateManager) EndRequest() {
sm.requests.Done()
sm.requestsCounter.Add(-1)
}

// VerifyTarget allows requests to be executed even in non-serving state.
Expand Down Expand Up @@ -485,11 +487,6 @@ func (sm *stateManager) unservePrimary() error {
}

func (sm *stateManager) serveNonPrimary(wantTabletType topodatapb.TabletType) error {
// We are likely transitioning from primary. We have to honor
// the shutdown grace period.
cancel := sm.handleShutdownGracePeriod()
defer cancel()

sm.ddle.Close()
sm.tableGC.Close()
sm.messager.Close()
Expand Down Expand Up @@ -540,9 +537,11 @@ func (sm *stateManager) connect(tabletType topodatapb.TabletType) error {
}

func (sm *stateManager) unserveCommon() {
wg := sync.WaitGroup{}
wg.Add(1)
log.Infof("Started execution of unserveCommon")
cancel := sm.handleShutdownGracePeriod()
log.Infof("Finished execution of handleShutdownGracePeriod")
cancel := sm.terminateAllQueries(&wg)
log.Infof("Finished execution of terminateAllQueries")
defer cancel()

log.Infof("Started online ddl executor close")
Expand All @@ -560,22 +559,54 @@ func (sm *stateManager) unserveCommon() {
log.Info("Finished Killing all OLAP queries. Started tracker close")
sm.tracker.Close()
log.Infof("Finished tracker close. Started wait for requests")
sm.requests.Wait()
log.Infof("Finished wait for requests. Finished execution of unserveCommon")
sm.handleShutdownGracePeriod(&wg)
log.Infof("Finished handling grace period. Finished execution of unserveCommon")
}

func (sm *stateManager) handleShutdownGracePeriod() (cancel func()) {
func (sm *stateManager) handleShutdownGracePeriod(wg *sync.WaitGroup) {
// If there is no shutdown grace period specified, then we should wait for all the requests to be empty.
if sm.shutdownGracePeriod == 0 {
sm.waitForRequestsToBeEmpty()
} else {
// We quickly check if the requests are empty or not.
// If they are, then we don't need to wait for the shutdown to complete.
count := sm.requestsCounter.Load()
if count == 0 {
return
}
// Otherwise, we should wait for all olap queries to be killed.
// We don't need to wait for requests to be empty since we have ensured all the queries against MySQL have been killed.
wg.Wait()
}
}

func (sm *stateManager) waitForRequestsToBeEmpty() {
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
for {
count := sm.requestsCounter.Load()
if count == 0 {
return
}
<-ticker.C
}
}

func (sm *stateManager) terminateAllQueries(wg *sync.WaitGroup) (cancel func()) {
if sm.shutdownGracePeriod == 0 {
return func() {}
}
ctx, cancel := context.WithCancel(context.TODO())
go func() {
if wg != nil {
defer wg.Done()
}
if err := timer.SleepContext(ctx, sm.shutdownGracePeriod); err != nil {
return
}
log.Infof("Grace Period %v exceeded. Killing all OLTP queries.", sm.shutdownGracePeriod)
sm.statelessql.TerminateAll()
log.Infof("Killed all stateful OLTP queries.")
log.Infof("Killed all stateless OLTP queries.")
sm.statefulql.TerminateAll()
log.Infof("Killed all OLTP queries.")
}()
Expand Down Expand Up @@ -625,7 +656,7 @@ func (sm *stateManager) setState(tabletType topodatapb.TabletType, state serving
log.Infof("TabletServer transition: %v -> %v for tablet %s:%s/%s",
sm.stateStringLocked(sm.target.TabletType, sm.state), sm.stateStringLocked(tabletType, state),
sm.target.Cell, sm.target.Keyspace, sm.target.Shard)
sm.handleGracePeriod(tabletType)
sm.handleTransitionGracePeriod(tabletType)
sm.target.TabletType = tabletType
if sm.state == StateNotConnected {
// If we're transitioning out of StateNotConnected, we have
Expand All @@ -644,7 +675,7 @@ func (sm *stateManager) stateStringLocked(tabletType topodatapb.TabletType, stat
return fmt.Sprintf("%v: %v, %v", tabletType, state, sm.ptsTimestamp.Local().Format("Jan 2, 2006 at 15:04:05 (MST)"))
}

func (sm *stateManager) handleGracePeriod(tabletType topodatapb.TabletType) {
func (sm *stateManager) handleTransitionGracePeriod(tabletType topodatapb.TabletType) {
if tabletType != topodatapb.TabletType_PRIMARY {
// We allow serving of previous type only for a primary transition.
sm.alsoAllow = nil
Expand Down
Loading