Skip to content

Commit

Permalink
refac: use lookup instead of workflowFetcher for lookup vindex actions
Browse files Browse the repository at this point in the history
Signed-off-by: Noble Mittal <[email protected]>
  • Loading branch information
beingnoble03 committed Dec 16, 2024
1 parent 4d4e6f1 commit acedfde
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 31 deletions.
40 changes: 26 additions & 14 deletions go/vt/vtctl/workflow/lookup.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,21 +26,33 @@ import (
"google.golang.org/protobuf/proto"

"vitess.io/vitess/go/sqlescape"
"vitess.io/vitess/go/vt/logutil"
"vitess.io/vitess/go/vt/schema"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/vtctl/schematools"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vtgate/vindexes"
"vitess.io/vitess/go/vt/vttablet/tmclient"

tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata"
vschemapb "vitess.io/vitess/go/vt/proto/vschema"
vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
)

// lookup is responsible for performing actions related to lookup vindexes.
type lookup struct {
ts *topo.Server
tmc tmclient.TabletManagerClient

logger logutil.Logger
parser *sqlparser.Parser
}

// prepareCreateLookup performs the preparatory steps for creating a
// Lookup Vindex.
func (wf *workflowFetcher) prepareCreateLookup(ctx context.Context, workflow, keyspace string, specs *vschemapb.Keyspace, continueAfterCopyWithOwner bool) (
func (l *lookup) prepareCreateLookup(ctx context.Context, workflow, keyspace string, specs *vschemapb.Keyspace, continueAfterCopyWithOwner bool) (
ms *vtctldatapb.MaterializeSettings, sourceVSchema, targetVSchema *vschemapb.Keyspace, cancelFunc func() error, err error) {
var (
// sourceVSchemaTable is the table info present in the vschema.
Expand All @@ -54,7 +66,7 @@ func (wf *workflowFetcher) prepareCreateLookup(ctx context.Context, workflow, ke
)

// Validate input vindex.
vindex, vInfo, err := wf.validateAndGetVindex(specs)
vindex, vInfo, err := l.validateAndGetVindex(specs)
if err != nil {
return nil, nil, nil, nil, err
}
Expand All @@ -69,7 +81,7 @@ func (wf *workflowFetcher) prepareCreateLookup(ctx context.Context, workflow, ke
return nil, nil, nil, nil, err
}

sourceVSchema, targetVSchema, err = wf.getTargetAndSourceVSchema(ctx, keyspace, vInfo.targetKeyspace)
sourceVSchema, targetVSchema, err = l.getTargetAndSourceVSchema(ctx, keyspace, vInfo.targetKeyspace)
if err != nil {
return nil, nil, nil, nil, err
}
Expand All @@ -91,7 +103,7 @@ func (wf *workflowFetcher) prepareCreateLookup(ctx context.Context, workflow, ke
}

// Validate against source schema.
sourceShards, err := wf.ts.GetServingShards(ctx, keyspace)
sourceShards, err := l.ts.GetServingShards(ctx, keyspace)
if err != nil {
return nil, nil, nil, nil, err
}
Expand All @@ -102,7 +114,7 @@ func (wf *workflowFetcher) prepareCreateLookup(ctx context.Context, workflow, ke
}

req := &tabletmanagerdatapb.GetSchemaRequest{Tables: []string{vInfo.sourceTableName}}
tableSchema, err := schematools.GetSchema(ctx, wf.ts, wf.tmc, onesource.PrimaryAlias, req)
tableSchema, err := schematools.GetSchema(ctx, l.ts, l.tmc, onesource.PrimaryAlias, req)
if err != nil {
return nil, nil, nil, nil, err
}
Expand All @@ -113,7 +125,7 @@ func (wf *workflowFetcher) prepareCreateLookup(ctx context.Context, workflow, ke
}

// Generate "create table" statement.
createDDL, err = wf.generateCreateDDLStatement(tableSchema, sourceVindexColumns, vInfo, vindex)
createDDL, err = l.generateCreateDDLStatement(tableSchema, sourceVindexColumns, vInfo, vindex)
if err != nil {
return nil, nil, nil, nil, err
}
Expand Down Expand Up @@ -191,7 +203,7 @@ func (wf *workflowFetcher) prepareCreateLookup(ctx context.Context, workflow, ke
if targetChanged {
cancelFunc = func() error {
// Restore the original target vschema.
return wf.ts.SaveVSchema(ctx, vInfo.targetKeyspace, ogTargetVSchema)
return l.ts.SaveVSchema(ctx, vInfo.targetKeyspace, ogTargetVSchema)
}
}

Expand Down Expand Up @@ -230,7 +242,7 @@ type vindexInfo struct {
}

// validateAndGetVindex validates and extracts vindex configuration
func (wf *workflowFetcher) validateAndGetVindex(specs *vschemapb.Keyspace) (*vschemapb.Vindex, *vindexInfo, error) {
func (l *lookup) validateAndGetVindex(specs *vschemapb.Keyspace) (*vschemapb.Vindex, *vindexInfo, error) {
if specs == nil {
return nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "no vindex provided")
}
Expand All @@ -245,7 +257,7 @@ func (wf *workflowFetcher) validateAndGetVindex(specs *vschemapb.Keyspace) (*vsc
return nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "vindex %s is not a lookup type", vindex.Type)
}

targetKeyspace, targetTableName, err := wf.parser.ParseTable(vindex.Params["table"])
targetKeyspace, targetTableName, err := l.parser.ParseTable(vindex.Params["table"])
if err != nil || targetKeyspace == "" {
return nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT,
"vindex table name (%s) must be in the form <keyspace>.<table>", vindex.Params["table"])
Expand Down Expand Up @@ -306,8 +318,8 @@ func (wf *workflowFetcher) validateAndGetVindex(specs *vschemapb.Keyspace) (*vsc
}, nil
}

func (wf *workflowFetcher) getTargetAndSourceVSchema(ctx context.Context, sourceKeyspace string, targetKeyspace string) (sourceVSchema *vschemapb.Keyspace, targetVSchema *vschemapb.Keyspace, err error) {
sourceVSchema, err = wf.ts.GetVSchema(ctx, sourceKeyspace)
func (l *lookup) getTargetAndSourceVSchema(ctx context.Context, sourceKeyspace string, targetKeyspace string) (sourceVSchema *vschemapb.Keyspace, targetVSchema *vschemapb.Keyspace, err error) {
sourceVSchema, err = l.ts.GetVSchema(ctx, sourceKeyspace)
if err != nil {
return nil, nil, err
}
Expand All @@ -319,7 +331,7 @@ func (wf *workflowFetcher) getTargetAndSourceVSchema(ctx context.Context, source
if sourceKeyspace == targetKeyspace {
targetVSchema = sourceVSchema
} else {
targetVSchema, err = wf.ts.GetVSchema(ctx, targetKeyspace)
targetVSchema, err = l.ts.GetVSchema(ctx, targetKeyspace)
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -367,7 +379,7 @@ func getSourceTable(specs *vschemapb.Keyspace, targetTableName string, fromCols
return sourceTable, sourceTableName, nil
}

func (wf *workflowFetcher) generateCreateDDLStatement(tableSchema *tabletmanagerdatapb.SchemaDefinition, sourceVindexColumns []string, vInfo *vindexInfo, vindex *vschemapb.Vindex) (string, error) {
func (l *lookup) generateCreateDDLStatement(tableSchema *tabletmanagerdatapb.SchemaDefinition, sourceVindexColumns []string, vInfo *vindexInfo, vindex *vschemapb.Vindex) (string, error) {
lines := strings.Split(tableSchema.TableDefinitions[0].Schema, "\n")
if len(lines) < 3 {
// Should never happen.
Expand Down Expand Up @@ -405,7 +417,7 @@ func (wf *workflowFetcher) generateCreateDDLStatement(tableSchema *tabletmanager
createDDL := strings.Join(modified, "\n")

// Confirm that our DDL is valid before we create anything.
if _, err := wf.parser.ParseStrictDDL(createDDL); err != nil {
if _, err := l.parser.ParseStrictDDL(createDDL); err != nil {
return "", vterrors.Errorf(vtrpcpb.Code_INTERNAL, "error: %v; invalid lookup table definition generated: %s",
err, createDDL)
}
Expand Down
30 changes: 15 additions & 15 deletions go/vt/vtctl/workflow/materializer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1515,13 +1515,13 @@ func TestCreateLookupVindexCreateDDL(t *testing.T) {
setStartingVschema()
}()
}
w := &workflowFetcher{
l := &lookup{
ts: env.ws.ts,
tmc: env.ws.tmc,
logger: env.ws.Logger(),
parser: env.ws.SQLParser(),
}
outms, _, _, cancelFunc, err := w.prepareCreateLookup(ctx, "workflow", ms.SourceKeyspace, tcase.specs, false)
outms, _, _, cancelFunc, err := l.prepareCreateLookup(ctx, "workflow", ms.SourceKeyspace, tcase.specs, false)
if tcase.err != "" {
require.Error(t, err)
require.Contains(t, err.Error(), tcase.err, "prepareCreateLookup(%s) err: %v, does not contain %v", tcase.description, err, tcase.err)
Expand Down Expand Up @@ -1769,13 +1769,13 @@ func TestCreateLookupVindexSourceVSchema(t *testing.T) {
t.Fatal(err)
}

w := &workflowFetcher{
l := &lookup{
ts: env.ws.ts,
tmc: env.ws.tmc,
logger: env.ws.Logger(),
parser: env.ws.SQLParser(),
}
_, got, _, _, err := w.prepareCreateLookup(ctx, "workflow", ms.SourceKeyspace, specs, false)
_, got, _, _, err := l.prepareCreateLookup(ctx, "workflow", ms.SourceKeyspace, specs, false)
require.NoError(t, err)
if !proto.Equal(got, tcase.out) {
t.Errorf("%s: got:\n%v, want\n%v", tcase.description, got, tcase.out)
Expand Down Expand Up @@ -2011,13 +2011,13 @@ func TestCreateLookupVindexTargetVSchema(t *testing.T) {
t.Fatal(err)
}

w := &workflowFetcher{
l := &lookup{
ts: env.ws.ts,
tmc: env.ws.tmc,
logger: env.ws.Logger(),
parser: env.ws.SQLParser(),
}
_, _, got, cancelFunc, err := w.prepareCreateLookup(ctx, "workflow", ms.SourceKeyspace, specs, false)
_, _, got, cancelFunc, err := l.prepareCreateLookup(ctx, "workflow", ms.SourceKeyspace, specs, false)
if tcase.err != "" {
if err == nil || !strings.Contains(err.Error(), tcase.err) {
t.Errorf("prepareCreateLookup(%s) err: %v, must contain %v", tcase.description, err, tcase.err)
Expand Down Expand Up @@ -2139,13 +2139,13 @@ func TestCreateLookupVindexSameKeyspace(t *testing.T) {
t.Fatal(err)
}

w := &workflowFetcher{
l := &lookup{
ts: env.ws.ts,
tmc: env.ws.tmc,
logger: env.ws.Logger(),
parser: env.ws.SQLParser(),
}
_, got, _, _, err := w.prepareCreateLookup(ctx, "keyspace", ms.TargetKeyspace, specs, false)
_, got, _, _, err := l.prepareCreateLookup(ctx, "keyspace", ms.TargetKeyspace, specs, false)
require.NoError(t, err)
if !proto.Equal(got, want) {
t.Errorf("same keyspace: got:\n%v, want\n%v", got, want)
Expand Down Expand Up @@ -2271,13 +2271,13 @@ func TestCreateCustomizedVindex(t *testing.T) {
t.Fatal(err)
}

w := &workflowFetcher{
l := &lookup{
ts: env.ws.ts,
tmc: env.ws.tmc,
logger: env.ws.Logger(),
parser: env.ws.SQLParser(),
}
_, got, _, _, err := w.prepareCreateLookup(ctx, "workflow", ms.TargetKeyspace, specs, false)
_, got, _, _, err := l.prepareCreateLookup(ctx, "workflow", ms.TargetKeyspace, specs, false)
require.NoError(t, err)
if !proto.Equal(got, want) {
t.Errorf("customize create lookup error same: got:\n%v, want\n%v", got, want)
Expand Down Expand Up @@ -2395,13 +2395,13 @@ func TestCreateLookupVindexIgnoreNulls(t *testing.T) {
t.Fatal(err)
}

w := &workflowFetcher{
l := &lookup{
ts: env.ws.ts,
tmc: env.ws.tmc,
logger: env.ws.Logger(),
parser: env.ws.SQLParser(),
}
ms, ks, _, _, err := w.prepareCreateLookup(ctx, "workflow", ms.TargetKeyspace, specs, false)
ms, ks, _, _, err := l.prepareCreateLookup(ctx, "workflow", ms.TargetKeyspace, specs, false)
require.NoError(t, err)
if !proto.Equal(wantKs, ks) {
t.Errorf("unexpected keyspace value: got:\n%v, want\n%v", ks, wantKs)
Expand Down Expand Up @@ -2481,17 +2481,17 @@ func TestStopAfterCopyFlag(t *testing.T) {
t.Fatal(err)
}

w := &workflowFetcher{
l := &lookup{
ts: env.ws.ts,
tmc: env.ws.tmc,
logger: env.ws.Logger(),
parser: env.ws.SQLParser(),
}
ms1, _, _, _, err := w.prepareCreateLookup(ctx, "workflow", ms.TargetKeyspace, specs, false)
ms1, _, _, _, err := l.prepareCreateLookup(ctx, "workflow", ms.TargetKeyspace, specs, false)
require.NoError(t, err)
require.Equal(t, ms1.StopAfterCopy, true)

ms2, _, _, _, err := w.prepareCreateLookup(ctx, "workflow", ms.TargetKeyspace, specs, true)
ms2, _, _, _, err := l.prepareCreateLookup(ctx, "workflow", ms.TargetKeyspace, specs, true)
require.NoError(t, err)
require.Equal(t, ms2.StopAfterCopy, false)
}
Expand Down
4 changes: 2 additions & 2 deletions go/vt/vtctl/workflow/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -567,13 +567,13 @@ func (s *Server) LookupVindexCreate(ctx context.Context, req *vtctldatapb.Lookup
span.Annotate("cells", req.Cells)
span.Annotate("tablet_types", req.TabletTypes)

w := &workflowFetcher{
l := &lookup{
ts: s.ts,
tmc: s.tmc,
logger: s.Logger(),
parser: s.SQLParser(),
}
ms, sourceVSchema, targetVSchema, cancelFunc, err := w.prepareCreateLookup(ctx, req.Workflow, req.Keyspace, req.Vindex, req.ContinueAfterCopyWithOwner)
ms, sourceVSchema, targetVSchema, cancelFunc, err := l.prepareCreateLookup(ctx, req.Workflow, req.Keyspace, req.Vindex, req.ContinueAfterCopyWithOwner)
if err != nil {
return nil, err
}
Expand Down

0 comments on commit acedfde

Please sign in to comment.