diff --git a/go/test/endtoend/onlineddl/vrepl_suite/onlineddl_vrepl_suite_test.go b/go/test/endtoend/onlineddl/vrepl_suite/onlineddl_vrepl_suite_test.go index c24a55a9fc8..6122a71aa44 100644 --- a/go/test/endtoend/onlineddl/vrepl_suite/onlineddl_vrepl_suite_test.go +++ b/go/test/endtoend/onlineddl/vrepl_suite/onlineddl_vrepl_suite_test.go @@ -59,7 +59,7 @@ var ( ) const ( - testDataPath = "/tmp/onlineddltests" + testDataPath = "testdata" ) func TestMain(m *testing.M) { diff --git a/go/vt/vttablet/onlineddl/vrepl.go b/go/vt/vttablet/onlineddl/vrepl.go index 6a2d7f944bc..847e40e3fbc 100644 --- a/go/vt/vttablet/onlineddl/vrepl.go +++ b/go/vt/vttablet/onlineddl/vrepl.go @@ -476,19 +476,17 @@ func (v *VRepl) analyzeTables(ctx context.Context, conn *dbconnpool.DBConnection return err } - /* - for _, sourcePKColumn := range sharedPKColumns.Columns() { - mappedColumn := v.targetSharedColumns.GetColumn(sourcePKColumn.Name) - if sourcePKColumn.Type == vrepl.EnumColumnType && mappedColumn.Type == vrepl.EnumColumnType { - // An ENUM as part of PRIMARY KEY. We must convert it to text because OMG that's complicated. - // There's a scenario where a query may modify the enum value (and it's bad practice, seeing - // that it's part of the PK, but it's still valid), and in that case we must have the string value - // to be able to DELETE the old row - v.targetSharedColumns.SetEnumToTextConversion(mappedColumn.Name, sourcePKColumn.EnumValues) - v.enumToTextMap[sourcePKColumn.Name] = sourcePKColumn.EnumValues - } + for _, sourcePKColumn := range sharedPKColumns.Columns() { + mappedColumn := v.targetSharedColumns.GetColumn(sourcePKColumn.Name) + if sourcePKColumn.Type == vrepl.EnumColumnType && mappedColumn.Type == vrepl.EnumColumnType { + // An ENUM as part of PRIMARY KEY. We must convert it to text because OMG that's complicated. + // There's a scenario where a query may modify the enum value (and it's bad practice, seeing + // that it's part of the PK, but it's still valid), and in that case we must have the string value + // to be able to DELETE the old row + v.targetSharedColumns.SetEnumToTextConversion(mappedColumn.Name, sourcePKColumn.EnumValues) + v.enumToTextMap[sourcePKColumn.Name] = sourcePKColumn.EnumValues } - */ + } for i := range v.sourceSharedColumns.Columns() { sourceColumn := v.sourceSharedColumns.Columns()[i] diff --git a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go index 0938c218b37..16dd062399e 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go @@ -751,27 +751,13 @@ func (vs *vstreamer) buildTablePlan(id uint64, tm *mysql.TableMap) (*binlogdatap vs.plans[id] = nil return nil, nil } + if err := addEnumAndSetMappingstoPlan(plan, cols); err != nil { + return nil, fmt.Errorf("failed to build ENUM and SET column integer to string mappings: %v", err) + } vs.plans[id] = &streamerPlan{ Plan: plan, TableMap: tm, } - // Add any necessary ENUM and SET integer position to string mappings. - for i, col := range cols { - if col.Type == querypb.Type_ENUM || col.Type == querypb.Type_SET { - if plan.EnumSetValuesMap == nil { - plan.EnumSetValuesMap = make(map[int]map[int]string) - } - // Strip the enum() / set() parts out. - begin := strings.Index(col.ColumnType, "(") - end := strings.LastIndex(col.ColumnType, ")") - if begin == -1 || end == -1 { - return nil, fmt.Errorf("enum or set column %s does not have valid string values: %s", - col.Name, col.ColumnType) - } - plan.EnumSetValuesMap[i] = schemautils.ParseEnumOrSetTokensMap(col.ColumnType[begin+1 : end]) - log.Errorf("DEBUG: enum values for %s: %v", col.Name, plan.EnumSetValuesMap[i]) - } - } return &binlogdatapb.VEvent{ Type: binlogdatapb.VEventType_FIELD, FieldEvent: &binlogdatapb.FieldEvent{ @@ -963,12 +949,6 @@ nextrow: } func (vs *vstreamer) processRowEvent(vevents []*binlogdatapb.VEvent, plan *streamerPlan, rows mysql.Rows) ([]*binlogdatapb.VEvent, error) { - defer func() { - if r := recover(); r != nil { - log.Errorf("DEBUG: caught panic: %v", r) - log.Flush() - } - }() rowChanges := make([]*binlogdatapb.RowChange, 0, len(rows.Rows)) for _, row := range rows.Rows { beforeOK, beforeValues, _, err := vs.extractRowAndFilter(plan, row.Identify, rows.IdentifyColumns, row.NullIdentifyColumns) @@ -1041,12 +1021,6 @@ func (vs *vstreamer) rebuildPlans() error { // - data values, array of one value per column // - true, if the row image was partial (i.e. binlog_row_image=noblob and dml doesn't update one or more blob/text columns) func (vs *vstreamer) extractRowAndFilter(plan *streamerPlan, data []byte, dataColumns, nullColumns mysql.Bitmap) (bool, []sqltypes.Value, bool, error) { - defer func() { - if r := recover(); r != nil { - log.Errorf("DEBUG: caught panic: %v", r) - log.Flush() - } - }() if len(data) == 0 { return false, nil, false, nil } @@ -1076,6 +1050,11 @@ func (vs *vstreamer) extractRowAndFilter(plan *streamerPlan, data []byte, dataCo } pos += l + if plan.EnumSetValuesMap == nil { + if err := addEnumAndSetMappingstoPlan(plan.Plan, plan.Table.Fields); err != nil { + return false, nil, false, fmt.Errorf("failed to build ENUM and SET column integer to string mappings: %v", err) + } + } // Convert the integer values in the binlog event for SET and ENUM fields into their // string representations. if plan.Table.Fields[colNum].Type == querypb.Type_ENUM { @@ -1086,14 +1065,12 @@ func (vs *vstreamer) extractRowAndFilter(plan *streamerPlan, data []byte, dataCo } strVal, ok := plan.EnumSetValuesMap[colNum][int(iv)] if !ok { - return false, nil, false, fmt.Errorf("no string value found for ENUM column %s in table %s using the found integer value: %d", - plan.Table.Fields[colNum].Name, plan.Table.Name, iv) + return false, nil, false, fmt.Errorf("no string value found for ENUM column %s in table %s -- with available values being: %v -- using the found integer value: %d", + plan.Table.Fields[colNum].Name, plan.Table.Name, plan.EnumSetValuesMap[colNum], iv) } value = sqltypes.MakeTrusted(plan.Table.Fields[colNum].Type, []byte(strVal)) - log.Errorf("DEBUG: extractRowAndFilter: mapped string value for col %d: %v", colNum, strVal) } if plan.Table.Fields[colNum].Type == querypb.Type_SET { - log.Errorf("DEBUG: column %s is a SET column", plan.Table.Name) val := bytes.Buffer{} // A SET column can have 64 unique values: https://dev.mysql.com/doc/refman/en/set.html // For this reason the binlog event contains the values encoded as an unsigned 64-bit @@ -1108,22 +1085,19 @@ func (vs *vstreamer) extractRowAndFilter(plan *streamerPlan, data []byte, dataCo // See what bits are set in the uint64 using bitmasks. for b := uint64(1); b < 1<<63; b <<= 1 { if iv&b > 0 { - log.Errorf("DEBUG: bit at position %d is set", idx) strVal, ok := plan.EnumSetValuesMap[colNum][idx] if !ok { - return false, nil, false, fmt.Errorf("no string value found for SET column %s in table %s using the found bit map: %b", - plan.Table.Fields[colNum].Name, plan.Table.Name, iv) + return false, nil, false, fmt.Errorf("no string value found for SET column %s in table %s -- with available values being: %v -- using the found bit map: %b", + plan.Table.Fields[colNum].Name, plan.Table.Name, plan.EnumSetValuesMap[colNum], iv) } if val.Len() > 0 { val.WriteByte(',') } val.WriteString(strVal) - log.Errorf("DEBUG: extractRowAndFilter: mapped string value for col %s: %v", plan.Table.Name, strVal) } idx++ } value = sqltypes.MakeTrusted(plan.Table.Fields[colNum].Type, val.Bytes()) - log.Errorf("DEBUG: extractRowAndFilter: mapped string value for col %d: %v", colNum, val.String()) } charsets[colNum] = collations.ID(plan.Table.Fields[colNum].Charset) @@ -1135,6 +1109,24 @@ func (vs *vstreamer) extractRowAndFilter(plan *streamerPlan, data []byte, dataCo return ok, filtered, partial, err } +// Add any necessary ENUM and SET integer position to string mappings. +func addEnumAndSetMappingstoPlan(plan *Plan, cols []*querypb.Field) error { + plan.EnumSetValuesMap = make(map[int]map[int]string) + for i, col := range cols { + if col.Type == querypb.Type_ENUM || col.Type == querypb.Type_SET { + // Strip the enum() / set() parts out. + begin := strings.Index(col.ColumnType, "(") + end := strings.LastIndex(col.ColumnType, ")") + if begin == -1 || end == -1 { + return fmt.Errorf("enum or set column %s does not have valid string values: %s", + col.Name, col.ColumnType) + } + plan.EnumSetValuesMap[i] = schemautils.ParseEnumOrSetTokensMap(col.ColumnType[begin+1 : end]) + } + } + return nil +} + func wrapError(err error, stopPos replication.Position, vse *Engine) error { if err != nil { vse.vstreamersEndedWithErrors.Add(1)