From fdf529e99d5b6e3808cfaff4e58c943aa573fd83 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Fri, 1 Nov 2024 11:36:36 -0400 Subject: [PATCH] VReplication: Fixes for generated column handling (#17107) Signed-off-by: Matt Lord --- go/test/endtoend/vreplication/cluster_test.go | 4 +- go/test/endtoend/vreplication/config_test.go | 4 +- .../vreplication/unsharded_init_data.sql | 4 +- .../vreplication/vreplication_test.go | 14 ++++--- .../vreplication/replicator_plan.go | 23 +++++------ .../vreplication/table_plan_builder.go | 33 ++++++---------- .../vreplication/table_plan_partial.go | 17 ++------ .../vstreamer/helper_event_test.go | 17 +++++++- .../tabletserver/vstreamer/vstreamer_test.go | 39 +++++++++++++++++++ 9 files changed, 96 insertions(+), 59 deletions(-) diff --git a/go/test/endtoend/vreplication/cluster_test.go b/go/test/endtoend/vreplication/cluster_test.go index 13268fc749c..119843651bc 100644 --- a/go/test/endtoend/vreplication/cluster_test.go +++ b/go/test/endtoend/vreplication/cluster_test.go @@ -30,6 +30,8 @@ import ( "testing" "time" + "github.com/stretchr/testify/require" + "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/test/endtoend/cluster" "vitess.io/vitess/go/test/endtoend/throttler" @@ -40,8 +42,6 @@ import ( vttablet "vitess.io/vitess/go/vt/vttablet/common" vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata" - - "github.com/stretchr/testify/require" ) var ( diff --git a/go/test/endtoend/vreplication/config_test.go b/go/test/endtoend/vreplication/config_test.go index 25a4b734259..4b4bcfecc35 100644 --- a/go/test/endtoend/vreplication/config_test.go +++ b/go/test/endtoend/vreplication/config_test.go @@ -48,8 +48,8 @@ var ( customerTypes = []string{"'individual'", "'soho'", "'enterprise'"} initialProductSchema = fmt.Sprintf(` create table product(pid int, description varbinary(128), date1 datetime not null default '0000-00-00 00:00:00', date2 datetime not null default '2021-00-01 00:00:00', primary key(pid), key(date1,date2)) CHARSET=utf8mb4; -create table customer(cid int auto_increment, name varchar(128) collate utf8mb4_bin, meta json default null, typ enum(%s), sport set('football','cricket','baseball'), - ts timestamp not null default current_timestamp, bits bit(2) default b'11', date1 datetime not null default '0000-00-00 00:00:00', +create table customer(cid int auto_increment, name varchar(128) collate utf8mb4_bin, meta json default null, industryCategory varchar(100) generated always as (json_extract(meta, _utf8mb4'$.industry')) virtual, + typ enum(%s), sport set('football','cricket','baseball'), ts timestamp not null default current_timestamp, bits bit(2) default b'11', date1 datetime not null default '0000-00-00 00:00:00', date2 datetime not null default '2021-00-01 00:00:00', dec80 decimal(8,0), blb blob, primary key(cid,typ), key(name)) CHARSET=utf8mb4; create table customer_seq(id int, next_id bigint, cache bigint, primary key(id)) comment 'vitess_sequence'; create table merchant(mname varchar(128), category varchar(128), primary key(mname), key(category)) CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci; diff --git a/go/test/endtoend/vreplication/unsharded_init_data.sql b/go/test/endtoend/vreplication/unsharded_init_data.sql index 8af0cab6608..3e795cadcfd 100644 --- a/go/test/endtoend/vreplication/unsharded_init_data.sql +++ b/go/test/endtoend/vreplication/unsharded_init_data.sql @@ -1,7 +1,7 @@ -insert into customer(cid, name, typ, sport, meta) values(1, 'Jøhn "❤️" Rizzolo',1,'football,baseball','{}'); +insert into customer(cid, name, typ, sport, meta) values(1, 'Jøhn "❤️" Rizzolo',1,'football,baseball','{"industry":"IT SaaS","company":"PlanetScale"}'); insert into customer(cid, name, typ, sport, meta) values(2, 'Paül','soho','cricket',convert(x'7b7d' using utf8mb4)); -- We use a high cid value here to test the target sequence initialization. -insert into customer(cid, name, typ, sport, blb) values(999999, 'ringo','enterprise','','blob data'); +insert into customer(cid, name, typ, sport, blb, meta) values(999999, 'ringo','enterprise','','blob data', '{"industry":"Music"}'); insert into merchant(mname, category) values('Monoprice', 'eléctronics'); insert into merchant(mname, category) values('newegg', 'elec†ronics'); insert into product(pid, description) values(1, 'keyböard ⌨️'); diff --git a/go/test/endtoend/vreplication/vreplication_test.go b/go/test/endtoend/vreplication/vreplication_test.go index 519bf8d0ce5..04a5eabc33b 100644 --- a/go/test/endtoend/vreplication/vreplication_test.go +++ b/go/test/endtoend/vreplication/vreplication_test.go @@ -587,12 +587,16 @@ func testVStreamCellFlag(t *testing.T) { } } -// TestCellAliasVreplicationWorkflow tests replication from a cell with an alias to test the tablet picker's alias functionality -// We also reuse the setup of this test to validate that the "vstream * from" vtgate query functionality is functional +// TestCellAliasVreplicationWorkflow tests replication from a cell with an alias to test +// the tablet picker's alias functionality. +// We also reuse the setup of this test to validate that the "vstream * from" vtgate +// query functionality is functional. func TestCellAliasVreplicationWorkflow(t *testing.T) { cells := []string{"zone1", "zone2"} - defer mainClusterConfig.enableGTIDCompression() - defer setAllVTTabletExperimentalFlags() + resetCompression := mainClusterConfig.enableGTIDCompression() + defer resetCompression() + resetExperimentalFlags := setAllVTTabletExperimentalFlags() + defer resetExperimentalFlags() vc = NewVitessCluster(t, &clusterOptions{cells: cells}) defer vc.TearDown() @@ -718,12 +722,12 @@ func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAl vtgateConn := getConnection(t, vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateMySQLPort) defer vtgateConn.Close() // Confirm that the 0 scale decimal field, dec80, is replicated correctly - dec80Replicated := false execVtgateQuery(t, vtgateConn, sourceKs, "update customer set dec80 = 0") execVtgateQuery(t, vtgateConn, sourceKs, "update customer set blb = \"new blob data\" where cid=3") execVtgateQuery(t, vtgateConn, sourceKs, "update json_tbl set j1 = null, j2 = 'null', j3 = '\"null\"'") execVtgateQuery(t, vtgateConn, sourceKs, "insert into json_tbl(id, j1, j2, j3) values (7, null, 'null', '\"null\"')") waitForNoWorkflowLag(t, vc, targetKs, workflow) + dec80Replicated := false for _, tablet := range []*cluster.VttabletProcess{customerTab1, customerTab2} { // Query the tablet's mysqld directly as the targets will have denied table entries. dbc, err := tablet.TabletConn(targetKs, true) diff --git a/go/vt/vttablet/tabletmanager/vreplication/replicator_plan.go b/go/vt/vttablet/tabletmanager/vreplication/replicator_plan.go index aee7f5c8909..6a416cb4414 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/replicator_plan.go +++ b/go/vt/vttablet/tabletmanager/vreplication/replicator_plan.go @@ -111,19 +111,18 @@ func (rp *ReplicatorPlan) buildFromFields(tableName string, lastpk *sqltypes.Res } for _, field := range fields { colName := sqlparser.NewIdentifierCI(field.Name) - isGenerated := false + generated := false + // We have to loop over the columns in the plan as the columns between the + // source and target are not always 1 to 1. for _, colInfo := range tpb.colInfos { if !strings.EqualFold(colInfo.Name, field.Name) { continue } if colInfo.IsGenerated { - isGenerated = true + generated = true } break } - if isGenerated { - continue - } cexpr := &colExpr{ colName: colName, colType: field.Type, @@ -133,6 +132,7 @@ func (rp *ReplicatorPlan) buildFromFields(tableName string, lastpk *sqltypes.Res references: map[string]bool{ field.Name: true, }, + isGenerated: generated, } tpb.colExprs = append(tpb.colExprs, cexpr) } @@ -608,12 +608,13 @@ func valsEqual(v1, v2 sqltypes.Value) bool { return v1.ToString() == v2.ToString() } -// AppendFromRow behaves like Append but takes a querypb.Row directly, assuming that -// the fields in the row are in the same order as the placeholders in this query. The fields might include generated -// columns which are dropped, by checking against skipFields, before binding the variables -// note: there can be more fields than bind locations since extra columns might be requested from the source if not all -// primary keys columns are present in the target table, for example. Also some values in the row may not correspond for -// values from the database on the source: sum/count for aggregation queries, for example +// AppendFromRow behaves like Append but takes a querypb.Row directly, assuming that the +// fields in the row are in the same order as the placeholders in this query. The fields +// might include generated columns which are dropped before binding the variables note: +// there can be more fields than bind locations since extra columns might be requested +// from the source if not all primary keys columns are present in the target table, for +// example. Also some values in the row may not correspond for values from the database +// on the source: sum/count for aggregation queries, for example. func (tp *TablePlan) appendFromRow(buf *bytes2.Buffer, row *querypb.Row) error { bindLocations := tp.BulkInsertValues.BindLocations() if len(tp.Fields) < len(bindLocations) { diff --git a/go/vt/vttablet/tabletmanager/vreplication/table_plan_builder.go b/go/vt/vttablet/tabletmanager/vreplication/table_plan_builder.go index 12afc3fec28..b8a86b94de5 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/table_plan_builder.go +++ b/go/vt/vttablet/tabletmanager/vreplication/table_plan_builder.go @@ -80,10 +80,11 @@ type colExpr struct { // references contains all the column names referenced in the expression. references map[string]bool - isGrouped bool - isPK bool - dataType string - columnType string + isGrouped bool + isPK bool + isGenerated bool + dataType string + columnType string } // operation is the opcode for the colExpr. @@ -360,7 +361,7 @@ func (tpb *tablePlanBuilder) generate() *TablePlan { fieldsToSkip := make(map[string]bool) for _, colInfo := range tpb.colInfos { if colInfo.IsGenerated { - fieldsToSkip[colInfo.Name] = true + fieldsToSkip[strings.ToLower(colInfo.Name)] = true } } return &TablePlan{ @@ -694,7 +695,7 @@ func (tpb *tablePlanBuilder) generateInsertPart(buf *sqlparser.TrackedBuffer) *s } separator := "" for _, cexpr := range tpb.colExprs { - if tpb.isColumnGenerated(cexpr.colName) { + if cexpr.isGenerated { continue } buf.Myprintf("%s%v", separator, cexpr.colName) @@ -708,7 +709,7 @@ func (tpb *tablePlanBuilder) generateValuesPart(buf *sqlparser.TrackedBuffer, bv bvf.mode = bvAfter separator := "(" for _, cexpr := range tpb.colExprs { - if tpb.isColumnGenerated(cexpr.colName) { + if cexpr.isGenerated { continue } buf.Myprintf("%s", separator) @@ -745,7 +746,7 @@ func (tpb *tablePlanBuilder) generateSelectPart(buf *sqlparser.TrackedBuffer, bv buf.WriteString(" select ") separator := "" for _, cexpr := range tpb.colExprs { - if tpb.isColumnGenerated(cexpr.colName) { + if cexpr.isGenerated { continue } buf.Myprintf("%s", separator) @@ -781,7 +782,7 @@ func (tpb *tablePlanBuilder) generateOnDupPart(buf *sqlparser.TrackedBuffer) *sq if cexpr.isGrouped || cexpr.isPK { continue } - if tpb.isColumnGenerated(cexpr.colName) { + if cexpr.isGenerated { continue } buf.Myprintf("%s%v=", separator, cexpr.colName) @@ -812,10 +813,7 @@ func (tpb *tablePlanBuilder) generateUpdateStatement() *sqlparser.ParsedQuery { if cexpr.isPK { tpb.pkIndices[i] = true } - if cexpr.isGrouped || cexpr.isPK { - continue - } - if tpb.isColumnGenerated(cexpr.colName) { + if cexpr.isGrouped || cexpr.isPK || cexpr.isGenerated { continue } buf.Myprintf("%s%v=", separator, cexpr.colName) @@ -961,15 +959,6 @@ func (tpb *tablePlanBuilder) generatePKConstraint(buf *sqlparser.TrackedBuffer, buf.WriteString(")") } -func (tpb *tablePlanBuilder) isColumnGenerated(col sqlparser.IdentifierCI) bool { - for _, colInfo := range tpb.colInfos { - if col.EqualString(colInfo.Name) && colInfo.IsGenerated { - return true - } - } - return false -} - // bindvarFormatter is a dual mode formatter. Its behavior // can be changed dynamically changed to generate bind vars // for the 'before' row or 'after' row by setting its mode diff --git a/go/vt/vttablet/tabletmanager/vreplication/table_plan_partial.go b/go/vt/vttablet/tabletmanager/vreplication/table_plan_partial.go index cb8ac6dc515..85e0fd8e50f 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/table_plan_partial.go +++ b/go/vt/vttablet/tabletmanager/vreplication/table_plan_partial.go @@ -50,10 +50,7 @@ func (tpb *tablePlanBuilder) generatePartialValuesPart(buf *sqlparser.TrackedBuf bvf.mode = bvAfter separator := "(" for ind, cexpr := range tpb.colExprs { - if tpb.isColumnGenerated(cexpr.colName) { - continue - } - if !isBitSet(dataColumns.Cols, ind) { + if cexpr.isGenerated || !isBitSet(dataColumns.Cols, ind) { continue } buf.Myprintf("%s", separator) @@ -84,7 +81,7 @@ func (tpb *tablePlanBuilder) generatePartialInsertPart(buf *sqlparser.TrackedBuf buf.Myprintf("insert into %v(", tpb.name) separator := "" for ind, cexpr := range tpb.colExprs { - if tpb.isColumnGenerated(cexpr.colName) { + if cexpr.isGenerated { continue } if !isBitSet(dataColumns.Cols, ind) { @@ -102,7 +99,7 @@ func (tpb *tablePlanBuilder) generatePartialSelectPart(buf *sqlparser.TrackedBuf buf.WriteString(" select ") separator := "" for ind, cexpr := range tpb.colExprs { - if tpb.isColumnGenerated(cexpr.colName) { + if cexpr.isGenerated { continue } if !isBitSet(dataColumns.Cols, ind) { @@ -141,17 +138,11 @@ func (tpb *tablePlanBuilder) createPartialUpdateQuery(dataColumns *binlogdatapb. buf.Myprintf("update %v set ", tpb.name) separator := "" for i, cexpr := range tpb.colExprs { - if cexpr.isPK { - continue - } - if tpb.isColumnGenerated(cexpr.colName) { - continue - } if int64(i) >= dataColumns.Count { log.Errorf("Ran out of columns trying to generate query for %s", tpb.name.CompliantName()) return nil } - if !isBitSet(dataColumns.Cols, i) { + if cexpr.isPK || cexpr.isGenerated || !isBitSet(dataColumns.Cols, i) { continue } buf.Myprintf("%s%v=", separator, cexpr.colName) diff --git a/go/vt/vttablet/tabletserver/vstreamer/helper_event_test.go b/go/vt/vttablet/tabletserver/vstreamer/helper_event_test.go index c52e81564c0..675677bf820 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/helper_event_test.go +++ b/go/vt/vttablet/tabletserver/vstreamer/helper_event_test.go @@ -142,6 +142,11 @@ type TestQuery struct { type TestRowChange struct { before []string after []string + + // If you need to customize the image you can use the raw types. + beforeRaw *query.Row + afterRaw *query.Row + dataColumnsRaw *binlogdatapb.RowChange_Bitmap } // TestRowEventSpec is used for defining a custom row event. @@ -161,7 +166,12 @@ func (s *TestRowEventSpec) String() string { if len(s.changes) > 0 { for _, c := range s.changes { rowChange := binlogdatapb.RowChange{} - if len(c.before) > 0 { + if c.dataColumnsRaw != nil { + rowChange.DataColumns = c.dataColumnsRaw + } + if c.beforeRaw != nil { + rowChange.Before = c.beforeRaw + } else if len(c.before) > 0 { rowChange.Before = &query.Row{} for _, val := range c.before { if val == sqltypes.NullStr { @@ -171,7 +181,9 @@ func (s *TestRowEventSpec) String() string { rowChange.Before.Values = append(rowChange.Before.Values, []byte(val)...) } } - if len(c.after) > 0 { + if c.afterRaw != nil { + rowChange.After = c.afterRaw + } else if len(c.after) > 0 { rowChange.After = &query.Row{} for i, val := range c.after { if val == sqltypes.NullStr { @@ -354,6 +366,7 @@ func (ts *TestSpec) getBindVarsForUpdate(stmt sqlparser.Statement) (string, map[ require.True(ts.t, ok, "field event for table %s not found", table) index := int64(0) state := ts.getCurrentState(table) + require.NotNil(ts.t, state) for i, col := range fe.cols { bv[col.name] = string(state.Values[index : index+state.Lengths[i]]) index += state.Lengths[i] diff --git a/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go b/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go index c265fb45b85..846d62202e7 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go +++ b/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go @@ -41,6 +41,7 @@ import ( "vitess.io/vitess/go/vt/vttablet/tabletserver/vstreamer/testenv" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" + querypb "vitess.io/vitess/go/vt/proto/query" ) type testcase struct { @@ -87,6 +88,9 @@ func TestNoBlob(t *testing.T) { "create table t2(id int, txt text, val varchar(4), unique key(id, val))", // t3 has a text column and a primary key. The text column will not be in update row events. "create table t3(id int, txt text, val varchar(4), primary key(id))", + // t4 has a blob column and a primary key, along with a generated virtual column. The blob + // column will not be in update row events. + "create table t4(id int, cOl varbinary(8) generated always as (concat(val, 'tsty')) virtual, blb blob, val varbinary(4), primary key(id))", }, options: &TestSpecOptions{ noblob: true, @@ -94,6 +98,18 @@ func TestNoBlob(t *testing.T) { } defer ts.Close() ts.Init() + + insertGeneratedFE := &TestFieldEvent{ + table: "t4", + db: testenv.DBName, + cols: []*TestColumn{ + {name: "id", dataType: "INT32", colType: "int(11)", len: 11, collationID: 63}, + {name: "cOl", dataType: "VARBINARY", colType: "varbinary(8)", len: 8, collationID: 63}, + {name: "blb", dataType: "BLOB", colType: "blob", len: 65535, collationID: 63}, + {name: "val", dataType: "VARBINARY", colType: "varbinary(4)", len: 4, collationID: 63}, + }, + } + ts.tests = [][]*TestQuery{{ {"begin", nil}, {"insert into t1 values (1, 'blob1', 'aaa')", nil}, @@ -107,6 +123,29 @@ func TestNoBlob(t *testing.T) { {"insert into t3 values (1, 'text1', 'aaa')", nil}, {"update t3 set val = 'bbb'", nil}, {"commit", nil}, + }, {{"begin", nil}, + {"insert into t4 (id, blb, val) values (1, 'text1', 'aaa')", []TestRowEvent{ + {event: insertGeneratedFE.String()}, + {spec: &TestRowEventSpec{table: "t4", changes: []TestRowChange{{after: []string{"1", "aaatsty", "text1", "aaa"}}}}}, + }}, + {"update t4 set val = 'bbb'", []TestRowEvent{ + // The blob column is not in the update row event's before or after image. + {spec: &TestRowEventSpec{table: "t4", changes: []TestRowChange{{ + beforeRaw: &querypb.Row{ + Lengths: []int64{1, 7, -1, 3}, // -1 for the 3rd column / blob field, as it's not present + Values: []byte("1aaatstyaaa"), + }, + afterRaw: &querypb.Row{ + Lengths: []int64{1, 7, -1, 3}, // -1 for the 3rd column / blob field, as it's not present + Values: []byte("1bbbtstybbb"), + }, + dataColumnsRaw: &binlogdatapb.RowChange_Bitmap{ + Count: 4, + Cols: []byte{0x0b}, // Columns bitmap of 00001011 as the third column/bit position representing the blob column has no data + }, + }}}}, + }}, + {"commit", nil}, }} ts.Run() }