Skip to content

Commit

Permalink
v18 backport: Online DDL: avoid SQL's CONVERT(...), convert program…
Browse files Browse the repository at this point in the history
…matically if needed (#16604)

Signed-off-by: Shlomi Noach <[email protected]>
  • Loading branch information
shlomi-noach authored Aug 15, 2024
1 parent 33672f2 commit 051a978
Show file tree
Hide file tree
Showing 12 changed files with 187 additions and 21 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
MODIFY `t1` varchar(128) CHARACTER SET utf8mb4 NOT NULL, MODIFY `t2` varchar(128) CHARACTER SET latin2 NOT NULL, MODIFY `tutf8` varchar(128) CHARACTER SET latin1 NOT NULL
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
drop table if exists onlineddl_test;
create table onlineddl_test (
id int auto_increment,
t1 varchar(128) charset latin1 collate latin1_swedish_ci,
t2 varchar(128) charset latin1 collate latin1_swedish_ci,
tutf8 varchar(128) charset utf8,
tutf8mb4 varchar(128) charset utf8mb4,
tlatin1 varchar(128) charset latin1 collate latin1_swedish_ci,
primary key(id)
) auto_increment=1;

insert into onlineddl_test values (null, md5(rand()), md5(rand()), md5(rand()), md5(rand()), md5(rand()));
insert into onlineddl_test values (null, 'átesting', 'átesting', 'átesting', 'átesting', 'átesting');
insert into onlineddl_test values (null, 'testátest', 'testátest', 'testátest', '🍻😀', 'átesting');
insert into onlineddl_test values (null, 'átesting-binlog', 'átesting-binlog', 'átesting-binlog', 'átesting-binlog', 'átesting-binlog');
insert into onlineddl_test values (null, 'testátest-binlog', 'testátest-binlog', 'testátest-binlog', '🍻😀', 'átesting-binlog');
insert into onlineddl_test values (null, 'átesting-bnull', 'átesting-bnull', 'átesting-bnull', null, null);

drop event if exists onlineddl_test;
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
(5.5|5.6|5.7)
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
drop table if exists onlineddl_test;
create table onlineddl_test (
id varchar(128) charset latin1 collate latin1_swedish_ci,
t1 varchar(128) charset latin1 collate latin1_swedish_ci,
t2 varchar(128) charset latin1 collate latin1_swedish_ci,
tutf8 varchar(128) charset utf8,
tutf8mb4 varchar(128) charset utf8mb4,
tlatin1 varchar(128) charset latin1 collate latin1_swedish_ci,
primary key(id)
) auto_increment=1;

insert into onlineddl_test values (concat('átesting-', md5(rand())), md5(rand()), md5(rand()), md5(rand()), md5(rand()), md5(rand()));
insert into onlineddl_test values (concat('átesting-', md5(rand())), 'átesting', 'átesting', 'átesting', 'átesting', 'átesting');
insert into onlineddl_test values (concat('átesting-', md5(rand())), 'testátest', 'testátest', 'testátest', '🍻😀', 'átesting');

drop event if exists onlineddl_test;
delimiter ;;
create event onlineddl_test
on schedule every 1 second
starts current_timestamp
ends current_timestamp + interval 60 second
on completion not preserve
enable
do
begin
insert into onlineddl_test values (concat('átesting-', md5(rand())), md5(rand()), md5(rand()), md5(rand()), md5(rand()), md5(rand()));
insert into onlineddl_test values (concat('átesting-', md5(rand())), 'átesting-binlog', 'átesting-binlog', 'átesting-binlog', 'átesting-binlog', 'átesting-binlog');
insert into onlineddl_test values (concat('átesting-', md5(rand())), 'testátest-binlog', 'testátest-binlog', 'testátest-binlog', '🍻😀', 'átesting-binlog');
insert into onlineddl_test values (concat('átesting-', md5(rand())), 'átesting-bnull', 'átesting-bnull', 'átesting-bnull', null, null);
end ;;
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
(5.5|5.6|5.7)
16 changes: 10 additions & 6 deletions go/vt/sqlparser/parsed_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type ParsedQuery struct {
}

type bindLocation struct {
offset, length int
Offset, Length int
}

// NewParsedQuery returns a ParsedQuery of the ast.
Expand Down Expand Up @@ -67,8 +67,8 @@ func (pq *ParsedQuery) GenerateQuery(bindVariables map[string]*querypb.BindVaria
func (pq *ParsedQuery) Append(buf *strings.Builder, bindVariables map[string]*querypb.BindVariable, extras map[string]Encodable) error {
current := 0
for _, loc := range pq.bindLocations {
buf.WriteString(pq.Query[current:loc.offset])
name := pq.Query[loc.offset : loc.offset+loc.length]
buf.WriteString(pq.Query[current:loc.Offset])
name := pq.Query[loc.Offset : loc.Offset+loc.Length]
if encodable, ok := extras[name[1:]]; ok {
encodable.EncodeSQL(buf)
} else {
Expand All @@ -78,7 +78,7 @@ func (pq *ParsedQuery) Append(buf *strings.Builder, bindVariables map[string]*qu
}
EncodeValue(buf, supplied)
}
current = loc.offset + loc.length
current = loc.Offset + loc.Length
}
buf.WriteString(pq.Query[current:])
return nil
Expand Down Expand Up @@ -122,7 +122,7 @@ func (pq *ParsedQuery) AppendFromRow(buf *bytes2.Buffer, fields []*querypb.Field
var offsetQuery int
for i, loc := range pq.bindLocations {
col := rowInfo[i]
buf.WriteString(pq.Query[offsetQuery:loc.offset])
buf.WriteString(pq.Query[offsetQuery:loc.Offset])
typ := col.typ

switch typ {
Expand All @@ -148,12 +148,16 @@ func (pq *ParsedQuery) AppendFromRow(buf *bytes2.Buffer, fields []*querypb.Field
vv.EncodeSQLBytes2(buf)
}
}
offsetQuery = loc.offset + loc.length
offsetQuery = loc.Offset + loc.Length
}
buf.WriteString(pq.Query[offsetQuery:])
return nil
}

func (pq *ParsedQuery) BindLocations() []bindLocation {
return pq.bindLocations
}

// MarshalJSON is a custom JSON marshaler for ParsedQuery.
// Note that any queries longer that 512 bytes will be truncated.
func (pq *ParsedQuery) MarshalJSON() ([]byte, error) {
Expand Down
2 changes: 1 addition & 1 deletion go/vt/sqlparser/parsed_query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func TestNewParsedQuery(t *testing.T) {
pq := NewParsedQuery(stmt)
want := &ParsedQuery{
Query: "select * from a where id = :id",
bindLocations: []bindLocation{{offset: 27, length: 3}},
bindLocations: []bindLocation{{Offset: 27, Length: 3}},
}
if !reflect.DeepEqual(pq, want) {
t.Errorf("GenerateParsedQuery: %+v, want %+v", pq, want)
Expand Down
4 changes: 2 additions & 2 deletions go/vt/sqlparser/tracked_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,8 +285,8 @@ func areBothISExpr(op Expr, val Expr) bool {
// tracking information for future substitutions.
func (buf *TrackedBuffer) WriteArg(prefix, arg string) {
buf.bindLocations = append(buf.bindLocations, bindLocation{
offset: buf.Len(),
length: len(prefix) + len(arg),
Offset: buf.Len(),
Length: len(prefix) + len(arg),
})
buf.WriteString(prefix)
buf.WriteString(arg)
Expand Down
15 changes: 9 additions & 6 deletions go/vt/vttablet/onlineddl/vrepl.go
Original file line number Diff line number Diff line change
Expand Up @@ -511,6 +511,9 @@ func (v *VRepl) generateFilterQuery(ctx context.Context) error {
sb.WriteString(fmt.Sprintf("CONCAT(%s)", escapeName(name)))
case sourceCol.Type == vrepl.JSONColumnType:
sb.WriteString(fmt.Sprintf("convert(%s using utf8mb4)", escapeName(name)))
case targetCol.Type == vrepl.JSONColumnType:
// Convert any type to JSON: encode the type as utf8mb4 text
sb.WriteString(fmt.Sprintf("convert(%s using utf8mb4)", escapeName(name)))
case sourceCol.Type == vrepl.StringColumnType:
// Check source and target charset/encoding. If needed, create
// a binlogdatapb.CharsetConversion entry (later written to vreplication)
Expand All @@ -523,19 +526,19 @@ func (v *VRepl) generateFilterQuery(ctx context.Context) error {
if targetCol.Type == vrepl.StringColumnType && toCollation == collations.Unknown {
return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "Character set %s not supported for column %s", targetCol.Charset, targetCol.Name)
}

if trivialCharset(fromCollation) && trivialCharset(toCollation) && targetCol.Type != vrepl.JSONColumnType {
if trivialCharset(fromCollation) && trivialCharset(toCollation) {
sb.WriteString(escapeName(name))
} else if fromCollation == toCollation {
// No need for charset conversions as both have the same collation.
sb.WriteString(escapeName(name))
} else {
// Charset conversion required:
v.convertCharset[targetName] = &binlogdatapb.CharsetConversion{
FromCharset: sourceCol.Charset,
ToCharset: targetCol.Charset,
}
sb.WriteString(fmt.Sprintf("convert(%s using utf8mb4)", escapeName(name)))
sb.WriteString(escapeName(name))
}
case targetCol.Type == vrepl.JSONColumnType && sourceCol.Type != vrepl.JSONColumnType:
// Convert any type to JSON: encode the type as utf8mb4 text
sb.WriteString(fmt.Sprintf("convert(%s using utf8mb4)", escapeName(name)))
default:
sb.WriteString(escapeName(name))
}
Expand Down
119 changes: 113 additions & 6 deletions go/vt/vttablet/tabletmanager/vreplication/replicator_plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"vitess.io/vitess/go/mysql/collations/charset"
"vitess.io/vitess/go/mysql/collations/colldata"
vjson "vitess.io/vitess/go/mysql/json"
"vitess.io/vitess/go/mysql/sqlerror"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/binlog/binlogplayer"
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
Expand Down Expand Up @@ -252,7 +253,7 @@ func (tp *TablePlan) applyBulkInsert(sqlbuffer *bytes2.Buffer, rows []*querypb.R
if i > 0 {
sqlbuffer.WriteString(", ")
}
if err := tp.BulkInsertValues.AppendFromRow(sqlbuffer, tp.Fields, row, tp.FieldsToSkip); err != nil {
if err := tp.appendFromRow(sqlbuffer, row); err != nil {
return nil, err
}
}
Expand Down Expand Up @@ -307,6 +308,30 @@ func (tp *TablePlan) isOutsidePKRange(bindvars map[string]*querypb.BindVariable,
return false
}

// convertStringCharset does a charset conversion given raw data and an applicable conversion rule.
// In case of a conversion error, it returns an equivalent of MySQL error 1366, which is what you'd
// get in a failed `CONVERT()` function, e.g.:
//
// > create table tascii(v varchar(100) charset ascii);
// > insert into tascii values ('€');
// ERROR 1366 (HY000): Incorrect string value: '\xE2\x82\xAC' for column 'v' at row 1
func (tp *TablePlan) convertStringCharset(raw []byte, conversion *binlogdatapb.CharsetConversion, fieldName string) ([]byte, error) {
fromCollation := collations.Local().DefaultCollationForCharset(conversion.FromCharset)
if fromCollation == collations.Unknown {
return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "character set %s not supported for column %s", conversion.FromCharset, fieldName)
}
toCollation := collations.Local().DefaultCollationForCharset(conversion.ToCharset)
if toCollation == collations.Unknown {
return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "character set %s not supported for column %s", conversion.ToCharset, fieldName)
}

out, err := charset.Convert(nil, colldata.Lookup(toCollation).Charset(), raw, colldata.Lookup(fromCollation).Charset())
if err != nil {
return nil, sqlerror.NewSQLError(sqlerror.ERTruncatedWrongValueForField, sqlerror.SSUnknownSQLState, "Incorrect string value: %s", err.Error())
}
return out, nil
}

// bindFieldVal returns a bind variable based on given field and value.
// Most values will just bind directly. But some values may need manipulation:
// - text values with charset conversion
Expand All @@ -315,11 +340,7 @@ func (tp *TablePlan) isOutsidePKRange(bindvars map[string]*querypb.BindVariable,
func (tp *TablePlan) bindFieldVal(field *querypb.Field, val *sqltypes.Value) (*querypb.BindVariable, error) {
if conversion, ok := tp.ConvertCharset[field.Name]; ok && !val.IsNull() {
// Non-null string value, for which we have a charset conversion instruction
fromCollation := collations.Local().DefaultCollationForCharset(conversion.FromCharset)
if fromCollation == collations.Unknown {
return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "Character set %s not supported for column %s", conversion.FromCharset, field.Name)
}
out, err := charset.Convert(nil, charset.Charset_utf8mb4{}, val.Raw(), colldata.Lookup(fromCollation).Charset())
out, err := tp.convertStringCharset(val.Raw(), conversion, field.Name)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -486,3 +507,89 @@ func valsEqual(v1, v2 sqltypes.Value) bool {
// Compare content only if none are null.
return v1.ToString() == v2.ToString()
}

// AppendFromRow behaves like Append but takes a querypb.Row directly, assuming that
// the fields in the row are in the same order as the placeholders in this query. The fields might include generated
// columns which are dropped, by checking against skipFields, before binding the variables
// note: there can be more fields than bind locations since extra columns might be requested from the source if not all
// primary keys columns are present in the target table, for example. Also some values in the row may not correspond for
// values from the database on the source: sum/count for aggregation queries, for example
func (tp *TablePlan) appendFromRow(buf *bytes2.Buffer, row *querypb.Row) error {
bindLocations := tp.BulkInsertValues.BindLocations()
if len(tp.Fields) < len(bindLocations) {
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "wrong number of fields: got %d fields for %d bind locations ",
len(tp.Fields), len(bindLocations))
}

type colInfo struct {
typ querypb.Type
length int64
offset int64
field *querypb.Field
}
rowInfo := make([]*colInfo, 0)

offset := int64(0)
for i, field := range tp.Fields { // collect info required for fields to be bound
length := row.Lengths[i]
if !tp.FieldsToSkip[strings.ToLower(field.Name)] {
rowInfo = append(rowInfo, &colInfo{
typ: field.Type,
length: length,
offset: offset,
field: field,
})
}
if length > 0 {
offset += row.Lengths[i]
}
}

// bind field values to locations
var offsetQuery int
for i, loc := range bindLocations {
col := rowInfo[i]
buf.WriteString(tp.BulkInsertValues.Query[offsetQuery:loc.Offset])
typ := col.typ

switch typ {
case querypb.Type_TUPLE:
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "unexpected Type_TUPLE for value %d", i)
case querypb.Type_JSON:
if col.length < 0 { // An SQL NULL and not an actual JSON value
buf.WriteString(sqltypes.NullStr)
} else { // A JSON value (which may be a JSON null literal value)
buf2 := row.Values[col.offset : col.offset+col.length]
vv, err := vjson.MarshalSQLValue(buf2)
if err != nil {
return err
}
buf.WriteString(vv.RawStr())
}
default:
if col.length < 0 {
// -1 means a null variable; serialize it directly
buf.WriteString(sqltypes.NullStr)
} else {
raw := row.Values[col.offset : col.offset+col.length]
var vv sqltypes.Value

if conversion, ok := tp.ConvertCharset[col.field.Name]; ok && col.length > 0 {
// Non-null string value, for which we have a charset conversion instruction
out, err := tp.convertStringCharset(raw, conversion, col.field.Name)
if err != nil {
return err
}
vv = sqltypes.MakeTrusted(typ, out)
} else {
vv = sqltypes.MakeTrusted(typ, raw)
}

vv.EncodeSQLBytes2(buf)
}
}
offsetQuery = loc.Offset + loc.Length
}
buf.WriteString(tp.BulkInsertValues.Query[offsetQuery:])
return nil
}

0 comments on commit 051a978

Please sign in to comment.