Skip to content

Commit

Permalink
ApplySchema: reroute ALTER VITESS_MIGRATION ... THROTTLE ... via …
Browse files Browse the repository at this point in the history
…`UpdateThrottlerConfig` (#16030)

Signed-off-by: Shlomi Noach <[email protected]>
  • Loading branch information
shlomi-noach authored Jun 2, 2024
1 parent ab6c7af commit 79b66d6
Show file tree
Hide file tree
Showing 2 changed files with 141 additions and 6 deletions.
47 changes: 47 additions & 0 deletions go/test/endtoend/tabletmanager/throttler_topo/throttler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"vitess.io/vitess/go/test/endtoend/throttler"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/base"
"vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -358,6 +359,52 @@ func TestInitialThrottler(t *testing.T) {
})
}

func TestThrottleViaApplySchema(t *testing.T) {
defer cluster.PanicHandler(t)
t.Run("throttling via ApplySchema", func(t *testing.T) {
vtctlParams := &cluster.ApplySchemaParams{DDLStrategy: "online"}
_, err := clusterInstance.VtctldClientProcess.ApplySchemaWithOutput(
keyspaceName, "alter vitess_migration throttle all", *vtctlParams,
)
assert.NoError(t, err)
})
t.Run("validate keyspace configuration after throttle", func(t *testing.T) {
keyspace, err := clusterInstance.VtctldClientProcess.GetKeyspace(keyspaceName)
require.NoError(t, err)
require.NotNil(t, keyspace)
require.NotNil(t, keyspace.Keyspace.ThrottlerConfig)
require.NotEmpty(t, keyspace.Keyspace.ThrottlerConfig.ThrottledApps, "throttler config: %+v", keyspace.Keyspace.ThrottlerConfig)
appRule, ok := keyspace.Keyspace.ThrottlerConfig.ThrottledApps[throttlerapp.OnlineDDLName.String()]
require.True(t, ok, "throttled apps: %v", keyspace.Keyspace.ThrottlerConfig.ThrottledApps)
require.NotNil(t, appRule)
assert.Equal(t, throttlerapp.OnlineDDLName.String(), appRule.Name)
assert.EqualValues(t, 1.0, appRule.Ratio)
expireAt := time.Unix(appRule.ExpiresAt.Seconds, int64(appRule.ExpiresAt.Nanoseconds))
assert.True(t, expireAt.After(time.Now()), "expected rule to expire in the future: %v", expireAt)
})
t.Run("unthrottling via ApplySchema", func(t *testing.T) {
vtctlParams := &cluster.ApplySchemaParams{DDLStrategy: "online"}
_, err := clusterInstance.VtctldClientProcess.ApplySchemaWithOutput(
keyspaceName, "alter vitess_migration unthrottle all", *vtctlParams,
)
assert.NoError(t, err)
})
t.Run("validate keyspace configuration after unthrottle", func(t *testing.T) {
keyspace, err := clusterInstance.VtctldClientProcess.GetKeyspace(keyspaceName)
require.NoError(t, err)
require.NotNil(t, keyspace)
require.NotNil(t, keyspace.Keyspace.ThrottlerConfig)
require.NotEmpty(t, keyspace.Keyspace.ThrottlerConfig.ThrottledApps, "throttler config: %+v", keyspace.Keyspace.ThrottlerConfig)
appRule, ok := keyspace.Keyspace.ThrottlerConfig.ThrottledApps[throttlerapp.OnlineDDLName.String()]
require.True(t, ok, "throttled apps: %v", keyspace.Keyspace.ThrottlerConfig.ThrottledApps)
require.NotNil(t, appRule)
assert.Equal(t, throttlerapp.OnlineDDLName.String(), appRule.Name)
assert.EqualValues(t, 1.0, appRule.Ratio)
expireAt := time.Unix(appRule.ExpiresAt.Seconds, int64(appRule.ExpiresAt.Nanoseconds))
assert.True(t, expireAt.Before(time.Now()), "expected rule to have expired, but it has not: %v", expireAt)
})
}

func TestThrottlerAfterMetricsCollected(t *testing.T) {
defer cluster.PanicHandler(t)

Expand Down
100 changes: 94 additions & 6 deletions go/vt/schemamanager/tablet_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,25 +19,30 @@ package schemamanager
import (
"context"
"fmt"
"strconv"
"strings"
"sync"
"time"

"golang.org/x/sync/semaphore"

"vitess.io/vitess/go/protoutil"
"vitess.io/vitess/go/timer"
"vitess.io/vitess/go/vt/logutil"
"vitess.io/vitess/go/vt/schema"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/vtctl/schematools"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vttablet/tabletserver/throttle"
"vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp"
"vitess.io/vitess/go/vt/vttablet/tmclient"

querypb "vitess.io/vitess/go/vt/proto/query"
tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/proto/vtrpc"
vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
)

// TabletExecutor applies schema changes to all tablets.
Expand Down Expand Up @@ -146,7 +151,7 @@ func (exec *TabletExecutor) parseDDLs(sqls []string) error {
for _, sql := range sqls {
stmt, err := exec.parser.Parse(sql)
if err != nil {
return vterrors.Errorf(vtrpc.Code_INVALID_ARGUMENT, "failed to parse sql: %s, got error: %v", sql, err)
return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "failed to parse sql: %s, got error: %v", sql, err)
}
switch stmt.(type) {
case sqlparser.DDLStatement:
Expand All @@ -155,7 +160,7 @@ func (exec *TabletExecutor) parseDDLs(sqls []string) error {
case *sqlparser.AlterMigration:
default:
if len(exec.tablets) != 1 {
return vterrors.Errorf(vtrpc.Code_FAILED_PRECONDITION, "non-ddl statements can only be executed for single shard keyspaces: %s", sql)
return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "non-ddl statements can only be executed for single shard keyspaces: %s", sql)
}
}
}
Expand Down Expand Up @@ -190,6 +195,76 @@ func (exec *TabletExecutor) isOnlineSchemaDDL(stmt sqlparser.Statement) (isOnlin
return false
}

func validateThrottleParams(alterMigrationType sqlparser.AlterMigrationType, expireString string, ratioLiteral *sqlparser.Literal) (duration time.Duration, ratio float64, err error) {
switch alterMigrationType {
case sqlparser.UnthrottleMigrationType,
sqlparser.UnthrottleAllMigrationType:
// Unthrottling is like throttling with duration=0
duration = 0
default:
duration = throttle.DefaultAppThrottleDuration
if expireString != "" {
duration, err = time.ParseDuration(expireString)
if err != nil || duration < 0 {
return duration, ratio, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "invalid EXPIRE value: %s. Try '120s', '30m', '1h', etc. Allowed units are (s)ec, (m)in, (h)hour", expireString)
}
}
}
ratio = 1.0
if ratioLiteral != nil {
ratio, err = strconv.ParseFloat(ratioLiteral.Val, 64)
if err != nil || ratio < 0 || ratio > 1 {
return duration, ratio, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "invalid RATIO value: %s. Try any decimal number between '0.0' (no throttle) and `1.0` (fully throttled)", ratioLiteral.Val)
}
}
return duration, ratio, nil
}

func (exec *TabletExecutor) executeAlterMigrationThrottle(ctx context.Context, alterMigration *sqlparser.AlterMigration) (err error) {
duration, ratio, err := validateThrottleParams(alterMigration.Type, alterMigration.Expire, alterMigration.Ratio)
if err != nil {
return err
}
expireAt := time.Now().Add(duration)
appName := alterMigration.UUID
if appName == "" {
appName = throttlerapp.OnlineDDLName.String()
}
throttledAppRule := &topodatapb.ThrottledAppRule{
Name: appName,
ExpiresAt: protoutil.TimeToProto(expireAt),
Ratio: ratio,
}

req := &vtctldatapb.UpdateThrottlerConfigRequest{
Keyspace: exec.keyspace,
ThrottledApp: throttledAppRule,
}

update := func(throttlerConfig *topodatapb.ThrottlerConfig) *topodatapb.ThrottlerConfig {
if throttlerConfig == nil {
throttlerConfig = &topodatapb.ThrottlerConfig{}
}
if throttlerConfig.ThrottledApps == nil {
throttlerConfig.ThrottledApps = make(map[string]*topodatapb.ThrottledAppRule)
}
throttlerConfig.ThrottledApps[req.ThrottledApp.Name] = req.ThrottledApp
return throttlerConfig
}
// We have already locked the keyspace
ki, err := exec.ts.GetKeyspace(ctx, req.Keyspace)
if err != nil {
return err
}
ki.ThrottlerConfig = update(ki.ThrottlerConfig)
err = exec.ts.UpdateKeyspace(ctx, ki)
if err != nil {
return err
}
_, err = exec.ts.UpdateSrvKeyspaceThrottlerConfig(ctx, req.Keyspace, []string{}, update)
return err
}

// executeSQL executes a single SQL statement either as online DDL or synchronously on all tablets.
// In online DDL case, the query may be exploded into multiple queries during
func (exec *TabletExecutor) executeSQL(ctx context.Context, sql string, providedUUID string, execResult *ExecuteResult) (executedAsynchronously bool, err error) {
Expand Down Expand Up @@ -235,8 +310,21 @@ func (exec *TabletExecutor) executeSQL(ctx context.Context, sql string, provided
exec.logger.Printf("%s\n", onlineDDL.UUID)
return true, nil
case *sqlparser.AlterMigration:
exec.executeOnAllTablets(ctx, execResult, sql, true)
return true, nil
switch stmt.Type {
case sqlparser.ThrottleMigrationType,
sqlparser.ThrottleAllMigrationType,
sqlparser.UnthrottleMigrationType,
sqlparser.UnthrottleAllMigrationType:
err := exec.executeAlterMigrationThrottle(ctx, stmt)
if err != nil {
execResult.ExecutorErr = err.Error()
return false, err
}
return true, nil
default:
exec.executeOnAllTablets(ctx, execResult, sql, true)
return true, nil
}
}
// Got here? The statement needs to be executed directly.
return executeViaFetch()
Expand Down Expand Up @@ -267,7 +355,7 @@ func allSQLsAreCreateQueries(sqls []string, parser *sqlparser.Parser) (bool, err
for _, sql := range sqls {
stmt, err := parser.Parse(sql)
if err != nil {
return false, vterrors.Errorf(vtrpc.Code_INVALID_ARGUMENT, "failed to parse sql: %s, got error: %v", sql, err)
return false, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "failed to parse sql: %s, got error: %v", sql, err)
}
switch stmt.(type) {
case *sqlparser.CreateTable, *sqlparser.CreateView:
Expand Down

0 comments on commit 79b66d6

Please sign in to comment.