From 569a17d1547926fee61a62e68ed9cb32b35093b3 Mon Sep 17 00:00:00 2001 From: Noble Mittal Date: Tue, 12 Nov 2024 12:24:23 +0530 Subject: [PATCH] refac: Initiate lookup.go for lookup related functions Signed-off-by: Noble Mittal --- go/vt/vtctl/workflow/lookup.go | 450 +++++++++++++++++++++++++++++++++ go/vt/vtctl/workflow/server.go | 60 +++-- 2 files changed, 484 insertions(+), 26 deletions(-) create mode 100644 go/vt/vtctl/workflow/lookup.go diff --git a/go/vt/vtctl/workflow/lookup.go b/go/vt/vtctl/workflow/lookup.go new file mode 100644 index 00000000000..96842b07e3f --- /dev/null +++ b/go/vt/vtctl/workflow/lookup.go @@ -0,0 +1,450 @@ +/* +Copyright 2024 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package workflow + +import ( + "context" + "fmt" + "slices" + "strings" + + "golang.org/x/exp/maps" + "google.golang.org/protobuf/proto" + + "vitess.io/vitess/go/sqlescape" + "vitess.io/vitess/go/vt/schema" + "vitess.io/vitess/go/vt/sqlparser" + "vitess.io/vitess/go/vt/vtctl/schematools" + "vitess.io/vitess/go/vt/vterrors" + "vitess.io/vitess/go/vt/vtgate/vindexes" + + tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata" + vschemapb "vitess.io/vitess/go/vt/proto/vschema" + vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata" + vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" +) + +// prepareCreateLookup performs the preparatory steps for creating a +// Lookup Vindex. +func (w *workflow) prepareCreateLookup(ctx context.Context, workflow, keyspace string, specs *vschemapb.Keyspace, continueAfterCopyWithOwner bool) ( + ms *vtctldatapb.MaterializeSettings, sourceVSchema, targetVSchema *vschemapb.Keyspace, cancelFunc func() error, err error) { + // Important variables are pulled out here. + var ( + vindexName string + vindex *vschemapb.Vindex + targetKeyspace string + targetTableName string + vindexFromCols []string + vindexToCol string + vindexIgnoreNulls bool + + sourceTableName string + // sourceTable is the supplied table info. + sourceTable *vschemapb.Table + // sourceVSchemaTable is the table info present in the vschema. + sourceVSchemaTable *vschemapb.Table + // sourceVindexColumns are computed from the input sourceTable. + sourceVindexColumns []string + + // Target table info. + createDDL string + materializeQuery string + ) + + // Validate input vindex. + if specs == nil { + return nil, nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "no vindex provided") + } + if len(specs.Vindexes) != 1 { + return nil, nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "only one vindex must be specified") + } + vindexName = maps.Keys(specs.Vindexes)[0] + vindex = maps.Values(specs.Vindexes)[0] + if !strings.Contains(vindex.Type, "lookup") { + return nil, nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "vindex %s is not a lookup type", vindex.Type) + } + targetKeyspace, targetTableName, err = w.parser.ParseTable(vindex.Params["table"]) + if err != nil || targetKeyspace == "" { + return nil, nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "vindex table name (%s) must be in the form .", vindex.Params["table"]) + } + vindexFromCols = strings.Split(vindex.Params["from"], ",") + for i, col := range vindexFromCols { + vindexFromCols[i] = strings.TrimSpace(col) + } + if strings.Contains(vindex.Type, "unique") { + if len(vindexFromCols) != 1 { + return nil, nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "unique vindex 'from' should have only one column") + } + } else { + if len(vindexFromCols) < 2 { + return nil, nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "non-unique vindex 'from' should have more than one column") + } + } + vindexToCol = vindex.Params["to"] + // Make the vindex write_only. If one exists already in the vschema, + // it will need to match this vindex exactly, including the write_only setting. + vindex.Params["write_only"] = "true" + // See if we can create the vindex without errors. + if _, err := vindexes.CreateVindex(vindex.Type, vindexName, vindex.Params); err != nil { + return nil, nil, nil, nil, err + } + if ignoreNullsStr, ok := vindex.Params["ignore_nulls"]; ok { + // This mirrors the behavior of vindexes.boolFromMap(). + switch ignoreNullsStr { + case "true": + vindexIgnoreNulls = true + case "false": + vindexIgnoreNulls = false + default: + return nil, nil, nil, nil, + vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "ignore_nulls (%s) value must be 'true' or 'false'", + ignoreNullsStr) + } + } + + // Validate input table. + if len(specs.Tables) < 1 || len(specs.Tables) > 2 { + return nil, nil, nil, nil, fmt.Errorf("one or two tables must be specified") + } + // Loop executes once or twice. + for tableName, table := range specs.Tables { + if len(table.ColumnVindexes) != 1 { + return nil, nil, nil, nil, + vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "exactly one ColumnVindex must be specified for the %s table", + tableName) + } + if tableName != targetTableName { // This is the source table. + sourceTableName = tableName + sourceTable = table + continue + } + // This is a primary vindex definition for the target table + // which allows you to override the vindex type used. + var vindexCols []string + if len(table.ColumnVindexes[0].Columns) != 0 { + vindexCols = table.ColumnVindexes[0].Columns + } else { + if table.ColumnVindexes[0].Column == "" { + return nil, nil, nil, nil, + vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "at least one column must be specified in ColumnVindexes for the %s table", + tableName) + } + vindexCols = []string{table.ColumnVindexes[0].Column} + } + if !slices.Equal(vindexCols, vindexFromCols) { + return nil, nil, nil, nil, + vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "columns in the lookup table %s primary vindex (%s) don't match the 'from' columns specified (%s)", + tableName, strings.Join(vindexCols, ","), strings.Join(vindexFromCols, ",")) + } + } + + // Validate input table and vindex consistency. + if sourceTable == nil || len(sourceTable.ColumnVindexes) != 1 { + return nil, nil, nil, nil, + vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "No ColumnVindex found for the owner table (%s) in the %s keyspace", + sourceTable, keyspace) + } + if sourceTable.ColumnVindexes[0].Name != vindexName { + return nil, nil, nil, nil, + vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "ColumnVindex name (%s) must match vindex name (%s)", + sourceTable.ColumnVindexes[0].Name, vindexName) + } + if vindex.Owner != "" && vindex.Owner != sourceTableName { + return nil, nil, nil, nil, + vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "vindex owner (%s) must match table name (%s)", + vindex.Owner, sourceTableName) + } + if len(sourceTable.ColumnVindexes[0].Columns) != 0 { + sourceVindexColumns = sourceTable.ColumnVindexes[0].Columns + } else { + if sourceTable.ColumnVindexes[0].Column == "" { + return nil, nil, nil, nil, + vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "at least one column must be specified in ColumnVindexes for the %s table", + sourceTableName) + } + sourceVindexColumns = []string{sourceTable.ColumnVindexes[0].Column} + } + if len(sourceVindexColumns) != len(vindexFromCols) { + return nil, nil, nil, nil, + vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "length of table columns (%d) differs from length of vindex columns (%d)", + len(sourceVindexColumns), len(vindexFromCols)) + } + + // Validate against source vschema. + sourceVSchema, err = w.ts.GetVSchema(ctx, keyspace) + if err != nil { + return nil, nil, nil, nil, err + } + if sourceVSchema.Vindexes == nil { + sourceVSchema.Vindexes = make(map[string]*vschemapb.Vindex) + } + // If source and target keyspaces are the same, make vschemas point + // to the same object. + if keyspace == targetKeyspace { + targetVSchema = sourceVSchema + } else { + targetVSchema, err = w.ts.GetVSchema(ctx, targetKeyspace) + if err != nil { + return nil, nil, nil, nil, err + } + } + if targetVSchema.Vindexes == nil { + targetVSchema.Vindexes = make(map[string]*vschemapb.Vindex) + } + if targetVSchema.Tables == nil { + targetVSchema.Tables = make(map[string]*vschemapb.Table) + } + if existing, ok := sourceVSchema.Vindexes[vindexName]; ok { + if !proto.Equal(existing, vindex) { // If the exact same vindex already exists then we can re-use it + return nil, nil, nil, nil, + vterrors.Errorf(vtrpcpb.Code_INTERNAL, "a conflicting vindex named %s already exists in the %s keyspace", + vindexName, keyspace) + } + } + sourceVSchemaTable = sourceVSchema.Tables[sourceTableName] + if sourceVSchemaTable == nil && !schema.IsInternalOperationTableName(sourceTableName) { + return nil, nil, nil, nil, + vterrors.Errorf(vtrpcpb.Code_INTERNAL, "table %s not found in the %s keyspace", sourceTableName, keyspace) + } + for _, colVindex := range sourceVSchemaTable.ColumnVindexes { + // For a conflict, the vindex name and column should match. + if colVindex.Name != vindexName { + continue + } + var colNames []string + if len(colVindex.Columns) == 0 { + colNames = []string{colVindex.Column} + } else { + colNames = colVindex.Columns + } + // If this is the exact same definition then we can use the existing one. If they + // are not the same then they are two distinct conflicting vindexes and we should + // not proceed. + if !slices.Equal(colNames, sourceVindexColumns) { + return nil, nil, nil, nil, + vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "a conflicting ColumnVindex on column(s) %s in table %s already exists in the %s keyspace", + strings.Join(colNames, ","), sourceTableName, keyspace) + } + } + + // Validate against source schema. + sourceShards, err := w.ts.GetServingShards(ctx, keyspace) + if err != nil { + return nil, nil, nil, nil, err + } + onesource := sourceShards[0] + if onesource.PrimaryAlias == nil { + return nil, nil, nil, nil, + vterrors.Errorf(vtrpcpb.Code_INTERNAL, "source shard %s has no primary", onesource.ShardName()) + } + req := &tabletmanagerdatapb.GetSchemaRequest{Tables: []string{sourceTableName}} + tableSchema, err := schematools.GetSchema(ctx, w.ts, w.tmc, onesource.PrimaryAlias, req) + if err != nil { + return nil, nil, nil, nil, err + } + if len(tableSchema.TableDefinitions) != 1 { + return nil, nil, nil, nil, + vterrors.Errorf(vtrpcpb.Code_INTERNAL, "unexpected number of tables (%d) returned from %s schema", + len(tableSchema.TableDefinitions), keyspace) + } + + // Generate "create table" statement. + lines := strings.Split(tableSchema.TableDefinitions[0].Schema, "\n") + if len(lines) < 3 { + // Should never happen. + return nil, nil, nil, nil, + vterrors.Errorf(vtrpcpb.Code_INTERNAL, "schema looks incorrect: %s, expecting at least four lines", + tableSchema.TableDefinitions[0].Schema) + } + var modified []string + modified = append(modified, strings.Replace(lines[0], sourceTableName, targetTableName, 1)) + for i := range sourceVindexColumns { + line, err := generateColDef(lines, sourceVindexColumns[i], vindexFromCols[i]) + if err != nil { + return nil, nil, nil, nil, err + } + modified = append(modified, line) + } + + if vindex.Params["data_type"] == "" || strings.EqualFold(vindex.Type, "consistent_lookup_unique") || strings.EqualFold(vindex.Type, "consistent_lookup") { + modified = append(modified, fmt.Sprintf(" %s varbinary(128),", sqlescape.EscapeID(vindexToCol))) + } else { + modified = append(modified, fmt.Sprintf(" %s %s,", sqlescape.EscapeID(vindexToCol), sqlescape.EscapeID(vindex.Params["data_type"]))) + } + buf := sqlparser.NewTrackedBuffer(nil) + fmt.Fprintf(buf, " PRIMARY KEY (") + prefix := "" + for _, col := range vindexFromCols { + fmt.Fprintf(buf, "%s%s", prefix, sqlescape.EscapeID(col)) + prefix = ", " + } + fmt.Fprintf(buf, ")") + modified = append(modified, buf.String()) + modified = append(modified, ")") + createDDL = strings.Join(modified, "\n") + // Confirm that our DDL is valid before we create anything. + if _, err = w.parser.ParseStrictDDL(createDDL); err != nil { + return nil, nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "error: %v; invalid lookup table definition generated: %s", + err, createDDL) + } + + // Generate vreplication query. + buf = sqlparser.NewTrackedBuffer(nil) + buf.Myprintf("select ") + for i := range vindexFromCols { + buf.Myprintf("%s as %s, ", sqlparser.String(sqlparser.NewIdentifierCI(sourceVindexColumns[i])), sqlparser.String(sqlparser.NewIdentifierCI(vindexFromCols[i]))) + } + if strings.EqualFold(vindexToCol, "keyspace_id") || strings.EqualFold(vindex.Type, "consistent_lookup_unique") || strings.EqualFold(vindex.Type, "consistent_lookup") { + buf.Myprintf("keyspace_id() as %s ", sqlparser.String(sqlparser.NewIdentifierCI(vindexToCol))) + } else { + buf.Myprintf("%s as %s ", sqlparser.String(sqlparser.NewIdentifierCI(vindexToCol)), sqlparser.String(sqlparser.NewIdentifierCI(vindexToCol))) + } + buf.Myprintf("from %s", sqlparser.String(sqlparser.NewIdentifierCS(sourceTableName))) + if vindexIgnoreNulls { + buf.Myprintf(" where ") + lastValIdx := len(vindexFromCols) - 1 + for i := range vindexFromCols { + buf.Myprintf("%s is not null", sqlparser.String(sqlparser.NewIdentifierCI(vindexFromCols[i]))) + if i != lastValIdx { + buf.Myprintf(" and ") + } + } + } + if vindex.Owner != "" { + // Only backfill. + buf.Myprintf(" group by ") + for i := range vindexFromCols { + buf.Myprintf("%s, ", sqlparser.String(sqlparser.NewIdentifierCI(vindexFromCols[i]))) + } + buf.Myprintf("%s", sqlparser.String(sqlparser.NewIdentifierCI(vindexToCol))) + } + materializeQuery = buf.String() + + // Save a copy of the original vschema if we modify it and need to provide + // a cancelFunc. + ogTargetVSchema := targetVSchema.CloneVT() + targetChanged := false + + // Update targetVSchema. + targetTable := specs.Tables[targetTableName] + if targetVSchema.Sharded { + // Choose a primary vindex type for the lookup table based on the source + // definition if one was not explicitly specified. + var targetVindexType string + var targetVindex *vschemapb.Vindex + for _, field := range tableSchema.TableDefinitions[0].Fields { + if sourceVindexColumns[0] == field.Name { + if targetTable != nil && len(targetTable.ColumnVindexes) > 0 { + targetVindexType = targetTable.ColumnVindexes[0].Name + } + if targetVindexType == "" { + targetVindexType, err = vindexes.ChooseVindexForType(field.Type) + if err != nil { + return nil, nil, nil, nil, err + } + } + targetVindex = &vschemapb.Vindex{ + Type: targetVindexType, + } + break + } + } + if targetVindex == nil { + // Unreachable. We validated column names when generating the DDL. + return nil, nil, nil, nil, + vterrors.Errorf(vtrpcpb.Code_INTERNAL, "column %s not found in target schema %s", + sourceVindexColumns[0], tableSchema.TableDefinitions[0].Schema) + } + if existing, ok := targetVSchema.Vindexes[targetVindexType]; ok { + if !proto.Equal(existing, targetVindex) { + return nil, nil, nil, nil, + vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "a conflicting vindex named %v already exists in the %s keyspace", + targetVindexType, targetKeyspace) + } + } else { + targetVSchema.Vindexes[targetVindexType] = targetVindex + targetChanged = true + } + + targetTable = &vschemapb.Table{ + ColumnVindexes: []*vschemapb.ColumnVindex{{ + Column: vindexFromCols[0], + Name: targetVindexType, + }}, + } + } else { + targetTable = &vschemapb.Table{} + } + if existing, ok := targetVSchema.Tables[targetTableName]; ok { + if !proto.Equal(existing, targetTable) { + return nil, nil, nil, nil, + vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "a conflicting table named %s already exists in the %s vschema", + targetTableName, targetKeyspace) + } + } else { + targetVSchema.Tables[targetTableName] = targetTable + targetChanged = true + } + + if targetChanged { + cancelFunc = func() error { + // Restore the original target vschema. + return w.ts.SaveVSchema(ctx, targetKeyspace, ogTargetVSchema) + } + } + + ms = &vtctldatapb.MaterializeSettings{ + Workflow: workflow, + MaterializationIntent: vtctldatapb.MaterializationIntent_CREATELOOKUPINDEX, + SourceKeyspace: keyspace, + TargetKeyspace: targetKeyspace, + StopAfterCopy: vindex.Owner != "" && !continueAfterCopyWithOwner, + TableSettings: []*vtctldatapb.TableMaterializeSettings{{ + TargetTable: targetTableName, + SourceExpression: materializeQuery, + CreateDdl: createDDL, + }}, + } + + // Update sourceVSchema + sourceVSchema.Vindexes[vindexName] = vindex + sourceVSchemaTable.ColumnVindexes = append(sourceVSchemaTable.ColumnVindexes, sourceTable.ColumnVindexes[0]) + + return ms, sourceVSchema, targetVSchema, cancelFunc, nil +} + +func generateColDef(lines []string, sourceVindexCol, vindexFromCol string) (string, error) { + source := sqlescape.EscapeID(sourceVindexCol) + target := sqlescape.EscapeID(vindexFromCol) + + for _, line := range lines[1:] { + if strings.Contains(line, source) { + line = strings.Replace(line, source, target, 1) + line = strings.Replace(line, " AUTO_INCREMENT", "", 1) + line = strings.Replace(line, " DEFAULT NULL", "", 1) + // Ensure that the column definition ends with a comma as we will + // be appending the TO column and PRIMARY KEY definitions. If the + // souce column here was the last entity defined in the source + // table's definition then it will not already have the comma. + if !strings.HasSuffix(strings.TrimSpace(line), ",") { + line += "," + } + return line, nil + } + } + return "", fmt.Errorf("column %s not found in schema %v", sourceVindexCol, lines) +} diff --git a/go/vt/vtctl/workflow/server.go b/go/vt/vtctl/workflow/server.go index 8b272333890..1f1fc18b691 100644 --- a/go/vt/vtctl/workflow/server.go +++ b/go/vt/vtctl/workflow/server.go @@ -715,11 +715,8 @@ func (s *Server) Materialize(ctx context.Context, ms *vtctldatapb.MaterializeSet cells[i] = strings.TrimSpace(cells[i]) } - switch { - case len(ms.ReferenceTables) == 0 && len(ms.TableSettings) == 0: - return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "either --table-settings or --reference-tables must be specified") - case len(ms.ReferenceTables) > 0 && len(ms.TableSettings) > 0: - return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "cannot specify both --table-settings and --reference-tables") + if err := validateMaterializeSettings(ms); err != nil { + return err } for _, table := range ms.ReferenceTables { @@ -746,6 +743,17 @@ func (s *Server) Materialize(ctx context.Context, ms *vtctldatapb.MaterializeSet return mz.startStreams(ctx) } +func validateMaterializeSettings(ms *vtctldatapb.MaterializeSettings) error { + switch { + case len(ms.ReferenceTables) == 0 && len(ms.TableSettings) == 0: + return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "either --table-settings or --reference-tables must be specified") + case len(ms.ReferenceTables) > 0 && len(ms.TableSettings) > 0: + return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "cannot specify both --table-settings and --reference-tables") + } + + return nil +} + // MoveTablesCreate is part of the vtctlservicepb.VtctldServer interface. // It passes the embedded TabletRequest object to the given keyspace's // target primary tablets that will be executing the workflow. @@ -3797,27 +3805,27 @@ func (s *Server) prepareCreateLookup(ctx context.Context, workflow, keyspace str return ms, sourceVSchema, targetVSchema, cancelFunc, nil } -func generateColDef(lines []string, sourceVindexCol, vindexFromCol string) (string, error) { - source := sqlescape.EscapeID(sourceVindexCol) - target := sqlescape.EscapeID(vindexFromCol) - - for _, line := range lines[1:] { - if strings.Contains(line, source) { - line = strings.Replace(line, source, target, 1) - line = strings.Replace(line, " AUTO_INCREMENT", "", 1) - line = strings.Replace(line, " DEFAULT NULL", "", 1) - // Ensure that the column definition ends with a comma as we will - // be appending the TO column and PRIMARY KEY definitions. If the - // souce column here was the last entity defined in the source - // table's definition then it will not already have the comma. - if !strings.HasSuffix(strings.TrimSpace(line), ",") { - line += "," - } - return line, nil - } - } - return "", fmt.Errorf("column %s not found in schema %v", sourceVindexCol, lines) -} +// func generateColDef(lines []string, sourceVindexCol, vindexFromCol string) (string, error) { +// source := sqlescape.EscapeID(sourceVindexCol) +// target := sqlescape.EscapeID(vindexFromCol) + +// for _, line := range lines[1:] { +// if strings.Contains(line, source) { +// line = strings.Replace(line, source, target, 1) +// line = strings.Replace(line, " AUTO_INCREMENT", "", 1) +// line = strings.Replace(line, " DEFAULT NULL", "", 1) +// // Ensure that the column definition ends with a comma as we will +// // be appending the TO column and PRIMARY KEY definitions. If the +// // souce column here was the last entity defined in the source +// // table's definition then it will not already have the comma. +// if !strings.HasSuffix(strings.TrimSpace(line), ",") { +// line += "," +// } +// return line, nil +// } +// } +// return "", fmt.Errorf("column %s not found in schema %v", sourceVindexCol, lines) +// } func (s *Server) MigrateCreate(ctx context.Context, req *vtctldatapb.MigrateCreateRequest) (*vtctldatapb.WorkflowStatusResponse, error) { moveTablesCreateRequest := &vtctldatapb.MoveTablesCreateRequest{