diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS
index 6778ec7e0de..7b9b54a19bb 100644
--- a/.github/CODEOWNERS
+++ b/.github/CODEOWNERS
@@ -28,6 +28,7 @@ go.sum @ajm188 @deepthi @harshit-gangal @mattlord @rohit-nayak-ps @systay @froui
 /go/test/endtoend/onlineddl @rohit-nayak-ps @shlomi-noach
 /go/test/endtoend/messaging @mattlord @rohit-nayak-ps @derekperkins
 /go/test/endtoend/schemadiff @shlomi-noach @mattlord
+/go/test/endtoend/transaction @harshit-gangal @systay @frouioui @GuptaManan100
 /go/test/endtoend/*throttler* @shlomi-noach @mattlord @timvaillancourt
 /go/test/endtoend/vtgate @harshit-gangal @systay @frouioui
 /go/test/endtoend/vtorc @deepthi @shlomi-noach @GuptaManan100 @timvaillancourt
diff --git a/.github/workflows/vtop_example.yml b/.github/workflows/vtop_example.yml
new file mode 100644
index 00000000000..fb5ae87c101
--- /dev/null
+++ b/.github/workflows/vtop_example.yml
@@ -0,0 +1,97 @@
+name: vtop_example
+on: [push, pull_request]
+concurrency:
+  group: format('{0}-{1}', ${{ github.ref }}, 'vtop_example')
+  cancel-in-progress: true
+
+jobs:
+  build:
+    name: VTop Example
+    runs-on: self-hosted
+
+    steps:
+      - name: Skip CI
+        run: |
+          if [[ "${{contains( github.event.pull_request.labels.*.name, 'Skip CI')}}" == "true" ]]; then
+            echo "skipping CI due to the 'Skip CI' label"
+            exit 1
+          fi
+
+      - name: Check if workflow needs to be skipped
+        id: skip-workflow
+        run: |
+          skip='false'
+          if [[ "${{github.event.pull_request}}" ==  "" ]] && [[ "${{github.ref}}" != "refs/heads/main" ]] && [[ ! "${{github.ref}}" =~ ^refs/heads/release-[0-9]+\.[0-9]$ ]] && [[ ! "${{github.ref}}" =~ "refs/tags/.*" ]]; then
+            skip='true'
+          fi
+          echo Skip ${skip}
+          echo "skip-workflow=${skip}" >> $GITHUB_OUTPUT
+          
+          PR_DATA=$(curl -s\
+            -H "Authorization: token ${{ secrets.GITHUB_TOKEN }}" \
+            -H "Accept: application/vnd.github.v3+json" \
+            "https://api.github.com/repos/${{ github.repository }}/pulls/${{ github.event.pull_request.number }}")
+          draft=$(echo "$PR_DATA" | jq .draft -r)
+          echo "is_draft=${draft}" >> $GITHUB_OUTPUT
+
+      - name: Check out code
+        if: steps.skip-workflow.outputs.skip-workflow == 'false'
+        uses: actions/checkout@692973e3d937129bcbf40652eb9f2f61becf3332 # v4.1.7
+
+      - name: Check for changes in relevant files
+        if: steps.skip-workflow.outputs.skip-workflow == 'false'
+        uses: dorny/paths-filter@ebc4d7e9ebcb0b1eb21480bb8f43113e996ac77a # v3.0.1
+        id: changes
+        with:
+          token: ''
+          filters: |
+            end_to_end:
+              - 'go/**/*.go'
+              - 'go/vt/sidecardb/**/*.sql'
+              - 'test.go'
+              - 'Makefile'
+              - 'build.env'
+              - 'go.[sumod]'
+              - 'proto/*.proto'
+              - 'tools/**'
+              - 'config/**'
+              - 'bootstrap.sh'
+              - 'examples/**'
+              - 'test/**'
+              - '.github/workflows/vtop_example.yml'
+
+      - name: Set up Go
+        if: steps.skip-workflow.outputs.skip-workflow == 'false' && steps.changes.outputs.end_to_end == 'true'
+        uses: actions/setup-go@0a12ed9d6a96ab950c8f026ed9f722fe0da7ef32 # v5.0.2
+        with:
+          go-version-file: go.mod
+
+      - name: Tune the OS
+        if: steps.skip-workflow.outputs.skip-workflow == 'false' && steps.changes.outputs.end_to_end == 'true'
+        run: |
+          echo '1024 65535' | sudo tee -a /proc/sys/net/ipv4/ip_local_port_range
+
+      - name: Get dependencies
+        if: steps.skip-workflow.outputs.skip-workflow == 'false' && steps.changes.outputs.end_to_end == 'true'
+        run: |
+          # Install everything we need, and configure
+          sudo apt-get install -y eatmydata make
+          go mod download
+
+      # needed for vtctldclient
+      - name: Build vitess
+        if: steps.skip-workflow.outputs.skip-workflow == 'false' && steps.changes.outputs.end_to_end == 'true'
+        run: |
+          make build
+
+      - name: Install kubectl & kind
+        if: steps.skip-workflow.outputs.skip-workflow == 'false' && steps.changes.outputs.end_to_end == 'true'
+        run: |
+          make install_kubectl_kind
+
+      - name: vtop_example
+        if: steps.skip-workflow.outputs.skip-workflow == 'false' && steps.changes.outputs.end_to_end == 'true'
+        timeout-minutes: 60
+        run: |
+          source build.env
+          eatmydata -- go run test.go -docker=false -skip-build -print-log -follow -retry=1 -timeout=60m vtop_example
\ No newline at end of file
diff --git a/changelog/19.0/19.0.8/changelog.md b/changelog/19.0/19.0.8/changelog.md
new file mode 100644
index 00000000000..97995e779b7
--- /dev/null
+++ b/changelog/19.0/19.0.8/changelog.md
@@ -0,0 +1,29 @@
+# Changelog of Vitess v19.0.8
+
+### Bug fixes 
+#### Topology
+ * [release-19.0] Close zookeeper topo connection on disconnect (#17136) [#17191](https://github.com/vitessio/vitess/pull/17191) 
+#### VTTablet
+ * [release-19.0] Fix deadlock in messager and health streamer (#17230) [#17233](https://github.com/vitessio/vitess/pull/17233)
+ * [release-19.0] Fix potential deadlock in health streamer (#17261) [#17268](https://github.com/vitessio/vitess/pull/17268)
+### CI/Build 
+#### Build/CI
+ * [release-19.0] Specify Ubuntu 24.04 for all jobs (#17278) [#17280](https://github.com/vitessio/vitess/pull/17280) 
+#### Cluster management
+ * [release-19.0] Fix flakiness in `TestListenerShutdown` (#17024) [#17187](https://github.com/vitessio/vitess/pull/17187) 
+#### General
+ * [release-19.0] Upgrade the Golang version to `go1.22.9` [#17214](https://github.com/vitessio/vitess/pull/17214)
+### Enhancement 
+#### Query Serving
+ * [release-19.0] Fix to prevent stopping buffering prematurely (#17013) [#17203](https://github.com/vitessio/vitess/pull/17203)
+### Internal Cleanup 
+#### Build/CI
+ * [release-19.0] Change the name of the vitess-tester repository (#16917) [#17028](https://github.com/vitessio/vitess/pull/17028)
+### Release 
+#### General
+ * [release-19.0] Bump to `v19.0.8-SNAPSHOT` after the `v19.0.7` release [#17158](https://github.com/vitessio/vitess/pull/17158)
+ * [release-19.0] Code Freeze for `v19.0.8` [#17310](https://github.com/vitessio/vitess/pull/17310)
+### Testing 
+#### Build/CI
+ * [release-19.0] Flakes: Address flakiness in TestZkConnClosedOnDisconnect (#17194) [#17195](https://github.com/vitessio/vitess/pull/17195)
+
diff --git a/changelog/19.0/19.0.8/release_notes.md b/changelog/19.0/19.0.8/release_notes.md
new file mode 100644
index 00000000000..ffb601fba4f
--- /dev/null
+++ b/changelog/19.0/19.0.8/release_notes.md
@@ -0,0 +1,7 @@
+# Release of Vitess v19.0.8
+The entire changelog for this release can be found [here](https://github.com/vitessio/vitess/blob/main/changelog/19.0/19.0.8/changelog.md).
+
+The release includes 11 merged Pull Requests.
+
+Thanks to all our contributors: @app/vitess-bot, @frouioui, @vitess-bot
+
diff --git a/changelog/19.0/README.md b/changelog/19.0/README.md
index 008c92c2aec..5893d3b1f4c 100644
--- a/changelog/19.0/README.md
+++ b/changelog/19.0/README.md
@@ -1,4 +1,8 @@
 ## v19.0
+* **[19.0.8](19.0.8)**
+	* [Changelog](19.0.8/changelog.md)
+	* [Release Notes](19.0.8/release_notes.md)
+
 * **[19.0.7](19.0.7)**
 	* [Changelog](19.0.7/changelog.md)
 	* [Release Notes](19.0.7/release_notes.md)
diff --git a/changelog/20.0/20.0.4/changelog.md b/changelog/20.0/20.0.4/changelog.md
new file mode 100644
index 00000000000..ec4af560368
--- /dev/null
+++ b/changelog/20.0/20.0.4/changelog.md
@@ -0,0 +1,27 @@
+# Changelog of Vitess v20.0.4
+
+### Bug fixes 
+#### Query Serving
+ * [release-20.0] Use proper keyspace when updating the query graph of a reference DML (#17226) [#17257](https://github.com/vitessio/vitess/pull/17257) 
+#### Topology
+ * [release-20.0] Close zookeeper topo connection on disconnect (#17136) [#17192](https://github.com/vitessio/vitess/pull/17192) 
+#### VTTablet
+ * [release-20.0] Fix deadlock in messager and health streamer (#17230) [#17234](https://github.com/vitessio/vitess/pull/17234)
+ * [release-20.0] Fix potential deadlock in health streamer (#17261) [#17269](https://github.com/vitessio/vitess/pull/17269)
+### CI/Build 
+#### Build/CI
+ * [release-20.0] Specify Ubuntu 24.04 for all jobs (#17278) [#17281](https://github.com/vitessio/vitess/pull/17281) 
+#### Cluster management
+ * [release-20.0] Fix flakiness in `TestListenerShutdown` (#17024) [#17188](https://github.com/vitessio/vitess/pull/17188) 
+#### General
+ * [release-20.0] Upgrade the Golang version to `go1.22.9` [#17212](https://github.com/vitessio/vitess/pull/17212)
+### Enhancement 
+#### Query Serving
+ * [release-20.0] Fix to prevent stopping buffering prematurely (#17013) [#17204](https://github.com/vitessio/vitess/pull/17204)
+### Internal Cleanup 
+#### Build/CI
+ * [release-20.0] Change the name of the vitess-tester repository (#16917) [#17029](https://github.com/vitessio/vitess/pull/17029)
+### Testing 
+#### Build/CI
+ * [release-20.0] Flakes: Address flakiness in TestZkConnClosedOnDisconnect (#17194) [#17196](https://github.com/vitessio/vitess/pull/17196)
+
diff --git a/changelog/20.0/20.0.4/release_notes.md b/changelog/20.0/20.0.4/release_notes.md
new file mode 100644
index 00000000000..42dc5b2b8a3
--- /dev/null
+++ b/changelog/20.0/20.0.4/release_notes.md
@@ -0,0 +1,7 @@
+# Release of Vitess v20.0.4
+The entire changelog for this release can be found [here](https://github.com/vitessio/vitess/blob/main/changelog/20.0/20.0.4/changelog.md).
+
+The release includes 10 merged Pull Requests.
+
+Thanks to all our contributors: @app/vitess-bot, @frouioui
+
diff --git a/changelog/20.0/README.md b/changelog/20.0/README.md
index f41ea711fb8..2fe6e3d9d61 100644
--- a/changelog/20.0/README.md
+++ b/changelog/20.0/README.md
@@ -1,4 +1,8 @@
 ## v20.0
+* **[20.0.4](20.0.4)**
+	* [Changelog](20.0.4/changelog.md)
+	* [Release Notes](20.0.4/release_notes.md)
+
 * **[20.0.3](20.0.3)**
 	* [Changelog](20.0.3/changelog.md)
 	* [Release Notes](20.0.3/release_notes.md)
diff --git a/changelog/21.0/21.0.0/summary.md b/changelog/21.0/21.0.0/summary.md
index 512aa45a12f..1c34f5ad81a 100644
--- a/changelog/21.0/21.0.0/summary.md
+++ b/changelog/21.0/21.0.0/summary.md
@@ -9,6 +9,7 @@
         - [Deprecated VTTablet Flags](#vttablet-flags)
         - [Deletion of deprecated metrics](#metric-deletion)
         - [Deprecated Metrics](#deprecations-metrics)
+    - **[RPC Changes](#rpc-changes)**
     - **[Traffic Mirroring](#traffic-mirroring)**
     - **[Atomic Distributed Transaction Support](#atomic-transaction)**
     - **[New VTGate Shutdown Behavior](#new-vtgate-shutdown-behavior)**
@@ -77,6 +78,12 @@ The following metrics are now deprecated and will be deleted in a future release
 | `vttablet` |   `QueryCacheHits`    |   `QueryEnginePlanCacheHits`    |
 | `vttablet` |  `QueryCacheMisses`   |  `QueryEnginePlanCacheMisses`   |
 
+### <a id="rpc-changes"/>RPC Changes</a>
+
+These are the RPC changes made in this release - 
+1. `ReadReparentJournalInfo` RPC has been added in TabletManagerClient interface, that is going to be used in EmergencyReparentShard for better errant GTID detection.
+2. `PrimaryStatus` RPC in TabletManagerClient interface has been updated to also return the server UUID of the primary. This is going to be used in the vttablets so that they can do their own errant GTID detection in `SetReplicationSource`.
+
 ### <a id="traffic-mirroring"/>Traffic Mirroring</a>
 
 Traffic mirroring is intended to help reduce some of the uncertainty inherent to `MoveTables SwitchTraffic`. When
diff --git a/changelog/21.0/21.0.1/changelog.md b/changelog/21.0/21.0.1/changelog.md
new file mode 100644
index 00000000000..1410591c3d5
--- /dev/null
+++ b/changelog/21.0/21.0.1/changelog.md
@@ -0,0 +1,57 @@
+# Changelog of Vitess v21.0.1
+
+### Bug fixes 
+#### Backup and Restore
+ * [release-21.0] Fix how we cancel the context in the builtin backup engine (#17285) [#17291](https://github.com/vitessio/vitess/pull/17291)
+ * [release-21.0] S3: optional endpoint resolver and correct retrier [#17307](https://github.com/vitessio/vitess/pull/17307) 
+#### Cluster management
+ * [release-21.0] Fix panic in vttablet when closing topo server twice (#17094) [#17122](https://github.com/vitessio/vitess/pull/17122) 
+#### Online DDL
+ * [release-21.0] Online DDL: fix defer function, potential connection pool exhaustion (#17207) [#17210](https://github.com/vitessio/vitess/pull/17210) 
+#### Query Serving
+ * [release-21.0] bugfix: treat EXPLAIN like SELECT (#17054) [#17058](https://github.com/vitessio/vitess/pull/17058)
+ * [release-21.0] Delegate Column Availability Checks to MySQL for Single-Route Queries (#17077) [#17087](https://github.com/vitessio/vitess/pull/17087)
+ * [release-21.0] bugfix: Handle CTEs with columns named in the CTE def (#17179) [#17181](https://github.com/vitessio/vitess/pull/17181)
+ * [release-21.0] Use proper keyspace when updating the query graph of a reference DML (#17226) [#17258](https://github.com/vitessio/vitess/pull/17258) 
+#### Topology
+ * [release-21.0] Close zookeeper topo connection on disconnect (#17136) [#17193](https://github.com/vitessio/vitess/pull/17193) 
+#### VReplication
+ * [release-21.0] VReplication: Qualify and SQL escape tables in created AutoIncrement VSchema definitions (#17174) [#17176](https://github.com/vitessio/vitess/pull/17176) 
+#### VTTablet
+ * [release-21.0] Fix deadlock in messager and health streamer (#17230) [#17235](https://github.com/vitessio/vitess/pull/17235)
+ * [release-21.0] Fix potential deadlock in health streamer (#17261) [#17270](https://github.com/vitessio/vitess/pull/17270)
+### CI/Build 
+#### Build/CI
+ * [release-21.0] Specify Ubuntu 24.04 for all jobs (#17278) [#17282](https://github.com/vitessio/vitess/pull/17282) 
+#### Cluster management
+ * [release-21.0] Fix flakiness in `TestListenerShutdown` (#17024) [#17189](https://github.com/vitessio/vitess/pull/17189) 
+#### General
+ * [release-21.0] Upgrade the Golang version to `go1.23.3` [#17211](https://github.com/vitessio/vitess/pull/17211)
+### Dependencies 
+#### Java
+ * [release-21.0] java package updates for grpc and protobuf and release plugins (#17100) [#17105](https://github.com/vitessio/vitess/pull/17105)
+### Documentation 
+#### Documentation
+ * [Direct PR][release-21.0] Add RPC changes segment in the summary doc [#17034](https://github.com/vitessio/vitess/pull/17034)
+### Enhancement 
+#### Online DDL
+ * [release-21.0] Improve Schema Engine's TablesWithSize80 query (#17066) [#17091](https://github.com/vitessio/vitess/pull/17091) 
+#### Query Serving
+ * [release-21.0] Fix to prevent stopping buffering prematurely (#17013) [#17205](https://github.com/vitessio/vitess/pull/17205) 
+#### VReplication
+ * [release-21.0] Binlog: Improve ZstdInMemoryDecompressorMaxSize management (#17220) [#17241](https://github.com/vitessio/vitess/pull/17241)
+### Internal Cleanup 
+#### Build/CI
+ * [release-21.0] Change the name of the vitess-tester repository (#16917) [#17030](https://github.com/vitessio/vitess/pull/17030)
+### Regression 
+#### Backup and Restore
+ * [release-21.0] Fix unreachable errors when taking a backup (#17062) [#17112](https://github.com/vitessio/vitess/pull/17112)
+### Release 
+#### General
+ * [release-21.0] Bump to `v21.0.1-SNAPSHOT` after the `v21.0.0` release [#17098](https://github.com/vitessio/vitess/pull/17098)
+### Testing 
+#### Build/CI
+ * [release-21.0] Flakes: Address flakiness in TestZkConnClosedOnDisconnect (#17194) [#17197](https://github.com/vitessio/vitess/pull/17197) 
+#### Query Serving
+ * [release-21.0] fix: flaky test on twopc transaction (#17068) [#17070](https://github.com/vitessio/vitess/pull/17070)
+
diff --git a/changelog/21.0/21.0.1/release_notes.md b/changelog/21.0/21.0.1/release_notes.md
new file mode 100644
index 00000000000..e42d16379a6
--- /dev/null
+++ b/changelog/21.0/21.0.1/release_notes.md
@@ -0,0 +1,7 @@
+# Release of Vitess v21.0.1
+The entire changelog for this release can be found [here](https://github.com/vitessio/vitess/blob/main/changelog/21.0/21.0.1/changelog.md).
+
+The release includes 25 merged Pull Requests.
+
+Thanks to all our contributors: @GuptaManan100, @app/vitess-bot, @frouioui, @vitess-bot
+
diff --git a/changelog/21.0/README.md b/changelog/21.0/README.md
index a77e98bcaba..f3a98feb55a 100644
--- a/changelog/21.0/README.md
+++ b/changelog/21.0/README.md
@@ -1,4 +1,8 @@
 ## v21.0
+* **[21.0.1](21.0.1)**
+	* [Changelog](21.0.1/changelog.md)
+	* [Release Notes](21.0.1/release_notes.md)
+
 * **[21.0.0](21.0.0)**
 	* [Changelog](21.0.0/changelog.md)
 	* [Release Notes](21.0.0/release_notes.md)
diff --git a/examples/operator/101_initial_cluster.yaml b/examples/operator/101_initial_cluster.yaml
index c26219254f1..a4f9f447e21 100644
--- a/examples/operator/101_initial_cluster.yaml
+++ b/examples/operator/101_initial_cluster.yaml
@@ -15,7 +15,7 @@ spec:
     vtbackup: vitess/lite:latest
     vtorc: vitess/lite:latest
     mysqld:
-      mysql80Compatible: vitess/lite:latest
+      mysql80Compatible: mysql:8.0.30
     mysqldExporter: prom/mysqld-exporter:v0.11.0
   cells:
   - name: zone1
@@ -155,23 +155,6 @@ stringData:
     # Vitess defaults
     ###############################################################################
 
-    # Vitess-internal database.
-    CREATE DATABASE IF NOT EXISTS _vt;
-    # Note that definitions of local_metadata and shard_metadata should be the same
-    # as in production which is defined in go/vt/mysqlctl/metadata_tables.go.
-    CREATE TABLE IF NOT EXISTS _vt.local_metadata (
-      name VARCHAR(255) NOT NULL,
-      value VARCHAR(255) NOT NULL,
-      db_name VARBINARY(255) NOT NULL,
-      PRIMARY KEY (db_name, name)
-      ) ENGINE=InnoDB;
-    CREATE TABLE IF NOT EXISTS _vt.shard_metadata (
-      name VARCHAR(255) NOT NULL,
-      value MEDIUMBLOB NOT NULL,
-      db_name VARBINARY(255) NOT NULL,
-      PRIMARY KEY (db_name, name)
-      ) ENGINE=InnoDB;
-
     # Admin user with all privileges.
     CREATE USER 'vt_dba'@'localhost';
     GRANT ALL ON *.* TO 'vt_dba'@'localhost';
@@ -200,12 +183,10 @@ stringData:
       ON *.* TO 'vt_allprivs'@'localhost';
 
     # User for slave replication connections.
-    # TODO: Should we set a password on this since it allows remote connections?
     CREATE USER 'vt_repl'@'%';
     GRANT REPLICATION SLAVE ON *.* TO 'vt_repl'@'%';
 
-    # User for Vitess filtered replication (binlog player).
-    # Same permissions as vt_app.
+    # User for Vitess VReplication (base vstreamers and vplayer).
     CREATE USER 'vt_filtered'@'localhost';
     GRANT SELECT, INSERT, UPDATE, DELETE, CREATE, DROP, RELOAD, PROCESS, FILE,
       REFERENCES, INDEX, ALTER, SHOW DATABASES, CREATE TEMPORARY TABLES,
@@ -213,6 +194,13 @@ stringData:
       SHOW VIEW, CREATE ROUTINE, ALTER ROUTINE, CREATE USER, EVENT, TRIGGER
       ON *.* TO 'vt_filtered'@'localhost';
 
+    # User for general MySQL monitoring.
+    CREATE USER 'vt_monitoring'@'localhost';
+    GRANT SELECT, PROCESS, SUPER, REPLICATION CLIENT, RELOAD
+    ON *.* TO 'vt_monitoring'@'localhost';
+    GRANT SELECT, UPDATE, DELETE, DROP
+    ON performance_schema.* TO 'vt_monitoring'@'localhost';
+
     # custom sql is used to add custom scripts like creating users/passwords. We use it in our tests
     # {{custom_sql}}
 
diff --git a/go/test/endtoend/transaction/twopc/twopc_test.go b/go/test/endtoend/transaction/twopc/twopc_test.go
index 033d93f8792..df064fb16cd 100644
--- a/go/test/endtoend/transaction/twopc/twopc_test.go
+++ b/go/test/endtoend/transaction/twopc/twopc_test.go
@@ -1395,8 +1395,6 @@ func TestReadTransactionStatus(t *testing.T) {
 		"insert into twopc_t1(id, col) values(6, 4)",
 		"insert into twopc_t1(id, col) values(9, 4)",
 	})
-	// Allow enough time for the commit to have started.
-	time.Sleep(1 * time.Second)
 
 	// Create a tablet manager client and use it to read the transaction state.
 	tmc := grpctmclient.NewClient()
@@ -1405,12 +1403,24 @@ func TestReadTransactionStatus(t *testing.T) {
 	defer cancel()
 
 	primaryTablet := getTablet(clusterInstance.Keyspaces[0].Shards[2].FindPrimaryTablet().GrpcPort)
+	// Wait for the transaction to show up in the unresolved list.
 	var unresTransaction *querypb.TransactionMetadata
-	for _, shard := range clusterInstance.Keyspaces[0].Shards {
-		urtRes, err := tmc.GetUnresolvedTransactions(ctx, getTablet(shard.FindPrimaryTablet().GrpcPort), 1)
-		require.NoError(t, err)
-		if len(urtRes) > 0 {
-			unresTransaction = urtRes[0]
+	timeout := time.After(10 * time.Second)
+	for {
+		for _, shard := range clusterInstance.Keyspaces[0].Shards {
+			urtRes, err := tmc.GetUnresolvedTransactions(ctx, getTablet(shard.FindPrimaryTablet().GrpcPort), 1)
+			require.NoError(t, err)
+			if len(urtRes) > 0 {
+				unresTransaction = urtRes[0]
+			}
+		}
+		if unresTransaction != nil {
+			break
+		}
+		select {
+		case <-timeout:
+			require.Fail(t, "timed out waiting for unresolved transaction")
+		default:
 		}
 	}
 	require.NotNil(t, unresTransaction)
diff --git a/go/vt/mysqlctl/builtinbackupengine.go b/go/vt/mysqlctl/builtinbackupengine.go
index f3cbe5364a0..5aa759f6f7a 100644
--- a/go/vt/mysqlctl/builtinbackupengine.go
+++ b/go/vt/mysqlctl/builtinbackupengine.go
@@ -604,7 +604,15 @@ func (be *BuiltinBackupEngine) backupFiles(
 	wg := sync.WaitGroup{}
 
 	ctxCancel, cancel := context.WithCancel(ctx)
-	defer cancel()
+	defer func() {
+		// We may still have operations in flight that require a valid context, such as adding files to S3.
+		// Unless we encountered an error, we should not cancel the context, this is taken care of later
+		// in the process. If we encountered an error however, we can safely cancel the context as we should
+		// no longer work on anything and exit fast.
+		if finalErr != nil {
+			cancel()
+		}
+	}()
 
 	for i := range fes {
 		wg.Add(1)
@@ -1037,7 +1045,15 @@ func (be *BuiltinBackupEngine) restoreFiles(ctx context.Context, params RestoreP
 	wg := sync.WaitGroup{}
 
 	ctxCancel, cancel := context.WithCancel(ctx)
-	defer cancel()
+	defer func() {
+		// We may still have operations in flight that require a valid context, such as adding files to S3.
+		// Unless we encountered an error, we should not cancel the context. This is taken care of later
+		// in the process. If we encountered an error however, we can safely cancel the context as we should
+		// no longer work on anything and exit fast.
+		if err != nil {
+			cancel()
+		}
+	}()
 
 	for i := range fes {
 		wg.Add(1)
diff --git a/go/vt/proto/vtadmin/vtadmin.pb.go b/go/vt/proto/vtadmin/vtadmin.pb.go
index e85385ec409..8b6a6997c8d 100644
--- a/go/vt/proto/vtadmin/vtadmin.pb.go
+++ b/go/vt/proto/vtadmin/vtadmin.pb.go
@@ -1203,8 +1203,12 @@ type ApplySchemaRequest struct {
 	sizeCache     protoimpl.SizeCache
 	unknownFields protoimpl.UnknownFields
 
-	ClusterId string                        `protobuf:"bytes,1,opt,name=cluster_id,json=clusterId,proto3" json:"cluster_id,omitempty"`
-	Request   *vtctldata.ApplySchemaRequest `protobuf:"bytes,2,opt,name=request,proto3" json:"request,omitempty"`
+	ClusterId string `protobuf:"bytes,1,opt,name=cluster_id,json=clusterId,proto3" json:"cluster_id,omitempty"`
+	// Request.Sql will be overriden by this Sql field.
+	Sql string `protobuf:"bytes,2,opt,name=sql,proto3" json:"sql,omitempty"`
+	// Request.CallerId will be overriden by this CallerId field.
+	CallerId string                        `protobuf:"bytes,3,opt,name=caller_id,json=callerId,proto3" json:"caller_id,omitempty"`
+	Request  *vtctldata.ApplySchemaRequest `protobuf:"bytes,4,opt,name=request,proto3" json:"request,omitempty"`
 }
 
 func (x *ApplySchemaRequest) Reset() {
@@ -1244,6 +1248,20 @@ func (x *ApplySchemaRequest) GetClusterId() string {
 	return ""
 }
 
+func (x *ApplySchemaRequest) GetSql() string {
+	if x != nil {
+		return x.Sql
+	}
+	return ""
+}
+
+func (x *ApplySchemaRequest) GetCallerId() string {
+	if x != nil {
+		return x.CallerId
+	}
+	return ""
+}
+
 func (x *ApplySchemaRequest) GetRequest() *vtctldata.ApplySchemaRequest {
 	if x != nil {
 		return x.Request
@@ -7731,11 +7749,14 @@ var file_vtadmin_proto_rawDesc = []byte{
 	0x2e, 0x76, 0x74, 0x63, 0x74, 0x6c, 0x64, 0x61, 0x74, 0x61, 0x2e, 0x57, 0x6f, 0x72, 0x6b, 0x66,
 	0x6c, 0x6f, 0x77, 0x53, 0x77, 0x69, 0x74, 0x63, 0x68, 0x54, 0x72, 0x61, 0x66, 0x66, 0x69, 0x63,
 	0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x52, 0x07, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74,
-	0x22, 0x6c, 0x0a, 0x12, 0x41, 0x70, 0x70, 0x6c, 0x79, 0x53, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x52,
-	0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1d, 0x0a, 0x0a, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65,
-	0x72, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x63, 0x6c, 0x75, 0x73,
-	0x74, 0x65, 0x72, 0x49, 0x64, 0x12, 0x37, 0x0a, 0x07, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74,
-	0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1d, 0x2e, 0x76, 0x74, 0x63, 0x74, 0x6c, 0x64, 0x61,
+	0x22, 0x9b, 0x01, 0x0a, 0x12, 0x41, 0x70, 0x70, 0x6c, 0x79, 0x53, 0x63, 0x68, 0x65, 0x6d, 0x61,
+	0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1d, 0x0a, 0x0a, 0x63, 0x6c, 0x75, 0x73, 0x74,
+	0x65, 0x72, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x63, 0x6c, 0x75,
+	0x73, 0x74, 0x65, 0x72, 0x49, 0x64, 0x12, 0x10, 0x0a, 0x03, 0x73, 0x71, 0x6c, 0x18, 0x02, 0x20,
+	0x01, 0x28, 0x09, 0x52, 0x03, 0x73, 0x71, 0x6c, 0x12, 0x1b, 0x0a, 0x09, 0x63, 0x61, 0x6c, 0x6c,
+	0x65, 0x72, 0x5f, 0x69, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x63, 0x61, 0x6c,
+	0x6c, 0x65, 0x72, 0x49, 0x64, 0x12, 0x37, 0x0a, 0x07, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74,
+	0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1d, 0x2e, 0x76, 0x74, 0x63, 0x74, 0x6c, 0x64, 0x61,
 	0x74, 0x61, 0x2e, 0x41, 0x70, 0x70, 0x6c, 0x79, 0x53, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x52, 0x65,
 	0x71, 0x75, 0x65, 0x73, 0x74, 0x52, 0x07, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x22, 0x80,
 	0x01, 0x0a, 0x1c, 0x43, 0x61, 0x6e, 0x63, 0x65, 0x6c, 0x53, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x4d,
diff --git a/go/vt/proto/vtadmin/vtadmin_vtproto.pb.go b/go/vt/proto/vtadmin/vtadmin_vtproto.pb.go
index bc0746b7b8a..82cca2cea06 100644
--- a/go/vt/proto/vtadmin/vtadmin_vtproto.pb.go
+++ b/go/vt/proto/vtadmin/vtadmin_vtproto.pb.go
@@ -454,6 +454,8 @@ func (m *ApplySchemaRequest) CloneVT() *ApplySchemaRequest {
 	}
 	r := new(ApplySchemaRequest)
 	r.ClusterId = m.ClusterId
+	r.Sql = m.Sql
+	r.CallerId = m.CallerId
 	r.Request = m.Request.CloneVT()
 	if len(m.unknownFields) > 0 {
 		r.unknownFields = make([]byte, len(m.unknownFields))
@@ -4038,6 +4040,20 @@ func (m *ApplySchemaRequest) MarshalToSizedBufferVT(dAtA []byte) (int, error) {
 		i -= size
 		i = protohelpers.EncodeVarint(dAtA, i, uint64(size))
 		i--
+		dAtA[i] = 0x22
+	}
+	if len(m.CallerId) > 0 {
+		i -= len(m.CallerId)
+		copy(dAtA[i:], m.CallerId)
+		i = protohelpers.EncodeVarint(dAtA, i, uint64(len(m.CallerId)))
+		i--
+		dAtA[i] = 0x1a
+	}
+	if len(m.Sql) > 0 {
+		i -= len(m.Sql)
+		copy(dAtA[i:], m.Sql)
+		i = protohelpers.EncodeVarint(dAtA, i, uint64(len(m.Sql)))
+		i--
 		dAtA[i] = 0x12
 	}
 	if len(m.ClusterId) > 0 {
@@ -10321,6 +10337,14 @@ func (m *ApplySchemaRequest) SizeVT() (n int) {
 	if l > 0 {
 		n += 1 + l + protohelpers.SizeOfVarint(uint64(l))
 	}
+	l = len(m.Sql)
+	if l > 0 {
+		n += 1 + l + protohelpers.SizeOfVarint(uint64(l))
+	}
+	l = len(m.CallerId)
+	if l > 0 {
+		n += 1 + l + protohelpers.SizeOfVarint(uint64(l))
+	}
 	if m.Request != nil {
 		l = m.Request.SizeVT()
 		n += 1 + l + protohelpers.SizeOfVarint(uint64(l))
@@ -15873,6 +15897,70 @@ func (m *ApplySchemaRequest) UnmarshalVT(dAtA []byte) error {
 			m.ClusterId = string(dAtA[iNdEx:postIndex])
 			iNdEx = postIndex
 		case 2:
+			if wireType != 2 {
+				return fmt.Errorf("proto: wrong wireType = %d for field Sql", wireType)
+			}
+			var stringLen uint64
+			for shift := uint(0); ; shift += 7 {
+				if shift >= 64 {
+					return protohelpers.ErrIntOverflow
+				}
+				if iNdEx >= l {
+					return io.ErrUnexpectedEOF
+				}
+				b := dAtA[iNdEx]
+				iNdEx++
+				stringLen |= uint64(b&0x7F) << shift
+				if b < 0x80 {
+					break
+				}
+			}
+			intStringLen := int(stringLen)
+			if intStringLen < 0 {
+				return protohelpers.ErrInvalidLength
+			}
+			postIndex := iNdEx + intStringLen
+			if postIndex < 0 {
+				return protohelpers.ErrInvalidLength
+			}
+			if postIndex > l {
+				return io.ErrUnexpectedEOF
+			}
+			m.Sql = string(dAtA[iNdEx:postIndex])
+			iNdEx = postIndex
+		case 3:
+			if wireType != 2 {
+				return fmt.Errorf("proto: wrong wireType = %d for field CallerId", wireType)
+			}
+			var stringLen uint64
+			for shift := uint(0); ; shift += 7 {
+				if shift >= 64 {
+					return protohelpers.ErrIntOverflow
+				}
+				if iNdEx >= l {
+					return io.ErrUnexpectedEOF
+				}
+				b := dAtA[iNdEx]
+				iNdEx++
+				stringLen |= uint64(b&0x7F) << shift
+				if b < 0x80 {
+					break
+				}
+			}
+			intStringLen := int(stringLen)
+			if intStringLen < 0 {
+				return protohelpers.ErrInvalidLength
+			}
+			postIndex := iNdEx + intStringLen
+			if postIndex < 0 {
+				return protohelpers.ErrInvalidLength
+			}
+			if postIndex > l {
+				return io.ErrUnexpectedEOF
+			}
+			m.CallerId = string(dAtA[iNdEx:postIndex])
+			iNdEx = postIndex
+		case 4:
 			if wireType != 2 {
 				return fmt.Errorf("proto: wrong wireType = %d for field Request", wireType)
 			}
diff --git a/go/vt/vtadmin/api.go b/go/vt/vtadmin/api.go
index cef8816504a..4f91459d9ed 100644
--- a/go/vt/vtadmin/api.go
+++ b/go/vt/vtadmin/api.go
@@ -59,6 +59,7 @@ import (
 	"vitess.io/vitess/go/vt/vtadmin/rbac"
 	"vitess.io/vitess/go/vt/vtadmin/sort"
 	"vitess.io/vitess/go/vt/vtadmin/vtadminproto"
+	"vitess.io/vitess/go/vt/vtctl/grpcvtctldserver"
 	"vitess.io/vitess/go/vt/vtctl/workflow"
 	"vitess.io/vitess/go/vt/vtenv"
 	"vitess.io/vitess/go/vt/vterrors"
@@ -488,6 +489,31 @@ func (api *API) ApplySchema(ctx context.Context, req *vtadminpb.ApplySchemaReque
 		return nil, err
 	}
 
+	// Parser with default options. New() itself initializes with default MySQL version.
+	parser, err := sqlparser.New(sqlparser.Options{
+		TruncateUILen:  512,
+		TruncateErrLen: 0,
+	})
+	if err != nil {
+		return nil, err
+	}
+
+	// Split the sql statement received from request.
+	sqlParts, err := parser.SplitStatementToPieces(req.Sql)
+	if err != nil {
+		return nil, err
+	}
+
+	req.Request.Sql = sqlParts
+
+	// Set the callerID if not empty.
+	if req.CallerId != "" {
+		req.Request.CallerId = &vtrpcpb.CallerID{Principal: req.CallerId}
+	}
+
+	// Set the default wait replicas timeout.
+	req.Request.WaitReplicasTimeout = protoutil.DurationToProto(grpcvtctldserver.DefaultWaitReplicasTimeout)
+
 	return c.ApplySchema(ctx, req.Request)
 }
 
diff --git a/go/vt/vtadmin/http/schema_migrations.go b/go/vt/vtadmin/http/schema_migrations.go
index e0207989648..3da6026fe9f 100644
--- a/go/vt/vtadmin/http/schema_migrations.go
+++ b/go/vt/vtadmin/http/schema_migrations.go
@@ -34,19 +34,26 @@ func ApplySchema(ctx context.Context, r Request, api *API) *JSONResponse {
 	decoder := json.NewDecoder(r.Body)
 	defer r.Body.Close()
 
-	var req vtctldatapb.ApplySchemaRequest
-	if err := decoder.Decode(&req); err != nil {
+	var body struct {
+		Sql      string                         `json:"sql"`
+		CallerId string                         `json:"caller_id"`
+		Request  vtctldatapb.ApplySchemaRequest `json:"request"`
+	}
+
+	if err := decoder.Decode(&body); err != nil {
 		return NewJSONResponse(nil, &errors.BadRequest{
 			Err: err,
 		})
 	}
 
 	vars := mux.Vars(r.Request)
-	req.Keyspace = vars["keyspace"]
+	body.Request.Keyspace = vars["keyspace"]
 
 	resp, err := api.server.ApplySchema(ctx, &vtadminpb.ApplySchemaRequest{
 		ClusterId: vars["cluster_id"],
-		Request:   &req,
+		Sql:       body.Sql,
+		CallerId:  body.CallerId,
+		Request:   &body.Request,
 	})
 
 	return NewJSONResponse(resp, err)
diff --git a/go/vt/vtgate/debugenv.go b/go/vt/vtgate/debugenv.go
index 4fa989c69a3..7213353432d 100644
--- a/go/vt/vtgate/debugenv.go
+++ b/go/vt/vtgate/debugenv.go
@@ -22,9 +22,10 @@ import (
 	"html"
 	"net/http"
 	"strconv"
-	"text/template"
 	"time"
 
+	"github.com/google/safehtml/template"
+
 	"vitess.io/vitess/go/acl"
 	"vitess.io/vitess/go/vt/discovery"
 	"vitess.io/vitess/go/vt/log"
diff --git a/go/vt/vtgate/querylogz.go b/go/vt/vtgate/querylogz.go
index 7c72e950d4a..05d301f28be 100644
--- a/go/vt/vtgate/querylogz.go
+++ b/go/vt/vtgate/querylogz.go
@@ -20,15 +20,15 @@ import (
 	"net/http"
 	"strconv"
 	"strings"
-	"text/template"
 	"time"
 
-	"vitess.io/vitess/go/vt/vtgate/logstats"
+	"github.com/google/safehtml/template"
 
 	"vitess.io/vitess/go/acl"
 	"vitess.io/vitess/go/vt/log"
 	"vitess.io/vitess/go/vt/logz"
 	"vitess.io/vitess/go/vt/sqlparser"
+	"vitess.io/vitess/go/vt/vtgate/logstats"
 )
 
 var (
diff --git a/go/vt/vtgate/querylogz_test.go b/go/vt/vtgate/querylogz_test.go
index 3cecb983b3f..9236b2ac840 100644
--- a/go/vt/vtgate/querylogz_test.go
+++ b/go/vt/vtgate/querylogz_test.go
@@ -35,7 +35,7 @@ import (
 
 func TestQuerylogzHandlerFormatting(t *testing.T) {
 	req, _ := http.NewRequest("GET", "/querylogz?timeout=10&limit=1", nil)
-	logStats := logstats.NewLogStats(context.Background(), "Execute", "select name from test_table limit 1000", "suuid", nil)
+	logStats := logstats.NewLogStats(context.Background(), "Execute", "select name, 'inject <script>alert();</script>' from test_table limit 1000", "suuid", nil)
 	logStats.StmtType = "select"
 	logStats.RowsAffected = 1000
 	logStats.ShardQueries = 1
@@ -64,7 +64,7 @@ func TestQuerylogzHandlerFormatting(t *testing.T) {
 		`<td>0.002</td>`,
 		`<td>0.003</td>`,
 		`<td>select</td>`,
-		`<td>select name from test_table limit 1000</td>`,
+		regexp.QuoteMeta(`<td>select name,​ &#39;inject &lt;script&gt;alert()​;&lt;/script&gt;&#39; from test_table limit 1000</td>`),
 		`<td>1</td>`,
 		`<td>1000</td>`,
 		`<td></td>`,
@@ -94,7 +94,7 @@ func TestQuerylogzHandlerFormatting(t *testing.T) {
 		`<td>0.002</td>`,
 		`<td>0.003</td>`,
 		`<td>select</td>`,
-		`<td>select name from test_table limit 1000</td>`,
+		regexp.QuoteMeta(`<td>select name,​ &#39;inject &lt;script&gt;alert()​;&lt;/script&gt;&#39; from test_table limit 1000</td>`),
 		`<td>1</td>`,
 		`<td>1000</td>`,
 		`<td></td>`,
@@ -124,7 +124,7 @@ func TestQuerylogzHandlerFormatting(t *testing.T) {
 		`<td>0.002</td>`,
 		`<td>0.003</td>`,
 		`<td>select</td>`,
-		`<td>select name from test_table limit 1000</td>`,
+		regexp.QuoteMeta(`<td>select name,​ &#39;inject &lt;script&gt;alert()​;&lt;/script&gt;&#39; from test_table limit 1000</td>`),
 		`<td>1</td>`,
 		`<td>1000</td>`,
 		`<td></td>`,
diff --git a/go/vt/vtorc/inst/instance_dao.go b/go/vt/vtorc/inst/instance_dao.go
index e15958334f5..d1421dbc91d 100644
--- a/go/vt/vtorc/inst/instance_dao.go
+++ b/go/vt/vtorc/inst/instance_dao.go
@@ -362,7 +362,7 @@ Cleanup:
 		// Add replication group ancestry UUID as well. Otherwise, VTOrc thinks there are errant GTIDs in group
 		// members and its replicas, even though they are not.
 		instance.AncestryUUID = strings.Trim(instance.AncestryUUID, ",")
-		err = detectErrantGTIDs(tabletAlias, instance, tablet)
+		err = detectErrantGTIDs(instance, tablet)
 	}
 
 	latency.Stop("instance")
@@ -390,13 +390,18 @@ Cleanup:
 }
 
 // detectErrantGTIDs detects the errant GTIDs on an instance.
-func detectErrantGTIDs(tabletAlias string, instance *Instance, tablet *topodatapb.Tablet) (err error) {
+func detectErrantGTIDs(instance *Instance, tablet *topodatapb.Tablet) (err error) {
 	// If the tablet is not replicating from anyone, then it could be the previous primary.
 	// We should check for errant GTIDs by finding the difference with the shard's current primary.
 	if instance.primaryExecutedGtidSet == "" && instance.SourceHost == "" {
 		var primaryInstance *Instance
 		primaryAlias, _, _ := ReadShardPrimaryInformation(tablet.Keyspace, tablet.Shard)
 		if primaryAlias != "" {
+			// Check if the current tablet is the primary.
+			// If it is, then we don't need to run errant gtid detection on it.
+			if primaryAlias == instance.InstanceAlias {
+				return nil
+			}
 			primaryInstance, _, _ = ReadInstance(primaryAlias)
 		}
 		// Only run errant GTID detection, if we are sure that the data read of the current primary
@@ -434,7 +439,7 @@ func detectErrantGTIDs(tabletAlias string, instance *Instance, tablet *topodatap
 			if err == nil {
 				var gtidCount int64
 				gtidCount, err = replication.GTIDCount(instance.GtidErrant)
-				currentErrantGTIDCount.Set(tabletAlias, gtidCount)
+				currentErrantGTIDCount.Set(instance.InstanceAlias, gtidCount)
 			}
 		}
 	}
diff --git a/go/vt/vtorc/inst/instance_dao_test.go b/go/vt/vtorc/inst/instance_dao_test.go
index 35f5a208b10..cc3217442ed 100644
--- a/go/vt/vtorc/inst/instance_dao_test.go
+++ b/go/vt/vtorc/inst/instance_dao_test.go
@@ -854,7 +854,7 @@ func TestDetectErrantGTIDs(t *testing.T) {
 	primaryTablet := &topodatapb.Tablet{
 		Alias: &topodatapb.TabletAlias{
 			Cell: "zone-1",
-			Uid:  100,
+			Uid:  101,
 		},
 		Keyspace: keyspaceName,
 		Shard:    shardName,
@@ -881,7 +881,8 @@ func TestDetectErrantGTIDs(t *testing.T) {
 				require.NoError(t, err)
 			}
 
-			err = detectErrantGTIDs(topoproto.TabletAliasString(tablet.Alias), tt.instance, tablet)
+			tt.instance.InstanceAlias = topoproto.TabletAliasString(tablet.Alias)
+			err = detectErrantGTIDs(tt.instance, tablet)
 			if tt.wantErr {
 				require.Error(t, err)
 				return
@@ -891,3 +892,47 @@ func TestDetectErrantGTIDs(t *testing.T) {
 		})
 	}
 }
+
+// TestPrimaryErrantGTIDs tests that we don't run Errant GTID detection on the primary tablet itself!
+func TestPrimaryErrantGTIDs(t *testing.T) {
+	// Clear the database after the test. The easiest way to do that is to run all the initialization commands again.
+	defer func() {
+		db.ClearVTOrcDatabase()
+	}()
+	db.ClearVTOrcDatabase()
+	keyspaceName := "ks"
+	shardName := "0"
+	tablet := &topodatapb.Tablet{
+		Alias: &topodatapb.TabletAlias{
+			Cell: "zone-1",
+			Uid:  100,
+		},
+		Keyspace: keyspaceName,
+		Shard:    shardName,
+	}
+	instance := &Instance{
+		SourceHost:      "",
+		ExecutedGtidSet: "230ea8ea-81e3-11e4-972a-e25ec4bd140a:1-10589,8bc65c84-3fe4-11ed-a912-257f0fcdd6c9:1-34,316d193c-70e5-11e5-adb2-ecf4bb2262ff:1-341",
+		InstanceAlias:   topoproto.TabletAliasString(tablet.Alias),
+	}
+
+	// Save shard record for the primary tablet.
+	err := SaveShard(topo.NewShardInfo(keyspaceName, shardName, &topodatapb.Shard{
+		PrimaryAlias: tablet.Alias,
+	}, nil))
+	require.NoError(t, err)
+
+	// Store the tablet record and the instance.
+	err = SaveTablet(tablet)
+	require.NoError(t, err)
+	err = WriteInstance(instance, true, nil)
+	require.NoError(t, err)
+
+	// After this if we read a new information for the record that updates its
+	// gtid set further, we shouldn't be detecting errant GTIDs on it since it is the primary!
+	// We shouldn't be comparing it with a previous version of itself!
+	instance.ExecutedGtidSet = "230ea8ea-81e3-11e4-972a-e25ec4bd140a:1-10589,8bc65c84-3fe4-11ed-a912-257f0fcdd6c9:1-34,316d193c-70e5-11e5-adb2-ecf4bb2262ff:1-351"
+	err = detectErrantGTIDs(instance, tablet)
+	require.NoError(t, err)
+	require.EqualValues(t, "", instance.GtidErrant)
+}
diff --git a/go/vt/vttablet/endtoend/config_test.go b/go/vt/vttablet/endtoend/config_test.go
index 4abf5b36c21..c3ad5f8a9db 100644
--- a/go/vt/vttablet/endtoend/config_test.go
+++ b/go/vt/vttablet/endtoend/config_test.go
@@ -36,7 +36,7 @@ import (
 )
 
 func TestPoolSize(t *testing.T) {
-	revert := changeVar(t, "PoolSize", "1")
+	revert := changeVar(t, "ReadPoolSize", "1")
 	defer revert()
 
 	vstart := framework.DebugVars()
@@ -92,7 +92,7 @@ func TestTxPoolSize(t *testing.T) {
 	defer client2.Rollback()
 	verifyIntValue(t, framework.DebugVars(), "FoundRowsPoolAvailable", framework.FetchInt(vstart, "FoundRowsPoolAvailable")-1)
 
-	revert := changeVar(t, "TxPoolSize", "1")
+	revert := changeVar(t, "TransactionPoolSize", "1")
 	defer revert()
 	vend := framework.DebugVars()
 	verifyIntValue(t, vend, "TransactionPoolAvailable", 0)
diff --git a/go/vt/vttablet/tabletserver/debugenv.go b/go/vt/vttablet/tabletserver/debugenv.go
index 54cf09db7d6..6f1ea854ea9 100644
--- a/go/vt/vttablet/tabletserver/debugenv.go
+++ b/go/vt/vttablet/tabletserver/debugenv.go
@@ -23,9 +23,10 @@ import (
 	"html"
 	"net/http"
 	"strconv"
-	"text/template"
 	"time"
 
+	"github.com/google/safehtml/template"
+
 	"vitess.io/vitess/go/acl"
 	"vitess.io/vitess/go/vt/log"
 )
@@ -70,90 +71,131 @@ func debugEnvHandler(tsv *TabletServer, w http.ResponseWriter, r *http.Request)
 		return
 	}
 
+	switch r.Method {
+	case http.MethodPost:
+		handlePost(tsv, w, r)
+	case http.MethodGet:
+		handleGet(tsv, w, r)
+	default:
+		http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
+	}
+}
+
+func handlePost(tsv *TabletServer, w http.ResponseWriter, r *http.Request) {
+	varname := r.FormValue("varname")
+	value := r.FormValue("value")
+
 	var msg string
-	if r.Method == "POST" {
-		varname := r.FormValue("varname")
-		value := r.FormValue("value")
-		setIntVal := func(f func(int)) {
-			ival, err := strconv.Atoi(value)
-			if err != nil {
-				msg = fmt.Sprintf("Failed setting value for %v: %v", varname, err)
-				return
-			}
-			f(ival)
-			msg = fmt.Sprintf("Setting %v to: %v", varname, value)
+	if varname == "" || value == "" {
+		http.Error(w, "Missing varname or value", http.StatusBadRequest)
+		return
+	}
+
+	setIntVal := func(f func(int)) error {
+		ival, err := strconv.Atoi(value)
+		if err != nil {
+			return fmt.Errorf("invalid int value for %v: %v", varname, err)
 		}
-		setIntValCtx := func(f func(context.Context, int) error) {
-			ival, err := strconv.Atoi(value)
-			if err == nil {
-				err = f(r.Context(), ival)
-				if err == nil {
-					msg = fmt.Sprintf("Setting %v to: %v", varname, value)
-					return
-				}
-			}
-			msg = fmt.Sprintf("Failed setting value for %v: %v", varname, err)
+		f(ival)
+		msg = fmt.Sprintf("Setting %v to: %v", varname, value)
+		return nil
+	}
+
+	setIntValCtx := func(f func(context.Context, int) error) error {
+		ival, err := strconv.Atoi(value)
+		if err == nil {
+			err = f(r.Context(), ival)
 		}
-		setInt64Val := func(f func(int64)) {
-			ival, err := strconv.ParseInt(value, 10, 64)
-			if err != nil {
-				msg = fmt.Sprintf("Failed setting value for %v: %v", varname, err)
-				return
-			}
-			f(ival)
-			msg = fmt.Sprintf("Setting %v to: %v", varname, value)
+		if err != nil {
+			return fmt.Errorf("failed setting value for %v: %v", varname, err)
 		}
-		setDurationVal := func(f func(time.Duration)) {
-			durationVal, err := time.ParseDuration(value)
-			if err != nil {
-				msg = fmt.Sprintf("Failed setting value for %v: %v", varname, err)
-				return
-			}
-			f(durationVal)
-			msg = fmt.Sprintf("Setting %v to: %v", varname, value)
+		msg = fmt.Sprintf("Setting %v to: %v", varname, value)
+		return nil
+	}
+
+	setInt64Val := func(f func(int64)) error {
+		ival, err := strconv.ParseInt(value, 10, 64)
+		if err != nil {
+			return fmt.Errorf("invalid int64 value for %v: %v", varname, err)
 		}
-		setFloat64Val := func(f func(float64)) {
-			fval, err := strconv.ParseFloat(value, 64)
-			if err != nil {
-				msg = fmt.Sprintf("Failed setting value for %v: %v", varname, err)
-				return
-			}
-			f(fval)
-			msg = fmt.Sprintf("Setting %v to: %v", varname, value)
+		f(ival)
+		msg = fmt.Sprintf("Setting %v to: %v", varname, value)
+		return nil
+	}
+
+	setDurationVal := func(f func(time.Duration)) error {
+		durationVal, err := time.ParseDuration(value)
+		if err != nil {
+			return fmt.Errorf("invalid duration value for %v: %v", varname, err)
 		}
-		switch varname {
-		case "PoolSize":
-			setIntValCtx(tsv.SetPoolSize)
-		case "StreamPoolSize":
-			setIntValCtx(tsv.SetStreamPoolSize)
-		case "TxPoolSize":
-			setIntValCtx(tsv.SetTxPoolSize)
-		case "MaxResultSize":
-			setIntVal(tsv.SetMaxResultSize)
-		case "WarnResultSize":
-			setIntVal(tsv.SetWarnResultSize)
-		case "RowStreamerMaxInnoDBTrxHistLen":
-			setInt64Val(func(val int64) { tsv.Config().RowStreamer.MaxInnoDBTrxHistLen = val })
-		case "RowStreamerMaxMySQLReplLagSecs":
-			setInt64Val(func(val int64) { tsv.Config().RowStreamer.MaxMySQLReplLagSecs = val })
-		case "UnhealthyThreshold":
-			setDurationVal(func(d time.Duration) { tsv.Config().Healthcheck.UnhealthyThreshold = d })
-			setDurationVal(tsv.hs.SetUnhealthyThreshold)
-			setDurationVal(tsv.sm.SetUnhealthyThreshold)
-		case "ThrottleMetricThreshold":
-			setFloat64Val(tsv.SetThrottleMetricThreshold)
-		case "Consolidator":
-			tsv.SetConsolidatorMode(value)
-			msg = fmt.Sprintf("Setting %v to: %v", varname, value)
+		f(durationVal)
+		msg = fmt.Sprintf("Setting %v to: %v", varname, value)
+		return nil
+	}
+
+	setFloat64Val := func(f func(float64)) error {
+		fval, err := strconv.ParseFloat(value, 64)
+		if err != nil {
+			return fmt.Errorf("invalid float64 value for %v: %v", varname, err)
 		}
+		f(fval)
+		msg = fmt.Sprintf("Setting %v to: %v", varname, value)
+		return nil
+	}
+
+	var err error
+	switch varname {
+	case "ReadPoolSize":
+		err = setIntValCtx(tsv.SetPoolSize)
+	case "StreamPoolSize":
+		err = setIntValCtx(tsv.SetStreamPoolSize)
+	case "TransactionPoolSize":
+		err = setIntValCtx(tsv.SetTxPoolSize)
+	case "MaxResultSize":
+		err = setIntVal(tsv.SetMaxResultSize)
+	case "WarnResultSize":
+		err = setIntVal(tsv.SetWarnResultSize)
+	case "RowStreamerMaxInnoDBTrxHistLen":
+		err = setInt64Val(func(val int64) { tsv.Config().RowStreamer.MaxInnoDBTrxHistLen = val })
+	case "RowStreamerMaxMySQLReplLagSecs":
+		err = setInt64Val(func(val int64) { tsv.Config().RowStreamer.MaxMySQLReplLagSecs = val })
+	case "UnhealthyThreshold":
+		err = setDurationVal(func(d time.Duration) { tsv.Config().Healthcheck.UnhealthyThreshold = d })
+	case "ThrottleMetricThreshold":
+		err = setFloat64Val(tsv.SetThrottleMetricThreshold)
+	case "Consolidator":
+		tsv.SetConsolidatorMode(value)
+		msg = fmt.Sprintf("Setting %v to: %v", varname, value)
+	}
+
+	if err != nil {
+		http.Error(w, err.Error(), http.StatusBadRequest)
+		return
 	}
 
+	vars := getVars(tsv)
+	sendResponse(r, w, vars, msg)
+}
+
+func handleGet(tsv *TabletServer, w http.ResponseWriter, r *http.Request) {
+	vars := getVars(tsv)
+	sendResponse(r, w, vars, "")
+}
+
+func sendResponse(r *http.Request, w http.ResponseWriter, vars []envValue, msg string) {
+	format := r.FormValue("format")
+	if format == "json" {
+		respondWithJSON(w, vars, msg)
+		return
+	}
+	respondWithHTML(w, vars, msg)
+}
+
+func getVars(tsv *TabletServer) []envValue {
 	var vars []envValue
-	vars = addVar(vars, "PoolSize", tsv.PoolSize)
+	vars = addVar(vars, "ReadPoolSize", tsv.PoolSize)
 	vars = addVar(vars, "StreamPoolSize", tsv.StreamPoolSize)
-	vars = addVar(vars, "TxPoolSize", tsv.TxPoolSize)
-	vars = addVar(vars, "QueryCacheCapacity", tsv.QueryPlanCacheCap) // QueryCacheCapacity is deprecated in v21, it is replaced by QueryEnginePlanCacheCapacity
-	vars = addVar(vars, "QueryEnginePlanCacheCapacity", tsv.QueryPlanCacheCap)
+	vars = addVar(vars, "TransactionPoolSize", tsv.TxPoolSize)
 	vars = addVar(vars, "MaxResultSize", tsv.MaxResultSize)
 	vars = addVar(vars, "WarnResultSize", tsv.WarnResultSize)
 	vars = addVar(vars, "RowStreamerMaxInnoDBTrxHistLen", func() int64 { return tsv.Config().RowStreamer.MaxInnoDBTrxHistLen })
@@ -165,18 +207,22 @@ func debugEnvHandler(tsv *TabletServer, w http.ResponseWriter, r *http.Request)
 		Value: tsv.ConsolidatorMode(),
 	})
 
-	format := r.FormValue("format")
-	if format == "json" {
-		mvars := make(map[string]string)
-		for _, v := range vars {
-			mvars[v.Name] = v.Value
-		}
-		w.Header().Set("Content-Type", "application/json")
-		_ = json.NewEncoder(w).Encode(mvars)
-		return
+	return vars
+}
+
+func respondWithJSON(w http.ResponseWriter, vars []envValue, msg string) {
+	mvars := make(map[string]string)
+	for _, v := range vars {
+		mvars[v.Name] = v.Value
 	}
+	if msg != "" {
+		mvars["ResponseMessage"] = msg
+	}
+	w.Header().Set("Content-Type", "application/json")
+	_ = json.NewEncoder(w).Encode(mvars)
+}
 
-	// gridTable is reused from twopcz.go.
+func respondWithHTML(w http.ResponseWriter, vars []envValue, msg string) {
 	w.Write(gridTable)
 	w.Write([]byte("<h3>Internal Variables</h3>\n"))
 	if msg != "" {
diff --git a/go/vt/vttablet/tabletserver/querylogz.go b/go/vt/vttablet/tabletserver/querylogz.go
index 33341d1641b..09f375aa329 100644
--- a/go/vt/vttablet/tabletserver/querylogz.go
+++ b/go/vt/vttablet/tabletserver/querylogz.go
@@ -20,9 +20,10 @@ import (
 	"net/http"
 	"strconv"
 	"strings"
-	"text/template"
 	"time"
 
+	"github.com/google/safehtml/template"
+
 	"vitess.io/vitess/go/acl"
 	"vitess.io/vitess/go/vt/log"
 	"vitess.io/vitess/go/vt/logz"
diff --git a/go/vt/vttablet/tabletserver/querylogz_test.go b/go/vt/vttablet/tabletserver/querylogz_test.go
index 25f03c762c7..ee26437f330 100644
--- a/go/vt/vttablet/tabletserver/querylogz_test.go
+++ b/go/vt/vttablet/tabletserver/querylogz_test.go
@@ -37,7 +37,7 @@ func TestQuerylogzHandler(t *testing.T) {
 	req, _ := http.NewRequest("GET", "/querylogz?timeout=10&limit=1", nil)
 	logStats := tabletenv.NewLogStats(context.Background(), "Execute")
 	logStats.PlanType = planbuilder.PlanSelect.String()
-	logStats.OriginalSQL = "select name from test_table limit 1000"
+	logStats.OriginalSQL = "select name, 'inject <script>alert();</script>' from test_table limit 1000"
 	logStats.RowsAffected = 1000
 	logStats.NumberOfQueries = 1
 	logStats.StartTime, _ = time.Parse("Jan 2 15:04:05", "Nov 29 13:33:09")
@@ -64,7 +64,7 @@ func TestQuerylogzHandler(t *testing.T) {
 		`<td>0.001</td>`,
 		`<td>1e-08</td>`,
 		`<td>Select</td>`,
-		`<td>select name from test_table limit 1000</td>`,
+		regexp.QuoteMeta(`<td>select name,​ &#39;inject &lt;script&gt;alert()​;&lt;/script&gt;&#39; from test_table limit 1000</td>`),
 		`<td>1</td>`,
 		`<td>none</td>`,
 		`<td>1000</td>`,
@@ -95,7 +95,7 @@ func TestQuerylogzHandler(t *testing.T) {
 		`<td>0.001</td>`,
 		`<td>1e-08</td>`,
 		`<td>Select</td>`,
-		`<td>select name from test_table limit 1000</td>`,
+		regexp.QuoteMeta(`<td>select name,​ &#39;inject &lt;script&gt;alert()​;&lt;/script&gt;&#39; from test_table limit 1000</td>`),
 		`<td>1</td>`,
 		`<td>none</td>`,
 		`<td>1000</td>`,
@@ -126,7 +126,7 @@ func TestQuerylogzHandler(t *testing.T) {
 		`<td>0.001</td>`,
 		`<td>1e-08</td>`,
 		`<td>Select</td>`,
-		`<td>select name from test_table limit 1000</td>`,
+		regexp.QuoteMeta(`<td>select name,​ &#39;inject &lt;script&gt;alert()​;&lt;/script&gt;&#39; from test_table limit 1000</td>`),
 		`<td>1</td>`,
 		`<td>none</td>`,
 		`<td>1000</td>`,
diff --git a/proto/vtadmin.proto b/proto/vtadmin.proto
index 78f086ec345..963d1fa5779 100644
--- a/proto/vtadmin.proto
+++ b/proto/vtadmin.proto
@@ -388,7 +388,11 @@ message WorkflowSwitchTrafficRequest {
 
 message ApplySchemaRequest {
     string cluster_id = 1;
-    vtctldata.ApplySchemaRequest request = 2;
+    // Request.Sql will be overriden by this Sql field.
+    string sql = 2;
+    // Request.CallerId will be overriden by this CallerId field.
+    string caller_id = 3;
+    vtctldata.ApplySchemaRequest request = 4;
 }
 
 message CancelSchemaMigrationRequest {
diff --git a/test/config.json b/test/config.json
index c911232ce74..1e278546c7a 100644
--- a/test/config.json
+++ b/test/config.json
@@ -1238,6 +1238,17 @@
 			"RetryMax": 1,
 			"Tags": []
 		},
+		"vtop_example": {
+			"File": "",
+			"Args": [],
+			"Command": [
+				"test/vtop_example.sh"
+			],
+			"Manual": false,
+			"Shard": "",
+			"RetryMax": 1,
+			"Tags": []
+		},
 		"vtorc_primary_failure": {
 			"File": "unused.go",
 			"Args": ["vitess.io/vitess/go/test/endtoend/vtorc/primaryfailure"],
diff --git a/test/vtop_example.sh b/test/vtop_example.sh
index 5ff90a2be7e..c537c0f844c 100755
--- a/test/vtop_example.sh
+++ b/test/vtop_example.sh
@@ -482,11 +482,12 @@ EOF
   waitForKeyspaceToBeServing customer 80- 1
 }
 
+kind delete cluster --name kind || true
 
 # Build the docker image for vitess/lite using the local code
 docker build -f docker/lite/Dockerfile -t vitess/lite:pr .
 # Build the docker image for vitess/vtadmin using the local code
-docker build -f docker/binaries/vtadmin/Dockerfile --build-arg VT_BASE_VER=pr -t vitess/vtadmin:pr .
+docker build -f docker/binaries/vtadmin/Dockerfile --build-arg VT_BASE_VER=pr -t vitess/vtadmin:pr ./docker/binaries/vtadmin
 
 # Print the docker images available
 docker image ls
diff --git a/tools/get_kubectl_kind.sh b/tools/get_kubectl_kind.sh
index 57df414fdd8..169b120aaa0 100755
--- a/tools/get_kubectl_kind.sh
+++ b/tools/get_kubectl_kind.sh
@@ -12,7 +12,7 @@ source build.env
 mkdir -p "$VTROOT/bin"
 cd "$VTROOT/bin"
 
-KUBE_VERSION="${KUBE_VERSION:-v1.21.1}"
+KUBE_VERSION="${KUBE_VERSION:-v1.31.0}"
 KUBERNETES_RELEASE_URL="${KUBERNETES_RELEASE_URL:-https://dl.k8s.io}"
 
 # Download kubectl if needed.
@@ -28,7 +28,7 @@ ln -sf "kubectl-${KUBE_VERSION}" kubectl
 if ! command -v kind &> /dev/null
 then
     echo "Downloading kind..."
-    curl -L https://kind.sigs.k8s.io/dl/v0.12.0/kind-linux-amd64 > "kind"
+    curl -L https://kind.sigs.k8s.io/dl/v0.22.0/kind-linux-amd64 > "kind"
     chmod +x "kind"
     echo "Installed kind"
 else
diff --git a/tools/map-shard-for-value/Makefile b/tools/map-shard-for-value/Makefile
new file mode 100644
index 00000000000..61bc88ac0ed
--- /dev/null
+++ b/tools/map-shard-for-value/Makefile
@@ -0,0 +1,22 @@
+# Copyright 2024 The Vitess Authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+build:
+	go build map-shard-for-value.go
+
+test:
+	echo "1\n-1\n99" | go run map-shard-for-value.go --total_shards=4 --vindex=xxhash
+
+clean:
+	rm -f map-shard-for-value
diff --git a/tools/map-shard-for-value/map-shard-for-value.go b/tools/map-shard-for-value/map-shard-for-value.go
new file mode 100755
index 00000000000..18a092d1371
--- /dev/null
+++ b/tools/map-shard-for-value/map-shard-for-value.go
@@ -0,0 +1,207 @@
+/*
+Copyright 2024 The Vitess Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+	http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package main
+
+import (
+	"bufio"
+	"context"
+	"encoding/hex"
+	"fmt"
+	"log"
+	"os"
+	"strconv"
+	"strings"
+
+	flag "github.com/spf13/pflag"
+
+	"vitess.io/vitess/go/vt/topo"
+
+	"vitess.io/vitess/go/sqltypes"
+	"vitess.io/vitess/go/vt/key"
+	"vitess.io/vitess/go/vt/proto/topodata"
+	"vitess.io/vitess/go/vt/vtgate/vindexes"
+)
+
+/*
+ * This tool reads a list of values from stdin and prints the
+ * corresponding keyspace ID and shard for each value. It uses the given vindex
+ * and shard ranges to determine the shard. The vindex is expected to be a
+ * single-column vindex. The shard ranges are specified as a comma-separated
+ * list of key ranges, example "-80,80-".
+ * If you have uniformly distributed shards, you can specify the total number
+ * of shards using the -total_shards flag, and the tool will generate the shard ranges
+ * using the same logic as the Vitess operator does (using the key.GenerateShardRanges() function).
+ *
+ * Example usage:
+ * echo "1\n2\n3" | go run shard-from-id.go -vindex=hash -shards=-80,80-
+ *
+ * Currently tested only for integer values and hash/xxhash vindexes.
+ */
+
+func mapShard(allShards []*topodata.ShardReference, ksid key.DestinationKeyspaceID) (string, error) {
+	foundShard := ""
+	addShard := func(shard string) error {
+		foundShard = shard
+		return nil
+	}
+	if err := ksid.Resolve(allShards, addShard); err != nil {
+		return "", fmt.Errorf("failed to resolve keyspace ID: %v:: %s", ksid.String(), err)
+	}
+
+	if foundShard == "" {
+		return "", fmt.Errorf("no shard found for keyspace ID: %v", ksid)
+	}
+	return foundShard, nil
+}
+
+func selectShard(vindex vindexes.Vindex, value sqltypes.Value, allShards []*topodata.ShardReference) (string, key.DestinationKeyspaceID, error) {
+	ctx := context.Background()
+
+	destinations, err := vindexes.Map(ctx, vindex, nil, [][]sqltypes.Value{{value}})
+	if err != nil {
+		return "", nil, fmt.Errorf("failed to map value to keyspace ID: %w", err)
+	}
+
+	if len(destinations) != 1 {
+		return "", nil, fmt.Errorf("unexpected number of destinations: %d", len(destinations))
+	}
+
+	ksid, ok := destinations[0].(key.DestinationKeyspaceID)
+	if !ok {
+		return "", nil, fmt.Errorf("unexpected destination type: %T", destinations[0])
+	}
+
+	foundShard, err := mapShard(allShards, ksid)
+	if err != nil {
+		return "", nil, fmt.Errorf("failed to map shard, original value %v, keyspace id %s: %w", value, ksid, err)
+	}
+	return foundShard, ksid, nil
+}
+
+func getValue(valueStr, valueType string) (sqltypes.Value, error) {
+	var value sqltypes.Value
+
+	switch valueType {
+	case "int":
+		valueInt, err := strconv.ParseInt(valueStr, 10, 64)
+		if err != nil {
+			return value, fmt.Errorf("failed to parse int value: %w", err)
+		}
+		value = sqltypes.NewInt64(valueInt)
+	case "uint":
+		valueUint, err := strconv.ParseUint(valueStr, 10, 64)
+		if err != nil {
+			return value, fmt.Errorf("failed to parse uint value: %w", err)
+		}
+		value = sqltypes.NewUint64(valueUint)
+	case "string":
+		value = sqltypes.NewVarChar(valueStr)
+	default:
+		return value, fmt.Errorf("unsupported value type: %s", valueType)
+	}
+
+	return value, nil
+}
+
+func getShardMap(shardsCSV *string) []*topodata.ShardReference {
+	var allShards []*topodata.ShardReference
+
+	for _, shard := range strings.Split(*shardsCSV, ",") {
+		_, keyRange, err := topo.ValidateShardName(shard)
+		if err != nil {
+			log.Fatalf("invalid shard range: %s", shard)
+		}
+		allShards = append(allShards, &topodata.ShardReference{
+			Name:     shard,
+			KeyRange: keyRange,
+		})
+	}
+	return allShards
+}
+
+type output struct {
+	Value      string
+	KeyspaceID string
+	Shard      string
+}
+
+func processValues(scanner *bufio.Scanner, shardsCSV *string, vindexName string, valueType string) ([]output, error) {
+	allShards := getShardMap(shardsCSV)
+
+	vindex, err := vindexes.CreateVindex(vindexName, vindexName, nil)
+	if err != nil {
+		return nil, fmt.Errorf("failed to create vindex: %v", err)
+	}
+	var outputs []output
+	for scanner.Scan() {
+		valueStr := scanner.Text()
+		if valueStr == "" {
+			continue
+		}
+		value, err := getValue(valueStr, valueType)
+		if err != nil {
+			return nil, fmt.Errorf("failed to get value for: %v, value_type %s:: %v", valueStr, valueType, err)
+		}
+		shard, ksid, err := selectShard(vindex, value, allShards)
+		if err != nil {
+			// ignore errors so that we can go ahead with the computation for other values
+			continue
+		}
+		outputs = append(outputs, output{Value: valueStr, KeyspaceID: hex.EncodeToString(ksid), Shard: shard})
+	}
+	return outputs, nil
+}
+
+func printOutput(outputs []output) {
+	fmt.Println("value,keyspaceID,shard")
+	for _, output := range outputs {
+		fmt.Printf("%s,%s,%s\n", output.Value, output.KeyspaceID, output.Shard)
+	}
+}
+
+func main() {
+	// Explicitly configuring the logger since it was flaky in displaying logs locally without this.
+	log.SetOutput(os.Stderr)
+	log.SetFlags(log.LstdFlags)
+	log.SetPrefix("LOG: ")
+
+	vindexName := flag.String("vindex", "xxhash", "name of the vindex")
+	shardsCSV := flag.String("shards", "", "comma-separated list of shard ranges")
+	totalShards := flag.Int("total_shards", 0, "total number of uniformly distributed shards")
+	valueType := flag.String("value_type", "int", "type of the value (int, uint, or string)")
+	flag.Parse()
+
+	if *totalShards > 0 {
+		if *shardsCSV != "" {
+			log.Fatalf("cannot specify both total_shards and shards")
+		}
+		shardArr, err := key.GenerateShardRanges(*totalShards)
+		if err != nil {
+			log.Fatalf("failed to generate shard ranges: %v", err)
+		}
+		*shardsCSV = strings.Join(shardArr, ",")
+	}
+	if *shardsCSV == "" {
+		log.Fatal("shards or total_shards must be specified")
+	}
+	scanner := bufio.NewScanner(os.Stdin)
+	outputs, err := processValues(scanner, shardsCSV, *vindexName, *valueType)
+	if err != nil {
+		log.Fatalf("failed to process values: %v", err)
+	}
+	printOutput(outputs)
+}
diff --git a/tools/map-shard-for-value/map-shard-for-value.md b/tools/map-shard-for-value/map-shard-for-value.md
new file mode 100644
index 00000000000..17daf7f5fe5
--- /dev/null
+++ b/tools/map-shard-for-value/map-shard-for-value.md
@@ -0,0 +1,47 @@
+## Map Shard for Value Tool
+
+### Overview
+
+The `map-shard-for-value` tool maps a given value to a specific shard. This tool helps in determining
+which shard a particular value belongs to, based on the vindex algorithm and shard ranges.
+
+### Features
+- 
+
+- Allows specifying the vindex type (e.g., `hash`, `xxhash`).
+- Allows specifying the shard list of (for uniformly distributed shard ranges) the total number of shards to generate.
+- Designed as a _filter_: Reads input values from `stdin` and outputs the corresponding shard information, so it can be
+  used to map values from a file or another program.
+
+### Usage
+
+```sh
+make build
+```
+
+```sh
+echo "1\n-1\n99" | ./map-shard-for-value --total_shards=4 --vindex=xxhash
+value,keyspaceID,shard
+1,d46405367612b4b7,c0-
+-1,d8e2a6a7c8c7623d,c0-
+99,200533312244abca,-40
+
+echo "1\n-1\n99" | ./map-shard-for-value --vindex=hash --shards="-80,80-"
+value,keyspaceID,shard
+1,166b40b44aba4bd6,-80
+-1,355550b2150e2451,-80
+99,2c40ad56f4593c47,-80
+```
+
+#### Flags
+
+- `--vindex`: Specifies the name of the vindex to use (e.g., `hash`, `xxhash`) (default `xxhash`)
+
+One (and only one) of these is required:
+
+- `--shards`: Comma-separated list of shard ranges
+- `--total_shards`: Total number of shards, only if shards are uniformly distributed
+
+Optional:
+- `--value_type`: Type of the value to map, one of int, uint, string (default `int`)
+
diff --git a/tools/map-shard-for-value/map-shard-for-value_test.go b/tools/map-shard-for-value/map-shard-for-value_test.go
new file mode 100644
index 00000000000..ca014818bb9
--- /dev/null
+++ b/tools/map-shard-for-value/map-shard-for-value_test.go
@@ -0,0 +1,90 @@
+/*
+Copyright 2024 The Vitess Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+	http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package main
+
+import (
+	"bufio"
+	"fmt"
+	"strings"
+	"testing"
+
+	"github.com/stretchr/testify/require"
+)
+
+func TestProcess(t *testing.T) {
+	type testCase struct {
+		name       string
+		shardsCSV  string
+		vindexType string
+		values     []int
+		valueType  string
+		expected   []output
+	}
+	testCases := []testCase{
+		{
+			name:       "hash,2 shards",
+			shardsCSV:  "-80,80-",
+			vindexType: "hash",
+			values:     []int{1, 99},
+			valueType:  "int",
+			expected: []output{
+				{
+					Value:      "1",
+					KeyspaceID: "166b40b44aba4bd6",
+					Shard:      "-80",
+				},
+				{
+					Value:      "99",
+					KeyspaceID: "2c40ad56f4593c47",
+					Shard:      "-80",
+				},
+			},
+		},
+		{
+			name:       "xxhash,4 shards",
+			shardsCSV:  "-40,40-80,80-c0,c0-",
+			vindexType: "xxhash",
+			values:     []int{1, 99},
+			valueType:  "int",
+			expected: []output{
+				{
+					Value:      "1",
+					KeyspaceID: "d46405367612b4b7",
+					Shard:      "c0-",
+				},
+				{
+					Value:      "99",
+					KeyspaceID: "200533312244abca",
+					Shard:      "-40",
+				},
+			},
+		},
+	}
+	for _, tc := range testCases {
+		t.Run(tc.name, func(t *testing.T) {
+			var input strings.Builder
+			for _, num := range tc.values {
+				fmt.Fprintf(&input, "%d\n", num)
+			}
+			reader := strings.NewReader(input.String())
+			scanner := bufio.NewScanner(reader)
+			got, err := processValues(scanner, &tc.shardsCSV, tc.vindexType, tc.valueType)
+			require.NoError(t, err)
+			require.EqualValues(t, tc.expected, got)
+		})
+	}
+}
diff --git a/web/vtadmin/src/api/http.ts b/web/vtadmin/src/api/http.ts
index 3f75330d240..674df961ef0 100644
--- a/web/vtadmin/src/api/http.ts
+++ b/web/vtadmin/src/api/http.ts
@@ -1068,3 +1068,41 @@ export const showVDiff = async ({ clusterID, request }: ShowVDiffParams) => {
 
     return vtadmin.VDiffShowResponse.create(result);
 };
+
+export const fetchSchemaMigrations = async (request: vtadmin.IGetSchemaMigrationsRequest) => {
+    const { result } = await vtfetch(`/api/migrations/`, {
+        body: JSON.stringify(request),
+        method: 'post',
+    });
+
+    const err = vtadmin.GetSchemaMigrationsResponse.verify(result);
+    if (err) throw Error(err);
+
+    return vtadmin.GetSchemaMigrationsResponse.create(result);
+};
+
+export interface ApplySchemaParams {
+    clusterID: string;
+    keyspace: string;
+    callerID: string;
+    sql: string;
+    request: vtctldata.IApplySchemaRequest;
+}
+
+export const applySchema = async ({ clusterID, keyspace, callerID, sql, request }: ApplySchemaParams) => {
+    const body = {
+        sql,
+        caller_id: callerID,
+        request,
+    };
+
+    const { result } = await vtfetch(`/api/migration/${clusterID}/${keyspace}`, {
+        body: JSON.stringify(body),
+        method: 'post',
+    });
+
+    const err = vtctldata.ApplySchemaResponse.verify(result);
+    if (err) throw Error(err);
+
+    return vtctldata.ApplySchemaResponse.create(result);
+};
diff --git a/web/vtadmin/src/components/App.tsx b/web/vtadmin/src/components/App.tsx
index ef27a35dc95..3bb41ea35f0 100644
--- a/web/vtadmin/src/components/App.tsx
+++ b/web/vtadmin/src/components/App.tsx
@@ -45,6 +45,8 @@ import { Transactions } from './routes/Transactions';
 import { Transaction } from './routes/transaction/Transaction';
 import { CreateReshard } from './routes/createWorkflow/CreateReshard';
 import { CreateMaterialize } from './routes/createWorkflow/CreateMaterialize';
+import { SchemaMigrations } from './routes/SchemaMigrations';
+import { CreateSchemaMigration } from './routes/createSchemaMigration/CreateSchemaMigration';
 
 export const App = () => {
     return (
@@ -140,6 +142,16 @@ export const App = () => {
                             <Workflow />
                         </Route>
 
+                        <Route exact path="/migrations">
+                            <SchemaMigrations />
+                        </Route>
+
+                        {!isReadOnlyMode() && (
+                            <Route exact path="/migrations/create">
+                                <CreateSchemaMigration />
+                            </Route>
+                        )}
+
                         <Route path="/transactions">
                             <Transactions />
                         </Route>
diff --git a/web/vtadmin/src/components/NavRail.tsx b/web/vtadmin/src/components/NavRail.tsx
index 9f9e1bf1681..b30cd165684 100644
--- a/web/vtadmin/src/components/NavRail.tsx
+++ b/web/vtadmin/src/components/NavRail.tsx
@@ -65,6 +65,9 @@ export const NavRail = () => {
                 </ul>
 
                 <ul className={style.navList}>
+                    <li>
+                        <NavRailLink hotkey="M" text="Migrations" to="/migrations" />
+                    </li>
                     <li>
                         <NavRailLink hotkey="T" text="Transactions" to="/transactions" />
                     </li>
diff --git a/web/vtadmin/src/components/routes/createWorkflow/ErrorDialog.tsx b/web/vtadmin/src/components/dialog/ErrorDialog.tsx
similarity index 94%
rename from web/vtadmin/src/components/routes/createWorkflow/ErrorDialog.tsx
rename to web/vtadmin/src/components/dialog/ErrorDialog.tsx
index 25ac5dedb0b..087876e4cd2 100644
--- a/web/vtadmin/src/components/routes/createWorkflow/ErrorDialog.tsx
+++ b/web/vtadmin/src/components/dialog/ErrorDialog.tsx
@@ -14,8 +14,8 @@
  * limitations under the License.
  */
 import React from 'react';
-import Dialog from '../../dialog/Dialog';
-import { Icon, Icons } from '../../Icon';
+import Dialog from './Dialog';
+import { Icon, Icons } from '../Icon';
 
 export interface ErrorDialogProps {
     errorTitle?: string;
diff --git a/web/vtadmin/src/components/routes/SchemaMigrations.tsx b/web/vtadmin/src/components/routes/SchemaMigrations.tsx
new file mode 100644
index 00000000000..1761d26de49
--- /dev/null
+++ b/web/vtadmin/src/components/routes/SchemaMigrations.tsx
@@ -0,0 +1,195 @@
+/**
+ * Copyright 2024 The Vitess Authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import { useEffect, useState } from 'react';
+import { useKeyspaces, useSchemaMigrations } from '../../hooks/api';
+import { DataCell } from '../dataTable/DataCell';
+import { DataTable } from '../dataTable/DataTable';
+import { ContentContainer } from '../layout/ContentContainer';
+import { WorkspaceHeader } from '../layout/WorkspaceHeader';
+import { WorkspaceTitle } from '../layout/WorkspaceTitle';
+import { QueryLoadingPlaceholder } from '../placeholders/QueryLoadingPlaceholder';
+import { useDocumentTitle } from '../../hooks/useDocumentTitle';
+import { vtadmin } from '../../proto/vtadmin';
+import { Select } from '../inputs/Select';
+import { ShardLink } from '../links/ShardLink';
+import { formatDateTime } from '../../util/time';
+import { ReadOnlyGate } from '../ReadOnlyGate';
+import { formatSchemaMigrationStatus } from '../../util/schemaMigrations';
+import { Link } from 'react-router-dom';
+import { TabletLink } from '../links/TabletLink';
+import { formatAlias } from '../../util/tablets';
+import { useURLQuery } from '../../hooks/useURLQuery';
+
+const COLUMNS = ['UUID', 'Status', 'DDL Action', 'Timestamps', 'Stage', 'Progress'];
+
+export const SchemaMigrations = () => {
+    useDocumentTitle('Schema Migrations');
+
+    const { query, replaceQuery } = useURLQuery();
+    const urlKeyspace = query['keyspace'];
+    const urlCluster = query['cluster'];
+
+    const keyspacesQuery = useKeyspaces();
+    const { data: keyspaces = [], ...ksQuery } = keyspacesQuery;
+
+    const [selectedKeyspace, setSelectedKeypsace] = useState<vtadmin.Keyspace | null | undefined>();
+
+    const request: vtadmin.IGetSchemaMigrationsRequest = {
+        cluster_requests: [
+            {
+                cluster_id: selectedKeyspace && selectedKeyspace.cluster?.id,
+                request: {
+                    keyspace: selectedKeyspace && selectedKeyspace.keyspace?.name,
+                },
+            },
+        ],
+    };
+
+    const schemaMigrationsQuery = useSchemaMigrations(request, {
+        enabled: !!selectedKeyspace,
+    });
+
+    const schemaMigrations = schemaMigrationsQuery.data ? schemaMigrationsQuery.data.schema_migrations : [];
+
+    const handleKeyspaceChange = (ks: vtadmin.Keyspace | null | undefined) => {
+        setSelectedKeypsace(ks);
+
+        if (ks) {
+            replaceQuery({ keyspace: ks.keyspace?.name, cluster: ks.cluster?.id });
+        } else {
+            replaceQuery({ keyspace: undefined, cluster: undefined });
+        }
+    };
+
+    useEffect(() => {
+        if (urlKeyspace && urlCluster) {
+            const keyspace = keyspaces.find(
+                (ks) => ks.cluster?.id === String(urlCluster) && ks.keyspace?.name === String(urlKeyspace)
+            );
+
+            if (keyspace) {
+                setSelectedKeypsace(keyspace);
+            } else if (!ksQuery.isLoading) {
+                replaceQuery({ keyspace: undefined, cluster: undefined });
+            }
+        } else {
+            setSelectedKeypsace(undefined);
+        }
+    }, [urlKeyspace, urlCluster, keyspaces, ksQuery.isLoading, replaceQuery]);
+
+    const renderRows = (rows: vtadmin.ISchemaMigration[]) => {
+        return rows.map((row) => {
+            const migrationInfo = row.schema_migration;
+
+            if (!migrationInfo) return <></>;
+
+            return (
+                <tr key={migrationInfo.uuid}>
+                    <DataCell>
+                        <div>{migrationInfo.uuid}</div>
+                        <div className="text-sm text-secondary">
+                            Tablet{' '}
+                            <TabletLink alias={formatAlias(migrationInfo.tablet)} clusterID={row.cluster?.id}>
+                                {formatAlias(migrationInfo.tablet)}
+                            </TabletLink>
+                        </div>
+                        <div className="text-sm text-secondary">
+                            Shard{' '}
+                            <ShardLink
+                                clusterID={row.cluster?.id}
+                                keyspace={migrationInfo.keyspace}
+                                shard={migrationInfo.shard}
+                            >
+                                {`${migrationInfo.keyspace}/${migrationInfo.shard}`}
+                            </ShardLink>
+                        </div>
+                    </DataCell>
+                    <DataCell>
+                        <div>{formatSchemaMigrationStatus(migrationInfo)}</div>
+                    </DataCell>
+                    <DataCell>{migrationInfo.ddl_action ? migrationInfo.ddl_action : '-'}</DataCell>
+                    <DataCell className="items-end flex flex-col">
+                        {migrationInfo.added_at && (
+                            <div className="text-sm font-sans whitespace-nowrap">
+                                <span className="text-secondary">Added </span>
+                                {formatDateTime(migrationInfo.added_at?.seconds)}
+                            </div>
+                        )}
+                        {migrationInfo.requested_at && (
+                            <div className="text-sm font-sans whitespace-nowrap">
+                                <span className="text-secondary">Requested </span>
+                                {formatDateTime(migrationInfo.requested_at?.seconds)}
+                            </div>
+                        )}
+                        {migrationInfo.started_at && (
+                            <div className="text-sm font-sans whitespace-nowrap">
+                                <span className="text-secondary">Started </span>
+                                {formatDateTime(migrationInfo.started_at?.seconds)}
+                            </div>
+                        )}
+                        {migrationInfo.completed_at && (
+                            <div className="text-sm font-sans whitespace-nowrap">
+                                <span className="text-secondary">Completed </span>
+                                {formatDateTime(migrationInfo.completed_at?.seconds)}
+                            </div>
+                        )}
+                    </DataCell>
+                    <DataCell>{migrationInfo.stage ? migrationInfo.stage : '-'}</DataCell>
+                    <DataCell>{migrationInfo.progress ? `${migrationInfo.progress}%` : '-'}</DataCell>
+                </tr>
+            );
+        });
+    };
+
+    return (
+        <div>
+            <WorkspaceHeader className="mb-0">
+                <div className="flex items-top justify-between">
+                    <WorkspaceTitle>Schema Migrations</WorkspaceTitle>
+                    <ReadOnlyGate>
+                        <div>
+                            <Link className="btn btn-secondary btn-md" to="/migrations/create">
+                                Create Schema Migration Request
+                            </Link>
+                        </div>
+                    </ReadOnlyGate>
+                </div>
+            </WorkspaceHeader>
+
+            <ContentContainer>
+                <div className="max-w-[740px]">
+                    <Select
+                        className="block grow-1 min-w-[400px]"
+                        disabled={keyspacesQuery.isLoading}
+                        inputClassName="block w-full"
+                        items={keyspaces}
+                        label="Keyspace"
+                        onChange={(ks) => handleKeyspaceChange(ks)}
+                        placeholder={
+                            keyspacesQuery.isLoading
+                                ? 'Loading keyspaces...'
+                                : 'Select a keyspace to view schema migrations'
+                        }
+                        renderItem={(ks) => `${ks?.keyspace?.name} (${ks?.cluster?.id})`}
+                        selectedItem={selectedKeyspace}
+                    />
+                </div>
+                <DataTable columns={COLUMNS} data={schemaMigrations} renderRows={renderRows} />
+                <QueryLoadingPlaceholder query={schemaMigrationsQuery} />
+            </ContentContainer>
+        </div>
+    );
+};
diff --git a/web/vtadmin/src/components/routes/createSchemaMigration/CreateSchemaMigration.module.scss b/web/vtadmin/src/components/routes/createSchemaMigration/CreateSchemaMigration.module.scss
new file mode 100644
index 00000000000..51f5fdca04e
--- /dev/null
+++ b/web/vtadmin/src/components/routes/createSchemaMigration/CreateSchemaMigration.module.scss
@@ -0,0 +1,30 @@
+/**
+ * Copyright 2024 The Vitess Authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+.sqlInput {
+    border: solid 2px var(--colorDisabled);
+    border-radius: 6px;
+    display: block;
+    font-family: var(--fontFamilyMonospace);
+    line-height: var(--lineHeightBody);
+    padding: 0.8rem;
+    resize: vertical;
+    width: 100%;
+}
+
+.sqlInput:focus {
+    border-color: var(--colorPrimary);
+    outline: none;
+}
diff --git a/web/vtadmin/src/components/routes/createSchemaMigration/CreateSchemaMigration.tsx b/web/vtadmin/src/components/routes/createSchemaMigration/CreateSchemaMigration.tsx
new file mode 100644
index 00000000000..0f7326d2ae1
--- /dev/null
+++ b/web/vtadmin/src/components/routes/createSchemaMigration/CreateSchemaMigration.tsx
@@ -0,0 +1,270 @@
+/**
+ * Copyright 2024 The Vitess Authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import { useEffect, useState } from 'react';
+import { Link, useHistory } from 'react-router-dom';
+
+import style from './CreateSchemaMigration.module.scss';
+import { useApplySchema, useClusters, useKeyspaces } from '../../../hooks/api';
+import { useDocumentTitle } from '../../../hooks/useDocumentTitle';
+import { Label } from '../../inputs/Label';
+import { Select } from '../../inputs/Select';
+import { ContentContainer } from '../../layout/ContentContainer';
+import { NavCrumbs } from '../../layout/NavCrumbs';
+import { WorkspaceHeader } from '../../layout/WorkspaceHeader';
+import { WorkspaceTitle } from '../../layout/WorkspaceTitle';
+import { TextInput } from '../../TextInput';
+import { success } from '../../Snackbar';
+import { FormError } from '../../forms/FormError';
+import { vtadmin } from '../../../proto/vtadmin';
+import ErrorDialog from '../../dialog/ErrorDialog';
+
+interface FormData {
+    clusterID: string;
+    keyspace: string;
+    ddlStrategy: string;
+    sql: string;
+    batchSize: number;
+    callerID: string;
+    migrationContext: string;
+    uuidList: string;
+}
+
+const DEFAULT_FORM_DATA: FormData = {
+    clusterID: '',
+    keyspace: '',
+    // Default DDL Strategy set to "vitess".
+    ddlStrategy: 'vitess',
+    sql: '',
+    batchSize: 0,
+    callerID: '',
+    migrationContext: '',
+    uuidList: '',
+};
+
+const DDL_STRATEGY_HELP_TEXT = `Online DDL strategy, compatible with @@ddl_strategy session variable (default "vitess")`;
+
+const MIGRATION_CONTEXT_HELP_TEXT =
+    'For Online DDL, optionally supply a custom unique string used as context for the migration(s) in this command. By default a unique context is auto-generated by Vitess.';
+
+const CALLER_ID_HELP_TEXT =
+    'Effective caller ID used for the operation and should map to an ACL name which grants this identity the necessary permissions to perform the operation (this is only necessary when strict table ACLs are used)';
+
+export const CreateSchemaMigration = () => {
+    useDocumentTitle('Create Schema Migration Request');
+
+    const history = useHistory();
+
+    const [formData, setFormData] = useState<FormData>(DEFAULT_FORM_DATA);
+
+    const [clusterKeyspaces, setClusterKeyspaces] = useState<vtadmin.Keyspace[]>([]);
+
+    const [errorDialogOpen, setErrorDialogOpen] = useState<boolean>(false);
+
+    const { data: clusters = [], ...clustersQuery } = useClusters();
+
+    const { data: keyspaces = [], ...keyspacesQuery } = useKeyspaces();
+
+    const mutation = useApplySchema(
+        {
+            clusterID: formData.clusterID,
+            keyspace: formData.keyspace,
+            callerID: formData.callerID,
+            sql: formData.sql,
+            request: {
+                ddl_strategy: formData.ddlStrategy,
+                batch_size: formData.batchSize,
+                migration_context: formData.migrationContext,
+                uuid_list: (formData.uuidList && formData.uuidList.split(',').map((uuid) => uuid.trim())) || undefined,
+            },
+        },
+        {
+            onSuccess: () => {
+                success(`Successfully created schema migration request.`, { autoClose: 1600 });
+
+                history.push({
+                    pathname: `/migrations`,
+                    search: `?keyspace=${formData.keyspace}&cluster=${formData.clusterID}`,
+                });
+            },
+            onError: () => {
+                setErrorDialogOpen(true);
+            },
+        }
+    );
+
+    let selectedCluster = null;
+    if (!!formData.clusterID) {
+        selectedCluster = clusters.find((c) => c.id === formData.clusterID);
+    }
+
+    let selectedKeyspace = null;
+    if (!!formData.keyspace) {
+        selectedKeyspace = keyspaces.find((ks) => ks.keyspace?.name === formData.keyspace);
+    }
+
+    const isValid = !!selectedCluster && !!formData.keyspace && !!formData.sql && !!formData.ddlStrategy;
+
+    const isDisabled = !isValid || mutation.isLoading;
+
+    const onSubmit: React.FormEventHandler<HTMLFormElement> = (e) => {
+        e.preventDefault();
+        mutation.mutate();
+    };
+
+    useEffect(() => {
+        // Clear out the selected keyspaces if selected cluster is changed.
+        setFormData((prevFormData) => ({ ...prevFormData, keyspace: '' }));
+        setClusterKeyspaces(keyspaces.filter((ks) => ks.cluster?.id === formData.clusterID));
+    }, [formData.clusterID, keyspaces]);
+
+    useEffect(() => {
+        if (clusters.length === 1) {
+            setFormData((prevFormData) => ({ ...prevFormData, clusterID: clusters[0].id }));
+        }
+    }, [clusters]);
+
+    return (
+        <div>
+            <WorkspaceHeader>
+                <NavCrumbs>
+                    <Link to="/migrations">Schema Migrations</Link>
+                </NavCrumbs>
+
+                <WorkspaceTitle>Create Schema Migration Request</WorkspaceTitle>
+            </WorkspaceHeader>
+
+            <ContentContainer>
+                <form onSubmit={onSubmit}>
+                    <div className="flex flex-row gap-4 flex-wrap mb-2">
+                        <Select
+                            className="block min-w-[300px]"
+                            disabled={keyspacesQuery.isLoading || !selectedCluster}
+                            inputClassName="block w-full"
+                            itemToString={(ks) => ks?.keyspace?.name || ''}
+                            items={clusterKeyspaces}
+                            label="Keyspace"
+                            onChange={(ks) => setFormData({ ...formData, keyspace: ks?.keyspace?.name || '' })}
+                            placeholder={keyspacesQuery.isLoading ? 'Loading keyspaces...' : 'Select a keyspace'}
+                            renderItem={(ks) => `${ks?.keyspace?.name}`}
+                            selectedItem={selectedKeyspace}
+                        />
+                        <Label
+                            className="block grow min-w-[300px]"
+                            label="DDL Strategy"
+                            helpText={DDL_STRATEGY_HELP_TEXT}
+                        >
+                            <TextInput
+                                onChange={(e) => setFormData({ ...formData, ddlStrategy: e.target.value })}
+                                value={formData.ddlStrategy}
+                            />
+                        </Label>
+                    </div>
+
+                    <Label label="SQL" helpText={'Semicolon-delimited, repeatable SQL commands to apply.'}>
+                        <textarea
+                            className={style.sqlInput}
+                            onChange={(e) => setFormData({ ...formData, sql: e.target.value })}
+                            rows={10}
+                            value={formData.sql}
+                        />
+                    </Label>
+
+                    <h3 className="mt-8 mb-2">Advanced</h3>
+
+                    <div className="flex flex-row gap-4 flex-wrap">
+                        <Label
+                            className="block grow min-w-[300px]"
+                            label="Batch Size"
+                            helpText={
+                                'How many queries to batch together. Only applicable when all queries are CREATE TABLE|VIEW'
+                            }
+                        >
+                            <TextInput
+                                onChange={(e) => setFormData({ ...formData, batchSize: Number(e.target.value) })}
+                                type="number"
+                                value={formData.batchSize}
+                            />
+                        </Label>
+                        <Label className="block grow min-w-[300px]" label="Caller ID" helpText={CALLER_ID_HELP_TEXT}>
+                            <TextInput
+                                onChange={(e) => setFormData({ ...formData, callerID: e.target.value })}
+                                value={formData.callerID}
+                            />
+                        </Label>
+                        <Select
+                            className="block grow min-w-[300px]"
+                            disabled={clustersQuery.isLoading}
+                            inputClassName="block w-full"
+                            itemToString={(cluster) => cluster?.name || ''}
+                            items={clusters}
+                            label="Cluster"
+                            onChange={(c) => setFormData({ ...formData, clusterID: c?.id || '' })}
+                            placeholder={clustersQuery.isLoading ? 'Loading clusters...' : 'Select a cluster'}
+                            renderItem={(c) => `${c?.name} (${c?.id})`}
+                            selectedItem={selectedCluster}
+                        />
+                        <Label
+                            className="block grow min-w-[300px]"
+                            label="UUIDs"
+                            helpText={
+                                'Optional, comma-delimited, repeatable, explicit UUIDs for migration. If given, must match number of DDL changes.'
+                            }
+                        >
+                            <TextInput
+                                onChange={(e) => setFormData({ ...formData, uuidList: e.target.value })}
+                                value={formData.uuidList || ''}
+                            />
+                        </Label>
+                        <Label
+                            className="block grow min-w-[300px]"
+                            label="Migration Context"
+                            helpText={MIGRATION_CONTEXT_HELP_TEXT}
+                        >
+                            <TextInput
+                                onChange={(e) => setFormData({ ...formData, migrationContext: e.target.value })}
+                                value={formData.migrationContext || ''}
+                            />
+                        </Label>
+                    </div>
+
+                    {clustersQuery.isError && (
+                        <FormError
+                            error={clustersQuery.error}
+                            title="Couldn't load clusters. Please reload the page to try again."
+                        />
+                    )}
+
+                    <div className="my-8">
+                        <button className="btn" disabled={isDisabled} type="submit">
+                            {mutation.isLoading ? 'Creating...' : 'Create Schema Migration Request'}
+                        </button>
+                    </div>
+                </form>
+
+                {mutation.isError && !mutation.isLoading && (
+                    <ErrorDialog
+                        errorDescription={mutation.error.message}
+                        errorTitle="Error Creating Schema Migration Request"
+                        isOpen={errorDialogOpen}
+                        onClose={() => {
+                            setErrorDialogOpen(false);
+                        }}
+                    />
+                )}
+            </ContentContainer>
+        </div>
+    );
+};
diff --git a/web/vtadmin/src/components/routes/createWorkflow/CreateMaterialize.tsx b/web/vtadmin/src/components/routes/createWorkflow/CreateMaterialize.tsx
index c5d688a1fb7..81447cd0e6d 100644
--- a/web/vtadmin/src/components/routes/createWorkflow/CreateMaterialize.tsx
+++ b/web/vtadmin/src/components/routes/createWorkflow/CreateMaterialize.tsx
@@ -31,7 +31,7 @@ import Toggle from '../../toggle/Toggle';
 import { tabletmanagerdata, vtadmin, vtctldata } from '../../../proto/vtadmin';
 import { MultiSelect } from '../../inputs/MultiSelect';
 import { TABLET_TYPES } from '../../../util/tablets';
-import ErrorDialog from './ErrorDialog';
+import ErrorDialog from '../../dialog/ErrorDialog';
 
 interface FormData {
     clusterID: string;
diff --git a/web/vtadmin/src/components/routes/createWorkflow/CreateMoveTables.tsx b/web/vtadmin/src/components/routes/createWorkflow/CreateMoveTables.tsx
index bca84cda4fa..1852d85b848 100644
--- a/web/vtadmin/src/components/routes/createWorkflow/CreateMoveTables.tsx
+++ b/web/vtadmin/src/components/routes/createWorkflow/CreateMoveTables.tsx
@@ -31,7 +31,7 @@ import Toggle from '../../toggle/Toggle';
 import { vtadmin } from '../../../proto/vtadmin';
 import { MultiSelect } from '../../inputs/MultiSelect';
 import { TABLET_TYPES } from '../../../util/tablets';
-import ErrorDialog from './ErrorDialog';
+import ErrorDialog from '../../dialog/ErrorDialog';
 
 interface FormData {
     clusterID: string;
diff --git a/web/vtadmin/src/components/routes/createWorkflow/CreateReshard.tsx b/web/vtadmin/src/components/routes/createWorkflow/CreateReshard.tsx
index 4977c59e46b..05a33825174 100644
--- a/web/vtadmin/src/components/routes/createWorkflow/CreateReshard.tsx
+++ b/web/vtadmin/src/components/routes/createWorkflow/CreateReshard.tsx
@@ -31,7 +31,7 @@ import Toggle from '../../toggle/Toggle';
 import { tabletmanagerdata, vtadmin } from '../../../proto/vtadmin';
 import { MultiSelect } from '../../inputs/MultiSelect';
 import { TABLET_TYPES } from '../../../util/tablets';
-import ErrorDialog from './ErrorDialog';
+import ErrorDialog from '../../dialog/ErrorDialog';
 
 interface FormData {
     clusterID: string;
diff --git a/web/vtadmin/src/hooks/api.ts b/web/vtadmin/src/hooks/api.ts
index 9261f4f0eb0..18ab3b60a53 100644
--- a/web/vtadmin/src/hooks/api.ts
+++ b/web/vtadmin/src/hooks/api.ts
@@ -95,6 +95,8 @@ import {
     showVDiff,
     ShowVDiffParams,
     createMaterialize,
+    fetchSchemaMigrations,
+    applySchema,
 } from '../api/http';
 import { vtadmin as pb, vtctldata } from '../proto/vtadmin';
 import { formatAlias } from '../util/tablets';
@@ -796,3 +798,25 @@ export const useShowVDiff = (
 ) => {
     return useQuery(['vdiff_show', params], () => showVDiff(params), { ...options });
 };
+
+/**
+ * useSchemaMigrations is a query hook that fetches schema migrations.
+ */
+export const useSchemaMigrations = (
+    request: pb.IGetSchemaMigrationsRequest,
+    options?: UseQueryOptions<pb.GetSchemaMigrationsResponse, Error> | undefined
+) => {
+    return useQuery(['migrations', request], () => fetchSchemaMigrations(request), { ...options });
+};
+
+/**
+ * useApplySchema is a mutation query hook that creates ApplySchema request.
+ */
+export const useApplySchema = (
+    params: Parameters<typeof applySchema>[0],
+    options: UseMutationOptions<Awaited<ReturnType<typeof applySchema>>, Error>
+) => {
+    return useMutation<Awaited<ReturnType<typeof applySchema>>, Error>(() => {
+        return applySchema(params);
+    }, options);
+};
diff --git a/web/vtadmin/src/proto/vtadmin.d.ts b/web/vtadmin/src/proto/vtadmin.d.ts
index 95b1448acef..410aaa644ff 100644
--- a/web/vtadmin/src/proto/vtadmin.d.ts
+++ b/web/vtadmin/src/proto/vtadmin.d.ts
@@ -3720,6 +3720,12 @@ export namespace vtadmin {
         /** ApplySchemaRequest cluster_id */
         cluster_id?: (string|null);
 
+        /** ApplySchemaRequest sql */
+        sql?: (string|null);
+
+        /** ApplySchemaRequest caller_id */
+        caller_id?: (string|null);
+
         /** ApplySchemaRequest request */
         request?: (vtctldata.IApplySchemaRequest|null);
     }
@@ -3736,6 +3742,12 @@ export namespace vtadmin {
         /** ApplySchemaRequest cluster_id. */
         public cluster_id: string;
 
+        /** ApplySchemaRequest sql. */
+        public sql: string;
+
+        /** ApplySchemaRequest caller_id. */
+        public caller_id: string;
+
         /** ApplySchemaRequest request. */
         public request?: (vtctldata.IApplySchemaRequest|null);
 
diff --git a/web/vtadmin/src/proto/vtadmin.js b/web/vtadmin/src/proto/vtadmin.js
index f179934c800..b8ab0c1186a 100644
--- a/web/vtadmin/src/proto/vtadmin.js
+++ b/web/vtadmin/src/proto/vtadmin.js
@@ -7838,6 +7838,8 @@ export const vtadmin = $root.vtadmin = (() => {
          * @memberof vtadmin
          * @interface IApplySchemaRequest
          * @property {string|null} [cluster_id] ApplySchemaRequest cluster_id
+         * @property {string|null} [sql] ApplySchemaRequest sql
+         * @property {string|null} [caller_id] ApplySchemaRequest caller_id
          * @property {vtctldata.IApplySchemaRequest|null} [request] ApplySchemaRequest request
          */
 
@@ -7864,6 +7866,22 @@ export const vtadmin = $root.vtadmin = (() => {
          */
         ApplySchemaRequest.prototype.cluster_id = "";
 
+        /**
+         * ApplySchemaRequest sql.
+         * @member {string} sql
+         * @memberof vtadmin.ApplySchemaRequest
+         * @instance
+         */
+        ApplySchemaRequest.prototype.sql = "";
+
+        /**
+         * ApplySchemaRequest caller_id.
+         * @member {string} caller_id
+         * @memberof vtadmin.ApplySchemaRequest
+         * @instance
+         */
+        ApplySchemaRequest.prototype.caller_id = "";
+
         /**
          * ApplySchemaRequest request.
          * @member {vtctldata.IApplySchemaRequest|null|undefined} request
@@ -7898,8 +7916,12 @@ export const vtadmin = $root.vtadmin = (() => {
                 writer = $Writer.create();
             if (message.cluster_id != null && Object.hasOwnProperty.call(message, "cluster_id"))
                 writer.uint32(/* id 1, wireType 2 =*/10).string(message.cluster_id);
+            if (message.sql != null && Object.hasOwnProperty.call(message, "sql"))
+                writer.uint32(/* id 2, wireType 2 =*/18).string(message.sql);
+            if (message.caller_id != null && Object.hasOwnProperty.call(message, "caller_id"))
+                writer.uint32(/* id 3, wireType 2 =*/26).string(message.caller_id);
             if (message.request != null && Object.hasOwnProperty.call(message, "request"))
-                $root.vtctldata.ApplySchemaRequest.encode(message.request, writer.uint32(/* id 2, wireType 2 =*/18).fork()).ldelim();
+                $root.vtctldata.ApplySchemaRequest.encode(message.request, writer.uint32(/* id 4, wireType 2 =*/34).fork()).ldelim();
             return writer;
         };
 
@@ -7939,6 +7961,14 @@ export const vtadmin = $root.vtadmin = (() => {
                         break;
                     }
                 case 2: {
+                        message.sql = reader.string();
+                        break;
+                    }
+                case 3: {
+                        message.caller_id = reader.string();
+                        break;
+                    }
+                case 4: {
                         message.request = $root.vtctldata.ApplySchemaRequest.decode(reader, reader.uint32());
                         break;
                     }
@@ -7980,6 +8010,12 @@ export const vtadmin = $root.vtadmin = (() => {
             if (message.cluster_id != null && message.hasOwnProperty("cluster_id"))
                 if (!$util.isString(message.cluster_id))
                     return "cluster_id: string expected";
+            if (message.sql != null && message.hasOwnProperty("sql"))
+                if (!$util.isString(message.sql))
+                    return "sql: string expected";
+            if (message.caller_id != null && message.hasOwnProperty("caller_id"))
+                if (!$util.isString(message.caller_id))
+                    return "caller_id: string expected";
             if (message.request != null && message.hasOwnProperty("request")) {
                 let error = $root.vtctldata.ApplySchemaRequest.verify(message.request);
                 if (error)
@@ -8002,6 +8038,10 @@ export const vtadmin = $root.vtadmin = (() => {
             let message = new $root.vtadmin.ApplySchemaRequest();
             if (object.cluster_id != null)
                 message.cluster_id = String(object.cluster_id);
+            if (object.sql != null)
+                message.sql = String(object.sql);
+            if (object.caller_id != null)
+                message.caller_id = String(object.caller_id);
             if (object.request != null) {
                 if (typeof object.request !== "object")
                     throw TypeError(".vtadmin.ApplySchemaRequest.request: object expected");
@@ -8025,10 +8065,16 @@ export const vtadmin = $root.vtadmin = (() => {
             let object = {};
             if (options.defaults) {
                 object.cluster_id = "";
+                object.sql = "";
+                object.caller_id = "";
                 object.request = null;
             }
             if (message.cluster_id != null && message.hasOwnProperty("cluster_id"))
                 object.cluster_id = message.cluster_id;
+            if (message.sql != null && message.hasOwnProperty("sql"))
+                object.sql = message.sql;
+            if (message.caller_id != null && message.hasOwnProperty("caller_id"))
+                object.caller_id = message.caller_id;
             if (message.request != null && message.hasOwnProperty("request"))
                 object.request = $root.vtctldata.ApplySchemaRequest.toObject(message.request, options);
             return object;
diff --git a/web/vtadmin/src/util/schemaMigrations.ts b/web/vtadmin/src/util/schemaMigrations.ts
new file mode 100644
index 00000000000..c405c4dbecf
--- /dev/null
+++ b/web/vtadmin/src/util/schemaMigrations.ts
@@ -0,0 +1,31 @@
+/**
+ * Copyright 2024 The Vitess Authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import { invertBy } from 'lodash-es';
+import { vtctldata } from '../proto/vtadmin';
+
+/**
+ * SCHEMA_MIGRATION_STATUS maps numeric schema migration status back to human readable strings.
+ */
+export const SCHEMA_MIGRATION_STATUS = Object.entries(invertBy(vtctldata.SchemaMigration.Status)).reduce(
+    (acc, [k, vs]) => {
+        acc[k] = vs[0];
+        return acc;
+    },
+    {} as { [k: string]: string }
+);
+
+export const formatSchemaMigrationStatus = (schemaMigration: vtctldata.ISchemaMigration) =>
+    schemaMigration.status && SCHEMA_MIGRATION_STATUS[schemaMigration.status];