Skip to content

Commit

Permalink
feat: MySQL support
Browse files Browse the repository at this point in the history
  • Loading branch information
stuartwdouglas committed Nov 19, 2024
1 parent 8148db9 commit 6eb0235
Show file tree
Hide file tree
Showing 38 changed files with 398 additions and 159 deletions.
22 changes: 18 additions & 4 deletions backend/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ type Config struct {
}

func (c *Config) SetDefaults() {
if err := kong.ApplyDefaults(c, kong.Vars{"dsn": dsn.DSN("ftl")}); err != nil {
if err := kong.ApplyDefaults(c, kong.Vars{"dsn": dsn.PostgresDSN("ftl")}); err != nil {
panic(err)
}
if c.Advertise == nil {
Expand Down Expand Up @@ -747,6 +747,21 @@ func (s *Service) GetModuleContext(ctx context.Context, req *connect.Request[ftl
// Initialize checksum to -1; a zero checksum does occur when the context contains no settings
lastChecksum := int64(-1)

dbTypes := map[string]modulecontext.DBType{}
deps, err := s.dal.GetActiveDeployments(ctx)
if err != nil {
return connect.NewError(connect.CodeInternal, fmt.Errorf("could not get deployments: %w", err))
}
for _, dep := range deps {
if dep.Module == name {
for _, decl := range dep.Schema.Decls {
if db, ok := decl.(*schema.Database); ok {
dbTypes[db.Name] = modulecontext.DBTypeFromString(db.Type)
}
}
break
}
}
for {
h := sha.New()

Expand All @@ -758,11 +773,10 @@ func (s *Service) GetModuleContext(ctx context.Context, req *connect.Request[ftl
if err != nil {
return connect.NewError(connect.CodeInternal, fmt.Errorf("could not get secrets: %w", err))
}
databases, err := modulecontext.DatabasesFromSecrets(ctx, name, secrets)
databases, err := modulecontext.DatabasesFromSecrets(ctx, name, secrets, dbTypes)
if err != nil {
return connect.NewError(connect.CodeInternal, fmt.Errorf("could not get databases: %w", err))
}

if err := hashConfigurationMap(h, configs); err != nil {
return connect.NewError(connect.CodeInternal, fmt.Errorf("could not detect change on configs: %w", err))
}
Expand Down Expand Up @@ -1091,7 +1105,7 @@ func (s *Service) CreateDeployment(ctx context.Context, req *connect.Request[ftl
if err := s.sm.Set(ctx, configuration.NewRef(module.Name, key), db.Runtime.DSN); err != nil {
return nil, fmt.Errorf("could not set database secret %s: %w", key, err)
}
logger.Infof("Database declaration: %s -> %s", db.Name, db.Runtime.DSN)
logger.Infof("Database declaration: %s -> %s type %s", db.Name, db.Runtime.DSN, db.Type)
}
}

Expand Down
13 changes: 11 additions & 2 deletions backend/controller/dsn/dsn.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,20 @@ func Host(host string) Option {
}
}

// DSN returns a DSN string for connecting to the FTL Controller PG database.
func DSN(dbName string, options ...Option) string {
// PostgresDSN returns a PostgresDSN string for connecting to the FTL Controller PG database.
func PostgresDSN(dbName string, options ...Option) string {
opts := &dsnOptions{port: 15432, host: "127.0.0.1"}
for _, opt := range options {
opt(opts)
}
return fmt.Sprintf("postgres://%s:%d/%s?sslmode=disable&user=postgres&password=secret", opts.host, opts.port, dbName)
}

// MySQLDSN returns a MySQLDSN string for connecting to the local MySQL database.
func MySQLDSN(dbName string, options ...Option) string {
opts := &dsnOptions{port: 13306, host: "127.0.0.1"}
for _, opt := range options {
opt(opts)
}
return fmt.Sprintf("root:secret@tcp(%s:%d)/%s?allowNativePasswords=True", opts.host, opts.port, dbName)
}
2 changes: 1 addition & 1 deletion backend/controller/sql/sqltest/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func OpenForTesting(ctx context.Context, t testing.TB) *sql.DB {
assert.NoError(t, err)
t.Cleanup(func() { _ = release() }) //nolint:errcheck

conn, err := databasetesting.CreateForDevel(ctx, dsn.DSN("ftl-test"), true)
conn, err := databasetesting.CreateForDevel(ctx, dsn.PostgresDSN("ftl-test"), true)
assert.NoError(t, err)
t.Cleanup(func() { assert.NoError(t, conn.Close()) })
return conn
Expand Down
66 changes: 35 additions & 31 deletions backend/protos/xyz/block/ftl/v1/module.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions backend/protos/xyz/block/ftl/v1/module.proto
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ message ModuleContextResponse {

enum DBType {
POSTGRES = 0;
MYSQL = 1;
}

message DSN {
Expand Down
20 changes: 17 additions & 3 deletions backend/provisioner/controller_provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ func NewControllerProvisioner(client ftlv1connect.ControllerServiceClient) *InMe
logger.Infof("provisioning module: %s", module)

for _, dep := range rc.Dependencies {
if psql, ok := dep.Resource.(*provisioner.Resource_Postgres); ok {
if psql.Postgres == nil || psql.Postgres.Output == nil {
switch r := dep.Resource.(type) {
case *provisioner.Resource_Postgres:
if r.Postgres == nil || r.Postgres.Output == nil {
return nil, fmt.Errorf("postgres resource has not been provisioned")
}

Expand All @@ -35,8 +36,21 @@ func NewControllerProvisioner(client ftlv1connect.ControllerServiceClient) *InMe
return nil, fmt.Errorf("failed to find database declaration: %w", err)
}
decl.Runtime = &schemapb.DatabaseRuntime{
Dsn: psql.Postgres.Output.WriteDsn,
Dsn: r.Postgres.Output.WriteDsn,
}
case *provisioner.Resource_Mysql:
if r.Mysql == nil || r.Mysql.Output == nil {
return nil, fmt.Errorf("mysql resource has not been provisioned")
}

decl, err := findDBDecl(dep.ResourceId, mod.Module.Schema)
if err != nil {
return nil, fmt.Errorf("failed to find database declaration: %w", err)
}
decl.Runtime = &schemapb.DatabaseRuntime{
Dsn: r.Mysql.Output.WriteDsn,
}
default:
}
}

Expand Down
131 changes: 90 additions & 41 deletions backend/provisioner/dev_provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (

"github.com/XSAM/otelsql"

_ "github.com/go-sql-driver/mysql"

"github.com/TBD54566975/ftl/backend/controller/dsn"
"github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1beta1/provisioner"
"github.com/TBD54566975/ftl/internal/dev"
Expand All @@ -14,54 +16,101 @@ import (
)

// NewDevProvisioner creates a new provisioner that provisions resources locally when running FTL in dev mode
func NewDevProvisioner(postgresPort int) *InMemProvisioner {
var postgresDSN string
func NewDevProvisioner(postgresPort int, mysqlPort int) *InMemProvisioner {
return NewEmbeddedProvisioner(map[ResourceType]InMemResourceProvisionerFn{
ResourceTypePostgres: func(ctx context.Context, rc *provisioner.ResourceContext, module, id string) (*provisioner.Resource, error) {
pg, ok := rc.Resource.Resource.(*provisioner.Resource_Postgres)
if !ok {
panic(fmt.Errorf("unexpected resource type: %T", rc.Resource.Resource))
}
logger := log.FromContext(ctx)
logger.Infof("provisioning postgres database: %s_%s", module, id)
ResourceTypePostgres: provisionPostgres(postgresPort),
ResourceTypeMysql: provisionMysql(mysqlPort),
})
}

dbName := strcase.ToLowerCamel(module) + "_" + strcase.ToLowerCamel(id)
func provisionMysql(mysqlPort int) InMemResourceProvisionerFn {
return func(ctx context.Context, rc *provisioner.ResourceContext, module, id string) (*provisioner.Resource, error) {
mysql, ok := rc.Resource.Resource.(*provisioner.Resource_Mysql)
if !ok {
panic(fmt.Errorf("unexpected resource type: %T", rc.Resource.Resource))
}
logger := log.FromContext(ctx)
logger.Infof("provisioning mysql database: %s_%s", module, id)

if postgresDSN == "" {
// We assume that the DB hsas already been started when running in dev mode
pdsn, err := dev.WaitForDBReady(ctx, postgresPort)
if err != nil {
return nil, fmt.Errorf("failed to wait for postgres to be ready: %w", err)
}
postgresDSN = pdsn
}
conn, err := otelsql.Open("pgx", postgresDSN)
dbName := strcase.ToLowerCamel(module) + "_" + strcase.ToLowerCamel(id)

// We assume that the DB hsas already been started when running in dev mode
mysqlDSN, err := dev.SetupMySQL(ctx, "mysql:8.4.3", mysqlPort, false)
if err != nil {
return nil, fmt.Errorf("failed to wait for mysql to be ready: %w", err)
}
conn, err := otelsql.Open("mysql", mysqlDSN)
if err != nil {
return nil, fmt.Errorf("failed to connect to mysql: %w", err)
}
defer conn.Close()

res, err := conn.Query("SELECT SCHEMA_NAME FROM INFORMATION_SCHEMA.SCHEMATA WHERE SCHEMA_NAME = ?", dbName)
if err != nil {
return nil, fmt.Errorf("failed to query database: %w", err)
}
defer res.Close()
if !res.Next() {
_, err = conn.ExecContext(ctx, "CREATE DATABASE "+dbName)
if err != nil {
return nil, fmt.Errorf("failed to connect to postgres: %w", err)
return nil, fmt.Errorf("failed to create database: %w", err)
}
defer conn.Close()
}

if mysql.Mysql == nil {
mysql.Mysql = &provisioner.MysqlResource{}
}
dsn := dsn.MySQLDSN(dbName, dsn.Port(mysqlPort))
mysql.Mysql.Output = &provisioner.MysqlResource_MysqlResourceOutput{
WriteDsn: dsn,
ReadDsn: dsn,
}
return rc.Resource, nil
}
}

func provisionPostgres(postgresPort int) func(ctx context.Context, rc *provisioner.ResourceContext, module string, id string) (*provisioner.Resource, error) {
return func(ctx context.Context, rc *provisioner.ResourceContext, module, id string) (*provisioner.Resource, error) {
pg, ok := rc.Resource.Resource.(*provisioner.Resource_Postgres)
if !ok {
panic(fmt.Errorf("unexpected resource type: %T", rc.Resource.Resource))
}
logger := log.FromContext(ctx)
logger.Infof("provisioning postgres database: %s_%s", module, id)

dbName := strcase.ToLowerCamel(module) + "_" + strcase.ToLowerCamel(id)

// We assume that the DB hsas already been started when running in dev mode
postgresDSN, err := dev.WaitForPostgresReady(ctx, postgresPort)
if err != nil {
return nil, fmt.Errorf("failed to wait for postgres to be ready: %w", err)
}
conn, err := otelsql.Open("pgx", postgresDSN)
if err != nil {
return nil, fmt.Errorf("failed to connect to postgres: %w", err)
}
defer conn.Close()

res, err := conn.Query("SELECT * FROM pg_catalog.pg_database WHERE datname=$1", dbName)
res, err := conn.Query("SELECT * FROM pg_catalog.pg_database WHERE datname=$1", dbName)
if err != nil {
return nil, fmt.Errorf("failed to query database: %w", err)
}
defer res.Close()
if !res.Next() {
_, err = conn.ExecContext(ctx, "CREATE DATABASE "+dbName)
if err != nil {
return nil, fmt.Errorf("failed to query database: %w", err)
}
defer res.Close()
if !res.Next() {
_, err = conn.ExecContext(ctx, "CREATE DATABASE "+dbName)
if err != nil {
return nil, fmt.Errorf("failed to create database: %w", err)
}
return nil, fmt.Errorf("failed to create database: %w", err)
}
}

if pg.Postgres == nil {
pg.Postgres = &provisioner.PostgresResource{}
}
dsn := dsn.DSN(dbName, dsn.Port(postgresPort))
pg.Postgres.Output = &provisioner.PostgresResource_PostgresResourceOutput{
WriteDsn: dsn,
ReadDsn: dsn,
}
return rc.Resource, nil
},
})
if pg.Postgres == nil {
pg.Postgres = &provisioner.PostgresResource{}
}
dsn := dsn.PostgresDSN(dbName, dsn.Port(postgresPort))
pg.Postgres.Output = &provisioner.PostgresResource_PostgresResourceOutput{
WriteDsn: dsn,
ReadDsn: dsn,
}
return rc.Resource, nil
}
}
3 changes: 3 additions & 0 deletions backend/provisioner/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,9 @@ func replaceOutputs(to []*provproto.Resource, from []*provproto.Resource) error
switch r := r.Resource.(type) {
case *provproto.Resource_Mysql:
if mysqlFrom, ok := existing.Resource.(*provproto.Resource_Mysql); ok && mysqlFrom.Mysql != nil {
if r.Mysql == nil {
r.Mysql = &provproto.MysqlResource{}
}
r.Mysql.Output = mysqlFrom.Mysql.Output
}
case *provproto.Resource_Postgres:
Expand Down
2 changes: 1 addition & 1 deletion cmd/ftl-controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func main() {
kong.Vars{
"version": ftl.Version,
"timestamp": time.Unix(t, 0).Format(time.RFC3339),
"dsn": dsn.DSN("ftl"),
"dsn": dsn.PostgresDSN("ftl"),
},
)
cli.ControllerConfig.SetDefaults()
Expand Down
Loading

0 comments on commit 6eb0235

Please sign in to comment.