Skip to content

Commit

Permalink
test: Add missing tests for traffic_switcher.go (vitessio#17334)
Browse files Browse the repository at this point in the history
Signed-off-by: Noble Mittal <[email protected]>
  • Loading branch information
beingnoble03 authored Dec 12, 2024
1 parent a3d4001 commit 69f2f8f
Show file tree
Hide file tree
Showing 2 changed files with 263 additions and 0 deletions.
9 changes: 9 additions & 0 deletions go/vt/vtctl/workflow/framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,11 @@ func (tmc *testTMClient) ExecuteFetchAsAllPrivs(ctx context.Context, tablet *top
return nil, nil
}

func (tmc *testTMClient) ExecuteFetchAsApp(ctx context.Context, tablet *topodatapb.Tablet, usePool bool, req *tabletmanagerdatapb.ExecuteFetchAsAppRequest) (*querypb.QueryResult, error) {
// Reuse VReplicationExec.
return tmc.VReplicationExec(ctx, tablet, string(req.Query))
}

func (tmc *testTMClient) expectApplySchemaRequest(tabletID uint32, req *applySchemaRequestResponse) {
tmc.mu.Lock()
defer tmc.mu.Unlock()
Expand Down Expand Up @@ -619,6 +624,10 @@ func (tmc *testTMClient) HasVReplicationWorkflows(ctx context.Context, tablet *t
}, nil
}

func (tmc *testTMClient) ResetSequences(ctx context.Context, tablet *topodatapb.Tablet, tables []string) error {
return nil
}

func (tmc *testTMClient) ReadVReplicationWorkflows(ctx context.Context, tablet *topodatapb.Tablet, req *tabletmanagerdatapb.ReadVReplicationWorkflowsRequest) (*tabletmanagerdatapb.ReadVReplicationWorkflowsResponse, error) {
tmc.mu.Lock()
defer tmc.mu.Unlock()
Expand Down
254 changes: 254 additions & 0 deletions go/vt/vtctl/workflow/traffic_switcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/stretchr/testify/require"

"vitess.io/vitess/go/sqlescape"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/mysqlctl/tmutils"
"vitess.io/vitess/go/vt/proto/vschema"
"vitess.io/vitess/go/vt/sqlparser"
Expand All @@ -36,6 +37,7 @@ import (
"vitess.io/vitess/go/vt/vttablet/tabletmanager/vreplication"

tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata"
)

Expand Down Expand Up @@ -649,3 +651,255 @@ func TestTrafficSwitchPositionHandling(t *testing.T) {
})
require.NoError(t, err)
}

func TestInitializeTargetSequences(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()

workflowName := "wf1"
tableName := "t1"
sourceKeyspaceName := "sourceks"
targetKeyspaceName := "targetks"

schema := map[string]*tabletmanagerdatapb.SchemaDefinition{
tableName: {
TableDefinitions: []*tabletmanagerdatapb.TableDefinition{
{
Name: tableName,
Schema: fmt.Sprintf("CREATE TABLE %s (id BIGINT, name VARCHAR(64), PRIMARY KEY (id))", tableName),
},
},
},
}

sourceKeyspace := &testKeyspace{
KeyspaceName: sourceKeyspaceName,
ShardNames: []string{"0"},
}
targetKeyspace := &testKeyspace{
KeyspaceName: targetKeyspaceName,
ShardNames: []string{"0"},
}

env := newTestEnv(t, ctx, defaultCellName, sourceKeyspace, targetKeyspace)
defer env.close()
env.tmc.schema = schema

ts, _, err := env.ws.getWorkflowState(ctx, targetKeyspaceName, workflowName)
require.NoError(t, err)
sw := &switcher{ts: ts, s: env.ws}

sequencesByBackingTable := map[string]*sequenceMetadata{
"my-seq1": {
backingTableName: "my-seq1",
backingTableKeyspace: sourceKeyspaceName,
backingTableDBName: fmt.Sprintf("vt_%s", sourceKeyspaceName),
usingTableName: tableName,
usingTableDBName: "vt_targetks",
usingTableDefinition: &vschema.Table{
AutoIncrement: &vschema.AutoIncrement{
Column: "my-col",
Sequence: fmt.Sprintf("%s.my-seq1", sourceKeyspace.KeyspaceName),
},
},
},
}

env.tmc.expectVRQuery(200, "/select max.*", sqltypes.MakeTestResult(sqltypes.MakeTestFields("maxval", "int64"), "34"))
// Expect the insert query to be executed with 35 as a params, since we provide a maxID of 34 in the last query
env.tmc.expectVRQuery(100, "/insert into.*35.*", &sqltypes.Result{RowsAffected: 1})

err = sw.initializeTargetSequences(ctx, sequencesByBackingTable)
assert.NoError(t, err)

// Expect the queries to be cleared
assert.Empty(t, env.tmc.vrQueries[100])
assert.Empty(t, env.tmc.vrQueries[200])
}

func TestAddTenantFilter(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()

workflowName := "wf1"
tableName := "t1"
sourceKeyspaceName := "sourceks"
targetKeyspaceName := "targetks"

sourceKeyspace := &testKeyspace{
KeyspaceName: sourceKeyspaceName,
ShardNames: []string{"0"},
}
targetKeyspace := &testKeyspace{
KeyspaceName: targetKeyspaceName,
ShardNames: []string{"0"},
}

schema := map[string]*tabletmanagerdatapb.SchemaDefinition{
tableName: {
TableDefinitions: []*tabletmanagerdatapb.TableDefinition{
{
Name: tableName,
Schema: fmt.Sprintf("CREATE TABLE %s (id BIGINT, name VARCHAR(64), PRIMARY KEY (id))", tableName),
},
},
},
}

env := newTestEnv(t, ctx, defaultCellName, sourceKeyspace, targetKeyspace)
defer env.close()
env.tmc.schema = schema

err := env.ts.SaveVSchema(ctx, targetKeyspaceName, &vschema.Keyspace{
MultiTenantSpec: &vschema.MultiTenantSpec{
TenantIdColumnName: "tenant_id",
TenantIdColumnType: sqltypes.Int64,
},
})
require.NoError(t, err)

ts, _, err := env.ws.getWorkflowState(ctx, targetKeyspaceName, workflowName)
require.NoError(t, err)

ts.options.TenantId = "123"

filter, err := ts.addTenantFilter(ctx, fmt.Sprintf("select * from %s where id < 5", tableName))
assert.NoError(t, err)
assert.Equal(t, "select * from t1 where tenant_id = 123 and id < 5", filter)
}

func TestChangeShardRouting(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()

workflowName := "wf1"
tableName := "t1"
sourceKeyspaceName := "sourceks"
targetKeyspaceName := "targetks"

sourceKeyspace := &testKeyspace{
KeyspaceName: sourceKeyspaceName,
ShardNames: []string{"0"},
}
targetKeyspace := &testKeyspace{
KeyspaceName: targetKeyspaceName,
ShardNames: []string{"0"},
}

schema := map[string]*tabletmanagerdatapb.SchemaDefinition{
tableName: {
TableDefinitions: []*tabletmanagerdatapb.TableDefinition{
{
Name: tableName,
Schema: fmt.Sprintf("CREATE TABLE %s (id BIGINT, name VARCHAR(64), PRIMARY KEY (id))", tableName),
},
},
},
}

env := newTestEnv(t, ctx, defaultCellName, sourceKeyspace, targetKeyspace)
defer env.close()
env.tmc.schema = schema

ts, _, err := env.ws.getWorkflowState(ctx, targetKeyspaceName, workflowName)
require.NoError(t, err)

err = env.ws.ts.UpdateSrvKeyspace(ctx, "cell", targetKeyspaceName, &topodatapb.SrvKeyspace{
Partitions: []*topodatapb.SrvKeyspace_KeyspacePartition{
{
ShardReferences: []*topodatapb.ShardReference{
{
Name: "0",
},
},
},
},
})
require.NoError(t, err)

err = env.ws.ts.UpdateSrvKeyspace(ctx, "cell", sourceKeyspaceName, &topodatapb.SrvKeyspace{
Partitions: []*topodatapb.SrvKeyspace_KeyspacePartition{
{
ShardReferences: []*topodatapb.ShardReference{
{
Name: "0",
},
},
},
},
})
require.NoError(t, err)

ctx, _, err = env.ws.ts.LockShard(ctx, targetKeyspaceName, "0", "targetks0")
require.NoError(t, err)

ctx, _, err = env.ws.ts.LockKeyspace(ctx, targetKeyspaceName, "targetks0")
require.NoError(t, err)

err = ts.changeShardRouting(ctx)
assert.NoError(t, err)

sourceShardInfo, err := env.ws.ts.GetShard(ctx, sourceKeyspaceName, "0")
assert.NoError(t, err)
assert.False(t, sourceShardInfo.IsPrimaryServing, "source shard shouldn't have it's primary serving after changeShardRouting() is called.")

targetShardInfo, err := env.ws.ts.GetShard(ctx, targetKeyspaceName, "0")
assert.NoError(t, err)
assert.True(t, targetShardInfo.IsPrimaryServing, "target shard should have it's primary serving after changeShardRouting() is called.")
}

func TestAddParticipatingTablesToKeyspace(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()

workflowName := "wf1"
tableName := "t1"
sourceKeyspaceName := "sourceks"
targetKeyspaceName := "targetks"

sourceKeyspace := &testKeyspace{
KeyspaceName: sourceKeyspaceName,
ShardNames: []string{"0"},
}
targetKeyspace := &testKeyspace{
KeyspaceName: targetKeyspaceName,
ShardNames: []string{"0"},
}

schema := map[string]*tabletmanagerdatapb.SchemaDefinition{
tableName: {
TableDefinitions: []*tabletmanagerdatapb.TableDefinition{
{
Name: tableName,
Schema: fmt.Sprintf("CREATE TABLE %s (id BIGINT, name VARCHAR(64), PRIMARY KEY (id))", tableName),
},
},
},
}

env := newTestEnv(t, ctx, defaultCellName, sourceKeyspace, targetKeyspace)
defer env.close()
env.tmc.schema = schema

ts, _, err := env.ws.getWorkflowState(ctx, targetKeyspaceName, workflowName)
require.NoError(t, err)

err = ts.addParticipatingTablesToKeyspace(ctx, sourceKeyspaceName, "")
assert.NoError(t, err)

vs, err := env.ts.GetVSchema(ctx, sourceKeyspaceName)
assert.NoError(t, err)
assert.NotNil(t, vs.Tables["t1"])
assert.Empty(t, vs.Tables["t1"])

specs := `{"t1":{"column_vindexes":[{"column":"col1","name":"v1"}, {"column":"col2","name":"v2"}]},"t2":{"column_vindexes":[{"column":"col2","name":"v2"}]}}`
err = ts.addParticipatingTablesToKeyspace(ctx, sourceKeyspaceName, specs)
assert.NoError(t, err)

vs, err = env.ts.GetVSchema(ctx, sourceKeyspaceName)
assert.NoError(t, err)
require.NotNil(t, vs.Tables["t1"])
require.NotNil(t, vs.Tables["t2"])
assert.Len(t, vs.Tables["t1"].ColumnVindexes, 2)
assert.Len(t, vs.Tables["t2"].ColumnVindexes, 1)
}

0 comments on commit 69f2f8f

Please sign in to comment.