diff --git a/go/mysql/binlog/binlog_json.go b/go/mysql/binlog/binlog_json.go index 8db78edfd23..33a9c474bd8 100644 --- a/go/mysql/binlog/binlog_json.go +++ b/go/mysql/binlog/binlog_json.go @@ -127,7 +127,7 @@ func ParseBinaryJSONDiff(data []byte) (sqltypes.Value, error) { diff.WriteByte('\'') if opType == jsonDiffOpRemove { // No value for remove - diff.WriteString(")") + diff.WriteByte(')') } else { diff.WriteString(", ") valueLen, readTo := readVariableLength(data, pos) @@ -141,7 +141,8 @@ func ParseBinaryJSONDiff(data []byte) (sqltypes.Value, error) { if value.Type() == json.TypeString { diff.WriteString(sqlparser.Utf8mb4Str) } - diff.WriteString(fmt.Sprintf("%s)", value)) + diff.Write(value.MarshalTo(nil)) + diff.WriteByte(')') } outer = true diff --git a/go/test/utils/binlog.go b/go/test/utils/binlog.go index d3f686f1a8a..a9eafc4941d 100644 --- a/go/test/utils/binlog.go +++ b/go/test/utils/binlog.go @@ -27,7 +27,8 @@ const ( BinlogRowImageCnf = "binlog-row-image.cnf" ) -// SetBinlogRowImageMode creates a temp cnf file to set binlog_row_image to noblob for vreplication unit tests. +// SetBinlogRowImageMode creates a temp cnf file to set binlog_row_image=NOBLOB and +// binlog_row_value_options=PARTIAL_JSON for vreplication unit tests. // It adds it to the EXTRA_MY_CNF environment variable which appends text from them into my.cnf. func SetBinlogRowImageMode(mode string, cnfDir string) error { var newCnfs []string @@ -55,6 +56,15 @@ func SetBinlogRowImageMode(mode string, cnfDir string) error { if err != nil { return err } + lm := strings.ToLower(mode) + if lm == "noblob" || lm == "minimal" { + // We're testing partial binlog row images so let's also test partial + // JSON values in the images. + _, err = f.WriteString("\nbinlog_row_value_options=PARTIAL_JSON\n") + if err != nil { + return err + } + } err = f.Close() if err != nil { return err diff --git a/go/vt/vttablet/tabletmanager/vreplication/replicator_plan.go b/go/vt/vttablet/tabletmanager/vreplication/replicator_plan.go index 43e1e2dc1e1..fa3a225b18b 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/replicator_plan.go +++ b/go/vt/vttablet/tabletmanager/vreplication/replicator_plan.go @@ -528,7 +528,7 @@ func (tp *TablePlan) applyChange(rowChange *binlogdatapb.RowChange, executor fun } if !isBitSet(rowChange.DataColumns.Cols, i) { return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, - "binary log event missing a needed value for %s.%s due to the usage of binlog-row-image=NOBLOB; you will need to re-run the workflow with binlog-row-image=FULL", + "binary log event missing a needed value for %s.%s due to not using binlog-row-image=FULL; you will need to re-run the workflow with binlog-row-image=FULL", tp.TargetName, field.Name) } } @@ -629,11 +629,28 @@ func (tp *TablePlan) applyBulkInsertChanges(rowInserts []*binlogdatapb.RowChange newStmt := true for _, rowInsert := range rowInserts { + var ( + err error + bindVar *querypb.BindVariable + ) rowValues := &strings.Builder{} bindvars := make(map[string]*querypb.BindVariable, len(tp.Fields)) vals := sqltypes.MakeRowTrusted(tp.Fields, rowInsert.After) for n, field := range tp.Fields { - bindVar, err := tp.bindFieldVal(field, &vals[n]) + if field.Type == querypb.Type_JSON { + var jsVal *sqltypes.Value + if vals[n].IsNull() { // An SQL NULL and not an actual JSON value + jsVal = &sqltypes.NULL + } else { // A JSON value (which may be a JSON null literal value) + jsVal, err = vjson.MarshalSQLValue(vals[n].Raw()) + if err != nil { + return nil, err + } + } + bindVar, err = tp.bindFieldVal(field, jsVal) + } else { + bindVar, err = tp.bindFieldVal(field, &vals[n]) + } if err != nil { return nil, err } diff --git a/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go b/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go index 9c17780b66e..90b514b5b6d 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go @@ -1519,13 +1519,20 @@ func TestPlayerRowMove(t *testing.T) { validateQueryCountStat(t, "replicate", 3) } -/* TODO: build this out and get it working -func TestPlayerUpdatePK(t *testing.T) { - defer deleteTablet(addTablet(100)) +// TestPlayerPartialImagesUpdatePK tests the behavior of the vplayer when we +// have partial binlog images, meaning that binlog-row-image=NOBLOB and +// binlog-row-value-options=PARTIAL_JSON. These are both set together when +// running the unit tests with runNoBlobTest=true. So we skip the test if +// it's not set. +func TestPlayerPartialImagesUpdatePK(t *testing.T) { + if !runNoBlobTest { + t.Skip("Skipping test as runNoBlobTest is not set") + } + defer deleteTablet(addTablet(100)) execStatements(t, []string{ - "create table src(id int, bd blob, jd json, primary key(id))", - fmt.Sprintf("create table %s.dst(id int, bd blob, jd json, primary key(id))", vrepldb), + "create table src (id int, jd json, bd blob, primary key(id))", + fmt.Sprintf("create table %s.dst (id int, jd json, bd blob, primary key(id))", vrepldb), }) defer execStatements(t, []string{ "drop table src", @@ -1547,38 +1554,73 @@ func TestPlayerUpdatePK(t *testing.T) { cancel, _ := startVReplication(t, bls, "") defer cancel() - execStatements(t, []string{ - "insert into src values(1, 'blob data', _utf8mb4'{\"key1\":\"val1\"}'), (2, 'blob data2', _utf8mb4'{\"key2\":\"val2\"}'), (3, 'blob data3', _utf8mb4'{\"key3\":\"val3\"}')", - }) - expectDBClientQueries(t, qh.Expect( - "begin", - "insert into dst(id,bd,jd) values (1,_binary'blob data','{\"key1\": \"val1\"}'), (2,_binary'blob data2','{\"key2\": \"val2\"}'), (3,_binary'blob data3','{\"key3\": \"val3\"}')", - "/update _vt.vreplication set pos=", - "commit", - )) - expectData(t, "dst", [][]string{ - {"1", "1", "1"}, - {"2", "5", "2"}, - }) - validateQueryCountStat(t, "replicate", 1) - - execStatements(t, []string{ - "update src set val1=1, val2=4 where id=3", - }) - expectDBClientQueries(t, qh.Expect( - "begin", - "update dst set sval2=sval2-ifnull(3, 0), rcount=rcount-1 where val1=2", - "insert into dst(val1,sval2,rcount) values (1,ifnull(4, 0),1) on duplicate key update sval2=sval2+ifnull(values(sval2), 0), rcount=rcount+1", - "/update _vt.vreplication set pos=", - "commit", - )) - expectData(t, "dst", [][]string{ - {"1", "5", "2"}, - {"2", "2", "1"}, - }) - validateQueryCountStat(t, "replicate", 3) + testCases := []struct { + input string + output []string + data [][]string + error string + }{ + { + input: "insert into src (id, jd, bd) values (1,'{\"key1\": \"val1\"}','blob data'), (2,'{\"key2\": \"val2\"}','blob data2'), (3,'{\"key3\": \"val3\"}','blob data3')", + output: []string{"insert into dst(id,jd,bd) values (1,JSON_OBJECT(_utf8mb4'key1', _utf8mb4'val1'),_binary'blob data'), (2,JSON_OBJECT(_utf8mb4'key2', _utf8mb4'val2'),_binary'blob data2'), (3,JSON_OBJECT(_utf8mb4'key3', _utf8mb4'val3'),_binary'blob data3')"}, + data: [][]string{ + {"1", "{\"key1\": \"val1\"}", "blob data"}, + {"2", "{\"key2\": \"val2\"}", "blob data2"}, + {"3", "{\"key3\": \"val3\"}", "blob data3"}, + }, + }, + { + input: `update src set jd=JSON_SET(jd, '$.color', 'red') where id = 1`, + output: []string{"update dst set jd=JSON_INSERT(`jd`, _utf8mb4'$.color', _utf8mb4\"red\") where id=1"}, + data: [][]string{ + {"1", "{\"key1\": \"val1\", \"color\": \"red\"}", "blob data"}, + {"2", "{\"key2\": \"val2\"}", "blob data2"}, + {"3", "{\"key3\": \"val3\"}", "blob data3"}, + }, + }, + { + input: `update src set id = id+10, bd = 'new blob data' where id = 2`, + output: []string{ + "delete from dst where id=2", + "insert into dst(id,jd,bd) values (12,JSON_OBJECT(_utf8mb4'key2', _utf8mb4'val2'),_binary'new blob data')", + }, + data: [][]string{ + {"1", "{\"key1\": \"val1\", \"color\": \"red\"}", "blob data"}, + {"3", "{\"key3\": \"val3\"}", "blob data3"}, + {"12", "{\"key2\": \"val2\"}", "new blob data"}, + }, + }, + { + input: `update src set id = id+10 where id = 3`, + error: "binary log event missing a needed value for dst.bd due to not using binlog-row-image=FULL", + }, + } + + for _, tc := range testCases { + t.Run(tc.input, func(t *testing.T) { + execStatements(t, []string{tc.input}) + var want qh.ExpectationSequencer + if tc.error != "" { + want = qh.Expect( + "rollback", + ).Then(qh.Immediately( + fmt.Sprintf("/update _vt.vreplication set message=.*%s.*", tc.error), + )) + expectDBClientQueries(t, want) + } else { + want = qh.Expect( + "begin", + tc.output..., + ).Then(qh.Immediately( + "/update _vt.vreplication set pos=", + "commit", + )) + expectDBClientQueries(t, want) + expectData(t, "dst", tc.data) + } + }) + } } -*/ func TestPlayerTypes(t *testing.T) { defer deleteTablet(addTablet(100)) @@ -1714,15 +1756,30 @@ func TestPlayerTypes(t *testing.T) { {"1", "", "{}", "123", `{"a": [42, 100]}`, `{"foo": "bar"}`}, {"2", "null", `{"name": null}`, "123", `{"a": [42, 100]}`, `{"foo": "bar"}`}, }, - }, { - input: "update vitess_json set val1 = '{\"bar\": \"foo\"}', val4 = '{\"a\": [98, 123]}', val5 = convert(x'7b7d' using utf8mb4) where id=1", - output: "update vitess_json set val1=JSON_OBJECT(_utf8mb4'bar', _utf8mb4'foo'), val2=JSON_OBJECT(), val3=CAST(123 as JSON), val4=JSON_OBJECT(_utf8mb4'a', JSON_ARRAY(98, 123)), val5=JSON_OBJECT() where id=1", - table: "vitess_json", - data: [][]string{ - {"1", `{"bar": "foo"}`, "{}", "123", `{"a": [98, 123]}`, `{}`}, - {"2", "null", `{"name": null}`, "123", `{"a": [42, 100]}`, `{"foo": "bar"}`}, - }, }} + if !runNoBlobTest { + testcases = append(testcases, testcase{ + input: "update vitess_json set val1 = '{\"bar\": \"foo\"}', val4 = '{\"a\": [98, 123]}', val5 = convert(x'7b7d' using utf8mb4) where id=1", + output: "update vitess_json set val1=JSON_OBJECT(_utf8mb4'bar', _utf8mb4'foo'), val2=JSON_OBJECT(), val3=CAST(123 as JSON), val4=JSON_OBJECT(_utf8mb4'a', JSON_ARRAY(98, 123)), val5=JSON_OBJECT() where id=1", + table: "vitess_json", + data: [][]string{ + {"1", `{"bar": "foo"}`, "{}", "123", `{"a": [98, 123]}`, `{}`}, + {"2", "null", `{"name": null}`, "123", `{"a": [42, 100]}`, `{"foo": "bar"}`}, + }, + }) + } else { + // With partial JSON values we don't replicate the JSON columns that aren't + // actually updated. + testcases = append(testcases, testcase{ + input: "update vitess_json set val1 = '{\"bar\": \"foo\"}', val4 = '{\"a\": [98, 123]}', val5 = convert(x'7b7d' using utf8mb4) where id=1", + output: "update vitess_json set val1=JSON_OBJECT(_utf8mb4'bar', _utf8mb4'foo'), val4=JSON_OBJECT(_utf8mb4'a', JSON_ARRAY(98, 123)), val5=JSON_OBJECT() where id=1", + table: "vitess_json", + data: [][]string{ + {"1", `{"bar": "foo"}`, "{}", "123", `{"a": [98, 123]}`, `{}`}, + {"2", "null", `{"name": null}`, "123", `{"a": [42, 100]}`, `{"foo": "bar"}`}, + }, + }) + } for _, tcases := range testcases { execStatements(t, []string{tcases.input})