diff --git a/go/vt/wrangler/materializer_env_test.go b/go/vt/wrangler/materializer_env_test.go index 3d5797a6bff..e3a3364c8fd 100644 --- a/go/vt/wrangler/materializer_env_test.go +++ b/go/vt/wrangler/materializer_env_test.go @@ -19,19 +19,14 @@ package wrangler import ( "context" "fmt" - "os" "regexp" - "runtime" "strconv" "strings" "sync" "testing" - "time" - - "go.uber.org/goleak" "vitess.io/vitess/go/sqltypes" - "vitess.io/vitess/go/vt/log" + "vitess.io/vitess/go/test/utils" "vitess.io/vitess/go/vt/logutil" "vitess.io/vitess/go/vt/mysqlctl/tmutils" "vitess.io/vitess/go/vt/sqlparser" @@ -40,7 +35,6 @@ import ( "vitess.io/vitess/go/vt/vtenv" "vitess.io/vitess/go/vt/vttablet/tmclient" - _flag "vitess.io/vitess/go/internal/flag" querypb "vitess.io/vitess/go/vt/proto/query" tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata" topodatapb "vitess.io/vitess/go/vt/proto/topodata" @@ -61,66 +55,9 @@ type testMaterializerEnv struct { //---------------------------------------------- // testMaterializerEnv -// EnsureNoLeaks is a helper function to fail tests if there are goroutine leaks. -// At this moment we still have a lot of goroutine leaks in the unit tests in this package. -// So we only use this while debugging and fixing the leaks. Once fixed we will use this -// in TestMain instead of just logging the number of leaked goroutines. -func EnsureNoLeaks(t testing.TB) { - if t.Failed() { - return - } - err := ensureNoGoroutines() - if err != nil { - t.Fatal(err) - } -} - -func ensureNoGoroutines() error { - // These goroutines have been found to stay around. - // Need to investigate and fix the Vitess ones at some point, if we indeed find out that they are unintended leaks. - var leaksToIgnore = []goleak.Option{ - goleak.IgnoreTopFunction("github.com/golang/glog.(*fileSink).flushDaemon"), - goleak.IgnoreTopFunction("github.com/golang/glog.(*loggingT).flushDaemon"), - goleak.IgnoreTopFunction("vitess.io/vitess/go/vt/dbconfigs.init.0.func1"), - goleak.IgnoreTopFunction("vitess.io/vitess/go/vt/vtgate.resetAggregators"), - goleak.IgnoreTopFunction("vitess.io/vitess/go/vt/vtgate.processQueryInfo"), - goleak.IgnoreTopFunction("github.com/patrickmn/go-cache.(*janitor).Run"), - } - - const ( - // give ample time for the goroutines to exit in CI. - waitTime = 100 * time.Millisecond - numIterations = 50 // 5 seconds - ) - var err error - for i := 0; i < numIterations; i++ { - err = goleak.Find(leaksToIgnore...) - if err == nil { - return nil - } - time.Sleep(waitTime) - } - return err -} - -func testMainWrapper(m *testing.M) int { - startingNumGoRoutines := runtime.NumGoroutine() - defer func() { - numGoroutines := runtime.NumGoroutine() - if numGoroutines > startingNumGoRoutines { - log.Infof("!!!!!!!!!!!! Wrangler unit tests Leaked %d goroutines", numGoroutines-startingNumGoRoutines) - } - }() - _flag.ParseFlagsForTest() - return m.Run() -} - -func TestMain(m *testing.M) { - os.Exit(testMainWrapper(m)) -} - -func newTestMaterializerEnv(t *testing.T, ctx context.Context, ms *vtctldatapb.MaterializeSettings, sources, targets []string) *testMaterializerEnv { +func newTestMaterializerEnv(t *testing.T, ms *vtctldatapb.MaterializeSettings, sources, targets []string) (*testMaterializerEnv, context.Context) { t.Helper() + ctx, cancel := context.WithCancel(context.Background()) env := &testMaterializerEnv{ ms: ms, sources: sources, @@ -167,7 +104,12 @@ func newTestMaterializerEnv(t *testing.T, ctx context.Context, ms *vtctldatapb.M if ms.Workflow != "" { env.expectValidation() } - return env + t.Cleanup(func() { + defer utils.EnsureNoLeaks(t) + env.close() + cancel() + }) + return env, ctx } func (env *testMaterializerEnv) expectValidation() { diff --git a/go/vt/wrangler/materializer_test.go b/go/vt/wrangler/materializer_test.go index bdeee1e6ac3..e7bb66f1003 100644 --- a/go/vt/wrangler/materializer_test.go +++ b/go/vt/wrangler/materializer_test.go @@ -65,10 +65,7 @@ func TestMoveTablesNoRoutingRules(t *testing.T) { SourceExpression: "select * from t1", }}, } - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - env := newTestMaterializerEnv(t, ctx, ms, []string{"0"}, []string{"0"}) - defer env.close() + env, ctx := newTestMaterializerEnv(t, ms, []string{"0"}, []string{"0"}) env.tmc.expectVRQuery(100, mzCheckJournal, &sqltypes.Result{}) env.tmc.expectVRQuery(200, mzSelectFrozenQuery, &sqltypes.Result{}) @@ -93,11 +90,7 @@ func TestMigrateTables(t *testing.T) { SourceExpression: "select * from t1", }}, } - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - env := newTestMaterializerEnv(t, ctx, ms, []string{"0"}, []string{"0"}) - defer env.close() - + env, ctx := newTestMaterializerEnv(t, ms, []string{"0"}, []string{"0"}) env.tmc.expectVRQuery(100, mzCheckJournal, &sqltypes.Result{}) env.tmc.expectVRQuery(200, mzSelectFrozenQuery, &sqltypes.Result{}) env.tmc.expectVRQuery(200, insertPrefix, &sqltypes.Result{}) @@ -135,12 +128,8 @@ func TestMissingTables(t *testing.T) { SourceExpression: "select * from t3", }}, } - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - env := newTestMaterializerEnv(t, ctx, ms, []string{"0"}, []string{"0"}) - defer env.close() + env, ctx := newTestMaterializerEnv(t, ms, []string{"0"}, []string{"0"}) env.tmc.expectVRQuery(100, mzCheckJournal, &sqltypes.Result{}) env.tmc.expectVRQuery(200, mzSelectFrozenQuery, &sqltypes.Result{}) env.tmc.expectVRQuery(200, insertPrefix, &sqltypes.Result{}) @@ -199,11 +188,7 @@ func TestMoveTablesAllAndExclude(t *testing.T) { } for _, tcase := range testCases { t.Run("", func(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - env := newTestMaterializerEnv(t, ctx, ms, []string{"0"}, []string{"0"}) - defer env.close() + env, ctx := newTestMaterializerEnv(t, ms, []string{"0"}, []string{"0"}) env.tmc.expectVRQuery(100, mzCheckJournal, &sqltypes.Result{}) env.tmc.expectVRQuery(200, mzSelectFrozenQuery, &sqltypes.Result{}) env.tmc.expectVRQuery(200, insertPrefix, &sqltypes.Result{}) @@ -231,11 +216,7 @@ func TestMoveTablesStopFlags(t *testing.T) { var err error t.Run("StopStartedAndStopAfterCopyFlags", func(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - env := newTestMaterializerEnv(t, ctx, ms, []string{"0"}, []string{"0"}) - defer env.close() + env, ctx := newTestMaterializerEnv(t, ms, []string{"0"}, []string{"0"}) env.tmc.expectVRQuery(100, mzCheckJournal, &sqltypes.Result{}) env.tmc.expectVRQuery(200, mzSelectFrozenQuery, &sqltypes.Result{}) // insert expects flag stop_after_copy to be true @@ -261,12 +242,7 @@ func TestMigrateVSchema(t *testing.T) { SourceExpression: "select * from t1", }}, } - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - env := newTestMaterializerEnv(t, ctx, ms, []string{"0"}, []string{"0"}) - defer env.close() - + env, ctx := newTestMaterializerEnv(t, ms, []string{"0"}, []string{"0"}) env.tmc.expectVRQuery(100, mzCheckJournal, &sqltypes.Result{}) env.tmc.expectVRQuery(200, mzSelectFrozenQuery, &sqltypes.Result{}) env.tmc.expectVRQuery(200, insertPrefix, &sqltypes.Result{}) @@ -294,11 +270,8 @@ func TestCreateLookupVindexFull(t *testing.T) { SourceKeyspace: "sourceks", TargetKeyspace: "targetks", } - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - env := newTestMaterializerEnv(t, ctx, ms, []string{"0"}, []string{"0"}) - defer env.close() + env, ctx := newTestMaterializerEnv(t, ms, []string{"0"}, []string{"0"}) specs := &vschemapb.Keyspace{ Vindexes: map[string]*vschemapb.Vindex{ @@ -420,11 +393,9 @@ func TestCreateLookupVindexCreateDDL(t *testing.T) { SourceKeyspace: "sourceks", TargetKeyspace: "targetks", } - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - env := newTestMaterializerEnv(t, ctx, ms, []string{"0"}, []string{"0"}) - defer env.close() + env, _ := newTestMaterializerEnv(t, ms, []string{"0"}, []string{"0"}) + vs := &vschemapb.Keyspace{ Sharded: true, Vindexes: map[string]*vschemapb.Vindex{ @@ -640,11 +611,8 @@ func TestCreateLookupVindexSourceVSchema(t *testing.T) { SourceKeyspace: "sourceks", TargetKeyspace: "targetks", } - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - env := newTestMaterializerEnv(t, ctx, ms, []string{"0"}, []string{"0"}) - defer env.close() + env, _ := newTestMaterializerEnv(t, ms, []string{"0"}, []string{"0"}) specs := &vschemapb.Keyspace{ Vindexes: map[string]*vschemapb.Vindex{ @@ -879,11 +847,9 @@ func TestCreateLookupVindexTargetVSchema(t *testing.T) { SourceKeyspace: "sourceks", TargetKeyspace: "targetks", } - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - env := newTestMaterializerEnv(t, ctx, ms, []string{"0"}, []string{"0"}) - defer env.close() + env, _ := newTestMaterializerEnv(t, ms, []string{"0"}, []string{"0"}) + sourcevs := &vschemapb.Keyspace{ Sharded: true, Vindexes: map[string]*vschemapb.Vindex{ @@ -1118,11 +1084,8 @@ func TestCreateLookupVindexSameKeyspace(t *testing.T) { SourceKeyspace: "ks", TargetKeyspace: "ks", } - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - env := newTestMaterializerEnv(t, ctx, ms, []string{"0"}, []string{"0"}) - defer env.close() + env, _ := newTestMaterializerEnv(t, ms, []string{"0"}, []string{"0"}) specs := &vschemapb.Keyspace{ Vindexes: map[string]*vschemapb.Vindex{ @@ -1230,11 +1193,8 @@ func TestCreateCustomizedVindex(t *testing.T) { SourceKeyspace: "ks", TargetKeyspace: "ks", } - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - env := newTestMaterializerEnv(t, ctx, ms, []string{"0"}, []string{"0"}) - defer env.close() + env, _ := newTestMaterializerEnv(t, ms, []string{"0"}, []string{"0"}) specs := &vschemapb.Keyspace{ Vindexes: map[string]*vschemapb.Vindex{ @@ -1343,11 +1303,8 @@ func TestCreateLookupVindexIgnoreNulls(t *testing.T) { SourceKeyspace: "ks", TargetKeyspace: "ks", } - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - env := newTestMaterializerEnv(t, ctx, ms, []string{"0"}, []string{"0"}) - defer env.close() + env, _ := newTestMaterializerEnv(t, ms, []string{"0"}, []string{"0"}) specs := &vschemapb.Keyspace{ Vindexes: map[string]*vschemapb.Vindex{ @@ -1464,11 +1421,9 @@ func TestStopAfterCopyFlag(t *testing.T) { SourceKeyspace: "ks", TargetKeyspace: "ks", } - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - env := newTestMaterializerEnv(t, ctx, ms, []string{"0"}, []string{"0"}) - defer env.close() + env, _ := newTestMaterializerEnv(t, ms, []string{"0"}, []string{"0"}) + specs := &vschemapb.Keyspace{ Vindexes: map[string]*vschemapb.Vindex{ "v": { @@ -1817,11 +1772,8 @@ func TestExternalizeVindex(t *testing.T) { SourceKeyspace: "sourceks", TargetKeyspace: "targetks", } - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - env := newTestMaterializerEnv(t, ctx, ms, []string{"0"}, []string{"-80", "80-"}) - defer env.close() + env, _ := newTestMaterializerEnv(t, ms, []string{"0"}, []string{"-80", "80-"}) sourceVSchema := &vschemapb.Keyspace{ Sharded: true, @@ -1968,11 +1920,8 @@ func TestMaterializerOneToOne(t *testing.T) { topodatapb.TabletType_RDONLY, }), } - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - env := newTestMaterializerEnv(t, ctx, ms, []string{"0"}, []string{"0"}) - defer env.close() + env, ctx := newTestMaterializerEnv(t, ms, []string{"0"}, []string{"0"}) env.tmc.expectVRQuery(200, mzSelectFrozenQuery, &sqltypes.Result{}) env.tmc.expectVRQuery( @@ -2012,11 +1961,8 @@ func TestMaterializerManyToOne(t *testing.T) { CreateDdl: "t2ddl", }}, } - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - env := newTestMaterializerEnv(t, ctx, ms, []string{"-80", "80-"}, []string{"0"}) - defer env.close() + env, ctx := newTestMaterializerEnv(t, ms, []string{"-80", "80-"}, []string{"0"}) env.tmc.expectVRQuery(200, mzSelectFrozenQuery, &sqltypes.Result{}) env.tmc.expectVRQuery( @@ -2046,11 +1992,8 @@ func TestMaterializerOneToMany(t *testing.T) { CreateDdl: "t1ddl", }}, } - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - env := newTestMaterializerEnv(t, ctx, ms, []string{"0"}, []string{"-80", "80-"}) - defer env.close() + env, ctx := newTestMaterializerEnv(t, ms, []string{"0"}, []string{"-80", "80-"}) vs := &vschemapb.Keyspace{ Sharded: true, @@ -2106,11 +2049,8 @@ func TestMaterializerManyToMany(t *testing.T) { CreateDdl: "t1ddl", }}, } - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - env := newTestMaterializerEnv(t, ctx, ms, []string{"-40", "40-"}, []string{"-80", "80-"}) - defer env.close() + env, ctx := newTestMaterializerEnv(t, ms, []string{"-40", "40-"}, []string{"-80", "80-"}) vs := &vschemapb.Keyspace{ Sharded: true, @@ -2167,11 +2107,8 @@ func TestMaterializerMulticolumnVindex(t *testing.T) { CreateDdl: "t1ddl", }}, } - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - env := newTestMaterializerEnv(t, ctx, ms, []string{"0"}, []string{"-80", "80-"}) - defer env.close() + env, ctx := newTestMaterializerEnv(t, ms, []string{"0"}, []string{"-80", "80-"}) vs := &vschemapb.Keyspace{ Sharded: true, @@ -2234,11 +2171,8 @@ func TestMaterializerDeploySchema(t *testing.T) { CreateDdl: "t2ddl", }}, } - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - env := newTestMaterializerEnv(t, ctx, ms, []string{"0"}, []string{"0"}) - defer env.close() + env, ctx := newTestMaterializerEnv(t, ms, []string{"0"}, []string{"0"}) delete(env.tmc.schema, "targetks.t2") @@ -2275,11 +2209,8 @@ func TestMaterializerCopySchema(t *testing.T) { CreateDdl: "t2ddl", }}, } - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - env := newTestMaterializerEnv(t, ctx, ms, []string{"0"}, []string{"0"}) - defer env.close() + env, ctx := newTestMaterializerEnv(t, ms, []string{"0"}, []string{"0"}) delete(env.tmc.schema, "targetks.t1") @@ -2313,11 +2244,8 @@ func TestMaterializerExplicitColumns(t *testing.T) { CreateDdl: "t1ddl", }}, } - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - env := newTestMaterializerEnv(t, ctx, ms, []string{"0"}, []string{"-80", "80-"}) - defer env.close() + env, ctx := newTestMaterializerEnv(t, ms, []string{"0"}, []string{"-80", "80-"}) vs := &vschemapb.Keyspace{ Sharded: true, @@ -2376,11 +2304,8 @@ func TestMaterializerRenamedColumns(t *testing.T) { CreateDdl: "t1ddl", }}, } - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - env := newTestMaterializerEnv(t, ctx, ms, []string{"0"}, []string{"-80", "80-"}) - defer env.close() + env, ctx := newTestMaterializerEnv(t, ms, []string{"0"}, []string{"-80", "80-"}) vs := &vschemapb.Keyspace{ Sharded: true, @@ -2444,11 +2369,8 @@ func TestMaterializerStopAfterCopy(t *testing.T) { CreateDdl: "t2ddl", }}, } - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - env := newTestMaterializerEnv(t, ctx, ms, []string{"0"}, []string{"0"}) - defer env.close() + env, ctx := newTestMaterializerEnv(t, ms, []string{"0"}, []string{"0"}) env.tmc.expectVRQuery(200, mzSelectFrozenQuery, &sqltypes.Result{}) env.tmc.expectVRQuery(200, insertPrefix+`.*stop_after_copy:true`, &sqltypes.Result{}) @@ -2470,11 +2392,8 @@ func TestMaterializerNoTargetVSchema(t *testing.T) { CreateDdl: "t1ddl", }}, } - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - env := newTestMaterializerEnv(t, ctx, ms, []string{"0"}, []string{"-80", "80-"}) - defer env.close() + env, ctx := newTestMaterializerEnv(t, ms, []string{"0"}, []string{"-80", "80-"}) vs := &vschemapb.Keyspace{ Sharded: true, @@ -2500,11 +2419,8 @@ func TestMaterializerNoDDL(t *testing.T) { CreateDdl: "", }}, } - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - env := newTestMaterializerEnv(t, ctx, ms, []string{"0"}, []string{"0"}) - defer env.close() + env, ctx := newTestMaterializerEnv(t, ms, []string{"0"}, []string{"0"}) delete(env.tmc.schema, "targetks.t1") @@ -2544,7 +2460,6 @@ func TestMaterializerNoSourcePrimary(t *testing.T) { tmc: newTestMaterializerTMClient(), } env.wr = New(vtenv.NewTestEnv(), logutil.NewConsoleLogger(), env.topoServ, env.tmc) - defer env.close() tabletID := 100 for _, shard := range sources { @@ -2577,11 +2492,8 @@ func TestMaterializerTableMismatchNonCopy(t *testing.T) { CreateDdl: "", }}, } - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - env := newTestMaterializerEnv(t, ctx, ms, []string{"0"}, []string{"0"}) - defer env.close() + env, ctx := newTestMaterializerEnv(t, ms, []string{"0"}, []string{"0"}) delete(env.tmc.schema, "targetks.t1") @@ -2601,11 +2513,8 @@ func TestMaterializerTableMismatchCopy(t *testing.T) { CreateDdl: "copy", }}, } - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - env := newTestMaterializerEnv(t, ctx, ms, []string{"0"}, []string{"0"}) - defer env.close() + env, ctx := newTestMaterializerEnv(t, ms, []string{"0"}, []string{"0"}) delete(env.tmc.schema, "targetks.t1") @@ -2625,11 +2534,8 @@ func TestMaterializerNoSourceTable(t *testing.T) { CreateDdl: "copy", }}, } - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - env := newTestMaterializerEnv(t, ctx, ms, []string{"0"}, []string{"0"}) - defer env.close() + env, ctx := newTestMaterializerEnv(t, ms, []string{"0"}, []string{"0"}) delete(env.tmc.schema, "targetks.t1") delete(env.tmc.schema, "sourceks.t1") @@ -2650,11 +2556,8 @@ func TestMaterializerSyntaxError(t *testing.T) { CreateDdl: "t1ddl", }}, } - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - env := newTestMaterializerEnv(t, ctx, ms, []string{"0"}, []string{"0"}) - defer env.close() + env, ctx := newTestMaterializerEnv(t, ms, []string{"0"}, []string{"0"}) env.tmc.expectVRQuery(200, mzSelectFrozenQuery, &sqltypes.Result{}) err := env.wr.Materialize(ctx, ms) @@ -2672,11 +2575,8 @@ func TestMaterializerNotASelect(t *testing.T) { CreateDdl: "t1ddl", }}, } - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - env := newTestMaterializerEnv(t, ctx, ms, []string{"0"}, []string{"0"}) - defer env.close() + env, ctx := newTestMaterializerEnv(t, ms, []string{"0"}, []string{"0"}) env.tmc.expectVRQuery(200, mzSelectFrozenQuery, &sqltypes.Result{}) err := env.wr.Materialize(ctx, ms) @@ -2694,11 +2594,8 @@ func TestMaterializerNoGoodVindex(t *testing.T) { CreateDdl: "t1ddl", }}, } - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - env := newTestMaterializerEnv(t, ctx, ms, []string{"0"}, []string{"-80", "80-"}) - defer env.close() + env, ctx := newTestMaterializerEnv(t, ms, []string{"0"}, []string{"-80", "80-"}) vs := &vschemapb.Keyspace{ Sharded: true, @@ -2743,10 +2640,8 @@ func TestMaterializerComplexVindexExpression(t *testing.T) { CreateDdl: "t1ddl", }}, } - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - env := newTestMaterializerEnv(t, ctx, ms, []string{"0"}, []string{"-80", "80-"}) - defer env.close() + + env, ctx := newTestMaterializerEnv(t, ms, []string{"0"}, []string{"-80", "80-"}) vs := &vschemapb.Keyspace{ Sharded: true, @@ -2786,10 +2681,8 @@ func TestMaterializerNoVindexInExpression(t *testing.T) { CreateDdl: "t1ddl", }}, } - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - env := newTestMaterializerEnv(t, ctx, ms, []string{"0"}, []string{"-80", "80-"}) - defer env.close() + + env, ctx := newTestMaterializerEnv(t, ms, []string{"0"}, []string{"-80", "80-"}) vs := &vschemapb.Keyspace{ Sharded: true, @@ -3244,9 +3137,7 @@ func TestMaterializerSourceShardSelection(t *testing.T) { for _, tcase := range testcases { t.Run(tcase.name, func(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - env := newTestMaterializerEnv(t, ctx, ms, tcase.sourceShards, tcase.targetShards) + env, ctx := newTestMaterializerEnv(t, ms, tcase.sourceShards, tcase.targetShards) if err := env.topoServ.SaveVSchema(ctx, "targetks", tcase.targetVSchema); err != nil { t.Fatal(err) } @@ -3255,7 +3146,7 @@ func TestMaterializerSourceShardSelection(t *testing.T) { t.Fatal(err) } } - defer env.close() + for i, targetShard := range tcase.targetShards { tabletID := 200 + i*10 env.tmc.expectVRQuery(tabletID, mzSelectFrozenQuery, &sqltypes.Result{}) @@ -3293,16 +3184,12 @@ func TestMoveTablesDDLFlag(t *testing.T) { SourceExpression: "select * from t1", }}, } - ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) - defer cancel() for onDDLAction := range binlogdatapb.OnDDLAction_value { t.Run(fmt.Sprintf("OnDDL Flag:%v", onDDLAction), func(t *testing.T) { - ctx, cancel := context.WithCancel(ctx) + env, ctx := newTestMaterializerEnv(t, ms, []string{"0"}, []string{"0"}) + ctx, cancel := context.WithTimeout(ctx, 60*time.Second) defer cancel() - env := newTestMaterializerEnv(t, ctx, ms, []string{"0"}, []string{"0"}) - defer env.close() - env.tmc.expectVRQuery(100, mzCheckJournal, &sqltypes.Result{}) env.tmc.expectVRQuery(200, mzSelectFrozenQuery, &sqltypes.Result{}) if onDDLAction == binlogdatapb.OnDDLAction_IGNORE.String() { @@ -3522,8 +3409,6 @@ func TestAddTablesToVSchema(t *testing.T) { // means that even if the target keyspace is sharded, the source // does not need to perform the in_keyrange filtering. func TestKeyRangesEqualOptimization(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) - defer cancel() workflow := "testwf" sourceKs := "sourceks" targetKs := "targetks" @@ -3698,9 +3583,9 @@ func TestKeyRangesEqualOptimization(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - env := newTestMaterializerEnv(t, ctx, tc.ms, tc.sourceShards, tc.targetShards) - defer env.close() - + env, ctx := newTestMaterializerEnv(t, tc.ms, tc.sourceShards, tc.targetShards) + ctx, cancel := context.WithTimeout(ctx, 30*time.Second) + defer cancel() // Target is always sharded. err := env.wr.ts.SaveVSchema(ctx, targetKs, targetVSchema) require.NoError(t, err, "SaveVSchema failed: %v", err)