Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TableGC: speed up GC process via RequestChecks(). Utilized by Online DDL for artifact cleanup #14431

Merged
merged 8 commits into from
Dec 4, 2023
182 changes: 113 additions & 69 deletions go/test/endtoend/tabletmanager/tablegc/tablegc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/vt/schema"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vttablet/tabletserver/gc"

"vitess.io/vitess/go/test/endtoend/cluster"
"vitess.io/vitess/go/test/endtoend/onlineddl"
Expand Down Expand Up @@ -128,13 +129,18 @@ func TestMain(m *testing.M) {
os.Exit(exitCode)
}

func checkTableRows(t *testing.T, tableName string, expect int64) {
func getTableRows(t *testing.T, tableName string) int64 {
require.NotEmpty(t, tableName)
query := `select count(*) as c from %a`
parsed := sqlparser.BuildParsedQuery(query, tableName)
rs, err := primaryTablet.VttabletProcess.QueryTablet(parsed.Query, keyspaceName, true)
require.NoError(t, err)
count := rs.Named().Row().AsInt64("c", 0)
return count
}

func checkTableRows(t *testing.T, tableName string, expect int64) {
count := getTableRows(t, tableName)
assert.Equal(t, expect, count)
}

Expand Down Expand Up @@ -176,19 +182,18 @@ func validateTableDoesNotExist(t *testing.T, tableExpr string) {
defer cancel()

ticker := time.NewTicker(time.Second)
var foundTableName string
var exists bool
var err error
defer ticker.Stop()

for {
exists, foundTableName, err := tableExists(tableExpr)
require.NoError(t, err)
if !exists {
return
}
select {
case <-ticker.C:
exists, foundTableName, err = tableExists(tableExpr)
require.NoError(t, err)
if !exists {
return
}
case <-ctx.Done():
assert.NoError(t, ctx.Err(), "validateTableDoesNotExist timed out, table %v still exists (%v)", tableExpr, foundTableName)
assert.Failf(t, "validateTableDoesNotExist timed out, table %v still exists (%v)", tableExpr, foundTableName)
return
}
}
Expand All @@ -199,59 +204,78 @@ func validateTableExists(t *testing.T, tableExpr string) {
defer cancel()

ticker := time.NewTicker(time.Second)
var exists bool
var err error
defer ticker.Stop()

for {
exists, _, err := tableExists(tableExpr)
require.NoError(t, err)
if exists {
return
}
select {
case <-ticker.C:
exists, _, err = tableExists(tableExpr)
require.NoError(t, err)
if exists {
return
}
case <-ctx.Done():
assert.NoError(t, ctx.Err(), "validateTableExists timed out, table %v still does not exist", tableExpr)
assert.Failf(t, "validateTableExists timed out, table %v still does not exist", tableExpr)
return
}
}
}

func validateAnyState(t *testing.T, expectNumRows int64, states ...schema.TableGCState) {
for _, state := range states {
expectTableToExist := true
searchExpr := ""
switch state {
case schema.HoldTableGCState:
searchExpr = `\_vt\_HOLD\_%`
case schema.PurgeTableGCState:
searchExpr = `\_vt\_PURGE\_%`
case schema.EvacTableGCState:
searchExpr = `\_vt\_EVAC\_%`
case schema.DropTableGCState:
searchExpr = `\_vt\_DROP\_%`
case schema.TableDroppedGCState:
searchExpr = `\_vt\_%`
expectTableToExist = false
default:
t.Log("Unknown state")
t.Fail()
}
exists, tableName, err := tableExists(searchExpr)
require.NoError(t, err)

if exists {
if expectNumRows >= 0 {
checkTableRows(t, tableName, expectNumRows)
t.Run(fmt.Sprintf("validateAnyState: expectNumRows=%v, states=%v", expectNumRows, states), func(t *testing.T) {
timeout := gc.NextChecksIntervals[len(gc.NextChecksIntervals)-1] + 5*time.Second
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

ticker := time.NewTicker(time.Second)
defer ticker.Stop()

for {
// Attempt validation:
for _, state := range states {
expectTableToExist := true
searchExpr := ""
switch state {
case schema.HoldTableGCState:
searchExpr = `\_vt\_HOLD\_%`
case schema.PurgeTableGCState:
searchExpr = `\_vt\_PURGE\_%`
case schema.EvacTableGCState:
searchExpr = `\_vt\_EVAC\_%`
case schema.DropTableGCState:
searchExpr = `\_vt\_DROP\_%`
case schema.TableDroppedGCState:
searchExpr = `\_vt\_%`
expectTableToExist = false
default:
require.Failf(t, "unknown state", "%v", state)
}
exists, tableName, err := tableExists(searchExpr)
require.NoError(t, err)

var foundRows int64
if exists {
foundRows = getTableRows(t, tableName)
// Now that the table is validated, we can drop it (test cleanup)
dropTable(t, tableName)
}
t.Logf("=== exists: %v, tableName: %v, rows: %v", exists, tableName, foundRows)
if exists == expectTableToExist {
// expectNumRows < 0 means "don't care"
if expectNumRows < 0 || (expectNumRows == foundRows) {
// All conditions are met
return
}
}
}
select {
case <-ticker.C:
case <-ctx.Done():
assert.Failf(t, "timeout in validateAnyState", " waiting for any of these states: %v, expecting rows: %v", states, expectNumRows)
return
}
// Now that the table is validated, we can drop it
dropTable(t, tableName)
}
if exists == expectTableToExist {
// condition met
return
}
}
assert.Failf(t, "could not match any of the states", "states=%v", states)
})
}

// dropTable drops a table
Expand Down Expand Up @@ -309,17 +333,22 @@ func TestHold(t *testing.T) {
}

func TestEvac(t *testing.T) {
populateTable(t)
query, tableName, err := schema.GenerateRenameStatement("t1", schema.EvacTableGCState, time.Now().UTC().Add(tableTransitionExpiration))
assert.NoError(t, err)

_, err = primaryTablet.VttabletProcess.QueryTablet(query, keyspaceName, true)
assert.NoError(t, err)

validateTableDoesNotExist(t, "t1")

time.Sleep(tableTransitionExpiration / 2)
{
var tableName string
t.Run("setting up EVAC table", func(t *testing.T) {
populateTable(t)
var query string
var err error
query, tableName, err = schema.GenerateRenameStatement("t1", schema.EvacTableGCState, time.Now().UTC().Add(tableTransitionExpiration))
assert.NoError(t, err)

_, err = primaryTablet.VttabletProcess.QueryTablet(query, keyspaceName, true)
assert.NoError(t, err)

validateTableDoesNotExist(t, "t1")
})

t.Run("validating before expiration", func(t *testing.T) {
time.Sleep(tableTransitionExpiration / 2)
// Table was created with +10s timestamp, so it should still exist
if fastDropTable {
// EVAC state is skipped in mysql 8.0.23 and beyond
Expand All @@ -328,13 +357,28 @@ func TestEvac(t *testing.T) {
validateTableExists(t, tableName)
checkTableRows(t, tableName, 1024)
}
}

time.Sleep(tableTransitionExpiration)
// We're now both beyond table's timestamp as well as a tableGC interval
validateTableDoesNotExist(t, tableName)
// Table should be renamed as _vt_DROP_... and then dropped!
validateAnyState(t, 0, schema.DropTableGCState, schema.TableDroppedGCState)
})

t.Run("validating rows evacuated", func(t *testing.T) {
// ctx, cancel := context.WithTimeout(ctx, tableTransitionExpiration+gc.CheckTablesReentryMinInterval)
// defer cancel()

// ticker := time.NewTicker(time.Second)
// defer ticker.Stop()

// for {
// select {

// case <-ctx.Done():
// case <-ticker.C:
// }
// }
// time.Sleep(tableTransitionExpiration + gc.CheckTablesReentryMinInterval)
// We're now both beyond table's timestamp as well as a tableGC interval
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to keep this commented code around?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed.

validateTableDoesNotExist(t, tableName)
// Table should be renamed as _vt_DROP_... and then dropped!
validateAnyState(t, 0, schema.DropTableGCState, schema.TableDroppedGCState)
})
}

func TestDrop(t *testing.T) {
Expand Down
20 changes: 14 additions & 6 deletions go/timer/suspendable_ticker.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ type SuspendableTicker struct {
// C is user facing
C chan time.Time

suspended int64
suspended atomic.Bool
}

// NewSuspendableTicker creates a new suspendable ticker, indicating whether the ticker should start
Expand All @@ -39,7 +39,7 @@ func NewSuspendableTicker(d time.Duration, initiallySuspended bool) *Suspendable
C: make(chan time.Time),
}
if initiallySuspended {
s.suspended = 1
s.suspended.Store(true)
}
go s.loop()
return s
Expand All @@ -48,12 +48,12 @@ func NewSuspendableTicker(d time.Duration, initiallySuspended bool) *Suspendable
// Suspend stops sending time events on the channel C
// time events sent during suspended time are lost
func (s *SuspendableTicker) Suspend() {
atomic.StoreInt64(&s.suspended, 1)
s.suspended.Store(true)
}

// Resume re-enables time events on channel C
func (s *SuspendableTicker) Resume() {
atomic.StoreInt64(&s.suspended, 0)
s.suspended.Store(false)
}

// Stop completely stops the timer, like time.Timer
Expand All @@ -64,15 +64,23 @@ func (s *SuspendableTicker) Stop() {
// TickNow generates a tick at this point in time. It may block
// if nothing consumes the tick.
func (s *SuspendableTicker) TickNow() {
if atomic.LoadInt64(&s.suspended) == 0 {
if !s.suspended.Load() {
// not suspended
s.C <- time.Now()
}
}

// TickAfter generates a tick after given duration has passed.
// It runs asynchronously and returns immediately.
func (s *SuspendableTicker) TickAfter(d time.Duration) {
time.AfterFunc(d, func() {
s.TickNow()
})
}

func (s *SuspendableTicker) loop() {
for t := range s.ticker.C {
if atomic.LoadInt64(&s.suspended) == 0 {
if !s.suspended.Load() {
// not suspended
s.C <- t
}
Expand Down
Loading
Loading