Skip to content

Commit

Permalink
vtadmin onlineddl endpoints (#15114)
Browse files Browse the repository at this point in the history
  • Loading branch information
Andrew Mason authored Feb 6, 2024
1 parent 45070de commit f432a95
Show file tree
Hide file tree
Showing 16 changed files with 14,928 additions and 6,567 deletions.
7,288 changes: 4,057 additions & 3,231 deletions go/vt/proto/vtadmin/vtadmin.pb.go

Large diffs are not rendered by default.

286 changes: 286 additions & 0 deletions go/vt/proto/vtadmin/vtadmin_grpc.pb.go

Large diffs are not rendered by default.

3,029 changes: 2,511 additions & 518 deletions go/vt/proto/vtadmin/vtadmin_vtproto.pb.go

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions go/vt/proto/vtctlservice/vtctlservice_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

228 changes: 228 additions & 0 deletions go/vt/vtadmin/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,13 @@ func (api *API) Handler() http.Handler {
router.HandleFunc("/keyspace/{cluster_id}/{name}/validate/schema", httpAPI.Adapt(vtadminhttp.ValidateSchemaKeyspace)).Name("API.ValidateSchemaKeyspace").Methods("PUT", "OPTIONS")
router.HandleFunc("/keyspace/{cluster_id}/{name}/validate/version", httpAPI.Adapt(vtadminhttp.ValidateVersionKeyspace)).Name("API.ValidateVersionKeyspace").Methods("PUT", "OPTIONS")
router.HandleFunc("/keyspaces", httpAPI.Adapt(vtadminhttp.GetKeyspaces)).Name("API.GetKeyspaces")
router.HandleFunc("/migration/{cluster_id}/{keyspace}", httpAPI.Adapt(vtadminhttp.ApplySchema)).Name("API.ApplySchema").Methods("POST")
router.HandleFunc("/migration/{cluster_id}/{keyspace}/cancel", httpAPI.Adapt(vtadminhttp.CancelSchemaMigration)).Name("API.CancelSchemaMigration").Methods("PUT", "OPTIONS")
router.HandleFunc("/migration/{cluster_id}/{keyspace}/cleanup", httpAPI.Adapt(vtadminhttp.CleanupSchemaMigration)).Name("API.CleanupSchemaMigration").Methods("PUT", "OPTIONS")
router.HandleFunc("/migration/{cluster_id}/{keyspace}/complete", httpAPI.Adapt(vtadminhttp.CompleteSchemaMigration)).Name("API.CompleteSchemaMigration").Methods("PUT", "OPTIONS")
router.HandleFunc("/migration/{cluster_id}/{keyspace}/launch", httpAPI.Adapt(vtadminhttp.LaunchSchemaMigration)).Name("API.LaunchSchemaMigration").Methods("PUT", "OPTIONS")
router.HandleFunc("/migration/{cluster_id}/{keyspace}/retry", httpAPI.Adapt(vtadminhttp.RetrySchemaMigration)).Name("API.RetrySchemaMigration").Methods("PUT", "OPTIONS")
router.HandleFunc("/migrations/", httpAPI.Adapt(vtadminhttp.GetSchemaMigrations)).Name("API.GetSchemaMigrations")
router.HandleFunc("/schema/{table}", httpAPI.Adapt(vtadminhttp.FindSchema)).Name("API.FindSchema")
router.HandleFunc("/schema/{cluster_id}/{keyspace}/{table}", httpAPI.Adapt(vtadminhttp.GetSchema)).Name("API.GetSchema")
router.HandleFunc("/schemas", httpAPI.Adapt(vtadminhttp.GetSchemas)).Name("API.GetSchemas")
Expand Down Expand Up @@ -438,6 +445,82 @@ func (api *API) EjectDynamicCluster(key string, value any) {
api.clusters = append(api.clusters[:clusterIndex], api.clusters[clusterIndex+1:]...)
}

// ApplySchema is part of the vtadminpb.VTAdminServer interface.
func (api *API) ApplySchema(ctx context.Context, req *vtadminpb.ApplySchemaRequest) (*vtctldatapb.ApplySchemaResponse, error) {
span, ctx := trace.NewSpan(ctx, "API.ApplySchema")
defer span.Finish()

span.Annotate("cluster_id", req.ClusterId)

if !api.authz.IsAuthorized(ctx, req.ClusterId, rbac.SchemaMigrationResource, rbac.CreateAction) {
return nil, fmt.Errorf("%w: cannot create schema migration in %s", errors.ErrUnauthorized, req.ClusterId)
}

c, err := api.getClusterForRequest(req.ClusterId)
if err != nil {
return nil, err
}

return c.ApplySchema(ctx, req.Request)
}

// CancelSchemaMigration is part of the vtadminpb.VTAdminServer interface.
func (api *API) CancelSchemaMigration(ctx context.Context, req *vtadminpb.CancelSchemaMigrationRequest) (*vtctldatapb.CancelSchemaMigrationResponse, error) {
span, ctx := trace.NewSpan(ctx, "API.CancelSchemaMigration")
defer span.Finish()

span.Annotate("cluster_id", req.ClusterId)

if !api.authz.IsAuthorized(ctx, req.ClusterId, rbac.SchemaMigrationResource, rbac.CancelAction) {
return nil, fmt.Errorf("%w: cannot cancel schema migration in %s", errors.ErrUnauthorized, req.ClusterId)
}

c, err := api.getClusterForRequest(req.ClusterId)
if err != nil {
return nil, err
}

return c.CancelSchemaMigration(ctx, req.Request)
}

// CleanupSchemaMigration is part of the vtadminpb.VTAdminServer interface.
func (api *API) CleanupSchemaMigration(ctx context.Context, req *vtadminpb.CleanupSchemaMigrationRequest) (*vtctldatapb.CleanupSchemaMigrationResponse, error) {
span, ctx := trace.NewSpan(ctx, "API.CleanupSchemaMigration")
defer span.Finish()

span.Annotate("cluster_id", req.ClusterId)

if !api.authz.IsAuthorized(ctx, req.ClusterId, rbac.SchemaMigrationResource, rbac.CleanupSchemaMigrationAction) {
return nil, fmt.Errorf("%w: cannot cleanup schema migration in %s", errors.ErrUnauthorized, req.ClusterId)
}

c, err := api.getClusterForRequest(req.ClusterId)
if err != nil {
return nil, err
}

return c.CleanupSchemaMigration(ctx, req.Request)
}

// CompleteSchemaMigration is part of the vtadminpb.VTAdminServer interface.
func (api *API) CompleteSchemaMigration(ctx context.Context, req *vtadminpb.CompleteSchemaMigrationRequest) (*vtctldatapb.CompleteSchemaMigrationResponse, error) {
span, ctx := trace.NewSpan(ctx, "API.CompleteSchemaMigration")
defer span.Finish()

span.Annotate("cluster_id", req.ClusterId)

if !api.authz.IsAuthorized(ctx, req.ClusterId, rbac.SchemaMigrationResource, rbac.CompleteSchemaMigrationAction) {
return nil, fmt.Errorf("%w: cannot complete schema migration in %s", errors.ErrUnauthorized, req.ClusterId)
}

c, err := api.getClusterForRequest(req.ClusterId)
if err != nil {
return nil, err
}

return c.CompleteSchemaMigration(ctx, req.Request)
}

// CreateKeyspace is part of the vtadminpb.VTAdminServer interface.
func (api *API) CreateKeyspace(ctx context.Context, req *vtadminpb.CreateKeyspaceRequest) (*vtadminpb.CreateKeyspaceResponse, error) {
span, ctx := trace.NewSpan(ctx, "API.CreateKeyspace")
Expand Down Expand Up @@ -1021,6 +1104,113 @@ func (api *API) GetSchemas(ctx context.Context, req *vtadminpb.GetSchemasRequest
}, nil
}

// GetSchemaMigrations is part of the vtadminpb.VTAdminServer interface.
func (api *API) GetSchemaMigrations(ctx context.Context, req *vtadminpb.GetSchemaMigrationsRequest) (*vtadminpb.GetSchemaMigrationsResponse, error) {
span, ctx := trace.NewSpan(ctx, "API.GetSchemaMigrations")
defer span.Finish()

clusterIDs := make([]string, 0, len(req.ClusterRequests))
requestsByCluster := make(map[string][]*vtctldatapb.GetSchemaMigrationsRequest, len(req.ClusterRequests))

for _, r := range req.ClusterRequests {
clusterIDs = append(clusterIDs, r.ClusterId)
requestsByCluster[r.ClusterId] = append(requestsByCluster[r.ClusterId], r.Request)
}

clusters, _ := api.getClustersForRequest(clusterIDs)

var (
m sync.Mutex
wg sync.WaitGroup
rec concurrency.AllErrorRecorder
results = make([]*vtadminpb.SchemaMigration, 0, len(req.ClusterRequests))
)

m.Lock()
for _, c := range clusters {
if len(requestsByCluster[c.ID]) == 0 {
wg.Add(1)
go func(ctx context.Context, c *cluster.Cluster) {
defer wg.Done()

span, ctx := trace.NewSpan(ctx, "API.getClusterKeyspaces")
defer span.Finish()

span.Annotate("cluster_id", c.ID)

if !api.authz.IsAuthorized(ctx, c.ID, rbac.SchemaMigrationResource, rbac.GetAction) {
return
}

keyspaces, err := c.GetKeyspaces(ctx)
if err != nil {
rec.RecordError(err)
return
}

m.Lock()
defer m.Unlock()

for _, ks := range keyspaces {
requestsByCluster[c.ID] = append(requestsByCluster[c.ID], &vtctldatapb.GetSchemaMigrationsRequest{
Keyspace: ks.Keyspace.Name,
})
}
}(ctx, c)
}
}
m.Unlock()

wg.Wait()
if rec.HasErrors() {
return nil, rec.Error()
}

for _, c := range clusters {
if requestsByCluster[c.ID] == nil {
continue
}

for _, r := range requestsByCluster[c.ID] {
wg.Add(1)

go func(ctx context.Context, c *cluster.Cluster, r *vtctldatapb.GetSchemaMigrationsRequest) {
defer wg.Done()

span, ctx := trace.NewSpan(ctx, "API.getClusterSchemaMigrations")
defer span.Finish()

span.Annotate("cluster_id", c.ID)

if !api.authz.IsAuthorized(ctx, c.ID, rbac.SchemaMigrationResource, rbac.GetAction) {
return
}

migrations, err := c.GetSchemaMigrations(ctx, r)
if err != nil {
rec.RecordError(err)
return
}

m.Lock()
defer m.Unlock()

results = append(results, migrations...)
}(ctx, c, r)
}
}

wg.Wait()

if rec.HasErrors() {
return nil, rec.Error()
}

return &vtadminpb.GetSchemaMigrationsResponse{
SchemaMigrations: results,
}, nil
}

// GetShardReplicationPositions is part of the vtadminpb.VTAdminServer interface.
func (api *API) GetShardReplicationPositions(ctx context.Context, req *vtadminpb.GetShardReplicationPositionsRequest) (*vtadminpb.GetShardReplicationPositionsResponse, error) {
span, ctx := trace.NewSpan(ctx, "API.GetShardReplicationPositions")
Expand Down Expand Up @@ -1521,6 +1711,25 @@ func (api *API) GetWorkflows(ctx context.Context, req *vtadminpb.GetWorkflowsReq
}, nil
}

// LaunchSchemaMigration is part of the vtadminpb.VTAdminServer interface.
func (api *API) LaunchSchemaMigration(ctx context.Context, req *vtadminpb.LaunchSchemaMigrationRequest) (*vtctldatapb.LaunchSchemaMigrationResponse, error) {
span, ctx := trace.NewSpan(ctx, "API.LaunchSchemaMigration")
defer span.Finish()

span.Annotate("cluster_id", req.ClusterId)

if !api.authz.IsAuthorized(ctx, req.ClusterId, rbac.SchemaMigrationResource, rbac.LaunchSchemaMigrationAction) {
return nil, fmt.Errorf("%w: cannot launch schema migration in %s", errors.ErrUnauthorized, req.ClusterId)
}

c, err := api.getClusterForRequest(req.ClusterId)
if err != nil {
return nil, err
}

return c.LaunchSchemaMigration(ctx, req.Request)
}

// PingTablet is part of the vtadminpb.VTAdminServer interface.
func (api *API) PingTablet(ctx context.Context, req *vtadminpb.PingTabletRequest) (*vtadminpb.PingTabletResponse, error) {
span, ctx := trace.NewSpan(ctx, "API.PingTablet")
Expand Down Expand Up @@ -1728,6 +1937,25 @@ func (api *API) ReloadSchemaShard(ctx context.Context, req *vtadminpb.ReloadSche
}, nil
}

// RetrySchemaMigration is part of the vtadminpb.VTAdminServer interface.
func (api *API) RetrySchemaMigration(ctx context.Context, req *vtadminpb.RetrySchemaMigrationRequest) (*vtctldatapb.RetrySchemaMigrationResponse, error) {
span, ctx := trace.NewSpan(ctx, "API.RetrySchemaMigration")
defer span.Finish()

span.Annotate("cluster_id", req.ClusterId)

if !api.authz.IsAuthorized(ctx, req.ClusterId, rbac.SchemaMigrationResource, rbac.RetryAction) {
return nil, fmt.Errorf("%w: cannot retry schema migration in %s", errors.ErrUnauthorized, req.ClusterId)
}

c, err := api.getClusterForRequest(req.ClusterId)
if err != nil {
return nil, err
}

return c.RetrySchemaMigration(ctx, req.Request)
}

// RunHealthCheck is part of the vtadminpb.VTAdminServer interface.
func (api *API) RunHealthCheck(ctx context.Context, req *vtadminpb.RunHealthCheckRequest) (*vtadminpb.RunHealthCheckResponse, error) {
span, ctx := trace.NewSpan(ctx, "API.RunHealthCheck")
Expand Down
Loading

0 comments on commit f432a95

Please sign in to comment.