From d55777eab7b098668271a578fd73f5bd17330659 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Juho=20M=C3=A4kinen?= Date: Wed, 2 Oct 2024 09:11:56 +1000 Subject: [PATCH] chore: configurable provisioners in ftl-provisioner (#2925) Adds a TOML config file for configuring which provisioner plugin to use for which resource. There is a hard coded `noop` provisioner doing nothing. Otherwise a binary with name `ftl-provisioner-` is used as plugin. Next, I will use Localstack to test that the Cloudformation plugin actually creates a DB. --------- Co-authored-by: github-actions[bot] --- backend/provisioner/deployment/deployment.go | 9 +- .../deployment/noop_provisioner.go | 19 ++++ .../deployment/plugin_provisioner.go | 71 ++++++++++++ backend/provisioner/deployment/provisioner.go | 107 +++++++----------- .../provisioner_integration_test.go | 9 +- backend/provisioner/service.go | 48 ++++++-- backend/provisioner/testdata/go/echo/echo.go | 4 + frontend/cli/cmd_serve.go | 6 +- internal/integration/harness.go | 24 ++-- 9 files changed, 211 insertions(+), 86 deletions(-) create mode 100644 backend/provisioner/deployment/noop_provisioner.go create mode 100644 backend/provisioner/deployment/plugin_provisioner.go diff --git a/backend/provisioner/deployment/deployment.go b/backend/provisioner/deployment/deployment.go index 9867757899..dd675cf1aa 100644 --- a/backend/provisioner/deployment/deployment.go +++ b/backend/provisioner/deployment/deployment.go @@ -106,8 +106,13 @@ func (d *Deployment) Progress(ctx context.Context) (bool, error) { return true, err } } - err := next.Progress(ctx) - return d.next().Ok(), err + if next.state != TaskStateDone { + err := next.Progress(ctx) + if err != nil { + return true, err + } + } + return d.next().Ok(), nil } type DeploymentState struct { diff --git a/backend/provisioner/deployment/noop_provisioner.go b/backend/provisioner/deployment/noop_provisioner.go new file mode 100644 index 0000000000..692c58b3e0 --- /dev/null +++ b/backend/provisioner/deployment/noop_provisioner.go @@ -0,0 +1,19 @@ +package deployment + +import ( + "context" + + "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1beta1/provisioner" +) + +type NoopProvisioner struct{} + +func (n *NoopProvisioner) Provision(ctx context.Context, module string, desired []*provisioner.ResourceContext, existing []*provisioner.Resource) (string, error) { + return "", nil +} + +func (n *NoopProvisioner) State(ctx context.Context, token string, desired []*provisioner.Resource) (TaskState, []*provisioner.Resource, error) { + return TaskStateDone, desired, nil +} + +var _ Provisioner = (*NoopProvisioner)(nil) diff --git a/backend/provisioner/deployment/plugin_provisioner.go b/backend/provisioner/deployment/plugin_provisioner.go new file mode 100644 index 0000000000..31a3dc6ed9 --- /dev/null +++ b/backend/provisioner/deployment/plugin_provisioner.go @@ -0,0 +1,71 @@ +package deployment + +import ( + "context" + "fmt" + + "connectrpc.com/connect" + + "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1beta1/provisioner" + "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1beta1/provisioner/provisionerconnect" + "github.com/TBD54566975/ftl/common/plugin" + "github.com/TBD54566975/ftl/internal/log" +) + +// PluginProvisioner delegates provisioning to an external plugin +type PluginProvisioner struct { + cmdCtx context.Context + client *plugin.Plugin[provisionerconnect.ProvisionerPluginServiceClient] +} + +var _ Provisioner = (*PluginProvisioner)(nil) + +func NewPluginProvisioner(ctx context.Context, name string) (*PluginProvisioner, error) { + client, cmdCtx, err := plugin.Spawn( + ctx, + log.Debug, + "ftl-provisioner-"+name, + ".", + "ftl-provisioner-"+name, + provisionerconnect.NewProvisionerPluginServiceClient, + ) + if err != nil { + return nil, fmt.Errorf("error spawning plugin: %w", err) + } + + return &PluginProvisioner{ + cmdCtx: cmdCtx, + client: client, + }, nil +} + +func (p *PluginProvisioner) Provision(ctx context.Context, module string, desired []*provisioner.ResourceContext, existing []*provisioner.Resource) (string, error) { + resp, err := p.client.Client.Provision(ctx, connect.NewRequest(&provisioner.ProvisionRequest{ + DesiredResources: desired, + ExistingResources: existing, + FtlClusterId: "ftl", + Module: module, + })) + if err != nil { + return "", fmt.Errorf("error calling plugin: %w", err) + } + if resp.Msg.Status != provisioner.ProvisionResponse_SUBMITTED { + return resp.Msg.ProvisioningToken, nil + } + return "", nil +} + +func (p *PluginProvisioner) State(ctx context.Context, token string, desired []*provisioner.Resource) (TaskState, []*provisioner.Resource, error) { + resp, err := p.client.Client.Status(ctx, connect.NewRequest(&provisioner.StatusRequest{ + ProvisioningToken: token, + })) + if err != nil { + return "", nil, fmt.Errorf("error getting status from plugin: %w", err) + } + if failed, ok := resp.Msg.Status.(*provisioner.StatusResponse_Failed); ok { + return TaskStateFailed, nil, fmt.Errorf("provisioning failed: %s", failed.Failed.ErrorMessage) + } else if success, ok := resp.Msg.Status.(*provisioner.StatusResponse_Success); ok { + return TaskStateDone, success.Success.UpdatedResources, nil + } + return TaskStateRunning, nil, nil +} diff --git a/backend/provisioner/deployment/provisioner.go b/backend/provisioner/deployment/provisioner.go index 1ae17cc287..3b89ad4304 100644 --- a/backend/provisioner/deployment/provisioner.go +++ b/backend/provisioner/deployment/provisioner.go @@ -4,13 +4,8 @@ import ( "context" "fmt" - "connectrpc.com/connect" - "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1beta1/provisioner" - "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1beta1/provisioner/provisionerconnect" "github.com/TBD54566975/ftl/backend/schema" - "github.com/TBD54566975/ftl/common/plugin" - "github.com/TBD54566975/ftl/internal/log" ) // ResourceType is a type of resource used to configure provisioners @@ -28,6 +23,29 @@ type Provisioner interface { State(ctx context.Context, token string, desired []*provisioner.Resource) (TaskState, []*provisioner.Resource, error) } +// ProvisionerPluginConfig is a map of provisioner name to resources it supports +type ProvisionerPluginConfig struct { + // The default provisioner to use for all resources not matched here + Default string `toml:"default"` + Plugins []struct { + Name string `toml:"name"` + Resources []ResourceType `toml:"resources"` + } `toml:"plugins"` +} + +func (cfg *ProvisionerPluginConfig) Validate() error { + registeredResources := map[ResourceType]bool{} + for _, plugin := range cfg.Plugins { + for _, r := range plugin.Resources { + if registeredResources[r] { + return fmt.Errorf("resource type %s is already registered. Trying to re-register for %s", r, plugin.Name) + } + registeredResources[r] = true + } + } + return nil +} + type provisionerConfig struct { provisioner Provisioner types []ResourceType @@ -35,9 +53,30 @@ type provisionerConfig struct { // ProvisionerRegistry contains all known resource handlers in the order they should be executed type ProvisionerRegistry struct { + Default Provisioner Provisioners []*provisionerConfig } +func NewProvisionerRegistry(ctx context.Context, cfg *ProvisionerPluginConfig) (*ProvisionerRegistry, error) { + result := &ProvisionerRegistry{} + if err := cfg.Validate(); err != nil { + return nil, fmt.Errorf("error validating provisioner config: %w", err) + } + for _, plugin := range cfg.Plugins { + switch plugin.Name { + case "noop": + result.Register(&NoopProvisioner{}, plugin.Resources...) + default: + provisioner, err := NewPluginProvisioner(ctx, plugin.Name) + if err != nil { + return nil, fmt.Errorf("error creating provisioner plugin %s: %w", plugin.Name, err) + } + result.Register(provisioner, plugin.Resources...) + } + } + return result, nil +} + // Register to the registry, to be executed after all the previously added handlers func (reg *ProvisionerRegistry) Register(handler Provisioner, types ...ResourceType) { reg.Provisioners = append(reg.Provisioners, &provisionerConfig{ @@ -112,61 +151,3 @@ func typeOf(r *provisioner.Resource) ResourceType { } return ResourceTypeUnknown } - -// PluginProvisioner delegates provisioning to an external plugin -type PluginProvisioner struct { - cmdCtx context.Context - client *plugin.Plugin[provisionerconnect.ProvisionerPluginServiceClient] -} - -func NewPluginProvisioner(ctx context.Context, name, dir, exe string) (*PluginProvisioner, error) { - client, cmdCtx, err := plugin.Spawn( - ctx, - log.Debug, - name, - dir, - exe, - provisionerconnect.NewProvisionerPluginServiceClient, - ) - if err != nil { - return nil, fmt.Errorf("error spawning plugin: %w", err) - } - - return &PluginProvisioner{ - cmdCtx: cmdCtx, - client: client, - }, nil -} - -func (p *PluginProvisioner) Provision(ctx context.Context, module string, desired []*provisioner.ResourceContext, existing []*provisioner.Resource) (string, error) { - resp, err := p.client.Client.Provision(ctx, connect.NewRequest(&provisioner.ProvisionRequest{ - DesiredResources: desired, - ExistingResources: existing, - FtlClusterId: "ftl", - Module: module, - })) - if err != nil { - return "", fmt.Errorf("error calling plugin: %w", err) - } - if resp.Msg.Status != provisioner.ProvisionResponse_SUBMITTED { - return resp.Msg.ProvisioningToken, nil - } - return "", nil -} - -func (p *PluginProvisioner) State(ctx context.Context, token string, desired []*provisioner.Resource) (TaskState, []*provisioner.Resource, error) { - resp, err := p.client.Client.Status(ctx, connect.NewRequest(&provisioner.StatusRequest{ - ProvisioningToken: token, - })) - if err != nil { - return "", nil, fmt.Errorf("error getting status from plugin: %w", err) - } - if failed, ok := resp.Msg.Status.(*provisioner.StatusResponse_Failed); ok { - return TaskStateFailed, nil, fmt.Errorf("provisioning failed: %s", failed.Failed.ErrorMessage) - } else if success, ok := resp.Msg.Status.(*provisioner.StatusResponse_Success); ok { - return TaskStateDone, success.Success.UpdatedResources, nil - } - return TaskStateRunning, nil, nil -} - -var _ Provisioner = (*PluginProvisioner)(nil) diff --git a/backend/provisioner/provisioner_integration_test.go b/backend/provisioner/provisioner_integration_test.go index c5fc202ec9..f37af0bb23 100644 --- a/backend/provisioner/provisioner_integration_test.go +++ b/backend/provisioner/provisioner_integration_test.go @@ -9,9 +9,14 @@ import ( "github.com/alecthomas/assert/v2" ) -func TestDeploymentThroughProvisioner(t *testing.T) { +func TestDeploymentThroughNoopProvisioner(t *testing.T) { in.Run(t, - in.WithProvisioner(), + in.WithProvisioner(` + default = "noop" + plugins = [ + { name = "noop", resources = ["postgres"] }, + ] + `), in.CopyModule("echo"), in.Deploy("echo"), in.Call("echo", "echo", "Bob", func(t testing.TB, response string) { diff --git a/backend/provisioner/service.go b/backend/provisioner/service.go index f1ed8a553e..4e140d59d1 100644 --- a/backend/provisioner/service.go +++ b/backend/provisioner/service.go @@ -1,11 +1,15 @@ package provisioner import ( + "bufio" "context" "fmt" + "io" "net/url" + "os" "connectrpc.com/connect" + "github.com/BurntSushi/toml" "github.com/alecthomas/kong" "golang.org/x/sync/errgroup" @@ -19,34 +23,42 @@ import ( "github.com/TBD54566975/ftl/internal/rpc" ) +// CommonProvisionerConfig is shared config between the production controller and development server. +type CommonProvisionerConfig struct { + PluginConfigFile *os.File `name:"provisioner-plugin-config" help:"Path to the plugin configuration file." env:"FTL_PROVISIONER_PLUGIN_CONFIG_FILE"` +} + type Config struct { Bind *url.URL `help:"Socket to bind to." default:"http://127.0.0.1:8893" env:"FTL_PROVISIONER_BIND"` - Advertise *url.URL `help:"Endpoint the Provisioner should advertise (must be unique across the cluster, defaults to --bind if omitted)." env:"FTL_PROVISIONER_ADVERTISE"` ControllerEndpoint *url.URL `name:"ftl-endpoint" help:"Controller endpoint." env:"FTL_ENDPOINT" default:"http://127.0.0.1:8892"` + CommonProvisionerConfig } func (c *Config) SetDefaults() { if err := kong.ApplyDefaults(c); err != nil { panic(err) } - if c.Advertise == nil { - c.Advertise = c.Bind - } } type Service struct { controllerClient ftlv1connect.ControllerServiceClient // TODO: Store in a resource graph currentResources map[string][]*provisioner.Resource - registry deployment.ProvisionerRegistry + registry *deployment.ProvisionerRegistry } var _ provisionerconnect.ProvisionerServiceHandler = (*Service)(nil) -func New(ctx context.Context, config Config, controllerClient ftlv1connect.ControllerServiceClient, devel bool) (*Service, error) { +func New(ctx context.Context, config Config, controllerClient ftlv1connect.ControllerServiceClient, pluginConfig *deployment.ProvisionerPluginConfig) (*Service, error) { + registry, err := deployment.NewProvisionerRegistry(ctx, pluginConfig) + if err != nil { + return nil, fmt.Errorf("error creating provisioner registry: %w", err) + } + return &Service{ controllerClient: controllerClient, currentResources: map[string][]*provisioner.Resource{}, + registry: registry, }, nil } @@ -126,12 +138,20 @@ func Start(ctx context.Context, config Config, devel bool) error { controllerClient := rpc.Dial(ftlv1connect.NewControllerServiceClient, config.ControllerEndpoint.String(), log.Error) - svc, err := New(ctx, config, controllerClient, devel) + pluginConfig := &deployment.ProvisionerPluginConfig{Default: "noop"} + if config.PluginConfigFile != nil { + pc, err := readPluginConfig(config.PluginConfigFile) + if err != nil { + return fmt.Errorf("error reading plugin configuration: %w", err) + } + pluginConfig = pc + } + + svc, err := New(ctx, config, controllerClient, pluginConfig) if err != nil { return err } logger.Debugf("Provisioner available at: %s", config.Bind) - logger.Debugf("Advertising as %s", config.Advertise) logger.Debugf("Using FTL endpoint: %s", config.ControllerEndpoint) g, ctx := errgroup.WithContext(ctx) @@ -147,6 +167,18 @@ func Start(ctx context.Context, config Config, devel bool) error { return nil } +func readPluginConfig(file *os.File) (*deployment.ProvisionerPluginConfig, error) { + result := deployment.ProvisionerPluginConfig{} + bytes, err := io.ReadAll(bufio.NewReader(file)) + if err != nil { + return nil, fmt.Errorf("error reading plugin configuration: %w", err) + } + if err := toml.Unmarshal(bytes, &result); err != nil { + return nil, fmt.Errorf("error parsing plugin configuration: %w", err) + } + return &result, nil +} + // Deployment client calls to ftl-controller func (s *Service) GetArtefactDiffs(ctx context.Context, req *connect.Request[ftlv1.GetArtefactDiffsRequest]) (*connect.Response[ftlv1.GetArtefactDiffsResponse], error) { diff --git a/backend/provisioner/testdata/go/echo/echo.go b/backend/provisioner/testdata/go/echo/echo.go index 6ce84c47b6..8e5afd05e1 100644 --- a/backend/provisioner/testdata/go/echo/echo.go +++ b/backend/provisioner/testdata/go/echo/echo.go @@ -4,8 +4,12 @@ package echo import ( "context" "fmt" + + "github.com/TBD54566975/ftl/go-runtime/ftl" ) +var db = ftl.PostgresDatabase("echodb") + // Echo returns a greeting with the current time. // //ftl:verb export diff --git a/frontend/cli/cmd_serve.go b/frontend/cli/cmd_serve.go index 3ba4bed509..d0ab0c7a64 100644 --- a/frontend/cli/cmd_serve.go +++ b/frontend/cli/cmd_serve.go @@ -52,6 +52,7 @@ type serveCmd struct { ObservabilityConfig observability.Config `embed:"" prefix:"o11y-"` DatabaseImage string `help:"The container image to start for the database" default:"postgres:15.8" env:"FTL_DATABASE_IMAGE" hidden:""` controller.CommonConfig + provisioner.CommonProvisionerConfig } const ftlContainerName = "ftl-db-1" @@ -191,8 +192,9 @@ func (s *serveCmd) run(ctx context.Context, projConfig projectconfig.Config, ini for i := range s.Provisioners { config := provisioner.Config{ - Bind: provisionerAddresses[i], - ControllerEndpoint: controllerAddresses[i%len(controllerAddresses)], + Bind: provisionerAddresses[i], + ControllerEndpoint: controllerAddresses[i%len(controllerAddresses)], + CommonProvisionerConfig: s.CommonProvisionerConfig, } config.SetDefaults() diff --git a/internal/integration/harness.go b/internal/integration/harness.go index 318e1abd26..82b49c0ea3 100644 --- a/internal/integration/harness.go +++ b/internal/integration/harness.go @@ -124,8 +124,9 @@ func WithoutController() Option { // WithProvisioner is a Run* option that starts the provisioner service. // if set, all deployments are done through the provisioner -func WithProvisioner() Option { +func WithProvisioner(config string) Option { return func(o *options) { + o.provisionerConfig = config o.startProvisioner = true // provisioner always needs a controller to talk to o.startController = true @@ -133,14 +134,15 @@ func WithProvisioner() Option { } type options struct { - languages []string - testDataDir string - ftlConfigPath string - startController bool - startProvisioner bool - requireJava bool - envars map[string]string - kube bool + languages []string + testDataDir string + ftlConfigPath string + startController bool + startProvisioner bool + provisionerConfig string + requireJava bool + envars map[string]string + kube bool } // Run an integration test. @@ -245,7 +247,11 @@ func run(t *testing.T, actionsOrOptions ...ActionOrOption) { Infof("Starting ftl cluster") args := []string{filepath.Join(binDir, "ftl"), "serve", "--recreate"} if opts.startProvisioner { + configFile := filepath.Join(tmpDir, "provisioner-plugin-config.toml") + os.WriteFile(configFile, []byte(opts.provisionerConfig), 0644) + args = append(args, "--provisioners=1") + args = append(args, "--provisioner-plugin-config="+configFile) } ctx = startProcess(ctx, t, args...) }