Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

VReplication: Ensure that RowStreamer uses optimal index when possible #13893

Merged
merged 9 commits into from
Oct 1, 2023
164 changes: 86 additions & 78 deletions go/vt/mysqlctl/fakemysqldaemon.go

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion go/vt/mysqlctl/mysql_daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ type MysqlDaemon interface {
GetSchema(ctx context.Context, dbName string, request *tabletmanagerdatapb.GetSchemaRequest) (*tabletmanagerdatapb.SchemaDefinition, error)
GetColumns(ctx context.Context, dbName, table string) ([]*querypb.Field, []string, error)
GetPrimaryKeyColumns(ctx context.Context, dbName, table string) ([]string, error)
GetPrimaryKeyEquivalentColumns(ctx context.Context, dbName, table string) ([]string, error)
GetPrimaryKeyEquivalentColumns(ctx context.Context, dbName, table string) ([]string, string, error)
PreflightSchemaChange(ctx context.Context, dbName string, changes []string) ([]*tabletmanagerdatapb.SchemaChangeResult, error)
ApplySchemaChange(ctx context.Context, dbName string, change *tmutils.SchemaChange) (*tabletmanagerdatapb.SchemaChangeResult, error)

Expand Down
39 changes: 27 additions & 12 deletions go/vt/mysqlctl/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,11 @@ import (
"vitess.io/vitess/go/vt/concurrency"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/mysqlctl/tmutils"
"vitess.io/vitess/go/vt/vterrors"

querypb "vitess.io/vitess/go/vt/proto/query"
tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata"
"vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/vterrors"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
)

const (
Expand Down Expand Up @@ -74,7 +75,7 @@ func encodeEntityName(name string) string {
// tableListSQL returns an IN clause "('t1', 't2'...) for a list of tables."
func tableListSQL(tables []string) (string, error) {
if len(tables) == 0 {
return "", vterrors.New(vtrpc.Code_INTERNAL, "no tables for tableListSQL")
return "", vterrors.New(vtrpcpb.Code_INTERNAL, "no tables for tableListSQL")
}

encodedTables := make([]string, len(tables))
Expand Down Expand Up @@ -566,27 +567,28 @@ func (mysqld *Mysqld) ApplySchemaChange(ctx context.Context, dbName string, chan
// GetPrimaryKeyEquivalentColumns can be used if the table has
// no defined PRIMARY KEY. It will return the columns in a
// viable PRIMARY KEY equivalent (PKE) -- a NON-NULL UNIQUE
// KEY -- in the specified table. When multiple PKE indexes
// are available it will attempt to choose the most efficient
// one based on the column data types and the number of columns
// in the index. See here for the data type storage sizes:
// KEY -- along with that index's name in the specified table.
// When multiple PKE indexes are available it will attempt to
// choose the most efficient one based on the column data types
// and the number of columns in the index. See here for the data
// type storage sizes:
//
// https://dev.mysql.com/doc/refman/en/storage-requirements.html
//
// If this function is used on a table that DOES have a
// defined PRIMARY KEY then it may return the columns for
// that index if it is likely the most efficient one amongst
// the available PKE indexes on the table.
func (mysqld *Mysqld) GetPrimaryKeyEquivalentColumns(ctx context.Context, dbName, table string) ([]string, error) {
func (mysqld *Mysqld) GetPrimaryKeyEquivalentColumns(ctx context.Context, dbName, table string) ([]string, string, error) {
conn, err := getPoolReconnect(ctx, mysqld.dbaPool)
if err != nil {
return nil, err
return nil, "", err
}
defer conn.Recycle()

// We use column name aliases to guarantee lower case for our named results.
sql := `
SELECT COLUMN_NAME AS column_name FROM information_schema.STATISTICS AS index_cols INNER JOIN
SELECT index_cols.COLUMN_NAME AS column_name, index_cols.INDEX_NAME as index_name FROM information_schema.STATISTICS AS index_cols INNER JOIN
(
SELECT stats.INDEX_NAME, SUM(
CASE LOWER(cols.DATA_TYPE)
Expand Down Expand Up @@ -629,15 +631,28 @@ func (mysqld *Mysqld) GetPrimaryKeyEquivalentColumns(ctx context.Context, dbName
sql = fmt.Sprintf(sql, encodedDbName, encodedTable, encodedDbName, encodedTable, encodedDbName, encodedTable)
qr, err := conn.ExecuteFetch(sql, 1000, true)
if err != nil {
return nil, err
return nil, "", err
}

named := qr.Named()
cols := make([]string, len(qr.Rows))
indexName := ""
for i, row := range named.Rows {
cols[i] = row.AsString("column_name", "")
in := row.AsString("index_name", "")
if in == "" { // This should never happen
return nil, "", vterrors.Errorf(vtrpcpb.Code_INTERNAL, "PKE column (%s) returned with an empty index name",
cols[i])
}
switch {
case i == 0:
indexName = in
case i > 0 && indexName != in: // This should never happen
return nil, "", vterrors.Errorf(vtrpcpb.Code_INTERNAL, "PKE columns (%s) returned for more than one index: %s, %s",
strings.Join(cols, ","), indexName, in)
}
}
return cols, err
return cols, indexName, err
}

// tableDefinitions is a sortable collection of table definitions
Expand Down
Loading