Skip to content

Commit

Permalink
[release-17.0] VReplication: Handle SQL NULL and JSON 'null' correctl…
Browse files Browse the repository at this point in the history
…y for JSON columns (#13944) (#13947)

Co-authored-by: vitess-bot[bot] <108069721+vitess-bot[bot]@users.noreply.github.com>
Co-authored-by: Rohit Nayak <[email protected]>
  • Loading branch information
vitess-bot[bot] and rohit-nayak-ps authored Sep 11, 2023
1 parent ccbb4ad commit 0543850
Show file tree
Hide file tree
Showing 10 changed files with 56 additions and 32 deletions.
8 changes: 4 additions & 4 deletions go/mysql/binlog/binlog_json_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ func TestMarshalJSONToSQL(t *testing.T) {
{
name: "null",
data: []byte{},
expected: "CAST(null as JSON)",
expected: "CAST(_utf8mb4'null' as JSON)",
},
{
name: `object {"a": "b"}`,
Expand Down Expand Up @@ -330,17 +330,17 @@ func TestMarshalJSONToSQL(t *testing.T) {
{
name: `true`,
data: []byte{4, 1},
expected: `CAST(true as JSON)`,
expected: `CAST(_utf8mb4'true' as JSON)`,
},
{
name: `false`,
data: []byte{4, 2},
expected: `CAST(false as JSON)`,
expected: `CAST(_utf8mb4'false' as JSON)`,
},
{
name: `null`,
data: []byte{4, 0},
expected: `CAST(null as JSON)`,
expected: `CAST(_utf8mb4'null' as JSON)`,
},
{
name: `-1`,
Expand Down
8 changes: 4 additions & 4 deletions go/mysql/json/marshal.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,24 +137,24 @@ func (v *Value) marshalSQLInternal(top bool, dst []byte) []byte {
return dst
case TypeBoolean:
if top {
dst = append(dst, "CAST("...)
dst = append(dst, "CAST(_utf8mb4'"...)
}
if v == ValueTrue {
dst = append(dst, "true"...)
} else {
dst = append(dst, "false"...)
}
if top {
dst = append(dst, " as JSON)"...)
dst = append(dst, "' as JSON)"...)
}
return dst
case TypeNull:
if top {
dst = append(dst, "CAST("...)
dst = append(dst, "CAST(_utf8mb4'"...)
}
dst = append(dst, "null"...)
if top {
dst = append(dst, " as JSON)"...)
dst = append(dst, "' as JSON)"...)
}
return dst
default:
Expand Down
2 changes: 1 addition & 1 deletion go/test/endtoend/vreplication/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ create table _vt_PURGE_4f9194b43b2011eb8a0104ed332e05c2_20221210194431(id int, v
create table db_order_test (c_uuid varchar(64) not null default '', created_at datetime not null, dstuff varchar(128), dtstuff text, dbstuff blob, cstuff char(32), primary key (c_uuid,created_at), key (dstuff)) CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
create table vdiff_order (order_id varchar(50) collate utf8mb4_unicode_ci not null, primary key (order_id), key (order_id)) charset=utf8mb4 COLLATE=utf8mb4_unicode_ci;
create table datze (id int, dt1 datetime not null default current_timestamp, dt2 datetime not null, ts1 timestamp default current_timestamp, primary key (id), key (dt1));
create table json_tbl (id int, j1 json, j2 json, primary key(id));
create table json_tbl (id int, j1 json, j2 json, j3 json not null, primary key(id));
create table geom_tbl (id int, g geometry, p point, ls linestring, pg polygon, mp multipoint, mls multilinestring, mpg multipolygon, gc geometrycollection, primary key(id));
create table ` + "`blüb_tbl`" + ` (id int, val1 varchar(20), ` + "`blöb1`" + ` blob, val2 varbinary(20), ` + "`bl@b2`" + ` longblob, txt1 text, blb3 tinyblob, txt2 longtext, blb4 mediumblob, primary key(id));
create table reftable (id int, val1 varchar(20), primary key(id), key(val1));
Expand Down
14 changes: 9 additions & 5 deletions go/test/endtoend/vreplication/initial_data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,16 @@ const NumJSONRows = 100

func insertJSONValues(t *testing.T) {
// insert null value combinations
execVtgateQuery(t, vtgateConn, "product:0", "insert into json_tbl(id) values(1)")
execVtgateQuery(t, vtgateConn, "product:0", "insert into json_tbl(id, j1) values(2, \"{}\")")
execVtgateQuery(t, vtgateConn, "product:0", "insert into json_tbl(id, j2) values(3, \"{}\")")
execVtgateQuery(t, vtgateConn, "product:0", "insert into json_tbl(id, j3) values(1, \"{}\")")
execVtgateQuery(t, vtgateConn, "product:0", "insert into json_tbl(id, j1, j3) values(2, \"{}\", \"{}\")")
execVtgateQuery(t, vtgateConn, "product:0", "insert into json_tbl(id, j2, j3) values(3, \"{}\", \"{}\")")
execVtgateQuery(t, vtgateConn, "product:0", "insert into json_tbl(id, j1, j2, j3) values(4, NULL, 'null', '\"null\"')")
execVtgateQuery(t, vtgateConn, "product:0", "insert into json_tbl(id, j3) values(5, JSON_QUOTE('null'))")
execVtgateQuery(t, vtgateConn, "product:0", "insert into json_tbl(id, j3) values(6, '{}')")

id := 4
q := "insert into json_tbl(id, j1, j2) values(%d, '%s', '%s')"
id := 8 // 6 inserted above and one after copy phase is done

q := "insert into json_tbl(id, j1, j2, j3) values(%d, '%s', '%s', '{}')"
numJsonValues := len(jsonValues)
for id <= NumJSONRows {
id++
Expand Down
2 changes: 2 additions & 0 deletions go/test/endtoend/vreplication/vreplication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -725,6 +725,8 @@ func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAl
dec80Replicated := false
execVtgateQuery(t, vtgateConn, sourceKs, "update customer set dec80 = 0")
execVtgateQuery(t, vtgateConn, sourceKs, "update customer set blb = \"new blob data\" where cid=3")
execVtgateQuery(t, vtgateConn, sourceKs, "update json_tbl set j1 = null, j2 = 'null', j3 = '\"null\"'")
execVtgateQuery(t, vtgateConn, sourceKs, "insert into json_tbl(id, j1, j2, j3) values (7, null, 'null', '\"null\"')")
waitForNoWorkflowLag(t, vc, targetKs, workflow)
for _, shard := range []string{"-80", "80-"} {
shardTarget := fmt.Sprintf("%s:%s", targetKs, shard)
Expand Down
17 changes: 9 additions & 8 deletions go/vt/sqlparser/parsed_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,15 +129,16 @@ func (pq *ParsedQuery) AppendFromRow(buf *bytes2.Buffer, fields []*querypb.Field
case querypb.Type_TUPLE:
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "unexpected Type_TUPLE for value %d", i)
case querypb.Type_JSON:
buf2 := sqltypes.NullBytes
if col.length >= 0 {
buf2 = row.Values[col.offset : col.offset+col.length]
}
vv, err := vjson.MarshalSQLValue(buf2)
if err != nil {
return err
if col.length < 0 { // An SQL NULL and not an actual JSON value
buf.WriteString(sqltypes.NullStr)
} else { // A JSON value (which may be a JSON null literal value)
buf2 := row.Values[col.offset : col.offset+col.length]
vv, err := vjson.MarshalSQLValue(buf2)
if err != nil {
return err
}
buf.WriteString(vv.RawStr())
}
buf.WriteString(vv.RawStr())
default:
if col.length < 0 {
// -1 means a null variable; serialize it directly
Expand Down
3 changes: 2 additions & 1 deletion go/vt/vttablet/tabletmanager/vreplication/framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -696,6 +696,7 @@ func customExpectData(t *testing.T, table string, values [][]string, exec func(c
if err == nil {
return
}
log.Errorf("data mismatch: %v, retrying", err)
time.Sleep(tick)
}
}
Expand All @@ -718,7 +719,7 @@ func compareQueryResults(t *testing.T, query string, values [][]string,
}
for j, val := range row {
if got := qr.Rows[i][j].ToString(); got != val {
return fmt.Errorf("mismatch at (%d, %d): %v, want %s", i, j, qr.Rows[i][j], val)
return fmt.Errorf("mismatch at (%d, %d): got '%s', want '%s'", i, j, qr.Rows[i][j].ToString(), val)
}
}
}
Expand Down
10 changes: 7 additions & 3 deletions go/vt/vttablet/tabletmanager/vreplication/replicator_plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,9 +386,13 @@ func (tp *TablePlan) applyChange(rowChange *binlogdatapb.RowChange, executor fun
var newVal *sqltypes.Value
var err error
if field.Type == querypb.Type_JSON {
newVal, err = vjson.MarshalSQLValue(vals[i].Raw())
if err != nil {
return nil, err
if vals[i].IsNull() { // An SQL NULL and not an actual JSON value
newVal = &sqltypes.NULL
} else { // A JSON value (which may be a JSON null literal value)
newVal, err = vjson.MarshalSQLValue(vals[i].Raw())
if err != nil {
return nil, err
}
}
bindVar, err = tp.bindFieldVal(field, newVal)
} else {
Expand Down
11 changes: 7 additions & 4 deletions go/vt/vttablet/tabletmanager/vreplication/vcopier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -563,7 +563,7 @@ func testPlayerCopyTables(t *testing.T) {

execStatements(t, []string{
"create table src1(id int, val varbinary(128), d decimal(8,0), j json, primary key(id))",
"insert into src1 values(2, 'bbb', 1, '{\"foo\": \"bar\"}'), (1, 'aaa', 0, JSON_ARRAY(123456789012345678901234567890, \"abcd\"))",
"insert into src1 values(2, 'bbb', 1, '{\"foo\": \"bar\"}'), (1, 'aaa', 0, JSON_ARRAY(123456789012345678901234567890, \"abcd\")), (3, 'ccc', 2, 'null'), (4, 'ddd', 3, '{\"name\": \"matt\", \"size\": null}'), (5, 'eee', 4, null)",
fmt.Sprintf("create table %s.dst1(id int, val varbinary(128), val2 varbinary(128), d decimal(8,0), j json, primary key(id))", vrepldb),
"create table yes(id int, val varbinary(128), primary key(id))",
fmt.Sprintf("create table %s.yes(id int, val varbinary(128), primary key(id))", vrepldb),
Expand Down Expand Up @@ -617,8 +617,8 @@ func testPlayerCopyTables(t *testing.T) {
// The first fast-forward has no starting point. So, it just saves the current position.
"/update _vt.vreplication set pos=",
"begin",
"insert into dst1(id,val,val2,d,j) values (1,'aaa','aaa',0,JSON_ARRAY(123456789012345678901234567890, _utf8mb4'abcd')), (2,'bbb','bbb',1,JSON_OBJECT(_utf8mb4'foo', _utf8mb4'bar'))",
`/insert into _vt.copy_state \(lastpk, vrepl_id, table_name\) values \('fields:{name:\\"id\\" type:INT32 charset:63 flags:53251} rows:{lengths:1 values:\\"2\\"}'.*`,
"insert into dst1(id,val,val2,d,j) values (1,'aaa','aaa',0,JSON_ARRAY(123456789012345678901234567890, _utf8mb4'abcd')), (2,'bbb','bbb',1,JSON_OBJECT(_utf8mb4'foo', _utf8mb4'bar')), (3,'ccc','ccc',2,CAST(_utf8mb4'null' as JSON)), (4,'ddd','ddd',3,JSON_OBJECT(_utf8mb4'name', _utf8mb4'matt', _utf8mb4'size', null)), (5,'eee','eee',4,null)",
`/insert into _vt.copy_state \(lastpk, vrepl_id, table_name\) values \('fields:{name:\\"id\\" type:INT32 charset:63 flags:53251} rows:{lengths:1 values:\\"5\\"}'.*`,
"commit",
// copy of dst1 is done: delete from copy_state.
"/delete cs, pca from _vt.copy_state as cs left join _vt.post_copy_action as pca on cs.vrepl_id=pca.vrepl_id and cs.table_name=pca.table_name.*dst1",
Expand All @@ -634,9 +634,12 @@ func testPlayerCopyTables(t *testing.T) {
expectData(t, "dst1", [][]string{
{"1", "aaa", "aaa", "0", "[123456789012345678901234567890, \"abcd\"]"},
{"2", "bbb", "bbb", "1", "{\"foo\": \"bar\"}"},
{"3", "ccc", "ccc", "2", "null"},
{"4", "ddd", "ddd", "3", "{\"name\": \"matt\", \"size\": null}"},
{"5", "eee", "eee", "4", ""},
})
expectData(t, "yes", [][]string{})
validateCopyRowCountStat(t, 2)
validateCopyRowCountStat(t, 5)
ctx, cancel := context.WithCancel(context.Background())

type logTestCase struct {
Expand Down
13 changes: 11 additions & 2 deletions go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1584,17 +1584,26 @@ func TestPlayerTypes(t *testing.T) {
},
}, {
input: "insert into vitess_json(val1,val2,val3,val4,val5) values (null,'{}','123','{\"a\":[42,100]}','{\"foo\": \"bar\"}')",
output: "insert into vitess_json(id,val1,val2,val3,val4,val5) values (1,CAST(null as JSON),JSON_OBJECT(),CAST(123 as JSON),JSON_OBJECT(_utf8mb4'a', JSON_ARRAY(42, 100)),JSON_OBJECT(_utf8mb4'foo', _utf8mb4'bar'))",
output: "insert into vitess_json(id,val1,val2,val3,val4,val5) values (1,null,JSON_OBJECT(),CAST(123 as JSON),JSON_OBJECT(_utf8mb4'a', JSON_ARRAY(42, 100)),JSON_OBJECT(_utf8mb4'foo', _utf8mb4'bar'))",
table: "vitess_json",
data: [][]string{
{"1", "", "{}", "123", `{"a": [42, 100]}`, `{"foo": "bar"}`},
},
}, {
input: "update vitess_json set val1 = '{\"bar\": \"foo\"}', val4 = '{\"a\": [98, 123]}', val5 = convert(x'7b7d' using utf8mb4)",
input: "insert into vitess_json(val1,val2,val3,val4,val5) values ('null', '{\"name\":null}','123','{\"a\":[42,100]}','{\"foo\": \"bar\"}')",
output: "insert into vitess_json(id,val1,val2,val3,val4,val5) values (2,CAST(_utf8mb4'null' as JSON),JSON_OBJECT(_utf8mb4'name', null),CAST(123 as JSON),JSON_OBJECT(_utf8mb4'a', JSON_ARRAY(42, 100)),JSON_OBJECT(_utf8mb4'foo', _utf8mb4'bar'))",
table: "vitess_json",
data: [][]string{
{"1", "", "{}", "123", `{"a": [42, 100]}`, `{"foo": "bar"}`},
{"2", "null", `{"name": null}`, "123", `{"a": [42, 100]}`, `{"foo": "bar"}`},
},
}, {
input: "update vitess_json set val1 = '{\"bar\": \"foo\"}', val4 = '{\"a\": [98, 123]}', val5 = convert(x'7b7d' using utf8mb4) where id=1",
output: "update vitess_json set val1=JSON_OBJECT(_utf8mb4'bar', _utf8mb4'foo'), val2=JSON_OBJECT(), val3=CAST(123 as JSON), val4=JSON_OBJECT(_utf8mb4'a', JSON_ARRAY(98, 123)), val5=JSON_OBJECT() where id=1",
table: "vitess_json",
data: [][]string{
{"1", `{"bar": "foo"}`, "{}", "123", `{"a": [98, 123]}`, `{}`},
{"2", "null", `{"name": null}`, "123", `{"a": [42, 100]}`, `{"foo": "bar"}`},
},
}}

Expand Down

0 comments on commit 0543850

Please sign in to comment.