diff --git a/go/test/endtoend/onlineddl/vrepl_suite/testdata/alter-charset-non-utf8-80-vcopier/allow_schemadiff_normalization b/go/test/endtoend/onlineddl/vrepl_suite/testdata/alter-charset-non-utf8-80-vcopier/allow_schemadiff_normalization new file mode 100644 index 00000000000..e69de29bb2d diff --git a/go/test/endtoend/onlineddl/vrepl_suite/testdata/alter-charset-non-utf8-80-vcopier/alter b/go/test/endtoend/onlineddl/vrepl_suite/testdata/alter-charset-non-utf8-80-vcopier/alter new file mode 100644 index 00000000000..b5ec82b1a8b --- /dev/null +++ b/go/test/endtoend/onlineddl/vrepl_suite/testdata/alter-charset-non-utf8-80-vcopier/alter @@ -0,0 +1 @@ +MODIFY `t1` varchar(128) CHARACTER SET utf8mb4 NOT NULL, MODIFY `t2` varchar(128) CHARACTER SET latin2 NOT NULL, MODIFY `tutf8` varchar(128) CHARACTER SET latin1 NOT NULL diff --git a/go/test/endtoend/onlineddl/vrepl_suite/testdata/alter-charset-non-utf8-80-vcopier/create.sql b/go/test/endtoend/onlineddl/vrepl_suite/testdata/alter-charset-non-utf8-80-vcopier/create.sql new file mode 100644 index 00000000000..79e8fda23ee --- /dev/null +++ b/go/test/endtoend/onlineddl/vrepl_suite/testdata/alter-charset-non-utf8-80-vcopier/create.sql @@ -0,0 +1,19 @@ +drop table if exists onlineddl_test; +create table onlineddl_test ( + id int auto_increment, + t1 varchar(128) charset latin1 collate latin1_swedish_ci, + t2 varchar(128) charset latin1 collate latin1_swedish_ci, + tutf8 varchar(128) charset utf8, + tutf8mb4 varchar(128) charset utf8mb4, + tlatin1 varchar(128) charset latin1 collate latin1_swedish_ci, + primary key(id) +) auto_increment=1; + +insert into onlineddl_test values (null, md5(rand()), md5(rand()), md5(rand()), md5(rand()), md5(rand())); +insert into onlineddl_test values (null, 'átesting', 'átesting', 'átesting', 'átesting', 'átesting'); +insert into onlineddl_test values (null, 'testátest', 'testátest', 'testátest', '🍻😀', 'átesting'); +insert into onlineddl_test values (null, 'átesting-binlog', 'átesting-binlog', 'átesting-binlog', 'átesting-binlog', 'átesting-binlog'); +insert into onlineddl_test values (null, 'testátest-binlog', 'testátest-binlog', 'testátest-binlog', '🍻😀', 'átesting-binlog'); +insert into onlineddl_test values (null, 'átesting-bnull', 'átesting-bnull', 'átesting-bnull', null, null); + +drop event if exists onlineddl_test; diff --git a/go/test/endtoend/onlineddl/vrepl_suite/testdata/alter-charset-non-utf8-80-vcopier/ignore_versions b/go/test/endtoend/onlineddl/vrepl_suite/testdata/alter-charset-non-utf8-80-vcopier/ignore_versions new file mode 100644 index 00000000000..0790a1e68fd --- /dev/null +++ b/go/test/endtoend/onlineddl/vrepl_suite/testdata/alter-charset-non-utf8-80-vcopier/ignore_versions @@ -0,0 +1 @@ +(5.5|5.6|5.7) diff --git a/go/test/endtoend/onlineddl/vrepl_suite/testdata/non-utf8-charset-pk/allow_schemadiff_normalization b/go/test/endtoend/onlineddl/vrepl_suite/testdata/non-utf8-charset-pk/allow_schemadiff_normalization new file mode 100644 index 00000000000..e69de29bb2d diff --git a/go/test/endtoend/onlineddl/vrepl_suite/testdata/non-utf8-charset-pk/create.sql b/go/test/endtoend/onlineddl/vrepl_suite/testdata/non-utf8-charset-pk/create.sql new file mode 100644 index 00000000000..c0313e62c8d --- /dev/null +++ b/go/test/endtoend/onlineddl/vrepl_suite/testdata/non-utf8-charset-pk/create.sql @@ -0,0 +1,30 @@ +drop table if exists onlineddl_test; +create table onlineddl_test ( + id varchar(128) charset latin1 collate latin1_swedish_ci, + t1 varchar(128) charset latin1 collate latin1_swedish_ci, + t2 varchar(128) charset latin1 collate latin1_swedish_ci, + tutf8 varchar(128) charset utf8, + tutf8mb4 varchar(128) charset utf8mb4, + tlatin1 varchar(128) charset latin1 collate latin1_swedish_ci, + primary key(id) +) auto_increment=1; + +insert into onlineddl_test values (concat('átesting-', md5(rand())), md5(rand()), md5(rand()), md5(rand()), md5(rand()), md5(rand())); +insert into onlineddl_test values (concat('átesting-', md5(rand())), 'átesting', 'átesting', 'átesting', 'átesting', 'átesting'); +insert into onlineddl_test values (concat('átesting-', md5(rand())), 'testátest', 'testátest', 'testátest', '🍻😀', 'átesting'); + +drop event if exists onlineddl_test; +delimiter ;; +create event onlineddl_test + on schedule every 1 second + starts current_timestamp + ends current_timestamp + interval 60 second + on completion not preserve + enable + do +begin + insert into onlineddl_test values (concat('átesting-', md5(rand())), md5(rand()), md5(rand()), md5(rand()), md5(rand()), md5(rand())); + insert into onlineddl_test values (concat('átesting-', md5(rand())), 'átesting-binlog', 'átesting-binlog', 'átesting-binlog', 'átesting-binlog', 'átesting-binlog'); + insert into onlineddl_test values (concat('átesting-', md5(rand())), 'testátest-binlog', 'testátest-binlog', 'testátest-binlog', '🍻😀', 'átesting-binlog'); + insert into onlineddl_test values (concat('átesting-', md5(rand())), 'átesting-bnull', 'átesting-bnull', 'átesting-bnull', null, null); +end ;; diff --git a/go/test/endtoend/onlineddl/vrepl_suite/testdata/non-utf8-charset-pk/ignore_versions b/go/test/endtoend/onlineddl/vrepl_suite/testdata/non-utf8-charset-pk/ignore_versions new file mode 100644 index 00000000000..0790a1e68fd --- /dev/null +++ b/go/test/endtoend/onlineddl/vrepl_suite/testdata/non-utf8-charset-pk/ignore_versions @@ -0,0 +1 @@ +(5.5|5.6|5.7) diff --git a/go/vt/vttablet/onlineddl/vrepl.go b/go/vt/vttablet/onlineddl/vrepl.go index 847e40e3fbc..fe6d2bd9141 100644 --- a/go/vt/vttablet/onlineddl/vrepl.go +++ b/go/vt/vttablet/onlineddl/vrepl.go @@ -571,6 +571,9 @@ func (v *VRepl) generateFilterQuery(ctx context.Context) error { sb.WriteString(fmt.Sprintf("CONCAT(%s)", escapeName(name))) case sourceCol.Type == vrepl.JSONColumnType: sb.WriteString(fmt.Sprintf("convert(%s using utf8mb4)", escapeName(name))) + case targetCol.Type == vrepl.JSONColumnType: + // Convert any type to JSON: encode the type as utf8mb4 text + sb.WriteString(fmt.Sprintf("convert(%s using utf8mb4)", escapeName(name))) case sourceCol.Type == vrepl.StringColumnType: // Check source and target charset/encoding. If needed, create // a binlogdatapb.CharsetConversion entry (later written to vreplication) @@ -583,19 +586,19 @@ func (v *VRepl) generateFilterQuery(ctx context.Context) error { if targetCol.Type == vrepl.StringColumnType && toCollation == collations.Unknown { return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "Character set %s not supported for column %s", targetCol.Charset, targetCol.Name) } - - if trivialCharset(fromCollation) && trivialCharset(toCollation) && targetCol.Type != vrepl.JSONColumnType { + if trivialCharset(fromCollation) && trivialCharset(toCollation) { + sb.WriteString(escapeName(name)) + } else if fromCollation == toCollation { + // No need for charset conversions as both have the same collation. sb.WriteString(escapeName(name)) } else { + // Charset conversion required: v.convertCharset[targetName] = &binlogdatapb.CharsetConversion{ FromCharset: sourceCol.Charset, ToCharset: targetCol.Charset, } - sb.WriteString(fmt.Sprintf("convert(%s using utf8mb4)", escapeName(name))) + sb.WriteString(escapeName(name)) } - case targetCol.Type == vrepl.JSONColumnType && sourceCol.Type != vrepl.JSONColumnType: - // Convert any type to JSON: encode the type as utf8mb4 text - sb.WriteString(fmt.Sprintf("convert(%s using utf8mb4)", escapeName(name))) default: sb.WriteString(escapeName(name)) } diff --git a/go/vt/vttablet/tabletmanager/vreplication/replicator_plan.go b/go/vt/vttablet/tabletmanager/vreplication/replicator_plan.go index 3bef997d0be..98cbb378fdd 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/replicator_plan.go +++ b/go/vt/vttablet/tabletmanager/vreplication/replicator_plan.go @@ -27,6 +27,7 @@ import ( "vitess.io/vitess/go/mysql/collations/charset" "vitess.io/vitess/go/mysql/collations/colldata" vjson "vitess.io/vitess/go/mysql/json" + "vitess.io/vitess/go/mysql/sqlerror" "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/vt/binlog/binlogplayer" "vitess.io/vitess/go/vt/sqlparser" @@ -257,7 +258,7 @@ func (tp *TablePlan) applyBulkInsert(sqlbuffer *bytes2.Buffer, rows []*querypb.R if i > 0 { sqlbuffer.WriteString(", ") } - if err := appendFromRow(tp.BulkInsertValues, sqlbuffer, tp.Fields, row, tp.FieldsToSkip); err != nil { + if err := tp.appendFromRow(sqlbuffer, row); err != nil { return nil, err } } @@ -312,6 +313,30 @@ func (tp *TablePlan) isOutsidePKRange(bindvars map[string]*querypb.BindVariable, return false } +// convertStringCharset does a charset conversion given raw data and an applicable conversion rule. +// In case of a conversion error, it returns an equivalent of MySQL error 1366, which is what you'd +// get in a failed `CONVERT()` function, e.g.: +// +// > create table tascii(v varchar(100) charset ascii); +// > insert into tascii values ('€'); +// ERROR 1366 (HY000): Incorrect string value: '\xE2\x82\xAC' for column 'v' at row 1 +func (tp *TablePlan) convertStringCharset(raw []byte, conversion *binlogdatapb.CharsetConversion, fieldName string) ([]byte, error) { + fromCollation := tp.CollationEnv.DefaultCollationForCharset(conversion.FromCharset) + if fromCollation == collations.Unknown { + return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "character set %s not supported for column %s", conversion.FromCharset, fieldName) + } + toCollation := tp.CollationEnv.DefaultCollationForCharset(conversion.ToCharset) + if toCollation == collations.Unknown { + return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "character set %s not supported for column %s", conversion.ToCharset, fieldName) + } + + out, err := charset.Convert(nil, colldata.Lookup(toCollation).Charset(), raw, colldata.Lookup(fromCollation).Charset()) + if err != nil { + return nil, sqlerror.NewSQLError(sqlerror.ERTruncatedWrongValueForField, sqlerror.SSUnknownSQLState, "Incorrect string value: %s", err.Error()) + } + return out, nil +} + // bindFieldVal returns a bind variable based on given field and value. // Most values will just bind directly. But some values may need manipulation: // - text values with charset conversion @@ -320,11 +345,7 @@ func (tp *TablePlan) isOutsidePKRange(bindvars map[string]*querypb.BindVariable, func (tp *TablePlan) bindFieldVal(field *querypb.Field, val *sqltypes.Value) (*querypb.BindVariable, error) { if conversion, ok := tp.ConvertCharset[field.Name]; ok && !val.IsNull() { // Non-null string value, for which we have a charset conversion instruction - fromCollation := tp.CollationEnv.DefaultCollationForCharset(conversion.FromCharset) - if fromCollation == collations.Unknown { - return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "Character set %s not supported for column %s", conversion.FromCharset, field.Name) - } - out, err := charset.Convert(nil, charset.Charset_utf8mb4{}, val.Raw(), colldata.Lookup(fromCollation).Charset()) + out, err := tp.convertStringCharset(val.Raw(), conversion, field.Name) if err != nil { return nil, err } @@ -590,28 +611,30 @@ func valsEqual(v1, v2 sqltypes.Value) bool { // note: there can be more fields than bind locations since extra columns might be requested from the source if not all // primary keys columns are present in the target table, for example. Also some values in the row may not correspond for // values from the database on the source: sum/count for aggregation queries, for example -func appendFromRow(pq *sqlparser.ParsedQuery, buf *bytes2.Buffer, fields []*querypb.Field, row *querypb.Row, skipFields map[string]bool) error { - bindLocations := pq.BindLocations() - if len(fields) < len(bindLocations) { +func (tp *TablePlan) appendFromRow(buf *bytes2.Buffer, row *querypb.Row) error { + bindLocations := tp.BulkInsertValues.BindLocations() + if len(tp.Fields) < len(bindLocations) { return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "wrong number of fields: got %d fields for %d bind locations ", - len(fields), len(bindLocations)) + len(tp.Fields), len(bindLocations)) } type colInfo struct { typ querypb.Type length int64 offset int64 + field *querypb.Field } rowInfo := make([]*colInfo, 0) offset := int64(0) - for i, field := range fields { // collect info required for fields to be bound + for i, field := range tp.Fields { // collect info required for fields to be bound length := row.Lengths[i] - if !skipFields[strings.ToLower(field.Name)] { + if !tp.FieldsToSkip[strings.ToLower(field.Name)] { rowInfo = append(rowInfo, &colInfo{ typ: field.Type, length: length, offset: offset, + field: field, }) } if length > 0 { @@ -623,7 +646,7 @@ func appendFromRow(pq *sqlparser.ParsedQuery, buf *bytes2.Buffer, fields []*quer var offsetQuery int for i, loc := range bindLocations { col := rowInfo[i] - buf.WriteString(pq.Query[offsetQuery:loc.Offset]) + buf.WriteString(tp.BulkInsertValues.Query[offsetQuery:loc.Offset]) typ := col.typ switch typ { @@ -645,12 +668,25 @@ func appendFromRow(pq *sqlparser.ParsedQuery, buf *bytes2.Buffer, fields []*quer // -1 means a null variable; serialize it directly buf.WriteString(sqltypes.NullStr) } else { - vv := sqltypes.MakeTrusted(typ, row.Values[col.offset:col.offset+col.length]) + raw := row.Values[col.offset : col.offset+col.length] + var vv sqltypes.Value + + if conversion, ok := tp.ConvertCharset[col.field.Name]; ok && col.length > 0 { + // Non-null string value, for which we have a charset conversion instruction + out, err := tp.convertStringCharset(raw, conversion, col.field.Name) + if err != nil { + return err + } + vv = sqltypes.MakeTrusted(typ, out) + } else { + vv = sqltypes.MakeTrusted(typ, raw) + } + vv.EncodeSQLBytes2(buf) } } offsetQuery = loc.Offset + loc.Length } - buf.WriteString(pq.Query[offsetQuery:]) + buf.WriteString(tp.BulkInsertValues.Query[offsetQuery:]) return nil }