Skip to content

Commit

Permalink
refactor: switch buildengine to schemaeventsource
Browse files Browse the repository at this point in the history
  • Loading branch information
alecthomas committed Nov 27, 2024
1 parent deb9b75 commit 5afa7b9
Show file tree
Hide file tree
Showing 7 changed files with 136 additions and 79 deletions.
5 changes: 3 additions & 2 deletions frontend/cli/cmd_build.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/ftlv1connect"
"github.com/TBD54566975/ftl/internal/buildengine"
"github.com/TBD54566975/ftl/internal/projectconfig"
"github.com/TBD54566975/ftl/internal/schema/schemaeventsource"
)

type buildCmd struct {
Expand All @@ -19,7 +20,7 @@ type buildCmd struct {
func (b *buildCmd) Run(
ctx context.Context,
controllerClient ftlv1connect.ControllerServiceClient,
schemaClient ftlv1connect.SchemaServiceClient,
schemaSourceFactory func() schemaeventsource.EventSource,
projConfig projectconfig.Config,
) error {
if len(b.Dirs) == 0 {
Expand All @@ -35,7 +36,7 @@ func (b *buildCmd) Run(
engine, err := buildengine.New(
ctx,
controllerClient,
schemaClient,
schemaSourceFactory(),
projConfig,
b.Dirs,
buildengine.BuildEnv(b.BuildEnv),
Expand Down
6 changes: 3 additions & 3 deletions frontend/cli/cmd_deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ import (
"fmt"
"time"

"github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/ftlv1connect"
"github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1beta1/provisioner/provisionerconnect"
"github.com/TBD54566975/ftl/internal/buildengine"
"github.com/TBD54566975/ftl/internal/projectconfig"
"github.com/TBD54566975/ftl/internal/schema/schemaeventsource"
)

type deployCmd struct {
Expand All @@ -22,7 +22,7 @@ func (d *deployCmd) Run(
ctx context.Context,
projConfig projectconfig.Config,
provisionerClient provisionerconnect.ProvisionerServiceClient,
schemaClient ftlv1connect.SchemaServiceClient,
schemaSourceFactory func() schemaeventsource.EventSource,
) error {
// Cancel build engine context to ensure all language plugins are killed.
var cancel context.CancelFunc
Expand All @@ -34,7 +34,7 @@ func (d *deployCmd) Run(
}
defer cancel()
engine, err := buildengine.New(
ctx, provisionerClient, schemaClient, projConfig, d.Build.Dirs,
ctx, provisionerClient, schemaSourceFactory(), projConfig, d.Build.Dirs,
buildengine.BuildEnv(d.Build.BuildEnv),
buildengine.Parallelism(d.Build.Parallelism),
)
Expand Down
2 changes: 1 addition & 1 deletion frontend/cli/cmd_dev.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func (d *devCmd) Run(
})
}

engine, err := buildengine.New(ctx, client, schemaClient, projConfig, d.Build.Dirs, opts...)
engine, err := buildengine.New(ctx, client, schemaEventSourceFactory(), projConfig, d.Build.Dirs, opts...)
if err != nil {
return err
}
Expand Down
128 changes: 65 additions & 63 deletions internal/buildengine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,31 +10,24 @@ import (
"strings"
"time"

"connectrpc.com/connect"
"github.com/alecthomas/types/optional"
"github.com/alecthomas/types/pubsub"
"github.com/jpillora/backoff"
"github.com/puzpuzpuz/xsync/v3"
"golang.org/x/exp/maps"
"golang.org/x/sync/errgroup"

"github.com/TBD54566975/ftl/backend/controller/scaling"
ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1"
"github.com/TBD54566975/ftl/internal/buildengine/languageplugin"
"github.com/TBD54566975/ftl/internal/log"
"github.com/TBD54566975/ftl/internal/moduleconfig"
"github.com/TBD54566975/ftl/internal/projectconfig"
"github.com/TBD54566975/ftl/internal/rpc"
"github.com/TBD54566975/ftl/internal/schema"
"github.com/TBD54566975/ftl/internal/schema/schemaeventsource"
"github.com/TBD54566975/ftl/internal/slices"
"github.com/TBD54566975/ftl/internal/watch"
)

type schemaChange struct {
ChangeType ftlv1.DeploymentChangeType
*schema.Module
}

// moduleMeta is a wrapper around a module that includes the last build's start time.
type moduleMeta struct {
module Module
Expand Down Expand Up @@ -176,13 +169,13 @@ type rebuildRequest struct {
// Engine for building a set of modules.
type Engine struct {
client DeployClient
schemaClient SchemaClient
schemaSource schemaeventsource.EventSource
moduleMetas *xsync.MapOf[string, moduleMeta]
projectConfig projectconfig.Config
moduleDirs []string
watcher *watch.Watcher // only watches for module toml changes
controllerSchema *xsync.MapOf[string, *schema.Module]
schemaChanges *pubsub.Topic[schemaChange]
schemaChanges *pubsub.Topic[schemaeventsource.Event]
cancel func()
parallelism int
modulesToBuild *xsync.MapOf[string, bool]
Expand Down Expand Up @@ -244,21 +237,21 @@ func WithStartTime(startTime time.Time) Option {
func New(
ctx context.Context,
client DeployClient,
schemaClient SchemaClient,
schemaSource schemaeventsource.EventSource,
projectConfig projectconfig.Config,
moduleDirs []string,
options ...Option,
) (*Engine, error) {
ctx = rpc.ContextWithClient(ctx, client)
e := &Engine{
client: client,
schemaClient: schemaClient,
schemaSource: schemaSource,
projectConfig: projectConfig,
moduleDirs: moduleDirs,
moduleMetas: xsync.NewMapOf[string, moduleMeta](),
watcher: watch.NewWatcher("ftl.toml"),
controllerSchema: xsync.NewMapOf[string, *schema.Module](),
schemaChanges: pubsub.New[schemaChange](),
schemaChanges: pubsub.New[schemaeventsource.Event](),
pluginEvents: make(chan languageplugin.PluginEvent, 128),
parallelism: runtime.NumCPU(),
modulesToBuild: xsync.NewMapOf[string, bool](),
Expand Down Expand Up @@ -311,47 +304,54 @@ func New(
if client == nil {
return e, nil
}
schemaSync := e.startSchemaSync(ctx)
go rpc.RetryStreamingServerStream(ctx, "build-engine", backoff.Backoff{Max: time.Second}, &ftlv1.PullSchemaRequest{}, schemaClient.PullSchema, schemaSync, rpc.AlwaysRetry())
e.startSchemaSync(ctx)
return e, nil
}

// Sync module schema changes from the FTL controller, as well as from manual
// updates, and merge them into a single schema map.
func (e *Engine) startSchemaSync(ctx context.Context) func(ctx context.Context, msg *ftlv1.PullSchemaResponse) error {
func (e *Engine) startSchemaSync(ctx context.Context) {
logger := log.FromContext(ctx)
// Blocking schema sync from the controller.
psch, err := e.schemaClient.GetSchema(ctx, connect.NewRequest(&ftlv1.GetSchemaRequest{}))
if err == nil {
sch, err := schema.FromProto(psch.Msg.Schema)
if err == nil {
for _, module := range sch.Modules {
e.controllerSchema.Store(module.Name, module)
if !e.schemaSource.Live() {
logger.Debugf("Schema source is not live, skipping initial sync.")
} else {
initialSync:
for {
select {
case <-ctx.Done():
return

case event := <-e.schemaSource.Events():
e.processEvent(event)
if !event.More() {
break initialSync
}
}
} else {
logger.Debugf("Failed to parse schema from controller: %s", err)
}
} else {
logger.Debugf("Failed to get schema from controller: %s", err)
}

// Sync module schema changes from the controller into the schema event source.
return func(ctx context.Context, msg *ftlv1.PullSchemaResponse) error {
switch msg.ChangeType {
case ftlv1.DeploymentChangeType_DEPLOYMENT_ADDED, ftlv1.DeploymentChangeType_DEPLOYMENT_CHANGED:
sch, err := schema.ModuleFromProto(msg.Schema)
if err != nil {
return err
}
e.controllerSchema.Store(sch.Name, sch)
e.schemaChanges.Publish(schemaChange{ChangeType: msg.ChangeType, Module: sch})
go func() {
for {
select {
case <-ctx.Done():
return

case ftlv1.DeploymentChangeType_DEPLOYMENT_REMOVED:
e.controllerSchema.Delete(msg.ModuleName)
e.schemaChanges.Publish(schemaChange{ChangeType: msg.ChangeType, Module: nil})
case event := <-e.schemaSource.Events():
e.processEvent(event)
}
}
return nil
}()
}

func (e *Engine) processEvent(event schemaeventsource.Event) {
switch event := event.(type) {
case schemaeventsource.EventUpsert:
e.controllerSchema.Store(event.Module.Name, event.Module)

case schemaeventsource.EventRemove:
e.controllerSchema.Delete(event.Module.Name)
}
e.schemaChanges.Publish(event)
}

// Close stops the Engine's schema sync.
Expand Down Expand Up @@ -497,7 +497,7 @@ func (e *Engine) Dev(ctx context.Context, period time.Duration) error {
func (e *Engine) watchForModuleChanges(ctx context.Context, period time.Duration) error {
logger := log.FromContext(ctx)

schemaChanges := make(chan schemaChange, 128)
schemaChanges := make(chan schemaeventsource.Event, 128)
e.schemaChanges.Subscribe(schemaChanges)
defer func() {
e.schemaChanges.Unsubscribe(schemaChanges)
Expand Down Expand Up @@ -589,32 +589,34 @@ func (e *Engine) watchForModuleChanges(ctx context.Context, period time.Duration

_ = e.BuildAndDeploy(ctx, 1, true, event.Config.Module) //nolint:errcheck
}
case change := <-schemaChanges:
if change.ChangeType == ftlv1.DeploymentChangeType_DEPLOYMENT_REMOVED {
case event := <-schemaChanges:
switch event := event.(type) {
case schemaeventsource.EventRemove:
continue
}
existingHash, ok := moduleHashes[change.Name]
if !ok {
existingHash = []byte{}
}
case schemaeventsource.EventUpsert:
existingHash, ok := moduleHashes[event.Module.Name]
if !ok {
existingHash = []byte{}
}

hash, err := computeModuleHash(change.Module)
if err != nil {
logger.Errorf(err, "compute hash for %s failed", change.Name)
continue
}
hash, err := computeModuleHash(event.Module)
if err != nil {
logger.Errorf(err, "compute hash for %s failed", event.Module.Name)
continue
}

if bytes.Equal(hash, existingHash) {
logger.Tracef("schema for %s has not changed", change.Name)
continue
}
if bytes.Equal(hash, existingHash) {
logger.Tracef("schema for %s has not changed", event.Module.Name)
continue
}

moduleHashes[change.Name] = hash
moduleHashes[event.Module.Name] = hash

dependentModuleNames := e.getDependentModuleNames(change.Name)
if len(dependentModuleNames) > 0 {
logger.Infof("%s's schema changed; processing %s", change.Name, strings.Join(dependentModuleNames, ", "))
_ = e.BuildAndDeploy(ctx, 1, true, dependentModuleNames...) //nolint:errcheck
dependentModuleNames := e.getDependentModuleNames(event.Module.Name)
if len(dependentModuleNames) > 0 {
logger.Infof("%s's schema changed; processing %s", event.Module.Name, strings.Join(dependentModuleNames, ", "))
_ = e.BuildAndDeploy(ctx, 1, true, dependentModuleNames...) //nolint:errcheck
}
}

case event := <-e.rebuildRequests:
Expand Down
3 changes: 2 additions & 1 deletion internal/buildengine/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/TBD54566975/ftl/internal/log"
"github.com/TBD54566975/ftl/internal/projectconfig"
"github.com/TBD54566975/ftl/internal/schema"
"github.com/TBD54566975/ftl/internal/schema/schemaeventsource"
)

func TestGraph(t *testing.T) {
Expand All @@ -20,7 +21,7 @@ func TestGraph(t *testing.T) {
Path: filepath.Join(t.TempDir(), "ftl-project.toml"),
Name: "test",
}
engine, err := buildengine.New(ctx, nil, nil, projConfig, []string{"testdata/alpha", "testdata/other", "testdata/another"})
engine, err := buildengine.New(ctx, nil, schemaeventsource.NewUnattached(), projConfig, []string{"testdata/alpha", "testdata/other", "testdata/another"})
assert.NoError(t, err)

defer engine.Close()
Expand Down
Loading

0 comments on commit 5afa7b9

Please sign in to comment.