diff --git a/examples/java/echo/src/main/java/ftl/echo/Echo.java b/examples/java/echo/src/main/java/ftl/echo/Echo.java index 988d91637a..7195437154 100644 --- a/examples/java/echo/src/main/java/ftl/echo/Echo.java +++ b/examples/java/echo/src/main/java/ftl/echo/Echo.java @@ -9,7 +9,6 @@ public class Echo { @Export @Verb - @VerbName("bro.kens") public EchoResponse echo(EchoRequest req, TimeClient time) { var response = time.time(); return new EchoResponse("Hello, " + req.name().orElse("anonymous") + "! The time is " + response.toString() + "."); diff --git a/frontend/cli/cmd_serve.go b/frontend/cli/cmd_serve.go index d3af85d7cd..ff86681a3d 100644 --- a/frontend/cli/cmd_serve.go +++ b/frontend/cli/cmd_serve.go @@ -68,13 +68,12 @@ func (s *serveCmd) Run( controllerClient ftlv1connect.ControllerServiceClient, provisionerClient provisionerconnect.ProvisionerServiceClient, schemaClient ftlv1connect.SchemaServiceClient, - devModeEndpoints <-chan scaling.DevModeEndpoints, ) error { bindAllocator, err := bind.NewBindAllocator(s.Bind, 2) if err != nil { return fmt.Errorf("could not create bind allocator: %w", err) } - return s.run(ctx, projConfig, cm, sm, optional.None[chan bool](), false, bindAllocator, controllerClient, provisionerClient, schemaClient, devModeEndpoints) + return s.run(ctx, projConfig, cm, sm, optional.None[chan bool](), false, bindAllocator, controllerClient, provisionerClient, schemaClient, nil) } //nolint:maintidx diff --git a/internal/integration/actions.go b/internal/integration/actions.go index cee0a49681..db3f8d1d7e 100644 --- a/internal/integration/actions.go +++ b/internal/integration/actions.go @@ -274,6 +274,31 @@ func Wait(module string) Action { } } +// WaitWithTimeout for the given module to deploy. +func WaitWithTimeout(module string, timeout time.Duration) Action { + return func(t testing.TB, ic TestContext) { + Infof("Waiting for %s to become ready", module) + deadline := time.After(timeout) + tick := time.NewTicker(time.Millisecond * 100) + defer tick.Stop() + for { + select { + case <-deadline: + t.Fatalf("deployment of module %q not found", module) + return + case <-tick.C: + status, err := ic.Controller.Status(ic, connect.NewRequest(&ftlv1.StatusRequest{})) + assert.NoError(t, err) + for _, deployment := range status.Msg.Deployments { + if deployment.Name == module { + return + } + } + } + } + } +} + func Sleep(duration time.Duration) Action { return func(t testing.TB, ic TestContext) { Infof("Sleeping for %s", duration) @@ -414,6 +439,18 @@ func VerifySchema(check func(ctx context.Context, t testing.TB, sch *schemapb.Sc } } +// VerifyControllerStatus lets you test the current controller status +func VerifyControllerStatus(check func(ctx context.Context, t testing.TB, status *ftlv1.StatusResponse)) Action { + return func(t testing.TB, ic TestContext) { + sch, err := ic.Controller.Status(ic, connect.NewRequest(&ftlv1.StatusRequest{})) + if err != nil { + t.Errorf("failed to get schema: %v", err) + return + } + check(ic.Context, t, sch.Msg) + } +} + // VerifySchemaVerb lets you test the current schema for a specific verb func VerifySchemaVerb(module string, verb string, check func(ctx context.Context, t testing.TB, schema *schemapb.Schema, verb *schemapb.Verb)) Action { return func(t testing.TB, ic TestContext) { diff --git a/internal/integration/harness.go b/internal/integration/harness.go index 8761954467..882947be3e 100644 --- a/internal/integration/harness.go +++ b/internal/integration/harness.go @@ -161,11 +161,19 @@ func WithProvisionerConfig(config string) Option { } } +// WithDevMode starts the server using FTL dev, so modules are deployed automatically on change +func WithDevMode() Option { + return func(o *options) { + o.devMode = true + } +} + type options struct { languages []string testDataDir string ftlConfigPath string startController bool + devMode bool startProvisioner bool provisionerConfig string requireJava bool @@ -296,7 +304,12 @@ func run(t *testing.T, actionsOrOptions ...ActionOrOption) { if opts.startController { Infof("Starting ftl cluster") - args := []string{filepath.Join(binDir, "ftl"), "serve", "--recreate"} + command := "serve" + if opts.devMode { + command = "dev" + } + + args := []string{filepath.Join(binDir, "ftl"), command, "--recreate"} if !opts.console { args = append(args, "--no-console") } @@ -309,7 +322,7 @@ func run(t *testing.T, actionsOrOptions ...ActionOrOption) { args = append(args, "--provisioner-plugin-config="+configFile) } } - ctx = startProcess(ctx, t, args...) + ctx = startProcess(ctx, t, tmpDir, opts.devMode, args...) } if opts.startController || opts.kube { controller = rpc.Dial(ftlv1connect.NewControllerServiceClient, "http://localhost:8892", log.Debug) @@ -335,6 +348,7 @@ func run(t *testing.T, actionsOrOptions ...ActionOrOption) { realT: t, Language: language, kubeNamespace: kubeNamespace, + devMode: opts.devMode, kubeClient: optional.Ptr(kubeClient), } defer dumpKubePods(ctx, ic.kubeClient, ic.kubeNamespace) @@ -415,6 +429,7 @@ type TestContext struct { // Set if the test is running on kubernetes kubeClient optional.Option[kubernetes.Clientset] kubeNamespace string + devMode bool Controller ftlv1connect.ControllerServiceClient Provisioner provisionerconnect.ProvisionerServiceClient @@ -501,10 +516,13 @@ func (l *logWriter) Write(p []byte) (n int, err error) { } // startProcess runs a binary in the background and terminates it when the test completes. -func startProcess(ctx context.Context, t testing.TB, args ...string) context.Context { +func startProcess(ctx context.Context, t testing.TB, tempDir string, devMode bool, args ...string) context.Context { t.Helper() ctx, cancel := context.WithCancel(ctx) cmd := ftlexec.Command(ctx, log.Debug, "..", args[0], args[1:]...) + if devMode { + cmd.Dir = tempDir + } err := cmd.Start() assert.NoError(t, err) terminated := make(chan bool) diff --git a/jvm-runtime/ftl-runtime/common/deployment/src/main/java/xyz/block/ftl/deployment/ModuleProcessor.java b/jvm-runtime/ftl-runtime/common/deployment/src/main/java/xyz/block/ftl/deployment/ModuleProcessor.java index dc38f66d4e..5bc8cae3f8 100644 --- a/jvm-runtime/ftl-runtime/common/deployment/src/main/java/xyz/block/ftl/deployment/ModuleProcessor.java +++ b/jvm-runtime/ftl-runtime/common/deployment/src/main/java/xyz/block/ftl/deployment/ModuleProcessor.java @@ -21,6 +21,7 @@ import org.tomlj.TomlParseResult; import io.quarkus.arc.deployment.AdditionalBeanBuildItem; +import io.quarkus.deployment.IsDevelopment; import io.quarkus.deployment.annotations.BuildProducer; import io.quarkus.deployment.annotations.BuildStep; import io.quarkus.deployment.annotations.ExecutionTime; @@ -30,6 +31,7 @@ import io.quarkus.deployment.builditem.CombinedIndexBuildItem; import io.quarkus.deployment.builditem.FeatureBuildItem; import io.quarkus.deployment.builditem.RunTimeConfigBuilderBuildItem; +import io.quarkus.deployment.builditem.ShutdownContextBuildItem; import io.quarkus.deployment.builditem.SystemPropertyBuildItem; import io.quarkus.deployment.pkg.builditem.OutputTargetBuildItem; import io.quarkus.grpc.deployment.BindableServiceBuildItem; @@ -199,4 +201,10 @@ void openSocket(BuildProducer virtual, socket.produce(RequireSocketHttpBuildItem.MARKER); virtual.produce(RequireVirtualHttpBuildItem.MARKER); } + + @Record(ExecutionTime.RUNTIME_INIT) + @BuildStep(onlyIf = IsDevelopment.class) + void hotReload(ShutdownContextBuildItem shutdownContextBuildItem, FTLRecorder recorder) { + recorder.startReloadTimer(shutdownContextBuildItem); + } } diff --git a/jvm-runtime/ftl-runtime/common/runtime/pom.xml b/jvm-runtime/ftl-runtime/common/runtime/pom.xml index 48a3bdeb5d..fbc857589b 100644 --- a/jvm-runtime/ftl-runtime/common/runtime/pom.xml +++ b/jvm-runtime/ftl-runtime/common/runtime/pom.xml @@ -16,6 +16,10 @@ io.quarkus quarkus-arc + + io.quarkus + quarkus-development-mode-spi + io.quarkus quarkus-grpc diff --git a/jvm-runtime/ftl-runtime/common/runtime/src/main/java/xyz/block/ftl/runtime/FTLRecorder.java b/jvm-runtime/ftl-runtime/common/runtime/src/main/java/xyz/block/ftl/runtime/FTLRecorder.java index a3a88a46c4..1ec9cf0ca1 100644 --- a/jvm-runtime/ftl-runtime/common/runtime/src/main/java/xyz/block/ftl/runtime/FTLRecorder.java +++ b/jvm-runtime/ftl-runtime/common/runtime/src/main/java/xyz/block/ftl/runtime/FTLRecorder.java @@ -3,6 +3,8 @@ import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; import java.util.List; +import java.util.Timer; +import java.util.TimerTask; import java.util.function.BiFunction; import org.jboss.resteasy.reactive.server.core.ResteasyReactiveRequestContext; @@ -11,6 +13,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import io.quarkus.arc.Arc; +import io.quarkus.runtime.ShutdownContext; import io.quarkus.runtime.annotations.Recorder; import xyz.block.ftl.runtime.http.FTLHttpHandler; import xyz.block.ftl.runtime.http.HTTPVerbInvoker; @@ -152,4 +155,20 @@ public Object extractParameter(ResteasyReactiveRequestContext context) { throw new RuntimeException(e); } } + + public void startReloadTimer(ShutdownContext shutdownContext) { + Timer t = new Timer(); + t.schedule(new TimerTask() { + @Override + public void run() { + HotReloadSetup.doScan(); + } + }, 1000, 1000); + shutdownContext.addShutdownTask(new Runnable() { + @Override + public void run() { + t.cancel(); + } + }); + } } diff --git a/jvm-runtime/ftl-runtime/common/runtime/src/main/java/xyz/block/ftl/runtime/HotReloadSetup.java b/jvm-runtime/ftl-runtime/common/runtime/src/main/java/xyz/block/ftl/runtime/HotReloadSetup.java new file mode 100644 index 0000000000..14dc297017 --- /dev/null +++ b/jvm-runtime/ftl-runtime/common/runtime/src/main/java/xyz/block/ftl/runtime/HotReloadSetup.java @@ -0,0 +1,24 @@ +package xyz.block.ftl.runtime; + +import io.quarkus.dev.spi.HotReplacementContext; +import io.quarkus.dev.spi.HotReplacementSetup; + +public class HotReloadSetup implements HotReplacementSetup { + + static volatile HotReplacementContext context; + + @Override + public void setupHotDeployment(HotReplacementContext hrc) { + context = hrc; + } + + static void doScan() { + if (context != null) { + try { + context.doScan(false); + } catch (Exception e) { + // ignore + } + } + } +} diff --git a/jvm-runtime/ftl-runtime/common/runtime/src/main/resources/META-INF/services/io.quarkus.dev.spi.HotReplacementSetup b/jvm-runtime/ftl-runtime/common/runtime/src/main/resources/META-INF/services/io.quarkus.dev.spi.HotReplacementSetup new file mode 100644 index 0000000000..f02b221da3 --- /dev/null +++ b/jvm-runtime/ftl-runtime/common/runtime/src/main/resources/META-INF/services/io.quarkus.dev.spi.HotReplacementSetup @@ -0,0 +1 @@ +xyz.block.ftl.runtime.HotReloadSetup \ No newline at end of file diff --git a/jvm-runtime/jvm_integration_test.go b/jvm-runtime/jvm_integration_test.go index 7f37dcf81d..ada0d70422 100644 --- a/jvm-runtime/jvm_integration_test.go +++ b/jvm-runtime/jvm_integration_test.go @@ -12,6 +12,7 @@ import ( "github.com/alecthomas/repr" + ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1" schemapb "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/schema" "github.com/TBD54566975/ftl/go-runtime/ftl" in "github.com/TBD54566975/ftl/internal/integration" @@ -19,17 +20,55 @@ import ( ) func TestLifecycleJVM(t *testing.T) { + deployment := "" in.Run(t, in.WithLanguages("java", "kotlin"), + in.WithDevMode(), in.GitInit(), in.Exec("rm", "ftl-project.toml"), in.Exec("ftl", "init", "test", "."), in.IfLanguage("java", in.Exec("ftl", "new", "java", ".", "echo")), in.IfLanguage("kotlin", in.Exec("ftl", "new", "kotlin", ".", "echo")), - in.Deploy("echo"), + in.WaitWithTimeout("echo", time.Minute), + in.VerifyControllerStatus(func(ctx context.Context, t testing.TB, status *ftlv1.StatusResponse) { + assert.Equal(t, 1, len(status.Deployments)) + deployment = status.Deployments[0].Key + }), in.Call("echo", "echo", "Bob", func(t testing.TB, response string) { assert.Equal(t, "Hello, Bob!", response) }), + // Now test hot reload + in.IfLanguage("java", in.EditFile("echo", func(content []byte) []byte { + return []byte(strings.ReplaceAll(string(content), "Hello", "Bye")) + }, "src/main/java/com/example/EchoVerb.java")), + in.IfLanguage("kotlin", in.EditFile("echo", func(content []byte) []byte { + return []byte(strings.ReplaceAll(string(content), "Hello", "Bye")) + }, "src/main/kotlin/com/example/EchoVerb.kt")), + in.Sleep(time.Second*2), // Annoyingly quarkus limits to one restart check every 2s, which is fine for normal dev, but a pain for these tests + in.Call("echo", "echo", "Bob", func(t testing.TB, response string) { + assert.Equal(t, "Bye, Bob!", response) + }), + in.VerifyControllerStatus(func(ctx context.Context, t testing.TB, status *ftlv1.StatusResponse) { + // Non structurally changing edits should not trigger a new deployment. + t.Logf("status %v", status) + assert.Equal(t, 1, len(status.Deployments)) + assert.Equal(t, deployment, status.Deployments[0].Key) + }), + // Structural change should result in a new deployment + in.IfLanguage("java", in.EditFile("echo", func(content []byte) []byte { + return []byte(strings.ReplaceAll(string(content), "@Export", "")) + }, "src/main/java/com/example/EchoVerb.java")), + in.IfLanguage("kotlin", in.EditFile("echo", func(content []byte) []byte { + return []byte(strings.ReplaceAll(string(content), "@Export", "")) + }, "src/main/kotlin/com/example/EchoVerb.kt")), + in.Call("echo", "echo", "Bob", func(t testing.TB, response string) { + assert.Equal(t, "Bye, Bob!", response) + }), + in.VerifyControllerStatus(func(ctx context.Context, t testing.TB, status *ftlv1.StatusResponse) { + // Non structurally changing edits should not trigger a new deployment. + assert.Equal(t, 1, len(status.Deployments)) + assert.NotEqual(t, deployment, status.Deployments[0].Key) + }), ) } diff --git a/jvm-runtime/plugin/common/jvmcommon.go b/jvm-runtime/plugin/common/jvmcommon.go index ad00f332de..e6871d92dc 100644 --- a/jvm-runtime/plugin/common/jvmcommon.go +++ b/jvm-runtime/plugin/common/jvmcommon.go @@ -6,7 +6,6 @@ import ( "context" "fmt" "io/fs" - "net/http" "os" "path/filepath" "regexp" @@ -228,7 +227,7 @@ func (s *Service) handleDevModeRequest(ctx context.Context, req *connect.Request }() schemaChangeTicker := time.NewTicker(100 * time.Millisecond) - forceReloadTicker := time.NewTicker(time.Second) + defer schemaChangeTicker.Stop() for { select { case <-ctx.Done(): @@ -236,22 +235,6 @@ func (s *Service) handleDevModeRequest(ctx context.Context, req *connect.Request case bc := <-events: buildCtx = bc.buildCtx case <-schemaChangeTicker.C: - select { - // We only force a reload every second, but we check for schema changes every 100ms - case <-forceReloadTicker.C: - if !first { - // Force a hot reload via HTTP request - req, err := http.NewRequestWithContext(ctx, http.MethodHead, bind, nil) // #nosec - // We don't do anything on error here - if err == nil { - resp, err := http.DefaultClient.Do(req) - if err == nil { - resp.Body.Close() - } - } - } - default: - } changed := false file, err := os.ReadFile(errorFile) @@ -276,16 +259,8 @@ func (s *Service) handleDevModeRequest(ctx context.Context, req *connect.Request } buildErrs, err := loadProtoErrors(buildCtx.Config) if err != nil { - err = stream.Send(&langpb.BuildEvent{Event: &langpb.BuildEvent_BuildFailure{ - BuildFailure: &langpb.BuildFailure{ - IsAutomaticRebuild: !first, - ContextId: buildCtx.ID, - Errors: &langpb.ErrorList{Errors: []*langpb.Error{{Msg: err.Error(), Level: langpb.Error_ERROR, Type: langpb.Error_FTL}}}, - }}}) - if err != nil { - return fmt.Errorf("could not send build event: %w", err) - } - first = false + // This is likely a transient error + logger.Errorf(err, "failed to load build errors") continue } if builderrors.ContainsTerminalError(langpb.ErrorsFromProto(buildErrs)) { @@ -305,16 +280,8 @@ func (s *Service) handleDevModeRequest(ctx context.Context, req *connect.Request moduleProto, err := readSchema(buildCtx) if err != nil { - err = stream.Send(&langpb.BuildEvent{Event: &langpb.BuildEvent_BuildFailure{ - BuildFailure: &langpb.BuildFailure{ - IsAutomaticRebuild: !first, - ContextId: buildCtx.ID, - Errors: &langpb.ErrorList{Errors: []*langpb.Error{{Msg: err.Error(), Level: langpb.Error_ERROR, Type: langpb.Error_FTL}}}, - }}}) - if err != nil { - return fmt.Errorf("could not send build event: %w", err) - } - first = false + // This is likely a transient error + logger.Errorf(err, "failed to schema") continue } @@ -378,7 +345,7 @@ func build(ctx context.Context, bctx buildContext, autoRebuild bool) (*langpb.Bu } logger.Infof("Using build command '%s'", config.Build) command := exec.Command(ctx, log.Debug, config.Dir, "bash", "-c", config.Build) - err = command.RunBuffered(ctx) + err = command.Run() if err != nil { return &langpb.BuildEvent{Event: &langpb.BuildEvent_BuildFailure{&langpb.BuildFailure{ IsAutomaticRebuild: autoRebuild,