Skip to content

Commit

Permalink
VReplication: Ensure that RowStreamer uses optimal index when possible (
Browse files Browse the repository at this point in the history
#13893)

Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord authored Oct 1, 2023
1 parent 3b2da8c commit d5efe8e
Show file tree
Hide file tree
Showing 14 changed files with 464 additions and 314 deletions.
164 changes: 86 additions & 78 deletions go/vt/mysqlctl/fakemysqldaemon.go

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion go/vt/mysqlctl/mysql_daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ type MysqlDaemon interface {
GetSchema(ctx context.Context, dbName string, request *tabletmanagerdatapb.GetSchemaRequest) (*tabletmanagerdatapb.SchemaDefinition, error)
GetColumns(ctx context.Context, dbName, table string) ([]*querypb.Field, []string, error)
GetPrimaryKeyColumns(ctx context.Context, dbName, table string) ([]string, error)
GetPrimaryKeyEquivalentColumns(ctx context.Context, dbName, table string) ([]string, error)
GetPrimaryKeyEquivalentColumns(ctx context.Context, dbName, table string) ([]string, string, error)
PreflightSchemaChange(ctx context.Context, dbName string, changes []string) ([]*tabletmanagerdatapb.SchemaChangeResult, error)
ApplySchemaChange(ctx context.Context, dbName string, change *tmutils.SchemaChange) (*tabletmanagerdatapb.SchemaChangeResult, error)

Expand Down
39 changes: 27 additions & 12 deletions go/vt/mysqlctl/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,11 @@ import (
"vitess.io/vitess/go/vt/concurrency"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/mysqlctl/tmutils"
"vitess.io/vitess/go/vt/vterrors"

querypb "vitess.io/vitess/go/vt/proto/query"
tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata"
"vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/vterrors"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
)

const (
Expand Down Expand Up @@ -74,7 +75,7 @@ func encodeEntityName(name string) string {
// tableListSQL returns an IN clause "('t1', 't2'...) for a list of tables."
func tableListSQL(tables []string) (string, error) {
if len(tables) == 0 {
return "", vterrors.New(vtrpc.Code_INTERNAL, "no tables for tableListSQL")
return "", vterrors.New(vtrpcpb.Code_INTERNAL, "no tables for tableListSQL")
}

encodedTables := make([]string, len(tables))
Expand Down Expand Up @@ -566,27 +567,28 @@ func (mysqld *Mysqld) ApplySchemaChange(ctx context.Context, dbName string, chan
// GetPrimaryKeyEquivalentColumns can be used if the table has
// no defined PRIMARY KEY. It will return the columns in a
// viable PRIMARY KEY equivalent (PKE) -- a NON-NULL UNIQUE
// KEY -- in the specified table. When multiple PKE indexes
// are available it will attempt to choose the most efficient
// one based on the column data types and the number of columns
// in the index. See here for the data type storage sizes:
// KEY -- along with that index's name in the specified table.
// When multiple PKE indexes are available it will attempt to
// choose the most efficient one based on the column data types
// and the number of columns in the index. See here for the data
// type storage sizes:
//
// https://dev.mysql.com/doc/refman/en/storage-requirements.html
//
// If this function is used on a table that DOES have a
// defined PRIMARY KEY then it may return the columns for
// that index if it is likely the most efficient one amongst
// the available PKE indexes on the table.
func (mysqld *Mysqld) GetPrimaryKeyEquivalentColumns(ctx context.Context, dbName, table string) ([]string, error) {
func (mysqld *Mysqld) GetPrimaryKeyEquivalentColumns(ctx context.Context, dbName, table string) ([]string, string, error) {
conn, err := getPoolReconnect(ctx, mysqld.dbaPool)
if err != nil {
return nil, err
return nil, "", err
}
defer conn.Recycle()

// We use column name aliases to guarantee lower case for our named results.
sql := `
SELECT COLUMN_NAME AS column_name FROM information_schema.STATISTICS AS index_cols INNER JOIN
SELECT index_cols.COLUMN_NAME AS column_name, index_cols.INDEX_NAME as index_name FROM information_schema.STATISTICS AS index_cols INNER JOIN
(
SELECT stats.INDEX_NAME, SUM(
CASE LOWER(cols.DATA_TYPE)
Expand Down Expand Up @@ -629,15 +631,28 @@ func (mysqld *Mysqld) GetPrimaryKeyEquivalentColumns(ctx context.Context, dbName
sql = fmt.Sprintf(sql, encodedDbName, encodedTable, encodedDbName, encodedTable, encodedDbName, encodedTable)
qr, err := conn.ExecuteFetch(sql, 1000, true)
if err != nil {
return nil, err
return nil, "", err
}

named := qr.Named()
cols := make([]string, len(qr.Rows))
indexName := ""
for i, row := range named.Rows {
cols[i] = row.AsString("column_name", "")
in := row.AsString("index_name", "")
if in == "" { // This should never happen
return nil, "", vterrors.Errorf(vtrpcpb.Code_INTERNAL, "PKE column (%s) returned with an empty index name",
cols[i])
}
switch {
case i == 0:
indexName = in
case i > 0 && indexName != in: // This should never happen
return nil, "", vterrors.Errorf(vtrpcpb.Code_INTERNAL, "PKE columns (%s) returned for more than one index: %s, %s",
strings.Join(cols, ","), indexName, in)
}
}
return cols, err
return cols, indexName, err
}

// tableDefinitions is a sortable collection of table definitions
Expand Down
Loading

0 comments on commit d5efe8e

Please sign in to comment.