From 16fa7a3641af3cbfbce7525c57e4dc462e2c555b Mon Sep 17 00:00:00 2001 From: Manan Gupta <35839558+GuptaManan100@users.noreply.github.com> Date: Wed, 27 Nov 2024 15:10:01 +0530 Subject: [PATCH 01/17] Errant GTID detection fix for VTOrc (#17287) Signed-off-by: Manan Gupta --- go/vt/vtorc/inst/instance_dao.go | 11 ++++-- go/vt/vtorc/inst/instance_dao_test.go | 49 +++++++++++++++++++++++++-- 2 files changed, 55 insertions(+), 5 deletions(-) diff --git a/go/vt/vtorc/inst/instance_dao.go b/go/vt/vtorc/inst/instance_dao.go index c8ff218710f..45291394c56 100644 --- a/go/vt/vtorc/inst/instance_dao.go +++ b/go/vt/vtorc/inst/instance_dao.go @@ -362,7 +362,7 @@ Cleanup: // Add replication group ancestry UUID as well. Otherwise, VTOrc thinks there are errant GTIDs in group // members and its replicas, even though they are not. instance.AncestryUUID = strings.Trim(instance.AncestryUUID, ",") - err = detectErrantGTIDs(tabletAlias, instance, tablet) + err = detectErrantGTIDs(instance, tablet) } latency.Stop("instance") @@ -390,13 +390,18 @@ Cleanup: } // detectErrantGTIDs detects the errant GTIDs on an instance. -func detectErrantGTIDs(tabletAlias string, instance *Instance, tablet *topodatapb.Tablet) (err error) { +func detectErrantGTIDs(instance *Instance, tablet *topodatapb.Tablet) (err error) { // If the tablet is not replicating from anyone, then it could be the previous primary. // We should check for errant GTIDs by finding the difference with the shard's current primary. if instance.primaryExecutedGtidSet == "" && instance.SourceHost == "" { var primaryInstance *Instance primaryAlias, _, _ := ReadShardPrimaryInformation(tablet.Keyspace, tablet.Shard) if primaryAlias != "" { + // Check if the current tablet is the primary. + // If it is, then we don't need to run errant gtid detection on it. + if primaryAlias == instance.InstanceAlias { + return nil + } primaryInstance, _, _ = ReadInstance(primaryAlias) } // Only run errant GTID detection, if we are sure that the data read of the current primary @@ -434,7 +439,7 @@ func detectErrantGTIDs(tabletAlias string, instance *Instance, tablet *topodatap if err == nil { var gtidCount int64 gtidCount, err = replication.GTIDCount(instance.GtidErrant) - currentErrantGTIDCount.Set(tabletAlias, gtidCount) + currentErrantGTIDCount.Set(instance.InstanceAlias, gtidCount) } } } diff --git a/go/vt/vtorc/inst/instance_dao_test.go b/go/vt/vtorc/inst/instance_dao_test.go index f248ded5e2b..0d59de0588f 100644 --- a/go/vt/vtorc/inst/instance_dao_test.go +++ b/go/vt/vtorc/inst/instance_dao_test.go @@ -854,7 +854,7 @@ func TestDetectErrantGTIDs(t *testing.T) { primaryTablet := &topodatapb.Tablet{ Alias: &topodatapb.TabletAlias{ Cell: "zone-1", - Uid: 100, + Uid: 101, }, Keyspace: keyspaceName, Shard: shardName, @@ -881,7 +881,8 @@ func TestDetectErrantGTIDs(t *testing.T) { require.NoError(t, err) } - err = detectErrantGTIDs(topoproto.TabletAliasString(tablet.Alias), tt.instance, tablet) + tt.instance.InstanceAlias = topoproto.TabletAliasString(tablet.Alias) + err = detectErrantGTIDs(tt.instance, tablet) if tt.wantErr { require.Error(t, err) return @@ -891,3 +892,47 @@ func TestDetectErrantGTIDs(t *testing.T) { }) } } + +// TestPrimaryErrantGTIDs tests that we don't run Errant GTID detection on the primary tablet itself! +func TestPrimaryErrantGTIDs(t *testing.T) { + // Clear the database after the test. The easiest way to do that is to run all the initialization commands again. + defer func() { + db.ClearVTOrcDatabase() + }() + db.ClearVTOrcDatabase() + keyspaceName := "ks" + shardName := "0" + tablet := &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{ + Cell: "zone-1", + Uid: 100, + }, + Keyspace: keyspaceName, + Shard: shardName, + } + instance := &Instance{ + SourceHost: "", + ExecutedGtidSet: "230ea8ea-81e3-11e4-972a-e25ec4bd140a:1-10589,8bc65c84-3fe4-11ed-a912-257f0fcdd6c9:1-34,316d193c-70e5-11e5-adb2-ecf4bb2262ff:1-341", + InstanceAlias: topoproto.TabletAliasString(tablet.Alias), + } + + // Save shard record for the primary tablet. + err := SaveShard(topo.NewShardInfo(keyspaceName, shardName, &topodatapb.Shard{ + PrimaryAlias: tablet.Alias, + }, nil)) + require.NoError(t, err) + + // Store the tablet record and the instance. + err = SaveTablet(tablet) + require.NoError(t, err) + err = WriteInstance(instance, true, nil) + require.NoError(t, err) + + // After this if we read a new information for the record that updates its + // gtid set further, we shouldn't be detecting errant GTIDs on it since it is the primary! + // We shouldn't be comparing it with a previous version of itself! + instance.ExecutedGtidSet = "230ea8ea-81e3-11e4-972a-e25ec4bd140a:1-10589,8bc65c84-3fe4-11ed-a912-257f0fcdd6c9:1-34,316d193c-70e5-11e5-adb2-ecf4bb2262ff:1-351" + err = detectErrantGTIDs(instance, tablet) + require.NoError(t, err) + require.EqualValues(t, "", instance.GtidErrant) +} From 0726ea6ad92792da0fb998504c4e61add3999be9 Mon Sep 17 00:00:00 2001 From: Florent Poinsard <35779988+frouioui@users.noreply.github.com> Date: Wed, 27 Nov 2024 13:58:55 -0600 Subject: [PATCH 02/17] Fix how we cancel the context in the builtin backup engine (#17285) Signed-off-by: Florent Poinsard --- go/vt/mysqlctl/builtinbackupengine.go | 20 ++++++++++++++++++-- 1 file changed, 18 insertions(+), 2 deletions(-) diff --git a/go/vt/mysqlctl/builtinbackupengine.go b/go/vt/mysqlctl/builtinbackupengine.go index f3cbe5364a0..5aa759f6f7a 100644 --- a/go/vt/mysqlctl/builtinbackupengine.go +++ b/go/vt/mysqlctl/builtinbackupengine.go @@ -604,7 +604,15 @@ func (be *BuiltinBackupEngine) backupFiles( wg := sync.WaitGroup{} ctxCancel, cancel := context.WithCancel(ctx) - defer cancel() + defer func() { + // We may still have operations in flight that require a valid context, such as adding files to S3. + // Unless we encountered an error, we should not cancel the context, this is taken care of later + // in the process. If we encountered an error however, we can safely cancel the context as we should + // no longer work on anything and exit fast. + if finalErr != nil { + cancel() + } + }() for i := range fes { wg.Add(1) @@ -1037,7 +1045,15 @@ func (be *BuiltinBackupEngine) restoreFiles(ctx context.Context, params RestoreP wg := sync.WaitGroup{} ctxCancel, cancel := context.WithCancel(ctx) - defer cancel() + defer func() { + // We may still have operations in flight that require a valid context, such as adding files to S3. + // Unless we encountered an error, we should not cancel the context. This is taken care of later + // in the process. If we encountered an error however, we can safely cancel the context as we should + // no longer work on anything and exit fast. + if err != nil { + cancel() + } + }() for i := range fes { wg.Add(1) From a1830b37ccc46e15a0619e9efcb8c9266ca7c143 Mon Sep 17 00:00:00 2001 From: Harshit Gangal Date: Thu, 28 Nov 2024 18:53:27 +0530 Subject: [PATCH 03/17] vttablet debugenv changes (#17161) Signed-off-by: Harshit Gangal Signed-off-by: Manan Gupta Co-authored-by: Manan Gupta --- go/vt/vttablet/endtoend/config_test.go | 4 +- go/vt/vttablet/tabletserver/debugenv.go | 211 ++++++++++++++---------- 2 files changed, 130 insertions(+), 85 deletions(-) diff --git a/go/vt/vttablet/endtoend/config_test.go b/go/vt/vttablet/endtoend/config_test.go index 4abf5b36c21..c3ad5f8a9db 100644 --- a/go/vt/vttablet/endtoend/config_test.go +++ b/go/vt/vttablet/endtoend/config_test.go @@ -36,7 +36,7 @@ import ( ) func TestPoolSize(t *testing.T) { - revert := changeVar(t, "PoolSize", "1") + revert := changeVar(t, "ReadPoolSize", "1") defer revert() vstart := framework.DebugVars() @@ -92,7 +92,7 @@ func TestTxPoolSize(t *testing.T) { defer client2.Rollback() verifyIntValue(t, framework.DebugVars(), "FoundRowsPoolAvailable", framework.FetchInt(vstart, "FoundRowsPoolAvailable")-1) - revert := changeVar(t, "TxPoolSize", "1") + revert := changeVar(t, "TransactionPoolSize", "1") defer revert() vend := framework.DebugVars() verifyIntValue(t, vend, "TransactionPoolAvailable", 0) diff --git a/go/vt/vttablet/tabletserver/debugenv.go b/go/vt/vttablet/tabletserver/debugenv.go index 54cf09db7d6..9a802a5d49c 100644 --- a/go/vt/vttablet/tabletserver/debugenv.go +++ b/go/vt/vttablet/tabletserver/debugenv.go @@ -70,90 +70,131 @@ func debugEnvHandler(tsv *TabletServer, w http.ResponseWriter, r *http.Request) return } + switch r.Method { + case http.MethodPost: + handlePost(tsv, w, r) + case http.MethodGet: + handleGet(tsv, w, r) + default: + http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) + } +} + +func handlePost(tsv *TabletServer, w http.ResponseWriter, r *http.Request) { + varname := r.FormValue("varname") + value := r.FormValue("value") + var msg string - if r.Method == "POST" { - varname := r.FormValue("varname") - value := r.FormValue("value") - setIntVal := func(f func(int)) { - ival, err := strconv.Atoi(value) - if err != nil { - msg = fmt.Sprintf("Failed setting value for %v: %v", varname, err) - return - } - f(ival) - msg = fmt.Sprintf("Setting %v to: %v", varname, value) + if varname == "" || value == "" { + http.Error(w, "Missing varname or value", http.StatusBadRequest) + return + } + + setIntVal := func(f func(int)) error { + ival, err := strconv.Atoi(value) + if err != nil { + return fmt.Errorf("invalid int value for %v: %v", varname, err) } - setIntValCtx := func(f func(context.Context, int) error) { - ival, err := strconv.Atoi(value) - if err == nil { - err = f(r.Context(), ival) - if err == nil { - msg = fmt.Sprintf("Setting %v to: %v", varname, value) - return - } - } - msg = fmt.Sprintf("Failed setting value for %v: %v", varname, err) + f(ival) + msg = fmt.Sprintf("Setting %v to: %v", varname, value) + return nil + } + + setIntValCtx := func(f func(context.Context, int) error) error { + ival, err := strconv.Atoi(value) + if err == nil { + err = f(r.Context(), ival) } - setInt64Val := func(f func(int64)) { - ival, err := strconv.ParseInt(value, 10, 64) - if err != nil { - msg = fmt.Sprintf("Failed setting value for %v: %v", varname, err) - return - } - f(ival) - msg = fmt.Sprintf("Setting %v to: %v", varname, value) + if err != nil { + return fmt.Errorf("failed setting value for %v: %v", varname, err) } - setDurationVal := func(f func(time.Duration)) { - durationVal, err := time.ParseDuration(value) - if err != nil { - msg = fmt.Sprintf("Failed setting value for %v: %v", varname, err) - return - } - f(durationVal) - msg = fmt.Sprintf("Setting %v to: %v", varname, value) + msg = fmt.Sprintf("Setting %v to: %v", varname, value) + return nil + } + + setInt64Val := func(f func(int64)) error { + ival, err := strconv.ParseInt(value, 10, 64) + if err != nil { + return fmt.Errorf("invalid int64 value for %v: %v", varname, err) } - setFloat64Val := func(f func(float64)) { - fval, err := strconv.ParseFloat(value, 64) - if err != nil { - msg = fmt.Sprintf("Failed setting value for %v: %v", varname, err) - return - } - f(fval) - msg = fmt.Sprintf("Setting %v to: %v", varname, value) + f(ival) + msg = fmt.Sprintf("Setting %v to: %v", varname, value) + return nil + } + + setDurationVal := func(f func(time.Duration)) error { + durationVal, err := time.ParseDuration(value) + if err != nil { + return fmt.Errorf("invalid duration value for %v: %v", varname, err) } - switch varname { - case "PoolSize": - setIntValCtx(tsv.SetPoolSize) - case "StreamPoolSize": - setIntValCtx(tsv.SetStreamPoolSize) - case "TxPoolSize": - setIntValCtx(tsv.SetTxPoolSize) - case "MaxResultSize": - setIntVal(tsv.SetMaxResultSize) - case "WarnResultSize": - setIntVal(tsv.SetWarnResultSize) - case "RowStreamerMaxInnoDBTrxHistLen": - setInt64Val(func(val int64) { tsv.Config().RowStreamer.MaxInnoDBTrxHistLen = val }) - case "RowStreamerMaxMySQLReplLagSecs": - setInt64Val(func(val int64) { tsv.Config().RowStreamer.MaxMySQLReplLagSecs = val }) - case "UnhealthyThreshold": - setDurationVal(func(d time.Duration) { tsv.Config().Healthcheck.UnhealthyThreshold = d }) - setDurationVal(tsv.hs.SetUnhealthyThreshold) - setDurationVal(tsv.sm.SetUnhealthyThreshold) - case "ThrottleMetricThreshold": - setFloat64Val(tsv.SetThrottleMetricThreshold) - case "Consolidator": - tsv.SetConsolidatorMode(value) - msg = fmt.Sprintf("Setting %v to: %v", varname, value) + f(durationVal) + msg = fmt.Sprintf("Setting %v to: %v", varname, value) + return nil + } + + setFloat64Val := func(f func(float64)) error { + fval, err := strconv.ParseFloat(value, 64) + if err != nil { + return fmt.Errorf("invalid float64 value for %v: %v", varname, err) } + f(fval) + msg = fmt.Sprintf("Setting %v to: %v", varname, value) + return nil + } + + var err error + switch varname { + case "ReadPoolSize": + err = setIntValCtx(tsv.SetPoolSize) + case "StreamPoolSize": + err = setIntValCtx(tsv.SetStreamPoolSize) + case "TransactionPoolSize": + err = setIntValCtx(tsv.SetTxPoolSize) + case "MaxResultSize": + err = setIntVal(tsv.SetMaxResultSize) + case "WarnResultSize": + err = setIntVal(tsv.SetWarnResultSize) + case "RowStreamerMaxInnoDBTrxHistLen": + err = setInt64Val(func(val int64) { tsv.Config().RowStreamer.MaxInnoDBTrxHistLen = val }) + case "RowStreamerMaxMySQLReplLagSecs": + err = setInt64Val(func(val int64) { tsv.Config().RowStreamer.MaxMySQLReplLagSecs = val }) + case "UnhealthyThreshold": + err = setDurationVal(func(d time.Duration) { tsv.Config().Healthcheck.UnhealthyThreshold = d }) + case "ThrottleMetricThreshold": + err = setFloat64Val(tsv.SetThrottleMetricThreshold) + case "Consolidator": + tsv.SetConsolidatorMode(value) + msg = fmt.Sprintf("Setting %v to: %v", varname, value) + } + + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return } + vars := getVars(tsv) + sendResponse(r, w, vars, msg) +} + +func handleGet(tsv *TabletServer, w http.ResponseWriter, r *http.Request) { + vars := getVars(tsv) + sendResponse(r, w, vars, "") +} + +func sendResponse(r *http.Request, w http.ResponseWriter, vars []envValue, msg string) { + format := r.FormValue("format") + if format == "json" { + respondWithJSON(w, vars, msg) + return + } + respondWithHTML(w, vars, msg) +} + +func getVars(tsv *TabletServer) []envValue { var vars []envValue - vars = addVar(vars, "PoolSize", tsv.PoolSize) + vars = addVar(vars, "ReadPoolSize", tsv.PoolSize) vars = addVar(vars, "StreamPoolSize", tsv.StreamPoolSize) - vars = addVar(vars, "TxPoolSize", tsv.TxPoolSize) - vars = addVar(vars, "QueryCacheCapacity", tsv.QueryPlanCacheCap) // QueryCacheCapacity is deprecated in v21, it is replaced by QueryEnginePlanCacheCapacity - vars = addVar(vars, "QueryEnginePlanCacheCapacity", tsv.QueryPlanCacheCap) + vars = addVar(vars, "TransactionPoolSize", tsv.TxPoolSize) vars = addVar(vars, "MaxResultSize", tsv.MaxResultSize) vars = addVar(vars, "WarnResultSize", tsv.WarnResultSize) vars = addVar(vars, "RowStreamerMaxInnoDBTrxHistLen", func() int64 { return tsv.Config().RowStreamer.MaxInnoDBTrxHistLen }) @@ -165,18 +206,22 @@ func debugEnvHandler(tsv *TabletServer, w http.ResponseWriter, r *http.Request) Value: tsv.ConsolidatorMode(), }) - format := r.FormValue("format") - if format == "json" { - mvars := make(map[string]string) - for _, v := range vars { - mvars[v.Name] = v.Value - } - w.Header().Set("Content-Type", "application/json") - _ = json.NewEncoder(w).Encode(mvars) - return + return vars +} + +func respondWithJSON(w http.ResponseWriter, vars []envValue, msg string) { + mvars := make(map[string]string) + for _, v := range vars { + mvars[v.Name] = v.Value } + if msg != "" { + mvars["ResponseMessage"] = msg + } + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode(mvars) +} - // gridTable is reused from twopcz.go. +func respondWithHTML(w http.ResponseWriter, vars []envValue, msg string) { w.Write(gridTable) w.Write([]byte("

Internal Variables

\n")) if msg != "" { From 2f281de91c1e5ed6fe17d8f53174a36a55018175 Mon Sep 17 00:00:00 2001 From: Manan Gupta <35839558+GuptaManan100@users.noreply.github.com> Date: Thu, 28 Nov 2024 18:59:55 +0530 Subject: [PATCH 04/17] Fix flakiness in `TestReadTransactionStatus` (#17277) Signed-off-by: Manan Gupta --- .github/CODEOWNERS | 1 + .../endtoend/transaction/twopc/twopc_test.go | 24 +++++++++++++------ 2 files changed, 18 insertions(+), 7 deletions(-) diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index 6778ec7e0de..7b9b54a19bb 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -28,6 +28,7 @@ go.sum @ajm188 @deepthi @harshit-gangal @mattlord @rohit-nayak-ps @systay @froui /go/test/endtoend/onlineddl @rohit-nayak-ps @shlomi-noach /go/test/endtoend/messaging @mattlord @rohit-nayak-ps @derekperkins /go/test/endtoend/schemadiff @shlomi-noach @mattlord +/go/test/endtoend/transaction @harshit-gangal @systay @frouioui @GuptaManan100 /go/test/endtoend/*throttler* @shlomi-noach @mattlord @timvaillancourt /go/test/endtoend/vtgate @harshit-gangal @systay @frouioui /go/test/endtoend/vtorc @deepthi @shlomi-noach @GuptaManan100 @timvaillancourt diff --git a/go/test/endtoend/transaction/twopc/twopc_test.go b/go/test/endtoend/transaction/twopc/twopc_test.go index 033d93f8792..df064fb16cd 100644 --- a/go/test/endtoend/transaction/twopc/twopc_test.go +++ b/go/test/endtoend/transaction/twopc/twopc_test.go @@ -1395,8 +1395,6 @@ func TestReadTransactionStatus(t *testing.T) { "insert into twopc_t1(id, col) values(6, 4)", "insert into twopc_t1(id, col) values(9, 4)", }) - // Allow enough time for the commit to have started. - time.Sleep(1 * time.Second) // Create a tablet manager client and use it to read the transaction state. tmc := grpctmclient.NewClient() @@ -1405,12 +1403,24 @@ func TestReadTransactionStatus(t *testing.T) { defer cancel() primaryTablet := getTablet(clusterInstance.Keyspaces[0].Shards[2].FindPrimaryTablet().GrpcPort) + // Wait for the transaction to show up in the unresolved list. var unresTransaction *querypb.TransactionMetadata - for _, shard := range clusterInstance.Keyspaces[0].Shards { - urtRes, err := tmc.GetUnresolvedTransactions(ctx, getTablet(shard.FindPrimaryTablet().GrpcPort), 1) - require.NoError(t, err) - if len(urtRes) > 0 { - unresTransaction = urtRes[0] + timeout := time.After(10 * time.Second) + for { + for _, shard := range clusterInstance.Keyspaces[0].Shards { + urtRes, err := tmc.GetUnresolvedTransactions(ctx, getTablet(shard.FindPrimaryTablet().GrpcPort), 1) + require.NoError(t, err) + if len(urtRes) > 0 { + unresTransaction = urtRes[0] + } + } + if unresTransaction != nil { + break + } + select { + case <-timeout: + require.Fail(t, "timed out waiting for unresolved transaction") + default: } } require.NotNil(t, unresTransaction) From aad2e469aee458443a884ad7f200cac6e215ed92 Mon Sep 17 00:00:00 2001 From: Manan Gupta <35839558+GuptaManan100@users.noreply.github.com> Date: Fri, 29 Nov 2024 00:45:43 +0530 Subject: [PATCH 05/17] Add CI for VTop example (#16007) Signed-off-by: Manan Gupta Signed-off-by: Florent Poinsard Signed-off-by: Rohit Nayak Co-authored-by: Florent Poinsard Co-authored-by: Rohit Nayak --- .github/workflows/vtop_example.yml | 97 ++++++++++++++++++++++ examples/operator/101_initial_cluster.yaml | 30 ++----- test/config.json | 11 +++ test/vtop_example.sh | 3 +- tools/get_kubectl_kind.sh | 4 +- 5 files changed, 121 insertions(+), 24 deletions(-) create mode 100644 .github/workflows/vtop_example.yml diff --git a/.github/workflows/vtop_example.yml b/.github/workflows/vtop_example.yml new file mode 100644 index 00000000000..fb5ae87c101 --- /dev/null +++ b/.github/workflows/vtop_example.yml @@ -0,0 +1,97 @@ +name: vtop_example +on: [push, pull_request] +concurrency: + group: format('{0}-{1}', ${{ github.ref }}, 'vtop_example') + cancel-in-progress: true + +jobs: + build: + name: VTop Example + runs-on: self-hosted + + steps: + - name: Skip CI + run: | + if [[ "${{contains( github.event.pull_request.labels.*.name, 'Skip CI')}}" == "true" ]]; then + echo "skipping CI due to the 'Skip CI' label" + exit 1 + fi + + - name: Check if workflow needs to be skipped + id: skip-workflow + run: | + skip='false' + if [[ "${{github.event.pull_request}}" == "" ]] && [[ "${{github.ref}}" != "refs/heads/main" ]] && [[ ! "${{github.ref}}" =~ ^refs/heads/release-[0-9]+\.[0-9]$ ]] && [[ ! "${{github.ref}}" =~ "refs/tags/.*" ]]; then + skip='true' + fi + echo Skip ${skip} + echo "skip-workflow=${skip}" >> $GITHUB_OUTPUT + + PR_DATA=$(curl -s\ + -H "Authorization: token ${{ secrets.GITHUB_TOKEN }}" \ + -H "Accept: application/vnd.github.v3+json" \ + "https://api.github.com/repos/${{ github.repository }}/pulls/${{ github.event.pull_request.number }}") + draft=$(echo "$PR_DATA" | jq .draft -r) + echo "is_draft=${draft}" >> $GITHUB_OUTPUT + + - name: Check out code + if: steps.skip-workflow.outputs.skip-workflow == 'false' + uses: actions/checkout@692973e3d937129bcbf40652eb9f2f61becf3332 # v4.1.7 + + - name: Check for changes in relevant files + if: steps.skip-workflow.outputs.skip-workflow == 'false' + uses: dorny/paths-filter@ebc4d7e9ebcb0b1eb21480bb8f43113e996ac77a # v3.0.1 + id: changes + with: + token: '' + filters: | + end_to_end: + - 'go/**/*.go' + - 'go/vt/sidecardb/**/*.sql' + - 'test.go' + - 'Makefile' + - 'build.env' + - 'go.[sumod]' + - 'proto/*.proto' + - 'tools/**' + - 'config/**' + - 'bootstrap.sh' + - 'examples/**' + - 'test/**' + - '.github/workflows/vtop_example.yml' + + - name: Set up Go + if: steps.skip-workflow.outputs.skip-workflow == 'false' && steps.changes.outputs.end_to_end == 'true' + uses: actions/setup-go@0a12ed9d6a96ab950c8f026ed9f722fe0da7ef32 # v5.0.2 + with: + go-version-file: go.mod + + - name: Tune the OS + if: steps.skip-workflow.outputs.skip-workflow == 'false' && steps.changes.outputs.end_to_end == 'true' + run: | + echo '1024 65535' | sudo tee -a /proc/sys/net/ipv4/ip_local_port_range + + - name: Get dependencies + if: steps.skip-workflow.outputs.skip-workflow == 'false' && steps.changes.outputs.end_to_end == 'true' + run: | + # Install everything we need, and configure + sudo apt-get install -y eatmydata make + go mod download + + # needed for vtctldclient + - name: Build vitess + if: steps.skip-workflow.outputs.skip-workflow == 'false' && steps.changes.outputs.end_to_end == 'true' + run: | + make build + + - name: Install kubectl & kind + if: steps.skip-workflow.outputs.skip-workflow == 'false' && steps.changes.outputs.end_to_end == 'true' + run: | + make install_kubectl_kind + + - name: vtop_example + if: steps.skip-workflow.outputs.skip-workflow == 'false' && steps.changes.outputs.end_to_end == 'true' + timeout-minutes: 60 + run: | + source build.env + eatmydata -- go run test.go -docker=false -skip-build -print-log -follow -retry=1 -timeout=60m vtop_example \ No newline at end of file diff --git a/examples/operator/101_initial_cluster.yaml b/examples/operator/101_initial_cluster.yaml index c26219254f1..a4f9f447e21 100644 --- a/examples/operator/101_initial_cluster.yaml +++ b/examples/operator/101_initial_cluster.yaml @@ -15,7 +15,7 @@ spec: vtbackup: vitess/lite:latest vtorc: vitess/lite:latest mysqld: - mysql80Compatible: vitess/lite:latest + mysql80Compatible: mysql:8.0.30 mysqldExporter: prom/mysqld-exporter:v0.11.0 cells: - name: zone1 @@ -155,23 +155,6 @@ stringData: # Vitess defaults ############################################################################### - # Vitess-internal database. - CREATE DATABASE IF NOT EXISTS _vt; - # Note that definitions of local_metadata and shard_metadata should be the same - # as in production which is defined in go/vt/mysqlctl/metadata_tables.go. - CREATE TABLE IF NOT EXISTS _vt.local_metadata ( - name VARCHAR(255) NOT NULL, - value VARCHAR(255) NOT NULL, - db_name VARBINARY(255) NOT NULL, - PRIMARY KEY (db_name, name) - ) ENGINE=InnoDB; - CREATE TABLE IF NOT EXISTS _vt.shard_metadata ( - name VARCHAR(255) NOT NULL, - value MEDIUMBLOB NOT NULL, - db_name VARBINARY(255) NOT NULL, - PRIMARY KEY (db_name, name) - ) ENGINE=InnoDB; - # Admin user with all privileges. CREATE USER 'vt_dba'@'localhost'; GRANT ALL ON *.* TO 'vt_dba'@'localhost'; @@ -200,12 +183,10 @@ stringData: ON *.* TO 'vt_allprivs'@'localhost'; # User for slave replication connections. - # TODO: Should we set a password on this since it allows remote connections? CREATE USER 'vt_repl'@'%'; GRANT REPLICATION SLAVE ON *.* TO 'vt_repl'@'%'; - # User for Vitess filtered replication (binlog player). - # Same permissions as vt_app. + # User for Vitess VReplication (base vstreamers and vplayer). CREATE USER 'vt_filtered'@'localhost'; GRANT SELECT, INSERT, UPDATE, DELETE, CREATE, DROP, RELOAD, PROCESS, FILE, REFERENCES, INDEX, ALTER, SHOW DATABASES, CREATE TEMPORARY TABLES, @@ -213,6 +194,13 @@ stringData: SHOW VIEW, CREATE ROUTINE, ALTER ROUTINE, CREATE USER, EVENT, TRIGGER ON *.* TO 'vt_filtered'@'localhost'; + # User for general MySQL monitoring. + CREATE USER 'vt_monitoring'@'localhost'; + GRANT SELECT, PROCESS, SUPER, REPLICATION CLIENT, RELOAD + ON *.* TO 'vt_monitoring'@'localhost'; + GRANT SELECT, UPDATE, DELETE, DROP + ON performance_schema.* TO 'vt_monitoring'@'localhost'; + # custom sql is used to add custom scripts like creating users/passwords. We use it in our tests # {{custom_sql}} diff --git a/test/config.json b/test/config.json index c911232ce74..1e278546c7a 100644 --- a/test/config.json +++ b/test/config.json @@ -1238,6 +1238,17 @@ "RetryMax": 1, "Tags": [] }, + "vtop_example": { + "File": "", + "Args": [], + "Command": [ + "test/vtop_example.sh" + ], + "Manual": false, + "Shard": "", + "RetryMax": 1, + "Tags": [] + }, "vtorc_primary_failure": { "File": "unused.go", "Args": ["vitess.io/vitess/go/test/endtoend/vtorc/primaryfailure"], diff --git a/test/vtop_example.sh b/test/vtop_example.sh index 5ff90a2be7e..c537c0f844c 100755 --- a/test/vtop_example.sh +++ b/test/vtop_example.sh @@ -482,11 +482,12 @@ EOF waitForKeyspaceToBeServing customer 80- 1 } +kind delete cluster --name kind || true # Build the docker image for vitess/lite using the local code docker build -f docker/lite/Dockerfile -t vitess/lite:pr . # Build the docker image for vitess/vtadmin using the local code -docker build -f docker/binaries/vtadmin/Dockerfile --build-arg VT_BASE_VER=pr -t vitess/vtadmin:pr . +docker build -f docker/binaries/vtadmin/Dockerfile --build-arg VT_BASE_VER=pr -t vitess/vtadmin:pr ./docker/binaries/vtadmin # Print the docker images available docker image ls diff --git a/tools/get_kubectl_kind.sh b/tools/get_kubectl_kind.sh index 57df414fdd8..169b120aaa0 100755 --- a/tools/get_kubectl_kind.sh +++ b/tools/get_kubectl_kind.sh @@ -12,7 +12,7 @@ source build.env mkdir -p "$VTROOT/bin" cd "$VTROOT/bin" -KUBE_VERSION="${KUBE_VERSION:-v1.21.1}" +KUBE_VERSION="${KUBE_VERSION:-v1.31.0}" KUBERNETES_RELEASE_URL="${KUBERNETES_RELEASE_URL:-https://dl.k8s.io}" # Download kubectl if needed. @@ -28,7 +28,7 @@ ln -sf "kubectl-${KUBE_VERSION}" kubectl if ! command -v kind &> /dev/null then echo "Downloading kind..." - curl -L https://kind.sigs.k8s.io/dl/v0.12.0/kind-linux-amd64 > "kind" + curl -L https://kind.sigs.k8s.io/dl/v0.22.0/kind-linux-amd64 > "kind" chmod +x "kind" echo "Installed kind" else From 9422e32d10eec94e355204af148053a15deef14c Mon Sep 17 00:00:00 2001 From: Rohit Nayak <57520317+rohit-nayak-ps@users.noreply.github.com> Date: Fri, 29 Nov 2024 11:24:59 +0100 Subject: [PATCH 06/17] Tool to determine mapping from vindex and value to shard (#17290) Signed-off-by: Rohit Nayak --- tools/map-shard-for-value/Makefile | 22 ++ .../map-shard-for-value.go | 207 ++++++++++++++++++ .../map-shard-for-value.md | 47 ++++ .../map-shard-for-value_test.go | 90 ++++++++ 4 files changed, 366 insertions(+) create mode 100644 tools/map-shard-for-value/Makefile create mode 100755 tools/map-shard-for-value/map-shard-for-value.go create mode 100644 tools/map-shard-for-value/map-shard-for-value.md create mode 100644 tools/map-shard-for-value/map-shard-for-value_test.go diff --git a/tools/map-shard-for-value/Makefile b/tools/map-shard-for-value/Makefile new file mode 100644 index 00000000000..61bc88ac0ed --- /dev/null +++ b/tools/map-shard-for-value/Makefile @@ -0,0 +1,22 @@ +# Copyright 2024 The Vitess Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +build: + go build map-shard-for-value.go + +test: + echo "1\n-1\n99" | go run map-shard-for-value.go --total_shards=4 --vindex=xxhash + +clean: + rm -f map-shard-for-value diff --git a/tools/map-shard-for-value/map-shard-for-value.go b/tools/map-shard-for-value/map-shard-for-value.go new file mode 100755 index 00000000000..18a092d1371 --- /dev/null +++ b/tools/map-shard-for-value/map-shard-for-value.go @@ -0,0 +1,207 @@ +/* +Copyright 2024 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import ( + "bufio" + "context" + "encoding/hex" + "fmt" + "log" + "os" + "strconv" + "strings" + + flag "github.com/spf13/pflag" + + "vitess.io/vitess/go/vt/topo" + + "vitess.io/vitess/go/sqltypes" + "vitess.io/vitess/go/vt/key" + "vitess.io/vitess/go/vt/proto/topodata" + "vitess.io/vitess/go/vt/vtgate/vindexes" +) + +/* + * This tool reads a list of values from stdin and prints the + * corresponding keyspace ID and shard for each value. It uses the given vindex + * and shard ranges to determine the shard. The vindex is expected to be a + * single-column vindex. The shard ranges are specified as a comma-separated + * list of key ranges, example "-80,80-". + * If you have uniformly distributed shards, you can specify the total number + * of shards using the -total_shards flag, and the tool will generate the shard ranges + * using the same logic as the Vitess operator does (using the key.GenerateShardRanges() function). + * + * Example usage: + * echo "1\n2\n3" | go run shard-from-id.go -vindex=hash -shards=-80,80- + * + * Currently tested only for integer values and hash/xxhash vindexes. + */ + +func mapShard(allShards []*topodata.ShardReference, ksid key.DestinationKeyspaceID) (string, error) { + foundShard := "" + addShard := func(shard string) error { + foundShard = shard + return nil + } + if err := ksid.Resolve(allShards, addShard); err != nil { + return "", fmt.Errorf("failed to resolve keyspace ID: %v:: %s", ksid.String(), err) + } + + if foundShard == "" { + return "", fmt.Errorf("no shard found for keyspace ID: %v", ksid) + } + return foundShard, nil +} + +func selectShard(vindex vindexes.Vindex, value sqltypes.Value, allShards []*topodata.ShardReference) (string, key.DestinationKeyspaceID, error) { + ctx := context.Background() + + destinations, err := vindexes.Map(ctx, vindex, nil, [][]sqltypes.Value{{value}}) + if err != nil { + return "", nil, fmt.Errorf("failed to map value to keyspace ID: %w", err) + } + + if len(destinations) != 1 { + return "", nil, fmt.Errorf("unexpected number of destinations: %d", len(destinations)) + } + + ksid, ok := destinations[0].(key.DestinationKeyspaceID) + if !ok { + return "", nil, fmt.Errorf("unexpected destination type: %T", destinations[0]) + } + + foundShard, err := mapShard(allShards, ksid) + if err != nil { + return "", nil, fmt.Errorf("failed to map shard, original value %v, keyspace id %s: %w", value, ksid, err) + } + return foundShard, ksid, nil +} + +func getValue(valueStr, valueType string) (sqltypes.Value, error) { + var value sqltypes.Value + + switch valueType { + case "int": + valueInt, err := strconv.ParseInt(valueStr, 10, 64) + if err != nil { + return value, fmt.Errorf("failed to parse int value: %w", err) + } + value = sqltypes.NewInt64(valueInt) + case "uint": + valueUint, err := strconv.ParseUint(valueStr, 10, 64) + if err != nil { + return value, fmt.Errorf("failed to parse uint value: %w", err) + } + value = sqltypes.NewUint64(valueUint) + case "string": + value = sqltypes.NewVarChar(valueStr) + default: + return value, fmt.Errorf("unsupported value type: %s", valueType) + } + + return value, nil +} + +func getShardMap(shardsCSV *string) []*topodata.ShardReference { + var allShards []*topodata.ShardReference + + for _, shard := range strings.Split(*shardsCSV, ",") { + _, keyRange, err := topo.ValidateShardName(shard) + if err != nil { + log.Fatalf("invalid shard range: %s", shard) + } + allShards = append(allShards, &topodata.ShardReference{ + Name: shard, + KeyRange: keyRange, + }) + } + return allShards +} + +type output struct { + Value string + KeyspaceID string + Shard string +} + +func processValues(scanner *bufio.Scanner, shardsCSV *string, vindexName string, valueType string) ([]output, error) { + allShards := getShardMap(shardsCSV) + + vindex, err := vindexes.CreateVindex(vindexName, vindexName, nil) + if err != nil { + return nil, fmt.Errorf("failed to create vindex: %v", err) + } + var outputs []output + for scanner.Scan() { + valueStr := scanner.Text() + if valueStr == "" { + continue + } + value, err := getValue(valueStr, valueType) + if err != nil { + return nil, fmt.Errorf("failed to get value for: %v, value_type %s:: %v", valueStr, valueType, err) + } + shard, ksid, err := selectShard(vindex, value, allShards) + if err != nil { + // ignore errors so that we can go ahead with the computation for other values + continue + } + outputs = append(outputs, output{Value: valueStr, KeyspaceID: hex.EncodeToString(ksid), Shard: shard}) + } + return outputs, nil +} + +func printOutput(outputs []output) { + fmt.Println("value,keyspaceID,shard") + for _, output := range outputs { + fmt.Printf("%s,%s,%s\n", output.Value, output.KeyspaceID, output.Shard) + } +} + +func main() { + // Explicitly configuring the logger since it was flaky in displaying logs locally without this. + log.SetOutput(os.Stderr) + log.SetFlags(log.LstdFlags) + log.SetPrefix("LOG: ") + + vindexName := flag.String("vindex", "xxhash", "name of the vindex") + shardsCSV := flag.String("shards", "", "comma-separated list of shard ranges") + totalShards := flag.Int("total_shards", 0, "total number of uniformly distributed shards") + valueType := flag.String("value_type", "int", "type of the value (int, uint, or string)") + flag.Parse() + + if *totalShards > 0 { + if *shardsCSV != "" { + log.Fatalf("cannot specify both total_shards and shards") + } + shardArr, err := key.GenerateShardRanges(*totalShards) + if err != nil { + log.Fatalf("failed to generate shard ranges: %v", err) + } + *shardsCSV = strings.Join(shardArr, ",") + } + if *shardsCSV == "" { + log.Fatal("shards or total_shards must be specified") + } + scanner := bufio.NewScanner(os.Stdin) + outputs, err := processValues(scanner, shardsCSV, *vindexName, *valueType) + if err != nil { + log.Fatalf("failed to process values: %v", err) + } + printOutput(outputs) +} diff --git a/tools/map-shard-for-value/map-shard-for-value.md b/tools/map-shard-for-value/map-shard-for-value.md new file mode 100644 index 00000000000..17daf7f5fe5 --- /dev/null +++ b/tools/map-shard-for-value/map-shard-for-value.md @@ -0,0 +1,47 @@ +## Map Shard for Value Tool + +### Overview + +The `map-shard-for-value` tool maps a given value to a specific shard. This tool helps in determining +which shard a particular value belongs to, based on the vindex algorithm and shard ranges. + +### Features +- + +- Allows specifying the vindex type (e.g., `hash`, `xxhash`). +- Allows specifying the shard list of (for uniformly distributed shard ranges) the total number of shards to generate. +- Designed as a _filter_: Reads input values from `stdin` and outputs the corresponding shard information, so it can be + used to map values from a file or another program. + +### Usage + +```sh +make build +``` + +```sh +echo "1\n-1\n99" | ./map-shard-for-value --total_shards=4 --vindex=xxhash +value,keyspaceID,shard +1,d46405367612b4b7,c0- +-1,d8e2a6a7c8c7623d,c0- +99,200533312244abca,-40 + +echo "1\n-1\n99" | ./map-shard-for-value --vindex=hash --shards="-80,80-" +value,keyspaceID,shard +1,166b40b44aba4bd6,-80 +-1,355550b2150e2451,-80 +99,2c40ad56f4593c47,-80 +``` + +#### Flags + +- `--vindex`: Specifies the name of the vindex to use (e.g., `hash`, `xxhash`) (default `xxhash`) + +One (and only one) of these is required: + +- `--shards`: Comma-separated list of shard ranges +- `--total_shards`: Total number of shards, only if shards are uniformly distributed + +Optional: +- `--value_type`: Type of the value to map, one of int, uint, string (default `int`) + diff --git a/tools/map-shard-for-value/map-shard-for-value_test.go b/tools/map-shard-for-value/map-shard-for-value_test.go new file mode 100644 index 00000000000..ca014818bb9 --- /dev/null +++ b/tools/map-shard-for-value/map-shard-for-value_test.go @@ -0,0 +1,90 @@ +/* +Copyright 2024 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import ( + "bufio" + "fmt" + "strings" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestProcess(t *testing.T) { + type testCase struct { + name string + shardsCSV string + vindexType string + values []int + valueType string + expected []output + } + testCases := []testCase{ + { + name: "hash,2 shards", + shardsCSV: "-80,80-", + vindexType: "hash", + values: []int{1, 99}, + valueType: "int", + expected: []output{ + { + Value: "1", + KeyspaceID: "166b40b44aba4bd6", + Shard: "-80", + }, + { + Value: "99", + KeyspaceID: "2c40ad56f4593c47", + Shard: "-80", + }, + }, + }, + { + name: "xxhash,4 shards", + shardsCSV: "-40,40-80,80-c0,c0-", + vindexType: "xxhash", + values: []int{1, 99}, + valueType: "int", + expected: []output{ + { + Value: "1", + KeyspaceID: "d46405367612b4b7", + Shard: "c0-", + }, + { + Value: "99", + KeyspaceID: "200533312244abca", + Shard: "-40", + }, + }, + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + var input strings.Builder + for _, num := range tc.values { + fmt.Fprintf(&input, "%d\n", num) + } + reader := strings.NewReader(input.String()) + scanner := bufio.NewScanner(reader) + got, err := processValues(scanner, &tc.shardsCSV, tc.vindexType, tc.valueType) + require.NoError(t, err) + require.EqualValues(t, tc.expected, got) + }) + } +} From 68b25b30f210511eda149fe96871704ec800ab18 Mon Sep 17 00:00:00 2001 From: Noble Mittal <62551163+beingnoble03@users.noreply.github.com> Date: Fri, 29 Nov 2024 17:02:23 +0530 Subject: [PATCH 07/17] VTAdmin: Support for schema migrations view/create (#17134) Signed-off-by: Noble Mittal --- go/vt/proto/vtadmin/vtadmin.pb.go | 35 ++- go/vt/proto/vtadmin/vtadmin_vtproto.pb.go | 88 ++++++ go/vt/vtadmin/api.go | 26 ++ go/vt/vtadmin/http/schema_migrations.go | 15 +- proto/vtadmin.proto | 6 +- web/vtadmin/src/api/http.ts | 38 +++ web/vtadmin/src/components/App.tsx | 12 + web/vtadmin/src/components/NavRail.tsx | 3 + .../createWorkflow => dialog}/ErrorDialog.tsx | 4 +- .../components/routes/SchemaMigrations.tsx | 195 +++++++++++++ .../CreateSchemaMigration.module.scss | 30 ++ .../CreateSchemaMigration.tsx | 270 ++++++++++++++++++ .../createWorkflow/CreateMaterialize.tsx | 2 +- .../createWorkflow/CreateMoveTables.tsx | 2 +- .../routes/createWorkflow/CreateReshard.tsx | 2 +- web/vtadmin/src/hooks/api.ts | 24 ++ web/vtadmin/src/proto/vtadmin.d.ts | 12 + web/vtadmin/src/proto/vtadmin.js | 48 +++- web/vtadmin/src/util/schemaMigrations.ts | 31 ++ 19 files changed, 825 insertions(+), 18 deletions(-) rename web/vtadmin/src/components/{routes/createWorkflow => dialog}/ErrorDialog.tsx (94%) create mode 100644 web/vtadmin/src/components/routes/SchemaMigrations.tsx create mode 100644 web/vtadmin/src/components/routes/createSchemaMigration/CreateSchemaMigration.module.scss create mode 100644 web/vtadmin/src/components/routes/createSchemaMigration/CreateSchemaMigration.tsx create mode 100644 web/vtadmin/src/util/schemaMigrations.ts diff --git a/go/vt/proto/vtadmin/vtadmin.pb.go b/go/vt/proto/vtadmin/vtadmin.pb.go index e85385ec409..8b6a6997c8d 100644 --- a/go/vt/proto/vtadmin/vtadmin.pb.go +++ b/go/vt/proto/vtadmin/vtadmin.pb.go @@ -1203,8 +1203,12 @@ type ApplySchemaRequest struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - ClusterId string `protobuf:"bytes,1,opt,name=cluster_id,json=clusterId,proto3" json:"cluster_id,omitempty"` - Request *vtctldata.ApplySchemaRequest `protobuf:"bytes,2,opt,name=request,proto3" json:"request,omitempty"` + ClusterId string `protobuf:"bytes,1,opt,name=cluster_id,json=clusterId,proto3" json:"cluster_id,omitempty"` + // Request.Sql will be overriden by this Sql field. + Sql string `protobuf:"bytes,2,opt,name=sql,proto3" json:"sql,omitempty"` + // Request.CallerId will be overriden by this CallerId field. + CallerId string `protobuf:"bytes,3,opt,name=caller_id,json=callerId,proto3" json:"caller_id,omitempty"` + Request *vtctldata.ApplySchemaRequest `protobuf:"bytes,4,opt,name=request,proto3" json:"request,omitempty"` } func (x *ApplySchemaRequest) Reset() { @@ -1244,6 +1248,20 @@ func (x *ApplySchemaRequest) GetClusterId() string { return "" } +func (x *ApplySchemaRequest) GetSql() string { + if x != nil { + return x.Sql + } + return "" +} + +func (x *ApplySchemaRequest) GetCallerId() string { + if x != nil { + return x.CallerId + } + return "" +} + func (x *ApplySchemaRequest) GetRequest() *vtctldata.ApplySchemaRequest { if x != nil { return x.Request @@ -7731,11 +7749,14 @@ var file_vtadmin_proto_rawDesc = []byte{ 0x2e, 0x76, 0x74, 0x63, 0x74, 0x6c, 0x64, 0x61, 0x74, 0x61, 0x2e, 0x57, 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x53, 0x77, 0x69, 0x74, 0x63, 0x68, 0x54, 0x72, 0x61, 0x66, 0x66, 0x69, 0x63, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x52, 0x07, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, - 0x22, 0x6c, 0x0a, 0x12, 0x41, 0x70, 0x70, 0x6c, 0x79, 0x53, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x52, - 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1d, 0x0a, 0x0a, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, - 0x72, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x63, 0x6c, 0x75, 0x73, - 0x74, 0x65, 0x72, 0x49, 0x64, 0x12, 0x37, 0x0a, 0x07, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, - 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1d, 0x2e, 0x76, 0x74, 0x63, 0x74, 0x6c, 0x64, 0x61, + 0x22, 0x9b, 0x01, 0x0a, 0x12, 0x41, 0x70, 0x70, 0x6c, 0x79, 0x53, 0x63, 0x68, 0x65, 0x6d, 0x61, + 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1d, 0x0a, 0x0a, 0x63, 0x6c, 0x75, 0x73, 0x74, + 0x65, 0x72, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x63, 0x6c, 0x75, + 0x73, 0x74, 0x65, 0x72, 0x49, 0x64, 0x12, 0x10, 0x0a, 0x03, 0x73, 0x71, 0x6c, 0x18, 0x02, 0x20, + 0x01, 0x28, 0x09, 0x52, 0x03, 0x73, 0x71, 0x6c, 0x12, 0x1b, 0x0a, 0x09, 0x63, 0x61, 0x6c, 0x6c, + 0x65, 0x72, 0x5f, 0x69, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x63, 0x61, 0x6c, + 0x6c, 0x65, 0x72, 0x49, 0x64, 0x12, 0x37, 0x0a, 0x07, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, + 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1d, 0x2e, 0x76, 0x74, 0x63, 0x74, 0x6c, 0x64, 0x61, 0x74, 0x61, 0x2e, 0x41, 0x70, 0x70, 0x6c, 0x79, 0x53, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x52, 0x07, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x22, 0x80, 0x01, 0x0a, 0x1c, 0x43, 0x61, 0x6e, 0x63, 0x65, 0x6c, 0x53, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x4d, diff --git a/go/vt/proto/vtadmin/vtadmin_vtproto.pb.go b/go/vt/proto/vtadmin/vtadmin_vtproto.pb.go index bc0746b7b8a..82cca2cea06 100644 --- a/go/vt/proto/vtadmin/vtadmin_vtproto.pb.go +++ b/go/vt/proto/vtadmin/vtadmin_vtproto.pb.go @@ -454,6 +454,8 @@ func (m *ApplySchemaRequest) CloneVT() *ApplySchemaRequest { } r := new(ApplySchemaRequest) r.ClusterId = m.ClusterId + r.Sql = m.Sql + r.CallerId = m.CallerId r.Request = m.Request.CloneVT() if len(m.unknownFields) > 0 { r.unknownFields = make([]byte, len(m.unknownFields)) @@ -4038,6 +4040,20 @@ func (m *ApplySchemaRequest) MarshalToSizedBufferVT(dAtA []byte) (int, error) { i -= size i = protohelpers.EncodeVarint(dAtA, i, uint64(size)) i-- + dAtA[i] = 0x22 + } + if len(m.CallerId) > 0 { + i -= len(m.CallerId) + copy(dAtA[i:], m.CallerId) + i = protohelpers.EncodeVarint(dAtA, i, uint64(len(m.CallerId))) + i-- + dAtA[i] = 0x1a + } + if len(m.Sql) > 0 { + i -= len(m.Sql) + copy(dAtA[i:], m.Sql) + i = protohelpers.EncodeVarint(dAtA, i, uint64(len(m.Sql))) + i-- dAtA[i] = 0x12 } if len(m.ClusterId) > 0 { @@ -10321,6 +10337,14 @@ func (m *ApplySchemaRequest) SizeVT() (n int) { if l > 0 { n += 1 + l + protohelpers.SizeOfVarint(uint64(l)) } + l = len(m.Sql) + if l > 0 { + n += 1 + l + protohelpers.SizeOfVarint(uint64(l)) + } + l = len(m.CallerId) + if l > 0 { + n += 1 + l + protohelpers.SizeOfVarint(uint64(l)) + } if m.Request != nil { l = m.Request.SizeVT() n += 1 + l + protohelpers.SizeOfVarint(uint64(l)) @@ -15873,6 +15897,70 @@ func (m *ApplySchemaRequest) UnmarshalVT(dAtA []byte) error { m.ClusterId = string(dAtA[iNdEx:postIndex]) iNdEx = postIndex case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Sql", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return protohelpers.ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return protohelpers.ErrInvalidLength + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return protohelpers.ErrInvalidLength + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Sql = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 3: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field CallerId", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return protohelpers.ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return protohelpers.ErrInvalidLength + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return protohelpers.ErrInvalidLength + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.CallerId = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 4: if wireType != 2 { return fmt.Errorf("proto: wrong wireType = %d for field Request", wireType) } diff --git a/go/vt/vtadmin/api.go b/go/vt/vtadmin/api.go index cef8816504a..4f91459d9ed 100644 --- a/go/vt/vtadmin/api.go +++ b/go/vt/vtadmin/api.go @@ -59,6 +59,7 @@ import ( "vitess.io/vitess/go/vt/vtadmin/rbac" "vitess.io/vitess/go/vt/vtadmin/sort" "vitess.io/vitess/go/vt/vtadmin/vtadminproto" + "vitess.io/vitess/go/vt/vtctl/grpcvtctldserver" "vitess.io/vitess/go/vt/vtctl/workflow" "vitess.io/vitess/go/vt/vtenv" "vitess.io/vitess/go/vt/vterrors" @@ -488,6 +489,31 @@ func (api *API) ApplySchema(ctx context.Context, req *vtadminpb.ApplySchemaReque return nil, err } + // Parser with default options. New() itself initializes with default MySQL version. + parser, err := sqlparser.New(sqlparser.Options{ + TruncateUILen: 512, + TruncateErrLen: 0, + }) + if err != nil { + return nil, err + } + + // Split the sql statement received from request. + sqlParts, err := parser.SplitStatementToPieces(req.Sql) + if err != nil { + return nil, err + } + + req.Request.Sql = sqlParts + + // Set the callerID if not empty. + if req.CallerId != "" { + req.Request.CallerId = &vtrpcpb.CallerID{Principal: req.CallerId} + } + + // Set the default wait replicas timeout. + req.Request.WaitReplicasTimeout = protoutil.DurationToProto(grpcvtctldserver.DefaultWaitReplicasTimeout) + return c.ApplySchema(ctx, req.Request) } diff --git a/go/vt/vtadmin/http/schema_migrations.go b/go/vt/vtadmin/http/schema_migrations.go index e0207989648..3da6026fe9f 100644 --- a/go/vt/vtadmin/http/schema_migrations.go +++ b/go/vt/vtadmin/http/schema_migrations.go @@ -34,19 +34,26 @@ func ApplySchema(ctx context.Context, r Request, api *API) *JSONResponse { decoder := json.NewDecoder(r.Body) defer r.Body.Close() - var req vtctldatapb.ApplySchemaRequest - if err := decoder.Decode(&req); err != nil { + var body struct { + Sql string `json:"sql"` + CallerId string `json:"caller_id"` + Request vtctldatapb.ApplySchemaRequest `json:"request"` + } + + if err := decoder.Decode(&body); err != nil { return NewJSONResponse(nil, &errors.BadRequest{ Err: err, }) } vars := mux.Vars(r.Request) - req.Keyspace = vars["keyspace"] + body.Request.Keyspace = vars["keyspace"] resp, err := api.server.ApplySchema(ctx, &vtadminpb.ApplySchemaRequest{ ClusterId: vars["cluster_id"], - Request: &req, + Sql: body.Sql, + CallerId: body.CallerId, + Request: &body.Request, }) return NewJSONResponse(resp, err) diff --git a/proto/vtadmin.proto b/proto/vtadmin.proto index 78f086ec345..963d1fa5779 100644 --- a/proto/vtadmin.proto +++ b/proto/vtadmin.proto @@ -388,7 +388,11 @@ message WorkflowSwitchTrafficRequest { message ApplySchemaRequest { string cluster_id = 1; - vtctldata.ApplySchemaRequest request = 2; + // Request.Sql will be overriden by this Sql field. + string sql = 2; + // Request.CallerId will be overriden by this CallerId field. + string caller_id = 3; + vtctldata.ApplySchemaRequest request = 4; } message CancelSchemaMigrationRequest { diff --git a/web/vtadmin/src/api/http.ts b/web/vtadmin/src/api/http.ts index 3f75330d240..674df961ef0 100644 --- a/web/vtadmin/src/api/http.ts +++ b/web/vtadmin/src/api/http.ts @@ -1068,3 +1068,41 @@ export const showVDiff = async ({ clusterID, request }: ShowVDiffParams) => { return vtadmin.VDiffShowResponse.create(result); }; + +export const fetchSchemaMigrations = async (request: vtadmin.IGetSchemaMigrationsRequest) => { + const { result } = await vtfetch(`/api/migrations/`, { + body: JSON.stringify(request), + method: 'post', + }); + + const err = vtadmin.GetSchemaMigrationsResponse.verify(result); + if (err) throw Error(err); + + return vtadmin.GetSchemaMigrationsResponse.create(result); +}; + +export interface ApplySchemaParams { + clusterID: string; + keyspace: string; + callerID: string; + sql: string; + request: vtctldata.IApplySchemaRequest; +} + +export const applySchema = async ({ clusterID, keyspace, callerID, sql, request }: ApplySchemaParams) => { + const body = { + sql, + caller_id: callerID, + request, + }; + + const { result } = await vtfetch(`/api/migration/${clusterID}/${keyspace}`, { + body: JSON.stringify(body), + method: 'post', + }); + + const err = vtctldata.ApplySchemaResponse.verify(result); + if (err) throw Error(err); + + return vtctldata.ApplySchemaResponse.create(result); +}; diff --git a/web/vtadmin/src/components/App.tsx b/web/vtadmin/src/components/App.tsx index ef27a35dc95..3bb41ea35f0 100644 --- a/web/vtadmin/src/components/App.tsx +++ b/web/vtadmin/src/components/App.tsx @@ -45,6 +45,8 @@ import { Transactions } from './routes/Transactions'; import { Transaction } from './routes/transaction/Transaction'; import { CreateReshard } from './routes/createWorkflow/CreateReshard'; import { CreateMaterialize } from './routes/createWorkflow/CreateMaterialize'; +import { SchemaMigrations } from './routes/SchemaMigrations'; +import { CreateSchemaMigration } from './routes/createSchemaMigration/CreateSchemaMigration'; export const App = () => { return ( @@ -140,6 +142,16 @@ export const App = () => { + + + + + {!isReadOnlyMode() && ( + + + + )} + diff --git a/web/vtadmin/src/components/NavRail.tsx b/web/vtadmin/src/components/NavRail.tsx index 9f9e1bf1681..b30cd165684 100644 --- a/web/vtadmin/src/components/NavRail.tsx +++ b/web/vtadmin/src/components/NavRail.tsx @@ -65,6 +65,9 @@ export const NavRail = () => {
    +
  • + +
  • diff --git a/web/vtadmin/src/components/routes/createWorkflow/ErrorDialog.tsx b/web/vtadmin/src/components/dialog/ErrorDialog.tsx similarity index 94% rename from web/vtadmin/src/components/routes/createWorkflow/ErrorDialog.tsx rename to web/vtadmin/src/components/dialog/ErrorDialog.tsx index 25ac5dedb0b..087876e4cd2 100644 --- a/web/vtadmin/src/components/routes/createWorkflow/ErrorDialog.tsx +++ b/web/vtadmin/src/components/dialog/ErrorDialog.tsx @@ -14,8 +14,8 @@ * limitations under the License. */ import React from 'react'; -import Dialog from '../../dialog/Dialog'; -import { Icon, Icons } from '../../Icon'; +import Dialog from './Dialog'; +import { Icon, Icons } from '../Icon'; export interface ErrorDialogProps { errorTitle?: string; diff --git a/web/vtadmin/src/components/routes/SchemaMigrations.tsx b/web/vtadmin/src/components/routes/SchemaMigrations.tsx new file mode 100644 index 00000000000..1761d26de49 --- /dev/null +++ b/web/vtadmin/src/components/routes/SchemaMigrations.tsx @@ -0,0 +1,195 @@ +/** + * Copyright 2024 The Vitess Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import { useEffect, useState } from 'react'; +import { useKeyspaces, useSchemaMigrations } from '../../hooks/api'; +import { DataCell } from '../dataTable/DataCell'; +import { DataTable } from '../dataTable/DataTable'; +import { ContentContainer } from '../layout/ContentContainer'; +import { WorkspaceHeader } from '../layout/WorkspaceHeader'; +import { WorkspaceTitle } from '../layout/WorkspaceTitle'; +import { QueryLoadingPlaceholder } from '../placeholders/QueryLoadingPlaceholder'; +import { useDocumentTitle } from '../../hooks/useDocumentTitle'; +import { vtadmin } from '../../proto/vtadmin'; +import { Select } from '../inputs/Select'; +import { ShardLink } from '../links/ShardLink'; +import { formatDateTime } from '../../util/time'; +import { ReadOnlyGate } from '../ReadOnlyGate'; +import { formatSchemaMigrationStatus } from '../../util/schemaMigrations'; +import { Link } from 'react-router-dom'; +import { TabletLink } from '../links/TabletLink'; +import { formatAlias } from '../../util/tablets'; +import { useURLQuery } from '../../hooks/useURLQuery'; + +const COLUMNS = ['UUID', 'Status', 'DDL Action', 'Timestamps', 'Stage', 'Progress']; + +export const SchemaMigrations = () => { + useDocumentTitle('Schema Migrations'); + + const { query, replaceQuery } = useURLQuery(); + const urlKeyspace = query['keyspace']; + const urlCluster = query['cluster']; + + const keyspacesQuery = useKeyspaces(); + const { data: keyspaces = [], ...ksQuery } = keyspacesQuery; + + const [selectedKeyspace, setSelectedKeypsace] = useState(); + + const request: vtadmin.IGetSchemaMigrationsRequest = { + cluster_requests: [ + { + cluster_id: selectedKeyspace && selectedKeyspace.cluster?.id, + request: { + keyspace: selectedKeyspace && selectedKeyspace.keyspace?.name, + }, + }, + ], + }; + + const schemaMigrationsQuery = useSchemaMigrations(request, { + enabled: !!selectedKeyspace, + }); + + const schemaMigrations = schemaMigrationsQuery.data ? schemaMigrationsQuery.data.schema_migrations : []; + + const handleKeyspaceChange = (ks: vtadmin.Keyspace | null | undefined) => { + setSelectedKeypsace(ks); + + if (ks) { + replaceQuery({ keyspace: ks.keyspace?.name, cluster: ks.cluster?.id }); + } else { + replaceQuery({ keyspace: undefined, cluster: undefined }); + } + }; + + useEffect(() => { + if (urlKeyspace && urlCluster) { + const keyspace = keyspaces.find( + (ks) => ks.cluster?.id === String(urlCluster) && ks.keyspace?.name === String(urlKeyspace) + ); + + if (keyspace) { + setSelectedKeypsace(keyspace); + } else if (!ksQuery.isLoading) { + replaceQuery({ keyspace: undefined, cluster: undefined }); + } + } else { + setSelectedKeypsace(undefined); + } + }, [urlKeyspace, urlCluster, keyspaces, ksQuery.isLoading, replaceQuery]); + + const renderRows = (rows: vtadmin.ISchemaMigration[]) => { + return rows.map((row) => { + const migrationInfo = row.schema_migration; + + if (!migrationInfo) return <>; + + return ( + + +
    {migrationInfo.uuid}
    +
    + Tablet{' '} + + {formatAlias(migrationInfo.tablet)} + +
    +
    + Shard{' '} + + {`${migrationInfo.keyspace}/${migrationInfo.shard}`} + +
    +
    + +
    {formatSchemaMigrationStatus(migrationInfo)}
    +
    + {migrationInfo.ddl_action ? migrationInfo.ddl_action : '-'} + + {migrationInfo.added_at && ( +
    + Added + {formatDateTime(migrationInfo.added_at?.seconds)} +
    + )} + {migrationInfo.requested_at && ( +
    + Requested + {formatDateTime(migrationInfo.requested_at?.seconds)} +
    + )} + {migrationInfo.started_at && ( +
    + Started + {formatDateTime(migrationInfo.started_at?.seconds)} +
    + )} + {migrationInfo.completed_at && ( +
    + Completed + {formatDateTime(migrationInfo.completed_at?.seconds)} +
    + )} +
    + {migrationInfo.stage ? migrationInfo.stage : '-'} + {migrationInfo.progress ? `${migrationInfo.progress}%` : '-'} + + ); + }); + }; + + return ( +
    + +
    + Schema Migrations + +
    + + Create Schema Migration Request + +
    +
    +
    +
    + + +
    + ks?.keyspace?.name || ''} + items={clusterKeyspaces} + label="Keyspace" + onChange={(ks) => setFormData({ ...formData, keyspace: ks?.keyspace?.name || '' })} + placeholder={keyspacesQuery.isLoading ? 'Loading keyspaces...' : 'Select a keyspace'} + renderItem={(ks) => `${ks?.keyspace?.name}`} + selectedItem={selectedKeyspace} + /> + +
    + +