Skip to content

Commit

Permalink
implementations
Browse files Browse the repository at this point in the history
  • Loading branch information
stuartwdouglas committed Nov 22, 2024
1 parent e4b721f commit ec9f01d
Show file tree
Hide file tree
Showing 12 changed files with 550 additions and 126 deletions.
311 changes: 218 additions & 93 deletions backend/protos/xyz/block/ftl/v1beta1/provisioner/resource.pb.go

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ message Resource {
PostgresResource postgres = 102;
MysqlResource mysql = 103;
ModuleResource module = 104;
MigrationResource migration = 105;
}
}

Expand All @@ -41,6 +42,12 @@ message MysqlResource {
MysqlResourceOutput output = 1;
}

message MigrationResource {
message MigrationResourceOutput {}
MigrationResourceOutput output = 1;
string digest = 2;
}

message ModuleResource {
message ModuleResourceOutput {
string deployment_key = 1;
Expand Down
128 changes: 128 additions & 0 deletions backend/provisioner/migration.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
package provisioner

import (
"archive/tar"
"context"
"database/sql"
"fmt"
"io"
"net/url"
"os"
"path/filepath"

"github.com/amacneil/dbmate/v2/pkg/dbmate"
_ "github.com/go-sql-driver/mysql"

"github.com/TBD54566975/ftl/backend/controller/artefacts"
"github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1beta1/provisioner"
"github.com/TBD54566975/ftl/internal/log"
"github.com/TBD54566975/ftl/internal/sha256"
)

// NewMigrationProvisioner creates a new provisioner that provisions database migrations
func NewMigrationProvisioner(registryConfig artefacts.RegistryConfig) *InMemProvisioner {
return NewEmbeddedProvisioner(map[ResourceType]InMemResourceProvisionerFn{
ResourceTypeMigration: provisionMigration(registryConfig),
})
}

func provisionMigration(registryConfig artefacts.RegistryConfig) func(ctx context.Context, rc *provisioner.ResourceContext, module, id string) (*provisioner.Resource, error) {
return func(ctx context.Context, rc *provisioner.ResourceContext, module, id string) (*provisioner.Resource, error) {
migration, ok := rc.Resource.Resource.(*provisioner.Resource_Migration)
if !ok {
return nil, fmt.Errorf("unexpected resource type: %T", rc.Resource.Resource)
}
if len(rc.Dependencies) != 1 {
return nil, fmt.Errorf("migrations must have exaclyt one dependency, found %v", rc.Dependencies)
}
registry, err := artefacts.NewOCIRegistryStorage(registryConfig)
if err != nil {
return nil, fmt.Errorf("failed to create OCI registry storage: %w", err)
}
parseSHA256, err := sha256.ParseSHA256(rc.Resource.GetMigration().Digest)
if err != nil {
return nil, fmt.Errorf("failed to parse digest %w", err)
}
download, err := registry.Download(ctx, parseSHA256)
if err != nil {
return nil, fmt.Errorf("failed to download migration: %w", err)
}
dir, err := extractTarToTempDir(download)
if err != nil {
return nil, fmt.Errorf("failed to extract tar: %w", err)
}
dsn := ""
driver := ""

resource := rc.Dependencies[0].Resource
switch res := resource.(type) {
case *provisioner.Resource_Postgres:
dsn = res.Postgres.GetOutput().GetWriteDsn()
driver = "pgx"
case *provisioner.Resource_Mysql:
dsn = res.Mysql.GetOutput().GetWriteDsn()
driver = "mysql"
}
u, err := url.Parse(dsn)
if err != nil {
return nil, fmt.Errorf("invalid DSN: %w", err)
}

conn, err := sql.Open(driver, dsn)
if err != nil {
return nil, fmt.Errorf("failed to connect to database: %w", err)
}
defer conn.Close()

db := dbmate.New(u)
db.AutoDumpSchema = false
db.Log = log.FromContext(ctx).Scope("migrate").WriterAt(log.Info)
db.MigrationsDir = []string{dir}
err = db.CreateAndMigrate()
if err != nil {
return nil, fmt.Errorf("failed to create and migrate database: %w", err)
}
migration.Migration = &provisioner.MigrationResource{
Output: &provisioner.MigrationResource_MigrationResourceOutput{},
}
return rc.Resource, nil
}
}

func extractTarToTempDir(tarReader io.Reader) (tempDir string, err error) {
// Create a new tar reader
tr := tar.NewReader(tarReader)

// Create a temporary directory
tempDir, err = os.MkdirTemp("", "extracted")
if err != nil {
return "", fmt.Errorf("failed to create temporary directory: %w", err)
}

// Extract files from the tar archive
for {
header, err := tr.Next()
if err == io.EOF {
break // End of tar archive
}
if err != nil {
return "", fmt.Errorf("failed to read tar header: %w", err)
}

// Construct the full path for the file
targetPath := filepath.Join(tempDir, header.Name)

// Create the file
file, err := os.OpenFile(targetPath, os.O_CREATE|os.O_WRONLY, os.FileMode(header.Mode))
if err != nil {
return "", fmt.Errorf("failed to create file: %w", err)
}
defer file.Close()

// Copy the file content
if _, err := io.Copy(file, tr); err != nil {
return "", fmt.Errorf("failed to copy file content: %w", err)
}
}
return tempDir, nil
}
23 changes: 19 additions & 4 deletions backend/provisioner/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ func resourceEqual(desired, existing *provisioner.Resource) bool {
&provisioner.MysqlResource_MysqlResourceOutput{},
&provisioner.PostgresResource_PostgresResourceOutput{},
&provisioner.ModuleResource_ModuleResourceOutput{},
&provisioner.MigrationResource_MigrationResourceOutput{},
),
)
}
Expand All @@ -201,6 +202,7 @@ func ExtractResources(msg *ftlv1.CreateDeploymentRequest) (*ResourceGraph, error
if err != nil {
return nil, fmt.Errorf("invalid module schema for module %s: %w", msg.Schema.Name, err)
}
edges := make([]*ResourceEdge, 0)

for _, decl := range module.Decls {
if db, ok := decl.(*schema.Database); ok {
Expand All @@ -218,6 +220,20 @@ func ExtractResources(msg *ftlv1.CreateDeploymentRequest) (*ResourceGraph, error
default:
return nil, fmt.Errorf("unknown db type: %s", db.Type)
}
for _, metadata := range db.Metadata {
if migration, ok := metadata.(*schema.MetadataMigration); ok {
id := decl.GetName() + "-migration-" + migration.Digest
deps = append(deps, &provisioner.Resource{
ResourceId: id,
Resource: &provisioner.Resource_Migration{Migration: &provisioner.MigrationResource{Digest: migration.Digest}},
})
edges = append(edges, &ResourceEdge{
from: id,
to: decl.GetName(),
})
}

}
}
}

Expand All @@ -231,12 +247,11 @@ func ExtractResources(msg *ftlv1.CreateDeploymentRequest) (*ResourceGraph, error
},
},
}
edges := make([]*ResourceEdge, len(deps))
for i, dep := range deps {
edges[i] = &ResourceEdge{
for _, dep := range deps {
edges = append(edges, &ResourceEdge{
from: root.ResourceId,
to: dep.ResourceId,
}
})
}

result := &ResourceGraph{
Expand Down
11 changes: 7 additions & 4 deletions backend/provisioner/resource_type.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@ import (
type ResourceType string

const (
ResourceTypeUnknown ResourceType = "unknown"
ResourceTypePostgres ResourceType = "postgres"
ResourceTypeMysql ResourceType = "mysql"
ResourceTypeModule ResourceType = "module"
ResourceTypeUnknown ResourceType = "unknown"
ResourceTypePostgres ResourceType = "postgres"
ResourceTypeMysql ResourceType = "mysql"
ResourceTypeModule ResourceType = "module"
ResourceTypeMigration ResourceType = "migration"
)

// TypeOf returns the resource type of the given resource
Expand All @@ -23,6 +24,8 @@ func TypeOf(r *provisioner.Resource) ResourceType {
return ResourceTypeMysql
case *provisioner.Resource_Postgres:
return ResourceTypePostgres
case *provisioner.Resource_Migration:
return ResourceTypeMigration
default:
return ResourceTypeUnknown
}
Expand Down
10 changes: 10 additions & 0 deletions examples/go/mysql/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,14 @@ require (
github.com/block/scaffolder v1.3.0 // indirect
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
github.com/containerd/log v0.1.0 // indirect
github.com/containerd/stargz-snapshotter/estargz v0.14.3 // indirect
github.com/danieljoos/wincred v1.2.2 // indirect
github.com/deckarep/golang-set/v2 v2.6.0 // indirect
github.com/distribution/reference v0.5.0 // indirect
github.com/docker/cli v27.1.1+incompatible // indirect
github.com/docker/distribution v2.8.2+incompatible // indirect
github.com/docker/docker v27.3.1+incompatible // indirect
github.com/docker/docker-credential-helpers v0.7.0 // indirect
github.com/docker/go-connections v0.5.0 // indirect
github.com/docker/go-units v0.5.0 // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
Expand All @@ -46,6 +50,7 @@ require (
github.com/godbus/dbus/v5 v5.1.0 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/google/go-cmp v0.6.0 // indirect
github.com/google/go-containerregistry v0.20.2 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.23.0 // indirect
github.com/hashicorp/cronexpr v1.1.2 // indirect
Expand All @@ -57,8 +62,10 @@ require (
github.com/jackc/puddle/v2 v2.2.2 // indirect
github.com/jpillora/backoff v1.0.0 // indirect
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 // indirect
github.com/klauspost/compress v1.17.11 // indirect
github.com/lib/pq v1.10.9 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/mitchellh/go-homedir v1.1.0 // indirect
github.com/moby/docker-image-spec v1.3.1 // indirect
github.com/moby/term v0.5.0 // indirect
github.com/morikuni/aec v1.0.0 // indirect
Expand All @@ -67,8 +74,10 @@ require (
github.com/opencontainers/image-spec v1.1.0 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/puzpuzpuz/xsync/v3 v3.4.0 // indirect
github.com/sirupsen/logrus v1.9.3 // indirect
github.com/swaggest/jsonschema-go v0.3.72 // indirect
github.com/swaggest/refl v1.3.0 // indirect
github.com/vbatts/tar-split v0.11.3 // indirect
github.com/zalando/go-keyring v0.2.6 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.57.0 // indirect
go.opentelemetry.io/otel v1.32.0 // indirect
Expand All @@ -93,6 +102,7 @@ require (
google.golang.org/genproto/googleapis/rpc v0.0.0-20241104194629-dd2ea8efbc28 // indirect
google.golang.org/grpc v1.68.0 // indirect
google.golang.org/protobuf v1.35.2 // indirect
oras.land/oras-go/v2 v2.5.0 // indirect
)

replace github.com/TBD54566975/ftl => ../../..
Loading

0 comments on commit ec9f01d

Please sign in to comment.