-
Notifications
You must be signed in to change notification settings - Fork 2.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
VReplication: Support binlog_row_value_options=PARTIAL_JSON #17345
Changes from 28 commits
3c337a4
c9cf71c
2aed1aa
7b75ed6
748589d
0b2b9cf
7be99a3
bd3aa67
96ffca4
69afa52
42d5db4
6e288df
f2a5445
1e492b9
4fbbe49
4d15ca2
d6f132a
8171e14
dd324ef
eb3f1b0
91b1e33
1980b89
7b25395
799850d
e68c035
8ad1fc0
c168ff0
dcec5f6
64e0422
a230cb1
f23f73c
76cd316
93aa37c
6264397
3a172c0
7dc29b3
79d3f08
5deec92
aeef8e9
77cb55e
27edece
3e3ab1d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,6 +17,7 @@ limitations under the License. | |
package binlog | ||
|
||
import ( | ||
"bytes" | ||
"encoding/binary" | ||
"fmt" | ||
"math" | ||
|
@@ -25,9 +26,12 @@ import ( | |
"vitess.io/vitess/go/hack" | ||
"vitess.io/vitess/go/mysql/format" | ||
"vitess.io/vitess/go/mysql/json" | ||
"vitess.io/vitess/go/sqltypes" | ||
"vitess.io/vitess/go/vt/sqlparser" | ||
"vitess.io/vitess/go/vt/vterrors" | ||
|
||
querypb "vitess.io/vitess/go/vt/proto/query" | ||
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" | ||
"vitess.io/vitess/go/vt/vterrors" | ||
) | ||
|
||
/* | ||
|
@@ -44,6 +48,14 @@ https://github.com/shyiko/mysql-binlog-connector-java/pull/119/files | |
https://github.com/noplay/python-mysql-replication/blob/175df28cc8b536a68522ff9b09dc5440adad6094/pymysqlreplication/packet.py | ||
*/ | ||
|
||
type jsonDiffOp uint8 | ||
|
||
const ( | ||
jsonDiffOpReplace = jsonDiffOp(0) | ||
jsonDiffOpInsert = jsonDiffOp(1) | ||
jsonDiffOpRemove = jsonDiffOp(2) | ||
) | ||
|
||
// ParseBinaryJSON provides the parsing function from the mysql binary json | ||
// representation to a JSON value instance. | ||
func ParseBinaryJSON(data []byte) (*json.Value, error) { | ||
|
@@ -60,6 +72,84 @@ func ParseBinaryJSON(data []byte) (*json.Value, error) { | |
return node, nil | ||
} | ||
|
||
// ParseBinaryJSONDiff provides the parsing function from the binary MySQL | ||
// JSON diff representation to an SQL expression. | ||
func ParseBinaryJSONDiff(data []byte) (sqltypes.Value, error) { | ||
diff := bytes.Buffer{} | ||
// Reasonable estimate of the space we'll need to build the SQL | ||
// expression in order to try and avoid reallocations w/o | ||
// overallocating too much. | ||
diff.Grow(int(float32(len(data)) * 1.25)) | ||
pos := 0 | ||
outer := false | ||
innerStr := "" | ||
|
||
for pos < len(data) { | ||
opType := jsonDiffOp(data[pos]) | ||
pos++ | ||
if outer { | ||
innerStr = diff.String() | ||
diff.Reset() | ||
} | ||
switch opType { | ||
case jsonDiffOpReplace: | ||
diff.WriteString("JSON_REPLACE(") | ||
case jsonDiffOpInsert: | ||
diff.WriteString("JSON_INSERT(") | ||
case jsonDiffOpRemove: | ||
diff.WriteString("JSON_REMOVE(") | ||
default: | ||
// Can be a JSON null. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you generally clarify (by code comments) what's going on here? (And otherwise in the rest of the function). I feel like there's some intimate knowledge here that I'm missing. What exactly needs to be done? What are we solving? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added here: 5deec92 |
||
js, err := ParseBinaryJSON(data) | ||
if err == nil && js.Type() == json.TypeNull { | ||
return sqltypes.MakeTrusted(sqltypes.Expression, js.MarshalTo(nil)), nil | ||
} | ||
return sqltypes.Value{}, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, | ||
"invalid JSON diff operation: %d", opType) | ||
} | ||
if outer { | ||
diff.WriteString(innerStr) | ||
diff.WriteString(", ") | ||
} else { // Only the inner most function has the field name | ||
diff.WriteString("%s, ") // This will later be replaced by the field name | ||
} | ||
outer = true | ||
|
||
pathLen, readTo := readVariableLength(data, pos) | ||
pos = readTo | ||
path := data[pos : pos+pathLen] | ||
pos += pathLen | ||
// We have to specify the unicode character set for the strings we | ||
// use in the expression as the connection can be using a different | ||
// character set (e.g. vreplication always uses set names binary). | ||
diff.WriteString(sqlparser.Utf8mb4Str) | ||
diff.WriteByte('\'') | ||
diff.Write(path) | ||
diff.WriteByte('\'') | ||
if opType == jsonDiffOpRemove { // No value for remove | ||
diff.WriteByte(')') | ||
continue | ||
} | ||
|
||
diff.WriteString(", ") | ||
valueLen, readTo := readVariableLength(data, pos) | ||
pos = readTo | ||
value, err := ParseBinaryJSON(data[pos : pos+valueLen]) | ||
if err != nil { | ||
return sqltypes.Value{}, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, | ||
"cannot read JSON diff value for path %s: %v", path, err) | ||
} | ||
pos += valueLen | ||
if value.Type() == json.TypeString { | ||
diff.WriteString(sqlparser.Utf8mb4Str) | ||
} | ||
diff.Write(value.MarshalTo(nil)) | ||
diff.WriteByte(')') | ||
} | ||
|
||
return sqltypes.MakeTrusted(sqltypes.Expression, diff.Bytes()), nil | ||
} | ||
|
||
// jsonDataType has the values used in the mysql json binary representation to denote types. | ||
// We have string, literal(true/false/null), number, object or array types. | ||
// large object => doc size > 64K: you get pointers instead of inline values. | ||
|
@@ -315,7 +405,7 @@ func binparserOpaque(_ jsonDataType, data []byte, pos int) (node *json.Value, er | |
precision := decimalData[0] | ||
scale := decimalData[1] | ||
metadata := (uint16(precision) << 8) + uint16(scale) | ||
val, _, err := CellValue(decimalData, 2, TypeNewDecimal, metadata, &querypb.Field{Type: querypb.Type_DECIMAL}) | ||
val, _, err := CellValue(decimalData, 2, TypeNewDecimal, metadata, &querypb.Field{Type: querypb.Type_DECIMAL}, false) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This didn't help with the periodic failures, but good to upgrade anyway as the latest GA is 5.1.