Skip to content

Commit

Permalink
VReplication: Fixes for generated column handling (#17107)
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord authored Nov 1, 2024
1 parent 1a4f2b9 commit fdf529e
Show file tree
Hide file tree
Showing 9 changed files with 96 additions and 59 deletions.
4 changes: 2 additions & 2 deletions go/test/endtoend/vreplication/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ import (
"testing"
"time"

"github.com/stretchr/testify/require"

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/test/endtoend/cluster"
"vitess.io/vitess/go/test/endtoend/throttler"
Expand All @@ -40,8 +42,6 @@ import (
vttablet "vitess.io/vitess/go/vt/vttablet/common"

vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata"

"github.com/stretchr/testify/require"
)

var (
Expand Down
4 changes: 2 additions & 2 deletions go/test/endtoend/vreplication/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ var (
customerTypes = []string{"'individual'", "'soho'", "'enterprise'"}
initialProductSchema = fmt.Sprintf(`
create table product(pid int, description varbinary(128), date1 datetime not null default '0000-00-00 00:00:00', date2 datetime not null default '2021-00-01 00:00:00', primary key(pid), key(date1,date2)) CHARSET=utf8mb4;
create table customer(cid int auto_increment, name varchar(128) collate utf8mb4_bin, meta json default null, typ enum(%s), sport set('football','cricket','baseball'),
ts timestamp not null default current_timestamp, bits bit(2) default b'11', date1 datetime not null default '0000-00-00 00:00:00',
create table customer(cid int auto_increment, name varchar(128) collate utf8mb4_bin, meta json default null, industryCategory varchar(100) generated always as (json_extract(meta, _utf8mb4'$.industry')) virtual,
typ enum(%s), sport set('football','cricket','baseball'), ts timestamp not null default current_timestamp, bits bit(2) default b'11', date1 datetime not null default '0000-00-00 00:00:00',
date2 datetime not null default '2021-00-01 00:00:00', dec80 decimal(8,0), blb blob, primary key(cid,typ), key(name)) CHARSET=utf8mb4;
create table customer_seq(id int, next_id bigint, cache bigint, primary key(id)) comment 'vitess_sequence';
create table merchant(mname varchar(128), category varchar(128), primary key(mname), key(category)) CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;
Expand Down
4 changes: 2 additions & 2 deletions go/test/endtoend/vreplication/unsharded_init_data.sql
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
insert into customer(cid, name, typ, sport, meta) values(1, 'Jøhn "❤️" Rizzolo',1,'football,baseball','{}');
insert into customer(cid, name, typ, sport, meta) values(1, 'Jøhn "❤️" Rizzolo',1,'football,baseball','{"industry":"IT SaaS","company":"PlanetScale"}');
insert into customer(cid, name, typ, sport, meta) values(2, 'Paül','soho','cricket',convert(x'7b7d' using utf8mb4));
-- We use a high cid value here to test the target sequence initialization.
insert into customer(cid, name, typ, sport, blb) values(999999, 'ringo','enterprise','','blob data');
insert into customer(cid, name, typ, sport, blb, meta) values(999999, 'ringo','enterprise','','blob data', '{"industry":"Music"}');
insert into merchant(mname, category) values('Monoprice', 'eléctronics');
insert into merchant(mname, category) values('newegg', 'elec†ronics');
insert into product(pid, description) values(1, 'keyböard ⌨️');
Expand Down
14 changes: 9 additions & 5 deletions go/test/endtoend/vreplication/vreplication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -587,12 +587,16 @@ func testVStreamCellFlag(t *testing.T) {
}
}

// TestCellAliasVreplicationWorkflow tests replication from a cell with an alias to test the tablet picker's alias functionality
// We also reuse the setup of this test to validate that the "vstream * from" vtgate query functionality is functional
// TestCellAliasVreplicationWorkflow tests replication from a cell with an alias to test
// the tablet picker's alias functionality.
// We also reuse the setup of this test to validate that the "vstream * from" vtgate
// query functionality is functional.
func TestCellAliasVreplicationWorkflow(t *testing.T) {
cells := []string{"zone1", "zone2"}
defer mainClusterConfig.enableGTIDCompression()
defer setAllVTTabletExperimentalFlags()
resetCompression := mainClusterConfig.enableGTIDCompression()
defer resetCompression()
resetExperimentalFlags := setAllVTTabletExperimentalFlags()
defer resetExperimentalFlags()
vc = NewVitessCluster(t, &clusterOptions{cells: cells})
defer vc.TearDown()

Expand Down Expand Up @@ -718,12 +722,12 @@ func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAl
vtgateConn := getConnection(t, vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateMySQLPort)
defer vtgateConn.Close()
// Confirm that the 0 scale decimal field, dec80, is replicated correctly
dec80Replicated := false
execVtgateQuery(t, vtgateConn, sourceKs, "update customer set dec80 = 0")
execVtgateQuery(t, vtgateConn, sourceKs, "update customer set blb = \"new blob data\" where cid=3")
execVtgateQuery(t, vtgateConn, sourceKs, "update json_tbl set j1 = null, j2 = 'null', j3 = '\"null\"'")
execVtgateQuery(t, vtgateConn, sourceKs, "insert into json_tbl(id, j1, j2, j3) values (7, null, 'null', '\"null\"')")
waitForNoWorkflowLag(t, vc, targetKs, workflow)
dec80Replicated := false
for _, tablet := range []*cluster.VttabletProcess{customerTab1, customerTab2} {
// Query the tablet's mysqld directly as the targets will have denied table entries.
dbc, err := tablet.TabletConn(targetKs, true)
Expand Down
23 changes: 12 additions & 11 deletions go/vt/vttablet/tabletmanager/vreplication/replicator_plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,19 +111,18 @@ func (rp *ReplicatorPlan) buildFromFields(tableName string, lastpk *sqltypes.Res
}
for _, field := range fields {
colName := sqlparser.NewIdentifierCI(field.Name)
isGenerated := false
generated := false
// We have to loop over the columns in the plan as the columns between the
// source and target are not always 1 to 1.
for _, colInfo := range tpb.colInfos {
if !strings.EqualFold(colInfo.Name, field.Name) {
continue
}
if colInfo.IsGenerated {
isGenerated = true
generated = true
}
break
}
if isGenerated {
continue
}
cexpr := &colExpr{
colName: colName,
colType: field.Type,
Expand All @@ -133,6 +132,7 @@ func (rp *ReplicatorPlan) buildFromFields(tableName string, lastpk *sqltypes.Res
references: map[string]bool{
field.Name: true,
},
isGenerated: generated,
}
tpb.colExprs = append(tpb.colExprs, cexpr)
}
Expand Down Expand Up @@ -608,12 +608,13 @@ func valsEqual(v1, v2 sqltypes.Value) bool {
return v1.ToString() == v2.ToString()
}

// AppendFromRow behaves like Append but takes a querypb.Row directly, assuming that
// the fields in the row are in the same order as the placeholders in this query. The fields might include generated
// columns which are dropped, by checking against skipFields, before binding the variables
// note: there can be more fields than bind locations since extra columns might be requested from the source if not all
// primary keys columns are present in the target table, for example. Also some values in the row may not correspond for
// values from the database on the source: sum/count for aggregation queries, for example
// AppendFromRow behaves like Append but takes a querypb.Row directly, assuming that the
// fields in the row are in the same order as the placeholders in this query. The fields
// might include generated columns which are dropped before binding the variables note:
// there can be more fields than bind locations since extra columns might be requested
// from the source if not all primary keys columns are present in the target table, for
// example. Also some values in the row may not correspond for values from the database
// on the source: sum/count for aggregation queries, for example.
func (tp *TablePlan) appendFromRow(buf *bytes2.Buffer, row *querypb.Row) error {
bindLocations := tp.BulkInsertValues.BindLocations()
if len(tp.Fields) < len(bindLocations) {
Expand Down
33 changes: 11 additions & 22 deletions go/vt/vttablet/tabletmanager/vreplication/table_plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,11 @@ type colExpr struct {
// references contains all the column names referenced in the expression.
references map[string]bool

isGrouped bool
isPK bool
dataType string
columnType string
isGrouped bool
isPK bool
isGenerated bool
dataType string
columnType string
}

// operation is the opcode for the colExpr.
Expand Down Expand Up @@ -360,7 +361,7 @@ func (tpb *tablePlanBuilder) generate() *TablePlan {
fieldsToSkip := make(map[string]bool)
for _, colInfo := range tpb.colInfos {
if colInfo.IsGenerated {
fieldsToSkip[colInfo.Name] = true
fieldsToSkip[strings.ToLower(colInfo.Name)] = true
}
}
return &TablePlan{
Expand Down Expand Up @@ -694,7 +695,7 @@ func (tpb *tablePlanBuilder) generateInsertPart(buf *sqlparser.TrackedBuffer) *s
}
separator := ""
for _, cexpr := range tpb.colExprs {
if tpb.isColumnGenerated(cexpr.colName) {
if cexpr.isGenerated {
continue
}
buf.Myprintf("%s%v", separator, cexpr.colName)
Expand All @@ -708,7 +709,7 @@ func (tpb *tablePlanBuilder) generateValuesPart(buf *sqlparser.TrackedBuffer, bv
bvf.mode = bvAfter
separator := "("
for _, cexpr := range tpb.colExprs {
if tpb.isColumnGenerated(cexpr.colName) {
if cexpr.isGenerated {
continue
}
buf.Myprintf("%s", separator)
Expand Down Expand Up @@ -745,7 +746,7 @@ func (tpb *tablePlanBuilder) generateSelectPart(buf *sqlparser.TrackedBuffer, bv
buf.WriteString(" select ")
separator := ""
for _, cexpr := range tpb.colExprs {
if tpb.isColumnGenerated(cexpr.colName) {
if cexpr.isGenerated {
continue
}
buf.Myprintf("%s", separator)
Expand Down Expand Up @@ -781,7 +782,7 @@ func (tpb *tablePlanBuilder) generateOnDupPart(buf *sqlparser.TrackedBuffer) *sq
if cexpr.isGrouped || cexpr.isPK {
continue
}
if tpb.isColumnGenerated(cexpr.colName) {
if cexpr.isGenerated {
continue
}
buf.Myprintf("%s%v=", separator, cexpr.colName)
Expand Down Expand Up @@ -812,10 +813,7 @@ func (tpb *tablePlanBuilder) generateUpdateStatement() *sqlparser.ParsedQuery {
if cexpr.isPK {
tpb.pkIndices[i] = true
}
if cexpr.isGrouped || cexpr.isPK {
continue
}
if tpb.isColumnGenerated(cexpr.colName) {
if cexpr.isGrouped || cexpr.isPK || cexpr.isGenerated {
continue
}
buf.Myprintf("%s%v=", separator, cexpr.colName)
Expand Down Expand Up @@ -961,15 +959,6 @@ func (tpb *tablePlanBuilder) generatePKConstraint(buf *sqlparser.TrackedBuffer,
buf.WriteString(")")
}

func (tpb *tablePlanBuilder) isColumnGenerated(col sqlparser.IdentifierCI) bool {
for _, colInfo := range tpb.colInfos {
if col.EqualString(colInfo.Name) && colInfo.IsGenerated {
return true
}
}
return false
}

// bindvarFormatter is a dual mode formatter. Its behavior
// can be changed dynamically changed to generate bind vars
// for the 'before' row or 'after' row by setting its mode
Expand Down
17 changes: 4 additions & 13 deletions go/vt/vttablet/tabletmanager/vreplication/table_plan_partial.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,7 @@ func (tpb *tablePlanBuilder) generatePartialValuesPart(buf *sqlparser.TrackedBuf
bvf.mode = bvAfter
separator := "("
for ind, cexpr := range tpb.colExprs {
if tpb.isColumnGenerated(cexpr.colName) {
continue
}
if !isBitSet(dataColumns.Cols, ind) {
if cexpr.isGenerated || !isBitSet(dataColumns.Cols, ind) {
continue
}
buf.Myprintf("%s", separator)
Expand Down Expand Up @@ -84,7 +81,7 @@ func (tpb *tablePlanBuilder) generatePartialInsertPart(buf *sqlparser.TrackedBuf
buf.Myprintf("insert into %v(", tpb.name)
separator := ""
for ind, cexpr := range tpb.colExprs {
if tpb.isColumnGenerated(cexpr.colName) {
if cexpr.isGenerated {
continue
}
if !isBitSet(dataColumns.Cols, ind) {
Expand All @@ -102,7 +99,7 @@ func (tpb *tablePlanBuilder) generatePartialSelectPart(buf *sqlparser.TrackedBuf
buf.WriteString(" select ")
separator := ""
for ind, cexpr := range tpb.colExprs {
if tpb.isColumnGenerated(cexpr.colName) {
if cexpr.isGenerated {
continue
}
if !isBitSet(dataColumns.Cols, ind) {
Expand Down Expand Up @@ -141,17 +138,11 @@ func (tpb *tablePlanBuilder) createPartialUpdateQuery(dataColumns *binlogdatapb.
buf.Myprintf("update %v set ", tpb.name)
separator := ""
for i, cexpr := range tpb.colExprs {
if cexpr.isPK {
continue
}
if tpb.isColumnGenerated(cexpr.colName) {
continue
}
if int64(i) >= dataColumns.Count {
log.Errorf("Ran out of columns trying to generate query for %s", tpb.name.CompliantName())
return nil
}
if !isBitSet(dataColumns.Cols, i) {
if cexpr.isPK || cexpr.isGenerated || !isBitSet(dataColumns.Cols, i) {
continue
}
buf.Myprintf("%s%v=", separator, cexpr.colName)
Expand Down
17 changes: 15 additions & 2 deletions go/vt/vttablet/tabletserver/vstreamer/helper_event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,11 @@ type TestQuery struct {
type TestRowChange struct {
before []string
after []string

// If you need to customize the image you can use the raw types.
beforeRaw *query.Row
afterRaw *query.Row
dataColumnsRaw *binlogdatapb.RowChange_Bitmap
}

// TestRowEventSpec is used for defining a custom row event.
Expand All @@ -161,7 +166,12 @@ func (s *TestRowEventSpec) String() string {
if len(s.changes) > 0 {
for _, c := range s.changes {
rowChange := binlogdatapb.RowChange{}
if len(c.before) > 0 {
if c.dataColumnsRaw != nil {
rowChange.DataColumns = c.dataColumnsRaw
}
if c.beforeRaw != nil {
rowChange.Before = c.beforeRaw
} else if len(c.before) > 0 {
rowChange.Before = &query.Row{}
for _, val := range c.before {
if val == sqltypes.NullStr {
Expand All @@ -171,7 +181,9 @@ func (s *TestRowEventSpec) String() string {
rowChange.Before.Values = append(rowChange.Before.Values, []byte(val)...)
}
}
if len(c.after) > 0 {
if c.afterRaw != nil {
rowChange.After = c.afterRaw
} else if len(c.after) > 0 {
rowChange.After = &query.Row{}
for i, val := range c.after {
if val == sqltypes.NullStr {
Expand Down Expand Up @@ -354,6 +366,7 @@ func (ts *TestSpec) getBindVarsForUpdate(stmt sqlparser.Statement) (string, map[
require.True(ts.t, ok, "field event for table %s not found", table)
index := int64(0)
state := ts.getCurrentState(table)
require.NotNil(ts.t, state)
for i, col := range fe.cols {
bv[col.name] = string(state.Values[index : index+state.Lengths[i]])
index += state.Lengths[i]
Expand Down
39 changes: 39 additions & 0 deletions go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"vitess.io/vitess/go/vt/vttablet/tabletserver/vstreamer/testenv"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
querypb "vitess.io/vitess/go/vt/proto/query"
)

type testcase struct {
Expand Down Expand Up @@ -87,13 +88,28 @@ func TestNoBlob(t *testing.T) {
"create table t2(id int, txt text, val varchar(4), unique key(id, val))",
// t3 has a text column and a primary key. The text column will not be in update row events.
"create table t3(id int, txt text, val varchar(4), primary key(id))",
// t4 has a blob column and a primary key, along with a generated virtual column. The blob
// column will not be in update row events.
"create table t4(id int, cOl varbinary(8) generated always as (concat(val, 'tsty')) virtual, blb blob, val varbinary(4), primary key(id))",
},
options: &TestSpecOptions{
noblob: true,
},
}
defer ts.Close()
ts.Init()

insertGeneratedFE := &TestFieldEvent{
table: "t4",
db: testenv.DBName,
cols: []*TestColumn{
{name: "id", dataType: "INT32", colType: "int(11)", len: 11, collationID: 63},
{name: "cOl", dataType: "VARBINARY", colType: "varbinary(8)", len: 8, collationID: 63},
{name: "blb", dataType: "BLOB", colType: "blob", len: 65535, collationID: 63},
{name: "val", dataType: "VARBINARY", colType: "varbinary(4)", len: 4, collationID: 63},
},
}

ts.tests = [][]*TestQuery{{
{"begin", nil},
{"insert into t1 values (1, 'blob1', 'aaa')", nil},
Expand All @@ -107,6 +123,29 @@ func TestNoBlob(t *testing.T) {
{"insert into t3 values (1, 'text1', 'aaa')", nil},
{"update t3 set val = 'bbb'", nil},
{"commit", nil},
}, {{"begin", nil},
{"insert into t4 (id, blb, val) values (1, 'text1', 'aaa')", []TestRowEvent{
{event: insertGeneratedFE.String()},
{spec: &TestRowEventSpec{table: "t4", changes: []TestRowChange{{after: []string{"1", "aaatsty", "text1", "aaa"}}}}},
}},
{"update t4 set val = 'bbb'", []TestRowEvent{
// The blob column is not in the update row event's before or after image.
{spec: &TestRowEventSpec{table: "t4", changes: []TestRowChange{{
beforeRaw: &querypb.Row{
Lengths: []int64{1, 7, -1, 3}, // -1 for the 3rd column / blob field, as it's not present
Values: []byte("1aaatstyaaa"),
},
afterRaw: &querypb.Row{
Lengths: []int64{1, 7, -1, 3}, // -1 for the 3rd column / blob field, as it's not present
Values: []byte("1bbbtstybbb"),
},
dataColumnsRaw: &binlogdatapb.RowChange_Bitmap{
Count: 4,
Cols: []byte{0x0b}, // Columns bitmap of 00001011 as the third column/bit position representing the blob column has no data
},
}}}},
}},
{"commit", nil},
}}
ts.Run()
}
Expand Down

0 comments on commit fdf529e

Please sign in to comment.