diff --git a/go/vt/vtctl/workflow/materializer.go b/go/vt/vtctl/workflow/materializer.go index 43b2f96a064..55ccca129c6 100644 --- a/go/vt/vtctl/workflow/materializer.go +++ b/go/vt/vtctl/workflow/materializer.go @@ -25,6 +25,7 @@ import ( "time" "vitess.io/vitess/go/ptr" + "vitess.io/vitess/go/sqlescape" "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/textutil" "vitess.io/vitess/go/vt/concurrency" @@ -372,9 +373,9 @@ func (mz *materializer) deploySchema() error { } if removeAutoInc { - var replaceFunc func(columnName string) + var replaceFunc func(columnName string) error if mz.ms.GetWorkflowOptions().ShardedAutoIncrementHandling == vtctldatapb.ShardedAutoIncrementHandling_REPLACE { - replaceFunc = func(columnName string) { + replaceFunc = func(columnName string) error { mu.Lock() defer mu.Unlock() // At this point we've already confirmed that the table exists in the target @@ -382,7 +383,21 @@ func (mz *materializer) deploySchema() error { table := targetVSchema.Tables[ts.TargetTable] // Don't override or redo anything that already exists. if table != nil && table.AutoIncrement == nil { - seqTableName := fmt.Sprintf(autoSequenceTableFormat, ts.TargetTable) + tableName, err := sqlescape.UnescapeID(ts.TargetTable) + if err != nil { + return err + } + seqTableName, err := sqlescape.EnsureEscaped(fmt.Sprintf(autoSequenceTableFormat, tableName)) + if err != nil { + return err + } + if mz.ms.GetWorkflowOptions().GlobalKeyspace != "" { + seqKeyspace, err := sqlescape.EnsureEscaped(mz.ms.WorkflowOptions.GlobalKeyspace) + if err != nil { + return err + } + seqTableName = fmt.Sprintf("%s.%s", seqKeyspace, seqTableName) + } // Create a Vitess AutoIncrement definition -- which uses a sequence -- to // replace the MySQL auto_increment definition that we removed. table.AutoIncrement = &vschemapb.AutoIncrement{ @@ -391,6 +406,7 @@ func (mz *materializer) deploySchema() error { } updatedVSchema = true } + return nil } } ddl, err = stripAutoIncrement(ddl, mz.env.Parser(), replaceFunc) diff --git a/go/vt/vtctl/workflow/materializer_env_test.go b/go/vt/vtctl/workflow/materializer_env_test.go index 500558bdd32..6ac0b6c0057 100644 --- a/go/vt/vtctl/workflow/materializer_env_test.go +++ b/go/vt/vtctl/workflow/materializer_env_test.go @@ -120,7 +120,7 @@ func newTestMaterializerEnv(t *testing.T, ctx context.Context, ms *vtctldatapb.M tableName := ts.TargetTable table, err := venv.Parser().TableFromStatement(ts.SourceExpression) if err == nil { - tableName = table.Name.String() + tableName = sqlparser.String(table.Name) } var ( cols []string diff --git a/go/vt/vtctl/workflow/materializer_test.go b/go/vt/vtctl/workflow/materializer_test.go index 20aa8b6df5c..2b04a49e810 100644 --- a/go/vt/vtctl/workflow/materializer_test.go +++ b/go/vt/vtctl/workflow/materializer_test.go @@ -608,11 +608,11 @@ func TestMoveTablesDDLFlag(t *testing.T) { // 2. REMOVE the tables' MySQL auto_increment clauses // 3. REPLACE the table's MySQL auto_increment clauses with Vitess sequences func TestShardedAutoIncHandling(t *testing.T) { - tableName := "t1" + tableName := "`t-1`" tableDDL := fmt.Sprintf("create table %s (id int not null auto_increment primary key, c1 varchar(10))", tableName) ms := &vtctldatapb.MaterializeSettings{ Workflow: "workflow", - SourceKeyspace: "sourceks", + SourceKeyspace: "source-ks", TargetKeyspace: "targetks", TableSettings: []*vtctldatapb.TableMaterializeSettings{{ TargetTable: tableName, @@ -847,7 +847,7 @@ func TestShardedAutoIncHandling(t *testing.T) { }, AutoIncrement: &vschemapb.AutoIncrement{ // AutoIncrement definition added Column: "id", - Sequence: fmt.Sprintf(autoSequenceTableFormat, tableName), + Sequence: fmt.Sprintf("`%s`.`%s`", ms.SourceKeyspace, fmt.Sprintf(autoSequenceTableFormat, strings.ReplaceAll(tableName, "`", ""))), }, }, }, @@ -909,7 +909,7 @@ func TestShardedAutoIncHandling(t *testing.T) { if tc.wantTargetVSchema != nil { targetVSchema, err := env.ws.ts.GetVSchema(ctx, ms.TargetKeyspace) require.NoError(t, err) - require.True(t, proto.Equal(targetVSchema, tc.wantTargetVSchema)) + require.True(t, proto.Equal(targetVSchema, tc.wantTargetVSchema), "got: %v, want: %v", targetVSchema, tc.wantTargetVSchema) } } }) diff --git a/go/vt/vtctl/workflow/utils.go b/go/vt/vtctl/workflow/utils.go index 5021e3938c8..665ec500b0c 100644 --- a/go/vt/vtctl/workflow/utils.go +++ b/go/vt/vtctl/workflow/utils.go @@ -222,24 +222,30 @@ func stripTableForeignKeys(ddl string, parser *sqlparser.Parser) (string, error) // table definition. If an optional replace function is specified then that // callback will be used to e.g. replace the MySQL clause with a Vitess // VSchema AutoIncrement definition. -func stripAutoIncrement(ddl string, parser *sqlparser.Parser, replace func(columnName string)) (string, error) { +func stripAutoIncrement(ddl string, parser *sqlparser.Parser, replace func(columnName string) error) (string, error) { newDDL, err := parser.ParseStrictDDL(ddl) if err != nil { return "", err } - _ = sqlparser.Walk(func(node sqlparser.SQLNode) (kontinue bool, err error) { + err = sqlparser.Walk(func(node sqlparser.SQLNode) (kontinue bool, err error) { switch node := node.(type) { case *sqlparser.ColumnDefinition: if node.Type.Options.Autoincrement { node.Type.Options.Autoincrement = false if replace != nil { - replace(sqlparser.String(node.Name)) + if err := replace(sqlparser.String(node.Name)); err != nil { + return false, vterrors.Wrapf(err, "failed to replace auto_increment column %q in %q", sqlparser.String(node.Name), ddl) + } + } } } return true, nil }, newDDL) + if err != nil { + return "", err + } return sqlparser.String(newDDL), nil }