Skip to content

Commit

Permalink
implementations
Browse files Browse the repository at this point in the history
  • Loading branch information
stuartwdouglas committed Nov 22, 2024
1 parent e4b721f commit 3f5877b
Show file tree
Hide file tree
Showing 9 changed files with 508 additions and 125 deletions.
311 changes: 218 additions & 93 deletions backend/protos/xyz/block/ftl/v1beta1/provisioner/resource.pb.go

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ message Resource {
PostgresResource postgres = 102;
MysqlResource mysql = 103;
ModuleResource module = 104;
MigrationResource migration = 105;
}
}

Expand All @@ -41,6 +42,12 @@ message MysqlResource {
MysqlResourceOutput output = 1;
}

message MigrationResource {
message MigrationResourceOutput {}
MigrationResourceOutput output = 1;
string digest = 2;
}

message ModuleResource {
message ModuleResourceOutput {
string deployment_key = 1;
Expand Down
128 changes: 128 additions & 0 deletions backend/provisioner/migration.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
package provisioner

import (
"archive/tar"
"context"
"database/sql"
"fmt"
"io"
"net/url"
"os"
"path/filepath"

"github.com/amacneil/dbmate/v2/pkg/dbmate"
_ "github.com/go-sql-driver/mysql"

"github.com/TBD54566975/ftl/backend/controller/artefacts"
"github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1beta1/provisioner"
"github.com/TBD54566975/ftl/internal/log"
"github.com/TBD54566975/ftl/internal/sha256"
)

// NewMigrationProvisioner creates a new provisioner that provisions database migrations
func NewMigrationProvisioner(registryConfig artefacts.RegistryConfig) *InMemProvisioner {
return NewEmbeddedProvisioner(map[ResourceType]InMemResourceProvisionerFn{
ResourceTypeMigration: provisionMigration(registryConfig),
})
}

func provisionMigration(registryConfig artefacts.RegistryConfig) func(ctx context.Context, rc *provisioner.ResourceContext, module, id string) (*provisioner.Resource, error) {
return func(ctx context.Context, rc *provisioner.ResourceContext, module, id string) (*provisioner.Resource, error) {
migration, ok := rc.Resource.Resource.(*provisioner.Resource_Migration)
if !ok {
return nil, fmt.Errorf("unexpected resource type: %T", rc.Resource.Resource)
}
if len(rc.Dependencies) != 1 {
return nil, fmt.Errorf("migrations must have exaclyt one dependency, found %v", rc.Dependencies)
}
registry, err := artefacts.NewOCIRegistryStorage(registryConfig)
if err != nil {
return nil, fmt.Errorf("failed to create OCI registry storage: %w", err)
}
parseSHA256, err := sha256.ParseSHA256(rc.Resource.GetMigration().Digest)
if err != nil {
return nil, fmt.Errorf("failed to parse digest %w", err)
}
download, err := registry.Download(ctx, parseSHA256)
if err != nil {
return nil, fmt.Errorf("failed to download migration: %w", err)
}
dir, err := extractTarToTempDir(download)
if err != nil {
return nil, fmt.Errorf("failed to extract tar: %w", err)
}
dsn := ""
driver := ""

resource := rc.Dependencies[0].Resource
switch res := resource.(type) {
case *provisioner.Resource_Postgres:
dsn = res.Postgres.GetOutput().GetWriteDsn()
driver = "pgx"
case *provisioner.Resource_Mysql:
dsn = res.Mysql.GetOutput().GetWriteDsn()
driver = "mysql"
}
u, err := url.Parse(dsn)
if err != nil {
return nil, fmt.Errorf("invalid DSN: %w", err)
}

conn, err := sql.Open(driver, dsn)
if err != nil {
return nil, fmt.Errorf("failed to connect to database: %w", err)
}
defer conn.Close()

db := dbmate.New(u)
db.AutoDumpSchema = false
db.Log = log.FromContext(ctx).Scope("migrate").WriterAt(log.Info)
db.MigrationsDir = []string{dir}
err = db.CreateAndMigrate()
if err != nil {
return nil, fmt.Errorf("failed to create and migrate database: %w", err)
}
migration.Migration = &provisioner.MigrationResource{
Output: &provisioner.MigrationResource_MigrationResourceOutput{},
}
return rc.Resource, nil
}
}

func extractTarToTempDir(tarReader io.Reader) (tempDir string, err error) {
// Create a new tar reader
tr := tar.NewReader(tarReader)

// Create a temporary directory
tempDir, err = os.MkdirTemp("", "extracted")
if err != nil {
return "", fmt.Errorf("failed to create temporary directory: %w", err)
}

// Extract files from the tar archive
for {
header, err := tr.Next()
if err == io.EOF {
break // End of tar archive
}
if err != nil {
return "", fmt.Errorf("failed to read tar header: %w", err)
}

// Construct the full path for the file
targetPath := filepath.Join(tempDir, header.Name)

// Create the file
file, err := os.OpenFile(targetPath, os.O_CREATE|os.O_WRONLY, os.FileMode(header.Mode))
if err != nil {
return "", fmt.Errorf("failed to create file: %w", err)
}
defer file.Close()

// Copy the file content
if _, err := io.Copy(file, tr); err != nil {
return "", fmt.Errorf("failed to copy file content: %w", err)
}
}
return tempDir, nil
}
23 changes: 19 additions & 4 deletions backend/provisioner/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ func resourceEqual(desired, existing *provisioner.Resource) bool {
&provisioner.MysqlResource_MysqlResourceOutput{},
&provisioner.PostgresResource_PostgresResourceOutput{},
&provisioner.ModuleResource_ModuleResourceOutput{},
&provisioner.MigrationResource_MigrationResourceOutput{},
),
)
}
Expand All @@ -201,6 +202,7 @@ func ExtractResources(msg *ftlv1.CreateDeploymentRequest) (*ResourceGraph, error
if err != nil {
return nil, fmt.Errorf("invalid module schema for module %s: %w", msg.Schema.Name, err)
}
edges := make([]*ResourceEdge, 0)

for _, decl := range module.Decls {
if db, ok := decl.(*schema.Database); ok {
Expand All @@ -218,6 +220,20 @@ func ExtractResources(msg *ftlv1.CreateDeploymentRequest) (*ResourceGraph, error
default:
return nil, fmt.Errorf("unknown db type: %s", db.Type)
}
for _, metadata := range db.Metadata {
if migration, ok := metadata.(*schema.MetadataMigration); ok {
id := decl.GetName() + "-migration-" + migration.Digest
deps = append(deps, &provisioner.Resource{
ResourceId: id,
Resource: &provisioner.Resource_Migration{Migration: &provisioner.MigrationResource{Digest: migration.Digest}},
})
edges = append(edges, &ResourceEdge{
from: id,
to: decl.GetName(),
})
}

}
}
}

Expand All @@ -231,12 +247,11 @@ func ExtractResources(msg *ftlv1.CreateDeploymentRequest) (*ResourceGraph, error
},
},
}
edges := make([]*ResourceEdge, len(deps))
for i, dep := range deps {
edges[i] = &ResourceEdge{
for _, dep := range deps {
edges = append(edges, &ResourceEdge{
from: root.ResourceId,
to: dep.ResourceId,
}
})
}

result := &ResourceGraph{
Expand Down
11 changes: 7 additions & 4 deletions backend/provisioner/resource_type.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@ import (
type ResourceType string

const (
ResourceTypeUnknown ResourceType = "unknown"
ResourceTypePostgres ResourceType = "postgres"
ResourceTypeMysql ResourceType = "mysql"
ResourceTypeModule ResourceType = "module"
ResourceTypeUnknown ResourceType = "unknown"
ResourceTypePostgres ResourceType = "postgres"
ResourceTypeMysql ResourceType = "mysql"
ResourceTypeModule ResourceType = "module"
ResourceTypeMigration ResourceType = "migration"
)

// TypeOf returns the resource type of the given resource
Expand All @@ -23,6 +24,8 @@ func TypeOf(r *provisioner.Resource) ResourceType {
return ResourceTypeMysql
case *provisioner.Resource_Postgres:
return ResourceTypePostgres
case *provisioner.Resource_Migration:
return ResourceTypeMigration
default:
return ResourceTypeUnknown
}
Expand Down
23 changes: 15 additions & 8 deletions frontend/cli/cmd_serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,12 +243,19 @@ func (s *serveCommonConfig) run(
provisionerCtx := log.ContextWithLogger(ctx, logger.Scope(scope))

// default local dev provisioner
registry := &provisioner.ProvisionerRegistry{
Provisioners: []*provisioner.ProvisionerBinding{{
Provisioner: provisioner.NewControllerProvisioner(controllerClient),
Types: []provisioner.ResourceType{provisioner.ResourceTypeModule},
ID: "controller",
}},
provisionerRegistry := &provisioner.ProvisionerRegistry{
Provisioners: []*provisioner.ProvisionerBinding{
{
Provisioner: provisioner.NewControllerProvisioner(controllerClient),
Types: []provisioner.ResourceType{provisioner.ResourceTypeModule},
ID: "controller",
},
{
Provisioner: provisioner.NewMigrationProvisioner(registry),
Types: []provisioner.ResourceType{provisioner.ResourceTypeMigration},
ID: "migration",
},
},
Default: &provisioner.ProvisionerBinding{
Provisioner: provisioner.NewDevProvisioner(s.DBPort, s.MysqlPort),
ID: "dev",
Expand All @@ -261,11 +268,11 @@ func (s *serveCommonConfig) run(
if err != nil {
return fmt.Errorf("failed to create provisioner registry: %w", err)
}
registry = r
provisionerRegistry = r
}

wg.Go(func() error {
if err := provisioner.Start(provisionerCtx, config, registry, controllerClient, schemaClient); err != nil {
if err := provisioner.Start(provisionerCtx, config, provisionerRegistry, controllerClient, schemaClient); err != nil {
logger.Errorf(err, "provisioner%d failed: %v", i, err)
return fmt.Errorf("provisioner%d failed: %w", i, err)
}
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 3f5877b

Please sign in to comment.