Skip to content

Commit

Permalink
VDiff: Support diffing tables without a defined Primary Key (#14794)
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord authored Dec 20, 2023
1 parent 071454f commit be7b670
Show file tree
Hide file tree
Showing 12 changed files with 200 additions and 63 deletions.
10 changes: 10 additions & 0 deletions go/test/endtoend/vreplication/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ create table geom_tbl (id int, g geometry, p point, ls linestring, pg polygon, m
create table ` + "`blüb_tbl`" + ` (id int, val1 varchar(20), ` + "`blöb1`" + ` blob, val2 varbinary(20), ` + "`bl@b2`" + ` longblob, txt1 text, blb3 tinyblob, txt2 longtext, blb4 mediumblob, primary key(id));
create table reftable (id int, val1 varchar(20), primary key(id), key(val1));
create table loadtest (id int, name varchar(256), primary key(id), key(name));
create table nopk (name varchar(128), age int unsigned);
`
// These should always be ignored in vreplication
internalSchema = `
Expand Down Expand Up @@ -94,6 +95,7 @@ create table loadtest (id int, name varchar(256), primary key(id), key(name));
"db_order_test": {},
"vdiff_order": {},
"datze": {},
"nopk": {},
"reftable": {
"type": "reference"
}
Expand Down Expand Up @@ -216,6 +218,14 @@ create table loadtest (id int, name varchar(256), primary key(id), key(name));
}
]
},
"nopk": {
"column_vindexes": [
{
"columns": ["name"],
"name": "unicode_loose_md5"
}
]
},
"reftable": {
"type": "reference"
}
Expand Down
10 changes: 7 additions & 3 deletions go/test/endtoend/vreplication/vdiff2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ var testCases = []*testCase{
sourceShards: "0",
targetShards: "-80,80-",
tabletBaseID: 200,
tables: "customer,Lead,Lead-1",
tables: "customer,Lead,Lead-1,nopk",
autoRetryError: true,
retryInsert: `insert into customer(cid, name, typ) values(1991234, 'Testy McTester', 'soho')`,
resume: true,
Expand Down Expand Up @@ -117,7 +117,7 @@ func TestVDiff2(t *testing.T) {
sourceShards := []string{"0"}
targetKs := "customer"
targetShards := []string{"-80", "80-"}
// This forces us to use multiple vstream packets even with small test tables
// This forces us to use multiple vstream packets even with small test tables.
extraVTTabletArgs = []string{"--vstream_packet_size=1"}

vc = NewVitessCluster(t, "TestVDiff2", strings.Split(allCellNames, ","), mainClusterConfig)
Expand Down Expand Up @@ -150,7 +150,11 @@ func TestVDiff2(t *testing.T) {
query := `insert into customer(cid, name, typ, sport) values(1001, null, 'soho','')`
execVtgateQuery(t, vtgateConn, fmt.Sprintf("%s:%s", sourceKs, sourceShards[0]), query)

generateMoreCustomers(t, sourceKs, 100)
generateMoreCustomers(t, sourceKs, 1000)

// Create rows in the nopk table using the customer names and random ages between 20 and 100.
_, err = vtgateConn.ExecuteFetch(fmt.Sprintf("insert into %s.nopk(name, age) select name, floor(rand()*80)+20 from %s.customer", sourceKs, sourceKs), -1, false)
require.NoError(t, err, "failed to insert rows into nopk table: %v", err)

// The primary tablet is only added in the first cell.
// We ONLY add primary tablets in this test.
Expand Down
1 change: 0 additions & 1 deletion go/vt/mysqlctl/mysql_daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,6 @@ type MysqlDaemon interface {
GetSchema(ctx context.Context, dbName string, request *tabletmanagerdatapb.GetSchemaRequest) (*tabletmanagerdatapb.SchemaDefinition, error)
GetColumns(ctx context.Context, dbName, table string) ([]*querypb.Field, []string, error)
GetPrimaryKeyColumns(ctx context.Context, dbName, table string) ([]string, error)
GetPrimaryKeyEquivalentColumns(ctx context.Context, dbName, table string) ([]string, string, error)
PreflightSchemaChange(ctx context.Context, dbName string, changes []string) ([]*tabletmanagerdatapb.SchemaChangeResult, error)
ApplySchemaChange(ctx context.Context, dbName string, change *tmutils.SchemaChange) (*tabletmanagerdatapb.SchemaChangeResult, error)

Expand Down
10 changes: 2 additions & 8 deletions go/vt/mysqlctl/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -579,13 +579,7 @@ func (mysqld *Mysqld) ApplySchemaChange(ctx context.Context, dbName string, chan
// defined PRIMARY KEY then it may return the columns for
// that index if it is likely the most efficient one amongst
// the available PKE indexes on the table.
func (mysqld *Mysqld) GetPrimaryKeyEquivalentColumns(ctx context.Context, dbName, table string) ([]string, string, error) {
conn, err := getPoolReconnect(ctx, mysqld.dbaPool)
if err != nil {
return nil, "", err
}
defer conn.Recycle()

func GetPrimaryKeyEquivalentColumns(ctx context.Context, exec func(string, int, bool) (*sqltypes.Result, error), dbName, table string) ([]string, string, error) {
// We use column name aliases to guarantee lower case for our named results.
sql := `
SELECT index_cols.COLUMN_NAME AS column_name, index_cols.INDEX_NAME as index_name FROM information_schema.STATISTICS AS index_cols INNER JOIN
Expand Down Expand Up @@ -629,7 +623,7 @@ func (mysqld *Mysqld) GetPrimaryKeyEquivalentColumns(ctx context.Context, dbName
encodedDbName := encodeEntityName(dbName)
encodedTable := encodeEntityName(table)
sql = fmt.Sprintf(sql, encodedDbName, encodedTable, encodedDbName, encodedTable, encodedDbName, encodedTable)
qr, err := conn.Conn.ExecuteFetch(sql, 1000, true)
qr, err := exec(sql, 1000, true)
if err != nil {
return nil, "", err
}
Expand Down
22 changes: 16 additions & 6 deletions go/vt/vttablet/tabletmanager/vdiff/framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,16 +100,26 @@ var (
Columns: []string{"id", "dt"},
PrimaryKeyColumns: []string{"id"},
Fields: sqltypes.MakeTestFields("id|dt", "int64|datetime"),
}, {
Name: "nopk",
Columns: []string{"c1", "c2", "c3"},
Fields: sqltypes.MakeTestFields("c1|c2|c3", "int64|int64|int64"),
}, {
Name: "nopkwithpke",
Columns: []string{"c1", "c2", "c3"},
Fields: sqltypes.MakeTestFields("c1|c2|c3", "int64|int64|int64"),
},
},
}
tableDefMap = map[string]int{
"t1": 0,
"nonpktext": 1,
"pktext": 2,
"multipk": 3,
"aggr": 4,
"datze": 5,
"t1": 0,
"nonpktext": 1,
"pktext": 2,
"multipk": 3,
"aggr": 4,
"datze": 5,
"nopk": 6,
"nopkwithpke": 7,
}
)

Expand Down
51 changes: 44 additions & 7 deletions go/vt/vttablet/tabletmanager/vdiff/table_plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,23 @@ limitations under the License.
package vdiff

import (
"context"
"fmt"
"strings"

"vitess.io/vitess/go/mysql/collations"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/binlog/binlogplayer"
tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/vterrors"

"vitess.io/vitess/go/vt/log"
querypb "vitess.io/vitess/go/vt/proto/query"
"vitess.io/vitess/go/vt/mysqlctl"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vtgate/engine"
"vitess.io/vitess/go/vt/vtgate/engine/opcode"

querypb "vitess.io/vitess/go/vt/proto/query"
tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
)

const sqlSelectColumnCollations = "select column_name as column_name, collation_name as collation_name from information_schema.columns where table_schema=%a and table_name=%a and column_name in %a"
Expand Down Expand Up @@ -75,7 +77,7 @@ func (td *tableDiffer) buildTablePlan(dbClient binlogplayer.DBClient, dbName str

sourceSelect := &sqlparser.Select{}
targetSelect := &sqlparser.Select{}
// aggregates is the list of Aggregate functions, if any.
// Aggregates is the list of Aggregate functions, if any.
var aggregates []*engine.AggregateParams
for _, selExpr := range sel.SelectExprs {
switch selExpr := selExpr.(type) {
Expand Down Expand Up @@ -153,10 +155,25 @@ func (td *tableDiffer) buildTablePlan(dbClient binlogplayer.DBClient, dbName str
},
}

if len(tp.table.PrimaryKeyColumns) == 0 {
// We use the columns from a PKE if there is one.
pkeCols, err := tp.getPKEquivalentColumns(dbClient)
if err != nil {
return nil, vterrors.Wrapf(err, "error getting PK equivalent columns for table %s", tp.table.Name)
}
if len(pkeCols) > 0 {
tp.table.PrimaryKeyColumns = append(tp.table.PrimaryKeyColumns, pkeCols...)
} else {
// We use every column together as a substitute PK.
tp.table.PrimaryKeyColumns = append(tp.table.PrimaryKeyColumns, tp.table.Columns...)
}
}

err = tp.findPKs(dbClient, targetSelect, collationEnv)
if err != nil {
return nil, err
}

// Remove in_keyrange. It's not understood by mysql.
sourceSelect.Where = sel.Where // removeKeyrange(sel.Where)
// The source should also perform the group by.
Expand All @@ -178,6 +195,9 @@ func (td *tableDiffer) buildTablePlan(dbClient binlogplayer.DBClient, dbName str

// findPKs identifies PKs and removes them from the columns to do data comparison.
func (tp *tablePlan) findPKs(dbClient binlogplayer.DBClient, targetSelect *sqlparser.Select, collationEnv *collations.Environment) error {
if len(tp.table.PrimaryKeyColumns) == 0 {
return nil
}
var orderby sqlparser.OrderBy
for _, pk := range tp.table.PrimaryKeyColumns {
found := false
Expand All @@ -196,7 +216,7 @@ func (tp *tablePlan) findPKs(dbClient binlogplayer.DBClient, targetSelect *sqlpa
tp.compareCols[i].isPK = true
tp.comparePKs = append(tp.comparePKs, tp.compareCols[i])
tp.selectPks = append(tp.selectPks, i)
// We'll be comparing pks separately. So, remove them from compareCols.
// We'll be comparing PKs separately. So, remove them from compareCols.
tp.pkCols = append(tp.pkCols, i)
found = true
break
Expand Down Expand Up @@ -224,6 +244,9 @@ func (tp *tablePlan) findPKs(dbClient binlogplayer.DBClient, targetSelect *sqlpa
// saves the collations in the tablePlan's comparePKs column info
// structs for those subsequent operations.
func (tp *tablePlan) getPKColumnCollations(dbClient binlogplayer.DBClient, collationEnv *collations.Environment) error {
if len(tp.comparePKs) == 0 {
return nil
}
columnList := make([]string, len(tp.comparePKs))
for i := range tp.comparePKs {
columnList[i] = tp.comparePKs[i].colName
Expand Down Expand Up @@ -259,3 +282,17 @@ func (tp *tablePlan) getPKColumnCollations(dbClient binlogplayer.DBClient, colla
}
return nil
}

func (tp *tablePlan) getPKEquivalentColumns(dbClient binlogplayer.DBClient) ([]string, error) {
ctx, cancel := context.WithTimeout(context.Background(), BackgroundOperationTimeout/2)
defer cancel()
executeFetch := func(query string, maxrows int, wantfields bool) (*sqltypes.Result, error) {
// This sets wantfields to true.
return dbClient.ExecuteFetch(query, maxrows)
}
pkeCols, _, err := mysqlctl.GetPrimaryKeyEquivalentColumns(ctx, executeFetch, tp.dbName, tp.table.Name)
if err != nil {
return nil, err
}
return pkeCols, nil
}
Loading

0 comments on commit be7b670

Please sign in to comment.