Skip to content

Commit

Permalink
VReplication: make flags workflow-specific and dynamically changeable (
Browse files Browse the repository at this point in the history
…#16583)

Signed-off-by: Rohit Nayak <[email protected]>
Signed-off-by: Matt Lord <[email protected]>
Co-authored-by: Matt Lord <[email protected]>
  • Loading branch information
rohit-nayak-ps and mattlord authored Sep 18, 2024
1 parent d054447 commit 56c39b2
Show file tree
Hide file tree
Showing 88 changed files with 6,438 additions and 3,822 deletions.
7 changes: 7 additions & 0 deletions changelog/21.0/21.0.0/summary.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
- **[Support for recursive CTEs](#recursive-cte)**
- **[VTGate Tablet Balancer](#tablet-balancer)**
- **[Query Timeout Override](#query-timeout)**
- **[Dynamic VReplication Configuration](#dynamic-vreplication-configuration)**

## <a id="major-changes"/>Major Changes

Expand Down Expand Up @@ -130,3 +131,9 @@ A query can also be set to have no timeout by using the `QUERY_TIMEOUT_MS` comme

Example usage:
`select /*vt+ QUERY_TIMEOUT_MS=30 */ col from tbl`

### <a id="dynamic-vreplication-configuration"/>Dynamic VReplication Configuration
Currently many of the configuration options for VReplication Workflows are vttablet flags. This means that any change
requires restarts of vttablets. We now allow these to be overridden while creating a workflow or dynamically once
the workflow is in progress. See https://github.com/vitessio/vitess/pull/16583 for details.

24 changes: 24 additions & 0 deletions go/cmd/vtctldclient/command/vreplication/common/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata"
vttablet "vitess.io/vitess/go/vt/vttablet/common"
)

var (
Expand Down Expand Up @@ -67,6 +68,7 @@ var (
MySQLServerVersion string
TruncateUILen int
TruncateErrLen int
ConfigOverrides []string
}{}
)

Expand Down Expand Up @@ -147,6 +149,27 @@ func validateOnDDL(cmd *cobra.Command) error {
return nil
}

// ParseConfigOverrides converts a slice of key=value strings into a map of config overrides. The slice is passed
// as a flag to the command, and the key=value pairs are used to override the default vreplication config values.
func ParseConfigOverrides(overrides []string) (map[string]string, error) {
configOverrides := make(map[string]string, len(overrides))
defaultConfig, err := vttablet.NewVReplicationConfig(nil)
if err != nil {
return nil, err
}
for _, kv := range overrides {
key, value, ok := strings.Cut(kv, "=")
if !ok {
return nil, fmt.Errorf("invalid config override format (var=value expected): %s", kv)
}
if _, ok := defaultConfig.Map()[key]; !ok {
return nil, fmt.Errorf("unknown vreplication config flag: %s", key)
}
configOverrides[key] = value
}
return configOverrides, nil
}

// ValidateShards checks if the provided shard names are valid key ranges.
func ValidateShards(shards []string) error {
for _, shard := range shards {
Expand Down Expand Up @@ -232,6 +255,7 @@ func AddCommonCreateFlags(cmd *cobra.Command) {
cmd.Flags().BoolVar(&CreateOptions.DeferSecondaryKeys, "defer-secondary-keys", false, "Defer secondary index creation for a table until after it has been copied.")
cmd.Flags().BoolVar(&CreateOptions.AutoStart, "auto-start", true, "Start the workflow after creating it.")
cmd.Flags().BoolVar(&CreateOptions.StopAfterCopy, "stop-after-copy", false, "Stop the workflow after it's finished copying the existing rows and before it starts replicating changes.")
cmd.Flags().StringSliceVar(&CreateOptions.ConfigOverrides, "config-overrides", []string{}, "Specify one or more VReplication config flags to override as a comma-separated list of key=value pairs.")
}

var MirrorTrafficOptions = struct {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,14 @@ func commandCreate(cmd *cobra.Command, args []string) error {
tsp := common.GetTabletSelectionPreference(cmd)
cli.FinishedParsing(cmd)

configOverrides, err := common.ParseConfigOverrides(common.CreateOptions.ConfigOverrides)
if err != nil {
return err
}
workflowOptions := &vtctldatapb.WorkflowOptions{
Config: configOverrides,
}

ms := &vtctldatapb.MaterializeSettings{
Workflow: common.BaseOptions.Workflow,
MaterializationIntent: vtctldatapb.MaterializationIntent_CUSTOM,
Expand All @@ -101,6 +109,7 @@ func commandCreate(cmd *cobra.Command, args []string) error {
Cell: strings.Join(common.CreateOptions.Cells, ","),
TabletTypes: topoproto.MakeStringTypeCSV(common.CreateOptions.TabletTypes),
TabletSelectionPreference: tsp,
WorkflowOptions: workflowOptions,
}

createOptions.TableSettings.parser, err = sqlparser.New(sqlparser.Options{
Expand Down
6 changes: 6 additions & 0 deletions go/cmd/vtctldclient/command/vreplication/movetables/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,12 @@ func commandCreate(cmd *cobra.Command, args []string) error {
tsp := common.GetTabletSelectionPreference(cmd)
cli.FinishedParsing(cmd)

configOverrides, err := common.ParseConfigOverrides(common.CreateOptions.ConfigOverrides)
if err != nil {
return err
}
createOptions.WorkflowOptions.Config = configOverrides

req := &vtctldatapb.MoveTablesCreateRequest{
Workflow: common.BaseOptions.Workflow,
TargetKeyspace: common.BaseOptions.TargetKeyspace,
Expand Down
9 changes: 9 additions & 0 deletions go/cmd/vtctldclient/command/vreplication/reshard/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,14 @@ func commandReshardCreate(cmd *cobra.Command, args []string) error {
tsp := common.GetTabletSelectionPreference(cmd)
cli.FinishedParsing(cmd)

configOverrides, err := common.ParseConfigOverrides(common.CreateOptions.ConfigOverrides)
if err != nil {
return err
}
workflowOptions := &vtctldatapb.WorkflowOptions{
Config: configOverrides,
}

req := &vtctldatapb.ReshardCreateRequest{
Workflow: common.BaseOptions.Workflow,
Keyspace: common.BaseOptions.TargetKeyspace,
Expand All @@ -72,6 +80,7 @@ func commandReshardCreate(cmd *cobra.Command, args []string) error {
SourceShards: reshardCreateOptions.sourceShards,
TargetShards: reshardCreateOptions.targetShards,
SkipSchemaCopy: reshardCreateOptions.skipSchemaCopy,
WorkflowOptions: workflowOptions,
}
resp, err := common.GetClient().ReshardCreate(common.GetCommandCtx(), req)
if err != nil {
Expand Down
10 changes: 10 additions & 0 deletions go/cmd/vtctldclient/command/vreplication/workflow/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ var (
TabletTypes []topodatapb.TabletType
TabletTypesInPreferenceOrder bool
OnDDL string
ConfigOverrides []string
}{}

// update makes a WorkflowUpdate gRPC call to a vtctld.
Expand Down Expand Up @@ -74,6 +75,9 @@ var (
return fmt.Errorf("invalid on-ddl value: %s", updateOptions.OnDDL)
}
}
if len(updateOptions.ConfigOverrides) > 0 {
changes = true
}
if !changes {
return fmt.Errorf("no configuration options specified to update")
}
Expand All @@ -95,13 +99,19 @@ func commandUpdate(cmd *cobra.Command, args []string) error {
}
}

configOverrides, err := common.ParseConfigOverrides(updateOptions.ConfigOverrides)
if err != nil {
return err
}

req := &vtctldatapb.WorkflowUpdateRequest{
Keyspace: baseOptions.Keyspace,
TabletRequest: &tabletmanagerdatapb.UpdateVReplicationWorkflowRequest{
Workflow: baseOptions.Workflow,
Cells: updateOptions.Cells,
TabletTypes: updateOptions.TabletTypes,
TabletSelectionPreference: &tsp,
ConfigOverrides: configOverrides,
},
}

Expand Down
2 changes: 2 additions & 0 deletions go/cmd/vtctldclient/command/vreplication/workflow/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ func registerCommands(root *cobra.Command) {
update.Flags().VarP((*topoproto.TabletTypeListFlag)(&updateOptions.TabletTypes), "tablet-types", "t", "New source tablet types to replicate from (e.g. PRIMARY,REPLICA,RDONLY).")
update.Flags().BoolVar(&updateOptions.TabletTypesInPreferenceOrder, "tablet-types-in-order", true, "When performing source tablet selection, look for candidates in the type order as they are listed in the tablet-types flag.")
update.Flags().StringVar(&updateOptions.OnDDL, "on-ddl", "", "New instruction on what to do when DDL is encountered in the VReplication stream. Possible values are IGNORE, STOP, EXEC, and EXEC_IGNORE.")
update.Flags().StringSliceVar(&updateOptions.ConfigOverrides, "config-overrides", nil, "Specify one or more VReplication config flags to override as a comma-separated list of key=value pairs.")

common.AddShardSubsetFlag(update, &baseOptions.Shards)
base.AddCommand(update)
}
Expand Down
4 changes: 2 additions & 2 deletions go/flags/endtoend/vtcombo.txt
Original file line number Diff line number Diff line change
Expand Up @@ -302,8 +302,8 @@ Flags:
--queryserver-enable-views Enable views support in vttablet.
--queryserver_enable_online_ddl Enable online DDL. (default true)
--redact-debug-ui-queries redact full queries and bind variables from debug UI
--relay_log_max_items int Maximum number of rows for VReplication target buffering. (default 5000)
--relay_log_max_size int Maximum buffer size (in bytes) for VReplication target buffering. If single rows are larger than this, a single row is buffered at a time. (default 250000)
--relay_log_max_items int Maximum number of rows for vreplication target buffering. (default 5000)
--relay_log_max_size int Maximum buffer size (in bytes) for vreplication target buffering. If single rows are larger than this, a single row is buffered at a time. (default 250000)
--remote_operation_timeout duration time to wait for a remote operation (default 15s)
--replication_connect_retry duration how long to wait in between replica reconnect attempts. Only precise to the second. (default 10s)
--restore-to-pos string (init incremental restore parameter) if set, run a point in time recovery that ends with the given position. This will attempt to use one full backup followed by zero or more incremental backups
Expand Down
4 changes: 2 additions & 2 deletions go/flags/endtoend/vttablet.txt
Original file line number Diff line number Diff line change
Expand Up @@ -292,8 +292,8 @@ Flags:
--queryserver-enable-views Enable views support in vttablet.
--queryserver_enable_online_ddl Enable online DDL. (default true)
--redact-debug-ui-queries redact full queries and bind variables from debug UI
--relay_log_max_items int Maximum number of rows for VReplication target buffering. (default 5000)
--relay_log_max_size int Maximum buffer size (in bytes) for VReplication target buffering. If single rows are larger than this, a single row is buffered at a time. (default 250000)
--relay_log_max_items int Maximum number of rows for vreplication target buffering. (default 5000)
--relay_log_max_size int Maximum buffer size (in bytes) for vreplication target buffering. If single rows are larger than this, a single row is buffered at a time. (default 250000)
--remote_operation_timeout duration time to wait for a remote operation (default 15s)
--replication_connect_retry duration how long to wait in between replica reconnect attempts. Only precise to the second. (default 10s)
--restore-to-pos string (init incremental restore parameter) if set, run a point in time recovery that ends with the given position. This will attempt to use one full backup followed by zero or more incremental backups
Expand Down
2 changes: 1 addition & 1 deletion go/test/endtoend/onlineddl/flow/onlineddl_flow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ import (
"vitess.io/vitess/go/test/endtoend/throttler"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/schema"
"vitess.io/vitess/go/vt/vttablet"
vttablet "vitess.io/vitess/go/vt/vttablet/common"
throttlebase "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/base"
"vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp"
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import (
"vitess.io/vitess/go/test/endtoend/throttler"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/schema"
"vitess.io/vitess/go/vt/vttablet"
vttablet "vitess.io/vitess/go/vt/vttablet/common"
)

type WriteMetrics struct {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ import (
"vitess.io/vitess/go/timer"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/schema"
"vitess.io/vitess/go/vt/vttablet"
vttablet "vitess.io/vitess/go/vt/vttablet/common"
)

type testcase struct {
Expand Down
2 changes: 1 addition & 1 deletion go/test/endtoend/vreplication/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import (
"vitess.io/vitess/go/vt/mysqlctl"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vtgate/planbuilder/plancontext"
"vitess.io/vitess/go/vt/vttablet"
vttablet "vitess.io/vitess/go/vt/vttablet/common"

vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata"

Expand Down
2 changes: 1 addition & 1 deletion go/test/endtoend/vreplication/fk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/vttablet"
vttablet "vitess.io/vitess/go/vt/vttablet/common"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
)
Expand Down
46 changes: 46 additions & 0 deletions go/test/endtoend/vreplication/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ import (
"testing"
"time"

"golang.org/x/exp/maps"

"github.com/buger/jsonparser"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -1022,3 +1024,47 @@ func confirmKeyspacesRoutedTo(t *testing.T, keyspace string, routedKeyspace, tab
require.Equalf(t, routedKeyspace, plan.Keyspace.Name, "for database %s, keyspace %v, tabletType %s", database, keyspace, tt)
}
}

// getVReplicationConfig returns the vreplication config for one random workflow for a given tablet. Currently, this is
// used when there is only one workflow, so we are using this simple method to get the config.
func getVReplicationConfig(t *testing.T, tab *cluster.VttabletProcess) map[string]string {
configJson, err := getDebugVar(t, tab.Port, []string{"VReplicationConfig"})
require.NoError(t, err)

var config map[string]string
err = json2.Unmarshal([]byte(configJson), &config)
require.NoError(t, err)
require.Equal(t, 1, len(config))

configJson = config[maps.Keys(config)[0]]
config = nil
err = json2.Unmarshal([]byte(configJson), &config)
require.NoError(t, err)

return config
}

// mapToCSV converts a golang map to a CSV string for use in defining the config overrides in vrep CLI commands.
func mapToCSV(m map[string]string) string {
csv := ""
if len(m) == 0 {
return csv
}
for k, v := range m {
csv += fmt.Sprintf("%s=%s,", k, v)
}
if len(csv) == 0 {
return csv
}
return csv[:len(csv)-1]
}

// validateOverrides validates that the given vttablets have the expected config overrides.
func validateOverrides(t *testing.T, tabs map[string]*cluster.VttabletProcess, want map[string]string) {
for _, tab := range tabs {
config := getVReplicationConfig(t, tab)
for k, v := range want {
require.EqualValues(t, v, config[k])
}
}
}
2 changes: 1 addition & 1 deletion go/test/endtoend/vreplication/vdiff2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import (
"vitess.io/vitess/go/test/endtoend/cluster"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vttablet"
vttablet "vitess.io/vitess/go/vt/vttablet/common"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata"
Expand Down
Loading

0 comments on commit 56c39b2

Please sign in to comment.