Skip to content

Commit

Permalink
Get N diffs working
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord committed Dec 9, 2024
1 parent 0b2b9cf commit 7be99a3
Show file tree
Hide file tree
Showing 3 changed files with 143 additions and 124 deletions.
92 changes: 50 additions & 42 deletions go/mysql/binlog/binlog_json.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,53 +74,61 @@ func ParseBinaryJSON(data []byte) (*json.Value, error) {
// ParseBinaryJSONDiff provides the parsing function from the MySQL JSON
// diff representation to an SQL expression.
func ParseBinaryJSONDiff(data []byte) (sqltypes.Value, error) {
pos := 0
opType := jsonDiffOp(data[pos])
pos++
diff := bytes.Buffer{}
diff.Grow(int(float32(len(data)) * 1.5))
pos := 0
outer := false
innerStr := ""

switch opType {
case jsonDiffOpReplace:
diff.WriteString("JSON_REPLACE(")
case jsonDiffOpInsert:
diff.WriteString("JSON_INSERT(")
case jsonDiffOpRemove:
diff.WriteString("JSON_REMOVE(")
}
diff.WriteString("%s, ") // This will later be replaced by the field name

pathLen, readTo, ok := readLenEncInt(data, pos)
if !ok {
return sqltypes.Value{}, fmt.Errorf("cannot read JSON diff path length")
}
pos = readTo

path := data[pos : uint64(pos)+pathLen]
pos += int(pathLen)
// We have to specify the unicode character set for the strings we
// use in the expression as the connection can be using a different
// character set (e.g. vreplication always uses set names binary).
diff.WriteString(fmt.Sprintf("_utf8mb4'%s', ", path))

if opType == jsonDiffOpRemove { // No value for remove
diff.WriteString(")")
return sqltypes.MakeTrusted(sqltypes.Expression, diff.Bytes()), nil
}
for pos < len(data) {
if outer {
innerStr = diff.String()
diff.Reset()
}
opType := jsonDiffOp(data[pos])
pos++
switch opType {
case jsonDiffOpReplace:
diff.WriteString("JSON_REPLACE(")
case jsonDiffOpInsert:
diff.WriteString("JSON_INSERT(")
case jsonDiffOpRemove:
diff.WriteString("JSON_REMOVE(")
}
if outer {
diff.WriteString(innerStr)
diff.WriteString(", ")
} else { // Only the inner most function has the field name
diff.WriteString("%s, ") // This will later be replaced by the field name
}

valueLen, readTo, ok := readLenEncInt(data, pos)
if !ok {
return sqltypes.Value{}, fmt.Errorf("cannot read JSON diff path length")
}
pos = readTo
pathLen, readTo := readVariableLength(data, pos)
pos = readTo
path := data[pos : pos+pathLen]
pos += pathLen
// We have to specify the unicode character set for the strings we
// use in the expression as the connection can be using a different
// character set (e.g. vreplication always uses set names binary).
diff.WriteString(fmt.Sprintf("_utf8mb4'%s'", path))

if opType == jsonDiffOpRemove { // No value for remove
diff.WriteString(")")
} else {
valueLen, readTo := readVariableLength(data, pos)
pos = readTo
value, err := ParseBinaryJSON(data[pos : pos+valueLen])
if err != nil {
return sqltypes.Value{}, fmt.Errorf("cannot read JSON diff value for path %s: %w", path, err)
}
pos += valueLen
if value.Type() == json.TypeString {
diff.WriteString(", _utf8mb4")
}
diff.WriteString(fmt.Sprintf("%s)", value))
}

value, err := ParseBinaryJSON(data[pos : uint64(pos)+valueLen])
if err != nil {
return sqltypes.Value{}, fmt.Errorf("cannot read JSON diff value for path %s: %w", path, err)
}
if value.Type() == json.TypeString {
diff.WriteString("_utf8mb4")
outer = true
}
diff.WriteString(fmt.Sprintf("%s)", value))

return sqltypes.MakeTrusted(sqltypes.Expression, diff.Bytes()), nil
}
Expand Down
172 changes: 90 additions & 82 deletions go/mysql/binlog_event_mysql56_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,78 +251,6 @@ func TestMysql56SemiSyncAck(t *testing.T) {
}

func TestMySQL56PartialUpdateRowsEvent(t *testing.T) {
// The mysqlbinlog -vvv --base64-output=decode-rows output for the following event:
// ### UPDATE `vt_commerce`.`customer`
// ### WHERE
// ### @1=1 /* LONGINT meta=0 nullable=0 is_null=0 */
// ### @2='[email protected]' /* VARSTRING(128) meta=128 nullable=1 is_null=0 */
// ### @3='{"salary": 100}' /* JSON meta=4 nullable=1 is_null=0 */
// ### SET
// ### @1=1 /* LONGINT meta=0 nullable=0 is_null=0 */
// ### @2='[email protected]' /* VARSTRING(128) meta=128 nullable=1 is_null=0 */
// ### @3=JSON_INSERT(@3, '$.role', 'manager') /* JSON meta=4 nullable=1 is_null=0 */
// ### UPDATE `vt_commerce`.`customer`
// ### WHERE
// ### @1=2 /* LONGINT meta=0 nullable=0 is_null=0 */
// ### @2='[email protected]' /* VARSTRING(128) meta=128 nullable=1 is_null=0 */
// ### @3='{"salary": 99}' /* JSON meta=4 nullable=1 is_null=0 */
// ### SET
// ### @1=2 /* LONGINT meta=0 nullable=0 is_null=0 */
// ### @2='[email protected]' /* VARSTRING(128) meta=128 nullable=1 is_null=0 */
// ### @3=JSON_INSERT(@3, '$.role', 'manager') /* JSON meta=4 nullable=1 is_null=0 */
// ### UPDATE `vt_commerce`.`customer`
// ### WHERE
// ### @1=3 /* LONGINT meta=0 nullable=0 is_null=0 */
// ### @2='[email protected]' /* VARSTRING(128) meta=128 nullable=1 is_null=0 */
// ### @3='{"salary": 99}' /* JSON meta=4 nullable=1 is_null=0 */
// ### SET
// ### @1=3 /* LONGINT meta=0 nullable=0 is_null=0 */
// ### @2='[email protected]' /* VARSTRING(128) meta=128 nullable=1 is_null=0 */
// ### @3=JSON_INSERT(@3, '$.role', 'manager') /* JSON meta=4 nullable=1 is_null=0 */
// ### UPDATE `vt_commerce`.`customer`
// ### WHERE
// ### @1=4 /* LONGINT meta=0 nullable=0 is_null=0 */
// ### @2='[email protected]' /* VARSTRING(128) meta=128 nullable=1 is_null=0 */
// ### @3='{"salary": 99}' /* JSON meta=4 nullable=1 is_null=0 */
// ### SET
// ### @1=4 /* LONGINT meta=0 nullable=0 is_null=0 */
// ### @2='[email protected]' /* VARSTRING(128) meta=128 nullable=1 is_null=0 */
// ### @3=JSON_INSERT(@3, '$.role', 'manager') /* JSON meta=4 nullable=1 is_null=0 */
// ### UPDATE `vt_commerce`.`customer`
// ### WHERE
// ### @1=5 /* LONGINT meta=0 nullable=0 is_null=0 */
// ### @2='[email protected]' /* VARSTRING(128) meta=128 nullable=1 is_null=0 */
// ### @3='{"salary": 100}' /* JSON meta=4 nullable=1 is_null=0 */
// ### SET
// ### @1=5 /* LONGINT meta=0 nullable=0 is_null=0 */
// ### @2='[email protected]' /* VARSTRING(128) meta=128 nullable=1 is_null=0 */
// ### @3=JSON_INSERT(@3, '$.role', 'manager') /* JSON meta=4 nullable=1 is_null=0 */
mysql56PartialUpdateRowEvent := NewMysql56BinlogEvent([]byte{0x67, 0xc4, 0x54, 0x67, 0x27, 0x1f, 0x91, 0x10, 0x76, 0x14, 0x02, 0x00, 0x00, 0x19, 0x69,
0x00, 0x00, 0x00, 0x00, 0xb0, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x02, 0x00, 0x03, 0xff, 0xff, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x10, 0x61, 0x6c, 0x69, 0x63, 0x65, 0x40, 0x64, 0x6f, 0x6d, 0x61, 0x69, 0x6e, 0x2e, 0x63, 0x6f, 0x6d, 0x16, 0x00, 0x00, 0x00, 0x00,
0x01, 0x00, 0x15, 0x00, 0x0b, 0x00, 0x03, 0x00, 0x0c, 0x0e, 0x00, 0x73, 0x65, 0x78, 0x06, 0x66, 0x65, 0x6d, 0x61, 0x6c, 0x65, 0x01, 0x01, 0x00,
0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x10, 0x61, 0x6c, 0x69, 0x63, 0x65, 0x40, 0x64, 0x6f, 0x6d, 0x61, 0x69, 0x6e, 0x2e, 0x63, 0x6f,
0x6d, 0x12, 0x00, 0x00, 0x00, 0x01, 0x06, 0x24, 0x2e, 0x72, 0x6f, 0x6c, 0x65, 0x09, 0x0c, 0x07, 0x6d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x72, 0x00,
0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0e, 0x62, 0x6f, 0x62, 0x40, 0x64, 0x6f, 0x6d, 0x61, 0x69, 0x6e, 0x2e, 0x63, 0x6f, 0x6d, 0x14,
0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x13, 0x00, 0x0b, 0x00, 0x03, 0x00, 0x0c, 0x0e, 0x00, 0x73, 0x65, 0x78, 0x04, 0x6d, 0x61, 0x6c, 0x65, 0x01,
0x01, 0x00, 0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0e, 0x62, 0x6f, 0x62, 0x40, 0x64, 0x6f, 0x6d, 0x61, 0x69, 0x6e, 0x2e, 0x63, 0x6f,
0x6d, 0x12, 0x00, 0x00, 0x00, 0x01, 0x06, 0x24, 0x2e, 0x72, 0x6f, 0x6c, 0x65, 0x09, 0x0c, 0x07, 0x6d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x72, 0x00,
0x03, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x12, 0x63, 0x68, 0x61, 0x72, 0x6c, 0x69, 0x65, 0x40, 0x64, 0x6f, 0x6d, 0x61, 0x69, 0x6e, 0x2e,
0x63, 0x6f, 0x6d, 0x14, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x13, 0x00, 0x0b, 0x00, 0x03, 0x00, 0x0c, 0x0e, 0x00, 0x73, 0x65, 0x78, 0x04, 0x6d,
0x61, 0x6c, 0x65, 0x01, 0x01, 0x00, 0x03, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x12, 0x63, 0x68, 0x61, 0x72, 0x6c, 0x69, 0x65, 0x40, 0x64,
0x6f, 0x6d, 0x61, 0x69, 0x6e, 0x2e, 0x63, 0x6f, 0x6d, 0x12, 0x00, 0x00, 0x00, 0x01, 0x06, 0x24, 0x2e, 0x72, 0x6f, 0x6c, 0x65, 0x09, 0x0c, 0x07,
0x6d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x72, 0x00, 0x04, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0e, 0x64, 0x61, 0x6e, 0x40, 0x64, 0x6f, 0x6d,
0x61, 0x69, 0x6e, 0x2e, 0x63, 0x6f, 0x6d, 0x14, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x13, 0x00, 0x0b, 0x00, 0x03, 0x00, 0x0c, 0x0e, 0x00, 0x73,
0x65, 0x78, 0x04, 0x6d, 0x61, 0x6c, 0x65, 0x01, 0x01, 0x00, 0x04, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0e, 0x64, 0x61, 0x6e, 0x40, 0x64,
0x6f, 0x6d, 0x61, 0x69, 0x6e, 0x2e, 0x63, 0x6f, 0x6d, 0x12, 0x00, 0x00, 0x00, 0x01, 0x06, 0x24, 0x2e, 0x72, 0x6f, 0x6c, 0x65, 0x09, 0x0c, 0x07,
0x6d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x72, 0x00, 0x05, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0e, 0x65, 0x76, 0x65, 0x40, 0x64, 0x6f, 0x6d,
0x61, 0x69, 0x6e, 0x2e, 0x63, 0x6f, 0x6d, 0x16, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x15, 0x00, 0x0b, 0x00, 0x03, 0x00, 0x0c, 0x0e, 0x00, 0x73,
0x65, 0x78, 0x06, 0x66, 0x65, 0x6d, 0x61, 0x6c, 0x65, 0x01, 0x01, 0x00, 0x05, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0e, 0x65, 0x76, 0x65,
0x40, 0x64, 0x6f, 0x6d, 0x61, 0x69, 0x6e, 0x2e, 0x63, 0x6f, 0x6d, 0x12, 0x00, 0x00, 0x00, 0x01, 0x06, 0x24, 0x2e, 0x72, 0x6f, 0x6c, 0x65, 0x09,
0x0c, 0x07, 0x6d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x72})

require.True(t, mysql56PartialUpdateRowEvent.IsPartialUpdateRows())

format := BinlogFormat{
HeaderSizes: []byte{
0, 13, 0, 8, 0, 0, 0, 0, 4, 0, 4, 0, 0, 0, 98, 0, 4, 26, 8, 0, 0, 0, 8, 8, 8, 2, 0, 0, 0, 10, 10, 10, 42, 42, 0, 18, 52, 0, 10, 40, 0,
Expand All @@ -344,16 +272,96 @@ func TestMySQL56PartialUpdateRowsEvent(t *testing.T) {
Metadata: []uint16{0, 128, 4},
ColumnCollationIDs: []collations.ID{63},
}
ev, err := mysql56PartialUpdateRowEvent.Rows(format, tm)
require.NoError(t, err)

assert.Equal(t, 5, len(ev.Rows))
require.NoError(t, err)
for i := range ev.Rows {
vals, err := ev.StringValuesForTests(tm, i)
require.NoError(t, err)
// The third column is the JSON column.
require.Equal(t, `JSON_INSERT(%s, _utf8mb4'$.role', _utf8mb4"manager")`, vals[2])
t.Logf("Rows: %v", vals)
testCases := []struct {
name string
rawEvent []byte
want string
}{
{
name: "REMOVE and REPLACE",
// The mysqlbinlog -vvv --base64-output=decode-rows output for the following event:
// ### UPDATE `vt_commerce`.`customer`
// ### WHERE
// ### @1=1 /* LONGINT meta=0 nullable=0 is_null=0 */
// ### @2='[email protected]' /* VARSTRING(128) meta=128 nullable=1 is_null=0 */
// ### @3='{"day": "friday", "role": "manager", "color": "red", "salary": 100, "favorite_color": "black"}' /* JSON meta=4 nullable=1 is_null=0 */
// ### SET
// ### @1=1 /* LONGINT meta=0 nullable=0 is_null=0 */
// ### @2='[email protected]' /* VARSTRING(128) meta=128 nullable=1 is_null=0 */
// ### @3=JSON_REMOVE(
// ### JSON_REPLACE(@3, '$.day', 'monday'),
// ### '$.favorite_color') /* JSON meta=4 nullable=1 is_null=0 */
// ### UPDATE `vt_commerce`.`customer`
// ### WHERE
// ### @1=2 /* LONGINT meta=0 nullable=0 is_null=0 */
// ### @2='[email protected]' /* VARSTRING(128) meta=128 nullable=1 is_null=0 */
// ### @3='{"day": "friday", "role": "manager", "color": "red", "salary": 99, "favorite_color": "black"}' /* JSON meta=4 nullable=1 is_null=0 */
// ### SET
// ### @1=2 /* LONGINT meta=0 nullable=0 is_null=0 */
// ### @2='[email protected]' /* VARSTRING(128) meta=128 nullable=1 is_null=0 */
// ### @3=JSON_REMOVE(
// ### JSON_REPLACE(@3, '$.day', 'monday'),
// ### '$.favorite_color') /* JSON meta=4 nullable=1 is_null=0 */
// ### UPDATE `vt_commerce`.`customer`
// ### WHERE
// ### @1=3 /* LONGINT meta=0 nullable=0 is_null=0 */
// ### @2='[email protected]' /* VARSTRING(128) meta=128 nullable=1 is_null=0 */
// ### @3='{"day": "friday", "role": "manager", "color": "red", "salary": 99, "favorite_color": "black"}' /* JSON meta=4 nullable=1 is_null=0 */
// ### SET
// ### @1=3 /* LONGINT meta=0 nullable=0 is_null=0 */
// ### @2='[email protected]' /* VARSTRING(128) meta=128 nullable=1 is_null=0 */
// ### @3=JSON_REMOVE(
// ### JSON_REPLACE(@3, '$.day', 'monday'),
// ### '$.favorite_color') /* JSON meta=4 nullable=1 is_null=0 */
// ### UPDATE `vt_commerce`.`customer`
// ### WHERE
// ### @1=4 /* LONGINT meta=0 nullable=0 is_null=0 */
// ### @2='[email protected]' /* VARSTRING(128) meta=128 nullable=1 is_null=0 */
// ### @3='{"day": "friday", "role": "manager", "color": "red", "salary": 99, "favorite_color": "black"}' /* JSON meta=4 nullable=1 is_null=0 */
// ### SET
// ### @1=4 /* LONGINT meta=0 nullable=0 is_null=0 */
// ### @2='[email protected]' /* VARSTRING(128) meta=128 nullable=1 is_null=0 */
// ### @3=JSON_REMOVE(
// ### JSON_REPLACE(@3, '$.day', 'monday'),
// ### '$.favorite_color') /* JSON meta=4 nullable=1 is_null=0 */
// ### UPDATE `vt_commerce`.`customer`
// ### WHERE
// ### @1=5 /* LONGINT meta=0 nullable=0 is_null=0 */
// ### @2='[email protected]' /* VARSTRING(128) meta=128 nullable=1 is_null=0 */
// ### @3='{"day": "friday", "role": "manager", "color": "red", "salary": 100, "favorite_color": "black"}' /* JSON meta=4 nullable=1 is_null=0 */
// ### SET
// ### @1=5 /* LONGINT meta=0 nullable=0 is_null=0 */
// ### @2='[email protected]' /* VARSTRING(128) meta=128 nullable=1 is_null=0 */
// ### @3=JSON_REMOVE(
// ### JSON_REPLACE(@3, '$.day', 'monday'),
// ### '$.favorite_color') /* JSON meta=4 nullable=1 is_null=0 */
rawEvent: []byte{
227, 240, 86, 103, 39, 74, 58, 208, 33, 225, 3, 0, 0, 173, 122, 0, 0, 0, 0, 176, 0, 0, 0, 0, 0, 1, 0, 2, 0, 3, 255, 255, 0, 1, 0, 0, 0, 0,
0, 0, 0, 16, 97, 108, 105, 99, 101, 64, 100, 111, 109, 97, 105, 110, 46, 99, 111, 109, 97, 0, 0, 0, 0, 5, 0, 96, 0, 39, 0, 3, 0, 42, 0, 4,
0, 46, 0, 5, 0, 51, 0, 6, 0, 57, 0, 14, 0, 12, 71, 0, 12, 78, 0, 12, 86, 0, 5, 100, 0, 12, 90, 0, 100, 97, 121, 114, 111, 108, 101, 99, 111, 108, 111, 114, 115, 97, 108, 97, 114, 121, 102, 97, 118, 111, 114, 105, 116, 101, 95, 99, 111, 108, 111, 114, 6, 102, 114, 105, 100, 97, 121, 7, 109, 97, 110, 97, 103, 101, 114, 3, 114, 101, 100, 5, 98, 108, 97, 99, 107, 1, 1, 0, 1, 0, 0, 0, 0, 0, 0, 0, 16, 97, 108, 105, 99, 101, 64, 100, 111, 109, 97, 105, 110, 46, 99, 111, 109, 34, 0, 0, 0, 0, 5, 36, 46, 100, 97, 121, 8, 12, 6, 109, 111, 110, 100, 97, 121, 2, 16, 36, 46, 102, 97, 118, 111, 114, 105, 116, 101, 95, 99, 111, 108, 111, 114, 0, 2, 0, 0, 0, 0, 0, 0, 0, 14, 98, 111, 98, 64, 100, 111, 109, 97, 105, 110, 46, 99, 111, 109, 97, 0, 0, 0, 0, 5, 0, 96, 0, 39, 0, 3, 0, 42, 0, 4, 0, 46, 0, 5, 0, 51, 0, 6, 0, 57, 0, 14, 0, 12, 71, 0, 12, 78, 0, 12, 86, 0, 5, 99, 0, 12, 90, 0, 100, 97, 121, 114, 111, 108, 101, 99, 111, 108, 111, 114, 115, 97, 108, 97, 114, 121, 102, 97, 118, 111, 114, 105, 116, 101, 95, 99, 111, 108, 111, 114, 6, 102, 114, 105, 100, 97, 121, 7, 109, 97, 110, 97, 103, 101, 114, 3, 114, 101, 100, 5, 98, 108, 97, 99, 107, 1, 1, 0, 2, 0, 0, 0, 0, 0, 0, 0, 14, 98, 111, 98, 64, 100, 111, 109, 97, 105, 110, 46, 99, 111, 109, 34, 0, 0, 0, 0, 5, 36, 46, 100, 97, 121, 8, 12, 6, 109, 111, 110, 100, 97, 121, 2, 16, 36, 46, 102, 97, 118, 111, 114, 105, 116, 101, 95, 99, 111, 108, 111, 114, 0, 3, 0, 0, 0, 0, 0, 0, 0, 18, 99, 104, 97, 114, 108, 105, 101, 64, 100, 111, 109, 97, 105, 110, 46, 99, 111, 109, 97, 0, 0, 0, 0, 5, 0, 96, 0, 39, 0, 3, 0, 42, 0, 4, 0, 46, 0, 5, 0, 51, 0, 6, 0, 57, 0, 14, 0, 12, 71, 0, 12, 78, 0, 12, 86, 0, 5, 99, 0, 12, 90, 0, 100, 97, 121, 114, 111, 108, 101, 99, 111, 108, 111, 114, 115, 97, 108, 97, 114, 121, 102, 97, 118, 111, 114, 105, 116, 101, 95, 99, 111, 108, 111, 114, 6, 102, 114, 105, 100, 97, 121, 7, 109, 97, 110, 97, 103, 101, 114, 3, 114, 101, 100, 5, 98, 108, 97, 99, 107, 1, 1, 0, 3, 0, 0, 0, 0, 0, 0, 0, 18, 99, 104, 97, 114, 108, 105, 101, 64, 100, 111, 109, 97, 105, 110, 46, 99, 111, 109, 34, 0, 0, 0, 0, 5, 36, 46, 100, 97, 121, 8, 12, 6, 109, 111, 110, 100, 97, 121, 2, 16, 36, 46, 102, 97, 118, 111, 114, 105, 116, 101, 95, 99, 111, 108, 111, 114, 0, 4, 0, 0, 0, 0, 0, 0, 0, 14, 100, 97, 110, 64, 100, 111, 109, 97, 105, 110, 46, 99, 111, 109, 97, 0, 0, 0, 0, 5, 0, 96, 0, 39, 0, 3, 0, 42, 0, 4, 0, 46, 0, 5, 0, 51, 0, 6, 0, 57, 0, 14, 0, 12, 71, 0, 12, 78, 0, 12, 86, 0, 5, 99, 0, 12, 90, 0, 100, 97, 121, 114, 111, 108, 101, 99, 111, 108, 111, 114, 115, 97, 108, 97, 114, 121, 102, 97, 118, 111, 114, 105, 116, 101, 95, 99, 111, 108, 111, 114, 6, 102, 114, 105, 100, 97, 121, 7, 109, 97, 110, 97, 103, 101, 114, 3, 114, 101, 100, 5, 98, 108, 97, 99, 107, 1, 1, 0, 4, 0, 0, 0, 0, 0, 0, 0, 14, 100, 97, 110, 64, 100, 111, 109, 97, 105, 110, 46, 99, 111, 109, 34, 0, 0, 0, 0, 5, 36, 46, 100, 97, 121, 8, 12, 6, 109, 111, 110, 100, 97, 121, 2, 16, 36, 46, 102, 97, 118, 111, 114, 105, 116, 101, 95, 99, 111, 108, 111, 114, 0, 5, 0, 0, 0, 0, 0, 0, 0, 14, 101, 118, 101, 64, 100, 111, 109, 97, 105, 110, 46, 99, 111, 109, 97, 0, 0, 0, 0, 5, 0, 96, 0, 39, 0, 3, 0, 42, 0, 4, 0, 46, 0, 5, 0, 51, 0, 6, 0, 57, 0, 14, 0, 12, 71, 0, 12, 78, 0, 12, 86, 0, 5, 100, 0, 12, 90, 0, 100, 97, 121, 114, 111, 108, 101, 99, 111, 108, 111, 114, 115, 97, 108, 97, 114, 121, 102, 97, 118, 111, 114, 105, 116, 101, 95, 99, 111, 108, 111, 114, 6, 102, 114, 105, 100, 97, 121, 7, 109, 97, 110, 97, 103, 101, 114, 3, 114, 101, 100, 5, 98, 108, 97, 99, 107, 1, 1, 0, 5, 0, 0, 0, 0, 0, 0, 0, 14, 101, 118, 101, 64, 100, 111, 109, 97, 105, 110, 46, 99, 111, 109, 34, 0, 0, 0, 0, 5, 36, 46, 100, 97, 121, 8, 12, 6, 109, 111, 110, 100, 97, 121, 2, 16, 36, 46, 102, 97, 118, 111, 114, 105, 116, 101, 95, 99, 111, 108, 111, 114,
},
want: "JSON_REMOVE(JSON_REPLACE(%s, _utf8mb4'$.day', _utf8mb4\"monday\"), _utf8mb4'$.favorite_color')",
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
mysql56PartialUpdateRowEvent := NewMysql56BinlogEvent(tc.rawEvent)
require.True(t, mysql56PartialUpdateRowEvent.IsPartialUpdateRows())

ev, err := mysql56PartialUpdateRowEvent.Rows(format, tm)
require.NoError(t, err)

assert.Equal(t, 5, len(ev.Rows))
require.NoError(t, err)
for i := range ev.Rows {
vals, err := ev.StringValuesForTests(tm, i)
require.NoError(t, err)
// The third column is the JSON column.
require.Equal(t, `JSON_REMOVE(JSON_REPLACE(%s, _utf8mb4'$.day', _utf8mb4"monday"), _utf8mb4'$.favorite_color')`, vals[2])
t.Logf("Rows: %v", vals)
}
})
}
}
3 changes: 3 additions & 0 deletions go/mysql/binlog_event_rbr.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,9 @@ func (ev binlogEvent) Rows(f BinlogFormat, tm *TableMap) (Rows, error) {
typ == eDeleteRowsEventV1 || typ == eDeleteRowsEventV2
hasData := typ == eWriteRowsEventV1 || typ == eWriteRowsEventV2 ||
typ == eUpdateRowsEventV1 || typ == ePartialUpdateRowsEvent || typ == eUpdateRowsEventV2
//if typ == ePartialUpdateRowsEvent {
// log.Errorf("DEBUG: PartialUpdateRowsEvent bytes: %v", ev.Bytes())
//}

result := Rows{}
pos := 6
Expand Down

0 comments on commit 7be99a3

Please sign in to comment.