Skip to content

Commit

Permalink
refactor: language plugins use plugin.Spawn
Browse files Browse the repository at this point in the history
  • Loading branch information
matt2e committed Nov 8, 2024
1 parent 469be0d commit 8ea679c
Show file tree
Hide file tree
Showing 6 changed files with 44 additions and 91 deletions.
16 changes: 10 additions & 6 deletions frontend/cli/cmd_new.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,7 @@ func prepareNewCmd(ctx context.Context, k *kong.Kong, args []string) (optionalPl
return optionalPlugin, fmt.Errorf("could not find new command")
}

bindAllocator, err := bindAllocatorWithoutController()
if err != nil {
return optionalPlugin, err
}

plugin, err := languageplugin.New(ctx, bindAllocator, language, "new")
plugin, err := languageplugin.New(ctx, pluginDir(), language, "new")
if err != nil {
return optionalPlugin, fmt.Errorf("could not create plugin for %v: %w", language, err)
}
Expand All @@ -85,6 +80,15 @@ func prepareNewCmd(ctx context.Context, k *kong.Kong, args []string) (optionalPl
return optional.Some(plugin), nil
}

// pluginDir returns the directory to base the plugin working directory off of.
// It tries to find the root directory of the project, or uses "." as a fallback.
func pluginDir() string {
if configPath, ok := projectconfig.DefaultConfigPath().Get(); ok {
return filepath.Dir(configPath)
}
return "."
}

func (i newCmd) Run(ctx context.Context, ktctx *kong.Context, config projectconfig.Config, plugin *languageplugin.LanguagePlugin) error {
name, path, err := validateModule(i.Dir, i.Name)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion internal/buildengine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -1039,7 +1039,7 @@ func (e *Engine) gatherSchemas(
}

func (e *Engine) newModuleMeta(ctx context.Context, config moduleconfig.UnvalidatedModuleConfig) (moduleMeta, error) {
plugin, err := languageplugin.New(ctx, e.bindAllocator, config.Language, config.Module)
plugin, err := languageplugin.New(ctx, config.Dir, config.Language, config.Module)
if err != nil {
return moduleMeta{}, fmt.Errorf("could not create plugin for %s: %w", config.Module, err)
}
Expand Down
10 changes: 2 additions & 8 deletions internal/buildengine/languageplugin/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (

langpb "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/language"
schemapb "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/schema"
"github.com/TBD54566975/ftl/internal/bind"
"github.com/TBD54566975/ftl/internal/builderrors"
"github.com/TBD54566975/ftl/internal/log"
"github.com/TBD54566975/ftl/internal/moduleconfig"
Expand Down Expand Up @@ -92,16 +91,11 @@ type BuildContext struct {
var ErrPluginNotRunning = errors.New("language plugin no longer running")

// PluginFromConfig creates a new language plugin from the given config.
func New(ctx context.Context, bindAllocator *bind.BindAllocator, language, name string) (p *LanguagePlugin, err error) {
bind, err := bindAllocator.Next()
if err != nil {
return nil, fmt.Errorf("failed to allocate port for external plugin: %w", err)
}
impl, err := newClientImpl(ctx, bind, language, name)
func New(ctx context.Context, dir, language, name string) (p *LanguagePlugin, err error) {
impl, err := newClientImpl(ctx, dir, language, name)
if err != nil {
return nil, err
}

return newPluginForTesting(ctx, impl), nil
}

Expand Down
97 changes: 29 additions & 68 deletions internal/buildengine/languageplugin/plugin_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package languageplugin
import (
"context"
"fmt"
"net/url"
"syscall"

"connectrpc.com/connect"
Expand All @@ -12,6 +11,7 @@ import (

langpb "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/language"
langconnect "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/language/languagepbconnect"
"github.com/TBD54566975/ftl/common/plugin"
"github.com/TBD54566975/ftl/internal/exec"
"github.com/TBD54566975/ftl/internal/log"
"github.com/TBD54566975/ftl/internal/rpc"
Expand All @@ -38,103 +38,64 @@ type pluginClient interface {
var _ pluginClient = &pluginClientImpl{}

type pluginClientImpl struct {
cmd *exec.Cmd
client langconnect.LanguageServiceClient
plugin *plugin.Plugin[langconnect.LanguageServiceClient]

// channel gets closed when the plugin exits
cmdError chan error
}

func newClientImpl(ctx context.Context, bind *url.URL, language, name string) (*pluginClientImpl, error) {
impl := &pluginClientImpl{
client: rpc.Dial(langconnect.NewLanguageServiceClient, bind.String(), log.Error),
}
err := impl.start(ctx, bind, language, name)
func newClientImpl(ctx context.Context, dir, language, name string) (*pluginClientImpl, error) {
impl := &pluginClientImpl{}
err := impl.start(ctx, dir, language, name)
if err != nil {
return nil, err
}
return impl, nil
}

// Start launches the plugin and blocks until the plugin is ready.
func (p *pluginClientImpl) start(ctx context.Context, bind *url.URL, language, name string) error {
logger := log.FromContext(ctx).Scope(name)

func (p *pluginClientImpl) start(ctx context.Context, dir, language, name string) error {
cmdName := "ftl-language-" + language
p.cmd = exec.Command(ctx, log.Debug, ".", cmdName, "--bind", bind.String())
p.cmd.Env = append(p.cmd.Env, "FTL_NAME="+name)
_, err := exec.LookPath(cmdName)
cmdPath, err := exec.LookPath(cmdName)
if err != nil {
return fmt.Errorf("failed to find plugin for %s: %w", language, err)
}

// Send the plugin's stderr to the logger.
p.cmd.Stderr = nil
pipe, err := p.cmd.StderrPipe()
plugin, _, err := plugin.Spawn(ctx,
log.FromContext(ctx).GetLevel(),
name,
dir,
cmdPath,
langconnect.NewLanguageServiceClient,
plugin.WithEnvars("FTL_Name="+name),
)
if err != nil {
return fmt.Errorf("could not create stderr pipe for %s: %w", name, err)
return fmt.Errorf("failed to spawn plugin for %s: %w", name, err)
}
go func() {
err := log.JSONStreamer(pipe, logger, log.Error)
if err != nil {
logger.Errorf(err, "Error streaming plugin logs.")
}
}()
p.plugin = plugin

// run the plugin and wait for it to finish executing
err = p.cmd.Start()
if err != nil {
return fmt.Errorf("failed to start plugin: %w", err)
}
runCtx, cancel := context.WithCancel(ctx)
p.cmdError = make(chan error)
pingErr := make(chan error)
go func() {
err := p.cmd.Wait()
err := p.plugin.Cmd.Wait()
if err != nil {
p.cmdError <- fmt.Errorf("language plugin failed: %w", err)
} else {
p.cmdError <- fmt.Errorf("language plugin ended with status 0")
}
cancel()
close(p.cmdError)
}()
go func() {
// wait for the plugin to be ready
if err := p.ping(runCtx); err != nil {
cancel()
pingErr <- fmt.Errorf("failed to ping plugin")
}
close(pingErr)
}()

// Wait for ping result, or for the plugin to exit. Which ever happens first.
select {
case err := <-p.cmdError:
if err != nil {
return err
}
return fmt.Errorf("plugin exited with status 0 before ping was registered")
case err := <-pingErr:
if err != nil {
return fmt.Errorf("failed to start plugin: %w", err)
}
return nil
case <-ctx.Done():
return fmt.Errorf("failed to start plugin: %w", ctx.Err())
}
return nil
}

func (p *pluginClientImpl) ping(ctx context.Context) error {
err := rpc.Wait(ctx, backoff.Backoff{}, launchTimeout, p.client)
err := rpc.Wait(ctx, backoff.Backoff{}, launchTimeout, p.plugin.Client)
if err != nil {
return connect.NewError(connect.CodeUnavailable, fmt.Errorf("failed to connect to runner: %w", err))
}
return nil
}

func (p *pluginClientImpl) kill() error {
if err := p.cmd.Kill(syscall.SIGINT); err != nil {
if err := p.plugin.Cmd.Kill(syscall.SIGINT); err != nil {
return err //nolint:wrapcheck
}
return nil
Expand All @@ -148,7 +109,7 @@ func (p *pluginClientImpl) getCreateModuleFlags(ctx context.Context, req *connec
if err := p.checkCmdIsAlive(); err != nil {
return nil, err
}
resp, err := p.client.GetCreateModuleFlags(ctx, req)
resp, err := p.plugin.Client.GetCreateModuleFlags(ctx, req)
if err != nil {
return nil, err //nolint:wrapcheck
}
Expand All @@ -159,7 +120,7 @@ func (p *pluginClientImpl) moduleConfigDefaults(ctx context.Context, req *connec
if err := p.checkCmdIsAlive(); err != nil {
return nil, err
}
resp, err := p.client.ModuleConfigDefaults(ctx, req)
resp, err := p.plugin.Client.ModuleConfigDefaults(ctx, req)
if err != nil {
return nil, err //nolint:wrapcheck
}
Expand All @@ -170,7 +131,7 @@ func (p *pluginClientImpl) createModule(ctx context.Context, req *connect.Reques
if err := p.checkCmdIsAlive(); err != nil {
return nil, err
}
resp, err := p.client.CreateModule(ctx, req)
resp, err := p.plugin.Client.CreateModule(ctx, req)
if err != nil {
return nil, err //nolint:wrapcheck
}
Expand All @@ -181,7 +142,7 @@ func (p *pluginClientImpl) getDependencies(ctx context.Context, req *connect.Req
if err := p.checkCmdIsAlive(); err != nil {
return nil, err
}
resp, err := p.client.GetDependencies(ctx, req)
resp, err := p.plugin.Client.GetDependencies(ctx, req)
if err != nil {
return nil, err //nolint:wrapcheck
}
Expand All @@ -192,7 +153,7 @@ func (p *pluginClientImpl) generateStubs(ctx context.Context, req *connect.Reque
if err := p.checkCmdIsAlive(); err != nil {
return nil, err
}
resp, err := p.client.GenerateStubs(ctx, req)
resp, err := p.plugin.Client.GenerateStubs(ctx, req)
if err != nil {
return nil, err //nolint:wrapcheck
}
Expand All @@ -203,7 +164,7 @@ func (p *pluginClientImpl) syncStubReferences(ctx context.Context, req *connect.
if err := p.checkCmdIsAlive(); err != nil {
return nil, err
}
resp, err := p.client.SyncStubReferences(ctx, req)
resp, err := p.plugin.Client.SyncStubReferences(ctx, req)
if err != nil {
return nil, err //nolint:wrapcheck
}
Expand All @@ -214,7 +175,7 @@ func (p *pluginClientImpl) build(ctx context.Context, req *connect.Request[langp
if err := p.checkCmdIsAlive(); err != nil {
return nil, nil, err
}
stream, err := p.client.Build(ctx, req)
stream, err := p.plugin.Client.Build(ctx, req)
if err != nil {
return nil, nil, err //nolint:wrapcheck
}
Expand Down Expand Up @@ -242,7 +203,7 @@ func (p *pluginClientImpl) buildContextUpdated(ctx context.Context, req *connect
if err := p.checkCmdIsAlive(); err != nil {
return nil, err
}
resp, err := p.client.BuildContextUpdated(ctx, req)
resp, err := p.plugin.Client.BuildContextUpdated(ctx, req)
if err != nil {
return nil, err //nolint:wrapcheck
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ func startPlugin() in.Action {

bindURL, err = bindAllocator.Next()
assert.NoError(t, err)
client, err = newClientImpl(ic.Context, bindURL, ic.Language, "test")
client, err = newClientImpl(ic.Context, ic.WorkingDir(), ic.Language, "test")
assert.NoError(t, err)
}
}
Expand Down
8 changes: 1 addition & 7 deletions jvm-runtime/plugin/common/java_plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,13 @@ package common

import (
"context"
"net/url"
"os"
"path/filepath"
"testing"

"github.com/alecthomas/assert/v2"
"github.com/alecthomas/types/optional"

"github.com/TBD54566975/ftl/internal/bind"
"github.com/TBD54566975/ftl/internal/buildengine/languageplugin"
"github.com/TBD54566975/ftl/internal/log"
"github.com/TBD54566975/ftl/internal/moduleconfig"
Expand Down Expand Up @@ -69,11 +67,7 @@ func TestJavaConfigDefaults(t *testing.T) {
dir, err := filepath.Abs(tt.dir)
assert.NoError(t, err)

baseBind, err := url.Parse("http://127.0.0.1:8893")
assert.NoError(t, err)
allocator, err := bind.NewBindAllocator(baseBind, 0)
assert.NoError(t, err)
plugin, err := languageplugin.New(ctx, allocator, "java", "test")
plugin, err := languageplugin.New(ctx, t.TempDir(), "java", "test")
assert.NoError(t, err)
t.Cleanup(func() {
_ = plugin.Kill() //nolint:errcheck
Expand Down

0 comments on commit 8ea679c

Please sign in to comment.