From 39b7deae9bd1eaee9732df87f6f1173bdb7d20d8 Mon Sep 17 00:00:00 2001 From: Olga Shestopalova Date: Mon, 25 Sep 2023 16:05:12 -0400 Subject: [PATCH 1/4] bugfix: change column name and type to json Signed-off-by: Olga Shestopalova --- .../vreplication/replicator_plan_test.go | 49 +++++++++++++++++++ .../vreplication/table_plan_builder.go | 16 +++++- .../tabletserver/vstreamer/planbuilder.go | 16 +++++- .../vstreamer/planbuilder_test.go | 23 +++++++++ 4 files changed, 102 insertions(+), 2 deletions(-) diff --git a/go/vt/vttablet/tabletmanager/vreplication/replicator_plan_test.go b/go/vt/vttablet/tabletmanager/vreplication/replicator_plan_test.go index 8b97f02dc1e..780b1c0d064 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/replicator_plan_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/replicator_plan_test.go @@ -414,6 +414,55 @@ func TestBuildPlayerPlan(t *testing.T) { }, }, }, + }, { + input: &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{{ + Match: "t1", + Filter: "select c1, convert(c using utf8mb4) as c2 from t1", + }}, + }, + plan: &TestReplicatorPlan{ + VStreamFilter: &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{{ + Match: "t1", + Filter: "select c1, convert(c using utf8mb4) as c2 from t1", + }}, + }, + TargetTables: []string{"t1"}, + TablePlans: map[string]*TestTablePlan{ + "t1": { + TargetName: "t1", + SendRule: "t1", + PKReferences: []string{"c1"}, + InsertFront: "insert into t1(c1,c2)", + InsertValues: "(:a_c1,convert(:a_c using utf8mb4))", + Insert: "insert into t1(c1,c2) values (:a_c1,convert(:a_c using utf8mb4))", + Update: "update t1 set c2=convert(:a_c using utf8mb4) where c1=:b_c1", + Delete: "delete from t1 where c1=:b_c1", + }, + }, + }, + planpk: &TestReplicatorPlan{ + VStreamFilter: &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{{ + Match: "t1", + Filter: "select c1, convert(c using utf8mb4) as c2, pk1, pk2 from t1", + }}, + }, + TargetTables: []string{"t1"}, + TablePlans: map[string]*TestTablePlan{ + "t1": { + TargetName: "t1", + SendRule: "t1", + PKReferences: []string{"c1", "pk1", "pk2"}, + InsertFront: "insert into t1(c1,c2)", + InsertValues: "(:a_c1,convert(:a_c using utf8mb4))", + Insert: "insert into t1(c1,c2) select :a_c1, convert(:a_c using utf8mb4) from dual where (:a_pk1,:a_pk2) <= (1,'aaa')", + Update: "update t1 set c2=convert(:a_c using utf8mb4) where c1=:b_c1 and (:b_pk1,:b_pk2) <= (1,'aaa')", + Delete: "delete from t1 where c1=:b_c1 and (:b_pk1,:b_pk2) <= (1,'aaa')", + }, + }, + }, }, { // Keywords as names. input: &binlogdatapb.Filter{ diff --git a/go/vt/vttablet/tabletmanager/vreplication/table_plan_builder.go b/go/vt/vttablet/tabletmanager/vreplication/table_plan_builder.go index 3d0f27c7d24..0acd0f398cc 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/table_plan_builder.go +++ b/go/vt/vttablet/tabletmanager/vreplication/table_plan_builder.go @@ -425,9 +425,23 @@ func (tpb *tablePlanBuilder) analyzeExpr(selExpr sqlparser.SelectExpr) (*colExpr references: make(map[string]bool), } if expr, ok := aliased.Expr.(*sqlparser.ConvertUsingExpr); ok { + var colName sqlparser.IdentifierCI + err := sqlparser.Walk(func(node sqlparser.SQLNode) (kontinue bool, err error) { + switch node := node.(type) { + case *sqlparser.ColName: + if !node.Qualifier.IsEmpty() { + return false, fmt.Errorf("unsupported qualifier for column: %v", sqlparser.String(node)) + } + colName = node.Name + } + return true, nil + }, aliased.Expr) + if err != nil { + return nil, fmt.Errorf("failed to find column name for convert using expression: %v", sqlparser.String(aliased.Expr)) + } selExpr := &sqlparser.ConvertUsingExpr{ Type: "utf8mb4", - Expr: &sqlparser.ColName{Name: as}, + Expr: &sqlparser.ColName{Name: colName}, } cexpr.expr = expr cexpr.operation = opExpr diff --git a/go/vt/vttablet/tabletserver/vstreamer/planbuilder.go b/go/vt/vttablet/tabletserver/vstreamer/planbuilder.go index a938bda6dff..ff81d8b1b01 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/planbuilder.go +++ b/go/vt/vttablet/tabletserver/vstreamer/planbuilder.go @@ -727,7 +727,21 @@ func (plan *Plan) analyzeExpr(vschema *localVSchema, selExpr sqlparser.SelectExp FixedValue: sqltypes.NewInt64(num), }, nil case *sqlparser.ConvertUsingExpr: - colnum, err := findColumn(plan.Table, aliased.As) + var colName sqlparser.IdentifierCI + err := sqlparser.Walk(func(node sqlparser.SQLNode) (kontinue bool, err error) { + switch node := node.(type) { + case *sqlparser.ColName: + if !node.Qualifier.IsEmpty() { + return false, fmt.Errorf("unsupported qualifier for column: %v", sqlparser.String(node)) + } + colName = node.Name + } + return true, nil + }, aliased.Expr) + if err != nil { + return ColExpr{}, fmt.Errorf("failed to find column name for convert using expression: %v", sqlparser.String(aliased.Expr)) + } + colnum, err := findColumn(plan.Table, colName) if err != nil { return ColExpr{}, err } diff --git a/go/vt/vttablet/tabletserver/vstreamer/planbuilder_test.go b/go/vt/vttablet/tabletserver/vstreamer/planbuilder_test.go index aed178998cb..03001362073 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/planbuilder_test.go +++ b/go/vt/vttablet/tabletserver/vstreamer/planbuilder_test.go @@ -479,6 +479,29 @@ func TestPlanBuilder(t *testing.T) { KeyRange: nil, }}, }, + }, { + inTable: t1, + inRule: &binlogdatapb.Rule{Match: "t1", Filter: "select convert(val using utf8mb4) as val2, id as id from t1"}, + outPlan: &Plan{ + ColExprs: []ColExpr{{ + ColNum: 1, + Field: &querypb.Field{ + Name: "val", + Type: sqltypes.VarBinary, + Charset: collations.CollationBinaryID, + Flags: uint32(querypb.MySqlFlag_BINARY_FLAG), + }, + }, { + ColNum: 0, + Field: &querypb.Field{ + Name: "id", + Type: sqltypes.Int64, + Charset: collations.CollationBinaryID, + Flags: uint32(querypb.MySqlFlag_NUM_FLAG), + }, + }}, + convertUsingUTF8Columns: map[string]bool{"val": true}, + }, }, { inTable: regional, inRule: &binlogdatapb.Rule{Match: "regional", Filter: "select id, keyspace_id() from regional"}, From d874a73f2d937a193c6cc55fed9e3bff4432ab0c Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Tue, 26 Sep 2023 12:43:48 +0300 Subject: [PATCH 2/4] adding a onlineddl_vrepl_suite test Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Signed-off-by: Olga Shestopalova --- .../vrepl_suite/onlineddl_vrepl_suite_test.go | 2 +- .../testdata/rename-retype-json/after_columns | 1 + .../testdata/rename-retype-json/alter | 1 + .../rename-retype-json/before_columns | 1 + .../testdata/rename-retype-json/create.sql | 22 +++++++++++++++++++ 5 files changed, 26 insertions(+), 1 deletion(-) create mode 100644 go/test/endtoend/onlineddl/vrepl_suite/testdata/rename-retype-json/after_columns create mode 100644 go/test/endtoend/onlineddl/vrepl_suite/testdata/rename-retype-json/alter create mode 100644 go/test/endtoend/onlineddl/vrepl_suite/testdata/rename-retype-json/before_columns create mode 100644 go/test/endtoend/onlineddl/vrepl_suite/testdata/rename-retype-json/create.sql diff --git a/go/test/endtoend/onlineddl/vrepl_suite/onlineddl_vrepl_suite_test.go b/go/test/endtoend/onlineddl/vrepl_suite/onlineddl_vrepl_suite_test.go index c8b87215036..939d3900068 100644 --- a/go/test/endtoend/onlineddl/vrepl_suite/onlineddl_vrepl_suite_test.go +++ b/go/test/endtoend/onlineddl/vrepl_suite/onlineddl_vrepl_suite_test.go @@ -58,7 +58,7 @@ var ( ) const ( - testDataPath = "testdata" + testDataPath = "tmptestdata" defaultSQLMode = "ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_ENGINE_SUBSTITUTION" ) diff --git a/go/test/endtoend/onlineddl/vrepl_suite/testdata/rename-retype-json/after_columns b/go/test/endtoend/onlineddl/vrepl_suite/testdata/rename-retype-json/after_columns new file mode 100644 index 00000000000..99f86097862 --- /dev/null +++ b/go/test/endtoend/onlineddl/vrepl_suite/testdata/rename-retype-json/after_columns @@ -0,0 +1 @@ +id, c1j diff --git a/go/test/endtoend/onlineddl/vrepl_suite/testdata/rename-retype-json/alter b/go/test/endtoend/onlineddl/vrepl_suite/testdata/rename-retype-json/alter new file mode 100644 index 00000000000..f2e64ff0894 --- /dev/null +++ b/go/test/endtoend/onlineddl/vrepl_suite/testdata/rename-retype-json/alter @@ -0,0 +1 @@ +change column c1 c1j json diff --git a/go/test/endtoend/onlineddl/vrepl_suite/testdata/rename-retype-json/before_columns b/go/test/endtoend/onlineddl/vrepl_suite/testdata/rename-retype-json/before_columns new file mode 100644 index 00000000000..b791aa0d27a --- /dev/null +++ b/go/test/endtoend/onlineddl/vrepl_suite/testdata/rename-retype-json/before_columns @@ -0,0 +1 @@ +id, c1 diff --git a/go/test/endtoend/onlineddl/vrepl_suite/testdata/rename-retype-json/create.sql b/go/test/endtoend/onlineddl/vrepl_suite/testdata/rename-retype-json/create.sql new file mode 100644 index 00000000000..5280498e9fd --- /dev/null +++ b/go/test/endtoend/onlineddl/vrepl_suite/testdata/rename-retype-json/create.sql @@ -0,0 +1,22 @@ +drop table if exists onlineddl_test; +create table onlineddl_test ( + id int auto_increment, + c1 int not null, + primary key (id) +) auto_increment=1; + +insert into onlineddl_test values (1, 11); +insert into onlineddl_test values (2, 13); + +drop event if exists onlineddl_test; +delimiter ;; +create event onlineddl_test + on schedule every 1 second + starts current_timestamp + ends current_timestamp + interval 60 second + on completion not preserve + enable + do +begin + insert into onlineddl_test values (null, 17); +end ;; From 90b4bffbc82ed887ba8e0b8aa13bc3b74f7aab8f Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Tue, 26 Sep 2023 13:09:30 +0300 Subject: [PATCH 3/4] fix testdata path Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Signed-off-by: Olga Shestopalova --- .../onlineddl/vrepl_suite/onlineddl_vrepl_suite_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/test/endtoend/onlineddl/vrepl_suite/onlineddl_vrepl_suite_test.go b/go/test/endtoend/onlineddl/vrepl_suite/onlineddl_vrepl_suite_test.go index 939d3900068..c8b87215036 100644 --- a/go/test/endtoend/onlineddl/vrepl_suite/onlineddl_vrepl_suite_test.go +++ b/go/test/endtoend/onlineddl/vrepl_suite/onlineddl_vrepl_suite_test.go @@ -58,7 +58,7 @@ var ( ) const ( - testDataPath = "tmptestdata" + testDataPath = "testdata" defaultSQLMode = "ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_ENGINE_SUBSTITUTION" ) From 30e21a4f9bb2ef1c337bf0810374f3b2b4dc5166 Mon Sep 17 00:00:00 2001 From: Olga Shestopalova Date: Tue, 26 Sep 2023 08:18:40 -0400 Subject: [PATCH 4/4] add comment explaining the code Signed-off-by: Olga Shestopalova --- .../tabletmanager/vreplication/table_plan_builder.go | 7 ++++++- go/vt/vttablet/tabletserver/vstreamer/planbuilder.go | 7 ++++++- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/go/vt/vttablet/tabletmanager/vreplication/table_plan_builder.go b/go/vt/vttablet/tabletmanager/vreplication/table_plan_builder.go index 0acd0f398cc..da1b4dfc2f3 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/table_plan_builder.go +++ b/go/vt/vttablet/tabletmanager/vreplication/table_plan_builder.go @@ -425,6 +425,11 @@ func (tpb *tablePlanBuilder) analyzeExpr(selExpr sqlparser.SelectExpr) (*colExpr references: make(map[string]bool), } if expr, ok := aliased.Expr.(*sqlparser.ConvertUsingExpr); ok { + // Here we find the actual column name in the convert, in case + // this is a column rename and the AS is the new column. + // For example, in convert(c1 using utf8mb4) as c2, we want to find + // c1, because c1 exists in the current table whereas c2 is the renamed column + // in the desired table. var colName sqlparser.IdentifierCI err := sqlparser.Walk(func(node sqlparser.SQLNode) (kontinue bool, err error) { switch node := node.(type) { @@ -437,7 +442,7 @@ func (tpb *tablePlanBuilder) analyzeExpr(selExpr sqlparser.SelectExpr) (*colExpr return true, nil }, aliased.Expr) if err != nil { - return nil, fmt.Errorf("failed to find column name for convert using expression: %v", sqlparser.String(aliased.Expr)) + return nil, fmt.Errorf("failed to find column name for convert using expression: %v, %v", sqlparser.String(aliased.Expr), err) } selExpr := &sqlparser.ConvertUsingExpr{ Type: "utf8mb4", diff --git a/go/vt/vttablet/tabletserver/vstreamer/planbuilder.go b/go/vt/vttablet/tabletserver/vstreamer/planbuilder.go index ff81d8b1b01..c9bb0121571 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/planbuilder.go +++ b/go/vt/vttablet/tabletserver/vstreamer/planbuilder.go @@ -727,6 +727,11 @@ func (plan *Plan) analyzeExpr(vschema *localVSchema, selExpr sqlparser.SelectExp FixedValue: sqltypes.NewInt64(num), }, nil case *sqlparser.ConvertUsingExpr: + // Here we find the actual column name in the convert, in case + // this is a column rename and the AS is the new column. + // For example, in convert(c1 using utf8mb4) as c2, we want to find + // c1, because c1 exists in the current table whereas c2 is the renamed column + // in the desired table. var colName sqlparser.IdentifierCI err := sqlparser.Walk(func(node sqlparser.SQLNode) (kontinue bool, err error) { switch node := node.(type) { @@ -739,7 +744,7 @@ func (plan *Plan) analyzeExpr(vschema *localVSchema, selExpr sqlparser.SelectExp return true, nil }, aliased.Expr) if err != nil { - return ColExpr{}, fmt.Errorf("failed to find column name for convert using expression: %v", sqlparser.String(aliased.Expr)) + return ColExpr{}, fmt.Errorf("failed to find column name for convert using expression: %v, %v", sqlparser.String(aliased.Expr), err) } colnum, err := findColumn(plan.Table, colName) if err != nil {