diff --git a/go/mysql/binlog/binlog_json_test.go b/go/mysql/binlog/binlog_json_test.go index f6d4fe7fcf2..5652b58567e 100644 --- a/go/mysql/binlog/binlog_json_test.go +++ b/go/mysql/binlog/binlog_json_test.go @@ -265,7 +265,7 @@ func TestMarshalJSONToSQL(t *testing.T) { { name: "null", data: []byte{}, - expected: "CAST(null as JSON)", + expected: "CAST(_utf8mb4'null' as JSON)", }, { name: `object {"a": "b"}`, @@ -330,17 +330,17 @@ func TestMarshalJSONToSQL(t *testing.T) { { name: `true`, data: []byte{4, 1}, - expected: `CAST(true as JSON)`, + expected: `CAST(_utf8mb4'true' as JSON)`, }, { name: `false`, data: []byte{4, 2}, - expected: `CAST(false as JSON)`, + expected: `CAST(_utf8mb4'false' as JSON)`, }, { name: `null`, data: []byte{4, 0}, - expected: `CAST(null as JSON)`, + expected: `CAST(_utf8mb4'null' as JSON)`, }, { name: `-1`, diff --git a/go/mysql/json/marshal.go b/go/mysql/json/marshal.go index 77d30285a69..e1ea916151d 100644 --- a/go/mysql/json/marshal.go +++ b/go/mysql/json/marshal.go @@ -137,7 +137,7 @@ func (v *Value) marshalSQLInternal(top bool, dst []byte) []byte { return dst case TypeBoolean: if top { - dst = append(dst, "CAST("...) + dst = append(dst, "CAST(_utf8mb4'"...) } if v == ValueTrue { dst = append(dst, "true"...) @@ -145,16 +145,16 @@ func (v *Value) marshalSQLInternal(top bool, dst []byte) []byte { dst = append(dst, "false"...) } if top { - dst = append(dst, " as JSON)"...) + dst = append(dst, "' as JSON)"...) } return dst case TypeNull: if top { - dst = append(dst, "CAST("...) + dst = append(dst, "CAST(_utf8mb4'"...) } dst = append(dst, "null"...) if top { - dst = append(dst, " as JSON)"...) + dst = append(dst, "' as JSON)"...) } return dst default: diff --git a/go/test/endtoend/vreplication/config_test.go b/go/test/endtoend/vreplication/config_test.go index 213ad0bcc75..b8684b04c71 100644 --- a/go/test/endtoend/vreplication/config_test.go +++ b/go/test/endtoend/vreplication/config_test.go @@ -55,7 +55,7 @@ create table _vt_PURGE_4f9194b43b2011eb8a0104ed332e05c2_20221210194431(id int, v create table db_order_test (c_uuid varchar(64) not null default '', created_at datetime not null, dstuff varchar(128), dtstuff text, dbstuff blob, cstuff char(32), primary key (c_uuid,created_at), key (dstuff)) CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci; create table vdiff_order (order_id varchar(50) collate utf8mb4_unicode_ci not null, primary key (order_id), key (order_id)) charset=utf8mb4 COLLATE=utf8mb4_unicode_ci; create table datze (id int, dt1 datetime not null default current_timestamp, dt2 datetime not null, ts1 timestamp default current_timestamp, primary key (id), key (dt1)); -create table json_tbl (id int, j1 json, j2 json, primary key(id)); +create table json_tbl (id int, j1 json, j2 json, j3 json not null, primary key(id)); create table geom_tbl (id int, g geometry, p point, ls linestring, pg polygon, mp multipoint, mls multilinestring, mpg multipolygon, gc geometrycollection, primary key(id)); create table ` + "`blüb_tbl`" + ` (id int, val1 varchar(20), ` + "`blöb1`" + ` blob, val2 varbinary(20), ` + "`bl@b2`" + ` longblob, txt1 text, blb3 tinyblob, txt2 longtext, blb4 mediumblob, primary key(id)); create table reftable (id int, val1 varchar(20), primary key(id), key(val1)); diff --git a/go/test/endtoend/vreplication/initial_data_test.go b/go/test/endtoend/vreplication/initial_data_test.go index 828c7136373..9443f62abc2 100644 --- a/go/test/endtoend/vreplication/initial_data_test.go +++ b/go/test/endtoend/vreplication/initial_data_test.go @@ -48,12 +48,16 @@ const NumJSONRows = 100 func insertJSONValues(t *testing.T) { // insert null value combinations - execVtgateQuery(t, vtgateConn, "product:0", "insert into json_tbl(id) values(1)") - execVtgateQuery(t, vtgateConn, "product:0", "insert into json_tbl(id, j1) values(2, \"{}\")") - execVtgateQuery(t, vtgateConn, "product:0", "insert into json_tbl(id, j2) values(3, \"{}\")") + execVtgateQuery(t, vtgateConn, "product:0", "insert into json_tbl(id, j3) values(1, \"{}\")") + execVtgateQuery(t, vtgateConn, "product:0", "insert into json_tbl(id, j1, j3) values(2, \"{}\", \"{}\")") + execVtgateQuery(t, vtgateConn, "product:0", "insert into json_tbl(id, j2, j3) values(3, \"{}\", \"{}\")") + execVtgateQuery(t, vtgateConn, "product:0", "insert into json_tbl(id, j1, j2, j3) values(4, NULL, 'null', '\"null\"')") + execVtgateQuery(t, vtgateConn, "product:0", "insert into json_tbl(id, j3) values(5, JSON_QUOTE('null'))") + execVtgateQuery(t, vtgateConn, "product:0", "insert into json_tbl(id, j3) values(6, '{}')") - id := 4 - q := "insert into json_tbl(id, j1, j2) values(%d, '%s', '%s')" + id := 8 // 6 inserted above and one after copy phase is done + + q := "insert into json_tbl(id, j1, j2, j3) values(%d, '%s', '%s', '{}')" numJsonValues := len(jsonValues) for id <= NumJSONRows { id++ diff --git a/go/test/endtoend/vreplication/vreplication_test.go b/go/test/endtoend/vreplication/vreplication_test.go index 38a1747103a..96109fc5a7f 100644 --- a/go/test/endtoend/vreplication/vreplication_test.go +++ b/go/test/endtoend/vreplication/vreplication_test.go @@ -725,6 +725,8 @@ func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAl dec80Replicated := false execVtgateQuery(t, vtgateConn, sourceKs, "update customer set dec80 = 0") execVtgateQuery(t, vtgateConn, sourceKs, "update customer set blb = \"new blob data\" where cid=3") + execVtgateQuery(t, vtgateConn, sourceKs, "update json_tbl set j1 = null, j2 = 'null', j3 = '\"null\"'") + execVtgateQuery(t, vtgateConn, sourceKs, "insert into json_tbl(id, j1, j2, j3) values (7, null, 'null', '\"null\"')") waitForNoWorkflowLag(t, vc, targetKs, workflow) for _, shard := range []string{"-80", "80-"} { shardTarget := fmt.Sprintf("%s:%s", targetKs, shard) diff --git a/go/vt/sqlparser/parsed_query.go b/go/vt/sqlparser/parsed_query.go index 1c9ad47010f..b6b03a1901a 100644 --- a/go/vt/sqlparser/parsed_query.go +++ b/go/vt/sqlparser/parsed_query.go @@ -129,15 +129,16 @@ func (pq *ParsedQuery) AppendFromRow(buf *bytes2.Buffer, fields []*querypb.Field case querypb.Type_TUPLE: return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "unexpected Type_TUPLE for value %d", i) case querypb.Type_JSON: - buf2 := sqltypes.NullBytes - if col.length >= 0 { - buf2 = row.Values[col.offset : col.offset+col.length] - } - vv, err := vjson.MarshalSQLValue(buf2) - if err != nil { - return err + if col.length < 0 { // An SQL NULL and not an actual JSON value + buf.WriteString(sqltypes.NullStr) + } else { // A JSON value (which may be a JSON null literal value) + buf2 := row.Values[col.offset : col.offset+col.length] + vv, err := vjson.MarshalSQLValue(buf2) + if err != nil { + return err + } + buf.WriteString(vv.RawStr()) } - buf.WriteString(vv.RawStr()) default: if col.length < 0 { // -1 means a null variable; serialize it directly diff --git a/go/vt/vttablet/tabletmanager/vreplication/framework_test.go b/go/vt/vttablet/tabletmanager/vreplication/framework_test.go index 8260d95b462..6e85df8e855 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/framework_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/framework_test.go @@ -696,6 +696,7 @@ func customExpectData(t *testing.T, table string, values [][]string, exec func(c if err == nil { return } + log.Errorf("data mismatch: %v, retrying", err) time.Sleep(tick) } } @@ -718,7 +719,7 @@ func compareQueryResults(t *testing.T, query string, values [][]string, } for j, val := range row { if got := qr.Rows[i][j].ToString(); got != val { - return fmt.Errorf("mismatch at (%d, %d): %v, want %s", i, j, qr.Rows[i][j], val) + return fmt.Errorf("mismatch at (%d, %d): got '%s', want '%s'", i, j, qr.Rows[i][j].ToString(), val) } } } diff --git a/go/vt/vttablet/tabletmanager/vreplication/replicator_plan.go b/go/vt/vttablet/tabletmanager/vreplication/replicator_plan.go index b07933519a6..6d38784bccd 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/replicator_plan.go +++ b/go/vt/vttablet/tabletmanager/vreplication/replicator_plan.go @@ -386,9 +386,13 @@ func (tp *TablePlan) applyChange(rowChange *binlogdatapb.RowChange, executor fun var newVal *sqltypes.Value var err error if field.Type == querypb.Type_JSON { - newVal, err = vjson.MarshalSQLValue(vals[i].Raw()) - if err != nil { - return nil, err + if vals[i].IsNull() { // An SQL NULL and not an actual JSON value + newVal = &sqltypes.NULL + } else { // A JSON value (which may be a JSON null literal value) + newVal, err = vjson.MarshalSQLValue(vals[i].Raw()) + if err != nil { + return nil, err + } } bindVar, err = tp.bindFieldVal(field, newVal) } else { diff --git a/go/vt/vttablet/tabletmanager/vreplication/vcopier_test.go b/go/vt/vttablet/tabletmanager/vreplication/vcopier_test.go index e1b1686b0d7..1c977d6287c 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vcopier_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vcopier_test.go @@ -563,7 +563,7 @@ func testPlayerCopyTables(t *testing.T) { execStatements(t, []string{ "create table src1(id int, val varbinary(128), d decimal(8,0), j json, primary key(id))", - "insert into src1 values(2, 'bbb', 1, '{\"foo\": \"bar\"}'), (1, 'aaa', 0, JSON_ARRAY(123456789012345678901234567890, \"abcd\"))", + "insert into src1 values(2, 'bbb', 1, '{\"foo\": \"bar\"}'), (1, 'aaa', 0, JSON_ARRAY(123456789012345678901234567890, \"abcd\")), (3, 'ccc', 2, 'null'), (4, 'ddd', 3, '{\"name\": \"matt\", \"size\": null}'), (5, 'eee', 4, null)", fmt.Sprintf("create table %s.dst1(id int, val varbinary(128), val2 varbinary(128), d decimal(8,0), j json, primary key(id))", vrepldb), "create table yes(id int, val varbinary(128), primary key(id))", fmt.Sprintf("create table %s.yes(id int, val varbinary(128), primary key(id))", vrepldb), @@ -617,8 +617,8 @@ func testPlayerCopyTables(t *testing.T) { // The first fast-forward has no starting point. So, it just saves the current position. "/update _vt.vreplication set pos=", "begin", - "insert into dst1(id,val,val2,d,j) values (1,'aaa','aaa',0,JSON_ARRAY(123456789012345678901234567890, _utf8mb4'abcd')), (2,'bbb','bbb',1,JSON_OBJECT(_utf8mb4'foo', _utf8mb4'bar'))", - `/insert into _vt.copy_state \(lastpk, vrepl_id, table_name\) values \('fields:{name:\\"id\\" type:INT32 charset:63 flags:53251} rows:{lengths:1 values:\\"2\\"}'.*`, + "insert into dst1(id,val,val2,d,j) values (1,'aaa','aaa',0,JSON_ARRAY(123456789012345678901234567890, _utf8mb4'abcd')), (2,'bbb','bbb',1,JSON_OBJECT(_utf8mb4'foo', _utf8mb4'bar')), (3,'ccc','ccc',2,CAST(_utf8mb4'null' as JSON)), (4,'ddd','ddd',3,JSON_OBJECT(_utf8mb4'name', _utf8mb4'matt', _utf8mb4'size', null)), (5,'eee','eee',4,null)", + `/insert into _vt.copy_state \(lastpk, vrepl_id, table_name\) values \('fields:{name:\\"id\\" type:INT32 charset:63 flags:53251} rows:{lengths:1 values:\\"5\\"}'.*`, "commit", // copy of dst1 is done: delete from copy_state. "/delete cs, pca from _vt.copy_state as cs left join _vt.post_copy_action as pca on cs.vrepl_id=pca.vrepl_id and cs.table_name=pca.table_name.*dst1", @@ -634,9 +634,12 @@ func testPlayerCopyTables(t *testing.T) { expectData(t, "dst1", [][]string{ {"1", "aaa", "aaa", "0", "[123456789012345678901234567890, \"abcd\"]"}, {"2", "bbb", "bbb", "1", "{\"foo\": \"bar\"}"}, + {"3", "ccc", "ccc", "2", "null"}, + {"4", "ddd", "ddd", "3", "{\"name\": \"matt\", \"size\": null}"}, + {"5", "eee", "eee", "4", ""}, }) expectData(t, "yes", [][]string{}) - validateCopyRowCountStat(t, 2) + validateCopyRowCountStat(t, 5) ctx, cancel := context.WithCancel(context.Background()) type logTestCase struct { diff --git a/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go b/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go index a16ecb9e4e0..7b291f6a99c 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go @@ -1584,17 +1584,26 @@ func TestPlayerTypes(t *testing.T) { }, }, { input: "insert into vitess_json(val1,val2,val3,val4,val5) values (null,'{}','123','{\"a\":[42,100]}','{\"foo\": \"bar\"}')", - output: "insert into vitess_json(id,val1,val2,val3,val4,val5) values (1,CAST(null as JSON),JSON_OBJECT(),CAST(123 as JSON),JSON_OBJECT(_utf8mb4'a', JSON_ARRAY(42, 100)),JSON_OBJECT(_utf8mb4'foo', _utf8mb4'bar'))", + output: "insert into vitess_json(id,val1,val2,val3,val4,val5) values (1,null,JSON_OBJECT(),CAST(123 as JSON),JSON_OBJECT(_utf8mb4'a', JSON_ARRAY(42, 100)),JSON_OBJECT(_utf8mb4'foo', _utf8mb4'bar'))", table: "vitess_json", data: [][]string{ {"1", "", "{}", "123", `{"a": [42, 100]}`, `{"foo": "bar"}`}, }, }, { - input: "update vitess_json set val1 = '{\"bar\": \"foo\"}', val4 = '{\"a\": [98, 123]}', val5 = convert(x'7b7d' using utf8mb4)", + input: "insert into vitess_json(val1,val2,val3,val4,val5) values ('null', '{\"name\":null}','123','{\"a\":[42,100]}','{\"foo\": \"bar\"}')", + output: "insert into vitess_json(id,val1,val2,val3,val4,val5) values (2,CAST(_utf8mb4'null' as JSON),JSON_OBJECT(_utf8mb4'name', null),CAST(123 as JSON),JSON_OBJECT(_utf8mb4'a', JSON_ARRAY(42, 100)),JSON_OBJECT(_utf8mb4'foo', _utf8mb4'bar'))", + table: "vitess_json", + data: [][]string{ + {"1", "", "{}", "123", `{"a": [42, 100]}`, `{"foo": "bar"}`}, + {"2", "null", `{"name": null}`, "123", `{"a": [42, 100]}`, `{"foo": "bar"}`}, + }, + }, { + input: "update vitess_json set val1 = '{\"bar\": \"foo\"}', val4 = '{\"a\": [98, 123]}', val5 = convert(x'7b7d' using utf8mb4) where id=1", output: "update vitess_json set val1=JSON_OBJECT(_utf8mb4'bar', _utf8mb4'foo'), val2=JSON_OBJECT(), val3=CAST(123 as JSON), val4=JSON_OBJECT(_utf8mb4'a', JSON_ARRAY(98, 123)), val5=JSON_OBJECT() where id=1", table: "vitess_json", data: [][]string{ {"1", `{"bar": "foo"}`, "{}", "123", `{"a": [98, 123]}`, `{}`}, + {"2", "null", `{"name": null}`, "123", `{"a": [42, 100]}`, `{"foo": "bar"}`}, }, }}