From 9296515c2cedcf08c8714ffd451b98ceed354341 Mon Sep 17 00:00:00 2001 From: Stuart Douglas Date: Fri, 15 Nov 2024 17:59:48 +1100 Subject: [PATCH] simplify build context update logic --- .../languageplugin/plugin_integration_test.go | 4 +- jvm-runtime/plugin/common/java_plugin_test.go | 8 - jvm-runtime/plugin/common/jvmcommon.go | 185 ++---------------- 3 files changed, 21 insertions(+), 176 deletions(-) diff --git a/internal/buildengine/languageplugin/plugin_integration_test.go b/internal/buildengine/languageplugin/plugin_integration_test.go index c33ff579e1..303f2d3bee 100644 --- a/internal/buildengine/languageplugin/plugin_integration_test.go +++ b/internal/buildengine/languageplugin/plugin_integration_test.go @@ -66,7 +66,7 @@ const ( func TestBuilds(t *testing.T) { sch := generateInitialSchema(t) in.Run(t, - in.WithLanguages("go", "java"), + in.WithLanguages("go"), in.WithoutController(), in.CopyModule(MODULE_NAME), startPlugin(), @@ -188,7 +188,7 @@ func TestBuildsWhenAlreadyLocked(t *testing.T) { sch := generateInitialSchema(t) in.Run(t, - in.WithLanguages("go", "java"), + in.WithLanguages("go"), in.WithoutController(), in.CopyModule(MODULE_NAME), startPlugin(), diff --git a/jvm-runtime/plugin/common/java_plugin_test.go b/jvm-runtime/plugin/common/java_plugin_test.go index 2199fd6748..de91c77beb 100644 --- a/jvm-runtime/plugin/common/java_plugin_test.go +++ b/jvm-runtime/plugin/common/java_plugin_test.go @@ -21,12 +21,6 @@ func TestExtractModuleDepsKotlin(t *testing.T) { } func TestJavaConfigDefaults(t *testing.T) { - watch := []string{ - "pom.xml", - "src/**", - "build/generated", - "target/generated-sources", - } for _, tt := range []struct { language string dir string @@ -40,7 +34,6 @@ func TestJavaConfigDefaults(t *testing.T) { DevModeBuild: optional.Some("mvn quarkus:dev"), DeployDir: "target", GeneratedSchemaDir: optional.Some("src/main/ftl-module-schema"), - Watch: watch, LanguageConfig: map[string]any{ "build-tool": "maven", }, @@ -54,7 +47,6 @@ func TestJavaConfigDefaults(t *testing.T) { DevModeBuild: optional.Some("mvn quarkus:dev"), DeployDir: "target", GeneratedSchemaDir: optional.Some("src/main/ftl-module-schema"), - Watch: watch, LanguageConfig: map[string]any{ "build-tool": "maven", }, diff --git a/jvm-runtime/plugin/common/jvmcommon.go b/jvm-runtime/plugin/common/jvmcommon.go index c19507b6d9..8fef640ac0 100644 --- a/jvm-runtime/plugin/common/jvmcommon.go +++ b/jvm-runtime/plugin/common/jvmcommon.go @@ -41,26 +41,16 @@ import ( "github.com/TBD54566975/ftl/internal/schema" "github.com/TBD54566975/ftl/internal/sha256" islices "github.com/TBD54566975/ftl/internal/slices" - "github.com/TBD54566975/ftl/internal/watch" ) const BuildLockTimeout = time.Minute const SchemaFile = "schema.pb" const ErrorFile = "errors.pb" -//sumtype:decl -type updateEvent interface{ updateEvent() } - type buildContextUpdatedEvent struct { buildCtx buildContext } -func (buildContextUpdatedEvent) updateEvent() {} - -type filesUpdatedEvent struct{} - -func (filesUpdatedEvent) updateEvent() {} - // buildContext contains contextual information needed to build. type buildContext struct { ID string @@ -84,7 +74,7 @@ func buildContextFromProto(proto *langpb.BuildContext) (buildContext, error) { } type Service struct { - updatesTopic *pubsub.Topic[updateEvent] + updatesTopic *pubsub.Topic[buildContextUpdatedEvent] acceptsContextUpdates atomic.Value[bool] scaffoldFiles *zip.Reader } @@ -93,7 +83,7 @@ var _ langconnect.LanguageServiceHandler = &Service{} func New(scaffoldFiles *zip.Reader) *Service { return &Service{ - updatesTopic: pubsub.New[updateEvent](), + updatesTopic: pubsub.New[buildContextUpdatedEvent](), scaffoldFiles: scaffoldFiles, } } @@ -175,73 +165,30 @@ func (s *Service) Build(ctx context.Context, req *connect.Request[langpb.BuildRe if req.Msg.RebuildAutomatically { return s.handleDevModeRequest(ctx, req, stream) } - events := make(chan updateEvent, 32) - s.updatesTopic.Subscribe(events) - defer s.updatesTopic.Unsubscribe(events) - - // cancel context when stream ends so that watcher can be stopped - ctx, cancel := context.WithCancel(ctx) - defer cancel() buildCtx, err := buildContextFromProto(req.Msg.BuildContext) if err != nil { return err } - watchPatterns, err := relativeWatchPatterns(buildCtx.Config.Dir, buildCtx.Config.Watch) - if err != nil { - return err - } - - watcher := watch.NewWatcher(watchPatterns...) - if req.Msg.RebuildAutomatically { - s.acceptsContextUpdates.Store(true) - defer s.acceptsContextUpdates.Store(false) - - if err := watchFiles(ctx, watcher, buildCtx, events); err != nil { - return err - } - } - // Initial build - if err := buildAndSend(ctx, stream, buildCtx, false, watcher.GetTransaction(buildCtx.Config.Dir)); err != nil { + if err := buildAndSend(ctx, stream, buildCtx, false); err != nil { return err } - if !req.Msg.RebuildAutomatically { - return nil - } - // Watch for changes and build as needed - for { - select { - case e := <-events: - var isAutomaticRebuild bool - buildCtx, isAutomaticRebuild = buildContextFromPendingEvents(ctx, buildCtx, events, e) - if isAutomaticRebuild { - err = stream.Send(&langpb.BuildEvent{ - Event: &langpb.BuildEvent_AutoRebuildStarted{ - AutoRebuildStarted: &langpb.AutoRebuildStarted{ - ContextId: buildCtx.ID, - }, - }, - }) - if err != nil { - return fmt.Errorf("could not send auto rebuild started event: %w", err) - } - } - if err = buildAndSend(ctx, stream, buildCtx, isAutomaticRebuild, watcher.GetTransaction(buildCtx.Config.Dir)); err != nil { - return err - } - case <-ctx.Done(): - log.FromContext(ctx).Infof("Build call ending - ctx cancelled") - return nil - } - } + return nil } func (s *Service) handleDevModeRequest(ctx context.Context, req *connect.Request[langpb.BuildRequest], stream *connect.ServerStream[langpb.BuildEvent]) error { logger := log.FromContext(ctx) + // cancel context when stream ends so that watcher can be stopped + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + events := make(chan buildContextUpdatedEvent, 32) + s.updatesTopic.Subscribe(events) + defer s.updatesTopic.Unsubscribe(events) first := true buildCtx, err := buildContextFromProto(req.Msg.BuildContext) if err != nil { @@ -263,7 +210,6 @@ func (s *Service) handleDevModeRequest(ctx context.Context, req *connect.Request errorHash := sha256.SHA256{} schemaHash := sha256.SHA256{} - ctx, cancel := context.WithCancel(context.Background()) ctx = log.ContextWithLogger(ctx, logger) bind := fmt.Sprintf("http://localhost:%d", address.Port) go func() { @@ -287,6 +233,8 @@ func (s *Service) handleDevModeRequest(ctx context.Context, req *connect.Request select { case <-ctx.Done(): return nil + case bc := <-events: + buildCtx = bc.buildCtx case <-schemaChangeTicker.C: select { // We only force a reload every second, but we check for schema changes every 100ms @@ -387,21 +335,13 @@ func (s *Service) handleDevModeRequest(ctx context.Context, req *connect.Request } } -func build(ctx context.Context, bctx buildContext, autoRebuild bool, transaction watch.ModifyFilesTransaction) (*langpb.BuildEvent, error) { +func build(ctx context.Context, bctx buildContext, autoRebuild bool) (*langpb.BuildEvent, error) { logger := log.FromContext(ctx) release, err := flock.Acquire(ctx, bctx.Config.BuildLock, BuildLockTimeout) if err != nil { return nil, fmt.Errorf("could not acquire build lock: %w", err) } defer release() //nolint:errcheck - if err := transaction.Begin(); err != nil { - return nil, fmt.Errorf("could not start a file transaction: %w", err) - } - defer func() { - if terr := transaction.End(); terr != nil { - logger.Errorf(terr, "failed to end file transaction") - } - }() deps, err := extractDependencies(bctx.Config.Module, bctx.Config.Dir) if err != nil { @@ -426,7 +366,7 @@ func build(ctx context.Context, bctx buildContext, autoRebuild bool, transaction return nil, fmt.Errorf("failed to build module %q: %w", config.Module, err) } if javaConfig.BuildTool == JavaBuildToolMaven { - if err := setPOMProperties(ctx, config.Dir, transaction); err != nil { + if err := setPOMProperties(ctx, config.Dir); err != nil { // This is not a critical error, things will probably work fine // TBH updating the pom is maybe not the best idea anyway logger.Warnf("unable to update ftl.version in %s: %s", config.Dir, err.Error()) @@ -512,84 +452,12 @@ func (s *Service) BuildContextUpdated(ctx context.Context, req *connect.Request[ return connect.NewResponse(&langpb.BuildContextUpdatedResponse{}), nil } -func watchFiles(ctx context.Context, watcher *watch.Watcher, buildCtx buildContext, events chan updateEvent) error { - watchTopic, err := watcher.Watch(ctx, time.Second, []string{buildCtx.Config.Dir}) - if err != nil { - return fmt.Errorf("could not watch for file changes: %w", err) - } - log.FromContext(ctx).Debugf("Watching for file changes: %s", buildCtx.Config.Dir) - watchEvents := make(chan watch.WatchEvent, 32) - watchTopic.Subscribe(watchEvents) - - // We need watcher to calculate file hashes before we do initial build so we can detect changes - select { - case e := <-watchEvents: - _, ok := e.(watch.WatchEventModuleAdded) - if !ok { - return fmt.Errorf("expected module added event, got: %T", e) - } - case <-time.After(3 * time.Second): - return fmt.Errorf("expected module added event, got no event") - case <-ctx.Done(): - return fmt.Errorf("context done: %w", ctx.Err()) - } - - go func() { - for { - select { - case e := <-watchEvents: - if change, ok := e.(watch.WatchEventModuleChanged); ok { - log.FromContext(ctx).Infof("Found file changes: %s", change) - events <- filesUpdatedEvent{} - } - - case <-ctx.Done(): - return - } - } - }() - return nil -} - -// buildContextFromPendingEvents processes all pending events to determine the latest context and whether the build is automatic. -func buildContextFromPendingEvents(ctx context.Context, buildCtx buildContext, events chan updateEvent, firstEvent updateEvent) (newBuildCtx buildContext, isAutomaticRebuild bool) { - allEvents := []updateEvent{firstEvent} - // find any other events in the queue - for { - select { - case e := <-events: - allEvents = append(allEvents, e) - case <-ctx.Done(): - return buildCtx, false - default: - // No more events waiting to be processed - hasExplicitBuilt := false - for _, e := range allEvents { - switch e := e.(type) { - case buildContextUpdatedEvent: - buildCtx = e.buildCtx - hasExplicitBuilt = true - case filesUpdatedEvent: - } - - } - switch e := firstEvent.(type) { - case buildContextUpdatedEvent: - buildCtx = e.buildCtx - hasExplicitBuilt = true - case filesUpdatedEvent: - } - return buildCtx, !hasExplicitBuilt - } - } -} - // buildAndSend builds the module and sends the build event to the stream. // // Build errors are sent over the stream as a BuildFailure event. // This function only returns an error if events could not be send over the stream. -func buildAndSend(ctx context.Context, stream *connect.ServerStream[langpb.BuildEvent], buildCtx buildContext, isAutomaticRebuild bool, transaction watch.ModifyFilesTransaction) error { - buildEvent, err := build(ctx, buildCtx, isAutomaticRebuild, transaction) +func buildAndSend(ctx context.Context, stream *connect.ServerStream[langpb.BuildEvent], buildCtx buildContext, isAutomaticRebuild bool) error { + buildEvent, err := build(ctx, buildCtx, isAutomaticRebuild) if err != nil { buildEvent = buildFailure(buildCtx, isAutomaticRebuild, builderrors.Error{ Type: builderrors.FTL, @@ -617,18 +485,6 @@ func buildFailure(buildCtx buildContext, isAutomaticRebuild bool, errs ...builde } } -func relativeWatchPatterns(moduleDir string, watchPaths []string) ([]string, error) { - relativePaths := make([]string, len(watchPaths)) - for i, path := range watchPaths { - relative, err := filepath.Rel(moduleDir, path) - if err != nil { - return nil, fmt.Errorf("could create relative path for watch pattern: %w", err) - } - relativePaths[i] = relative - } - return relativePaths, nil -} - const JavaBuildToolMaven string = "maven" const JavaBuildToolGradle string = "gradle" @@ -648,9 +504,7 @@ func loadJavaConfig(languageConfig any, language string) (JavaConfig, error) { func (s *Service) ModuleConfigDefaults(ctx context.Context, req *connect.Request[langpb.ModuleConfigDefaultsRequest]) (*connect.Response[langpb.ModuleConfigDefaultsResponse], error) { defaults := langpb.ModuleConfigDefaultsResponse{ GeneratedSchemaDir: ptr("src/main/ftl-module-schema"), - // Watch defaults to files related to maven and gradle - Watch: []string{"pom.xml", "src/**", "build/generated", "target/generated-sources"}, - LanguageConfig: &structpb.Struct{Fields: map[string]*structpb.Value{}}, + LanguageConfig: &structpb.Struct{Fields: map[string]*structpb.Value{}}, } dir := req.Msg.Dir pom := filepath.Join(dir, "pom.xml") @@ -776,7 +630,7 @@ func extractKotlinFTLImports(self, dir string) ([]string, error) { // setPOMProperties updates the ftl.version properties in the // pom.xml file in the given base directory. -func setPOMProperties(ctx context.Context, baseDir string, transaction watch.ModifyFilesTransaction) error { +func setPOMProperties(ctx context.Context, baseDir string) error { logger := log.FromContext(ctx) ftlVersion := ftl.Version // If we are running in dev mode, ftl.Version will be "dev" @@ -821,7 +675,6 @@ func setPOMProperties(ctx context.Context, baseDir string, transaction watch.Mod if err != nil { return fmt.Errorf("unable to write %s: %w", pomFile, err) } - err = transaction.ModifiedFiles(pomFile) if err != nil { return fmt.Errorf("could not mark %s as modified: %w", pomFile, err) }