Skip to content

Commit

Permalink
fix: concurrent map writes panic in engine (#1286)
Browse files Browse the repository at this point in the history
  • Loading branch information
wesbillman authored Apr 17, 2024
1 parent cff05fe commit ec9eb1a
Showing 1 changed file with 39 additions and 24 deletions.
63 changes: 39 additions & 24 deletions buildengine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func (b BuildStartedListenerFunc) OnBuildStarted(project Project) { b(project) }
// Engine for building a set of modules.
type Engine struct {
client ftlv1connect.ControllerServiceClient
projectMetas map[ProjectKey]projectMeta
projectMetas *xsync.MapOf[ProjectKey, projectMeta]
moduleDirs []string
externalDirs []string
controllerSchema *xsync.MapOf[string, *schema.Module]
Expand Down Expand Up @@ -87,7 +87,7 @@ func New(ctx context.Context, client ftlv1connect.ControllerServiceClient, modul
client: client,
moduleDirs: moduleDirs,
externalDirs: externalDirs,
projectMetas: map[ProjectKey]projectMeta{},
projectMetas: xsync.NewMapOf[ProjectKey, projectMeta](),
controllerSchema: xsync.NewMapOf[string, *schema.Module](),
schemaChanges: pubsub.New[schemaChange](),
parallelism: runtime.NumCPU(),
Expand All @@ -109,7 +109,7 @@ func New(ctx context.Context, client ftlv1connect.ControllerServiceClient, modul
if err != nil {
return nil, err
}
e.projectMetas[project.Config().Key] = projectMeta{project: project}
e.projectMetas.Store(project.Config().Key, projectMeta{project: project})
e.projectsToBuild.Store(project.Config().Key, true)
}

Expand Down Expand Up @@ -172,7 +172,10 @@ func (e *Engine) Close() error {
func (e *Engine) Graph(projects ...ProjectKey) (map[string][]string, error) {
out := map[string][]string{}
if len(projects) == 0 {
projects = maps.Keys(e.projectMetas)
e.projectMetas.Range(func(key ProjectKey, _ projectMeta) bool {
projects = append(projects, key)
return true
})
}
for _, key := range projects {
if err := e.buildGraph(string(key), out); err != nil {
Expand All @@ -184,7 +187,7 @@ func (e *Engine) Graph(projects ...ProjectKey) (map[string][]string, error) {

func (e *Engine) buildGraph(key string, out map[string][]string) error {
var deps []string
if meta, ok := e.projectMetas[ProjectKey(key)]; ok {
if meta, ok := e.projectMetas.Load(ProjectKey(key)); ok {
deps = meta.project.Config().Dependencies
} else if sch, ok := e.controllerSchema.Load(key); ok {
deps = sch.Imports()
Expand Down Expand Up @@ -272,8 +275,8 @@ func (e *Engine) watchForModuleChanges(ctx context.Context, period time.Duration
switch event := event.(type) {
case WatchEventProjectAdded:
config := event.Project.Config()
if _, exists := e.projectMetas[config.Key]; !exists {
e.projectMetas[config.Key] = projectMeta{project: event.Project}
if _, exists := e.projectMetas.Load(config.Key); !exists {
e.projectMetas.Store(config.Key, projectMeta{project: event.Project})
err := e.buildAndDeploy(ctx, 1, true, config.Key)
if err != nil {
logger.Errorf(err, "deploy %s failed", config.Key)
Expand All @@ -287,13 +290,18 @@ func (e *Engine) watchForModuleChanges(ctx context.Context, period time.Duration
logger.Errorf(err, "terminate %s failed", module.Module)
}
}
delete(e.projectMetas, config.Key)
e.projectMetas.Delete(config.Key)
case WatchEventProjectChanged:
config := event.Project.Config()

lastBuildTime := e.projectMetas[config.Key].lastBuildStartTime
if event.Time.Before(lastBuildTime) {
logger.Warnf("Skipping build and deploy; event time %v is before the last build time %v", event.Time, lastBuildTime)
meta, ok := e.projectMetas.Load(config.Key)
if !ok {
logger.Warnf("project %q not found", config.Key)
continue
}

if event.Time.Before(meta.lastBuildStartTime) {
logger.Debugf("Skipping build and deploy; event time %v is before the last build time %v", event.Time, meta.lastBuildStartTime)
continue // Skip this event as it's outdated
}
err := e.buildAndDeploy(ctx, 1, true, config.Key)
Expand Down Expand Up @@ -349,20 +357,24 @@ func computeModuleHash(module *schema.Module) ([]byte, error) {

func (e *Engine) getDependentProjectKeys(name string) []ProjectKey {
dependentProjectKeys := map[ProjectKey]bool{}
for k, meta := range e.projectMetas {
e.projectMetas.Range(func(key ProjectKey, meta projectMeta) bool {
for _, dep := range meta.project.Config().Dependencies {
if dep == name {
dependentProjectKeys[k] = true
dependentProjectKeys[key] = true
}
}
}
return true
})
return maps.Keys(dependentProjectKeys)
}

func (e *Engine) buildAndDeploy(ctx context.Context, replicas int32, waitForDeployOnline bool, projects ...ProjectKey) error {
logger := log.FromContext(ctx)
if len(projects) == 0 {
projects = maps.Keys(e.projectMetas)
e.projectMetas.Range(func(key ProjectKey, meta projectMeta) bool {
projects = append(projects, key)
return true
})
}

buildGroup := errgroup.Group{}
Expand Down Expand Up @@ -412,10 +424,13 @@ type buildCallback func(ctx context.Context, project Project) error
func (e *Engine) buildWithCallback(ctx context.Context, callback buildCallback, projects ...ProjectKey) error {
mustBuild := map[ProjectKey]bool{}
if len(projects) == 0 {
projects = maps.Keys(e.projectMetas)
e.projectMetas.Range(func(key ProjectKey, meta projectMeta) bool {
projects = append(projects, key)
return true
})
}
for _, key := range projects {
meta, ok := e.projectMetas[key]
meta, ok := e.projectMetas.Load(key)
if !ok {
return fmt.Errorf("project %q not found", key)
}
Expand All @@ -425,7 +440,7 @@ func (e *Engine) buildWithCallback(ctx context.Context, callback buildCallback,
if err != nil {
return err
}
e.projectMetas[key] = projectMeta{project: project}
e.projectMetas.Store(key, projectMeta{project: project})
mustBuild[key] = true
}
graph, err := e.Graph(projects...)
Expand Down Expand Up @@ -488,7 +503,7 @@ func (e *Engine) tryBuild(ctx context.Context, mustBuild map[ProjectKey]bool, ke
return e.mustSchema(ctx, key, builtModules, schemas)
}

meta, ok := e.projectMetas[key]
meta, ok := e.projectMetas.Load(key)
if !ok {
return fmt.Errorf("project %q not found", key)
}
Expand All @@ -501,10 +516,10 @@ func (e *Engine) tryBuild(ctx context.Context, mustBuild map[ProjectKey]bool, ke
}

meta.lastBuildStartTime = time.Now()
e.projectMetas[key] = meta
e.projectMetas.Store(key, meta)
err := e.build(ctx, key, builtModules, schemas)
if err == nil && callback != nil {
return callback(ctx, e.projectMetas[key].project)
return callback(ctx, meta.project)
}

return err
Expand All @@ -523,7 +538,7 @@ func (e *Engine) mustSchema(ctx context.Context, key ProjectKey, builtModules ma
//
// Assumes that all dependencies have been built and are available in "built".
func (e *Engine) build(ctx context.Context, key ProjectKey, builtModules map[string]*schema.Module, schemas chan<- *schema.Module) error {
meta, ok := e.projectMetas[key]
meta, ok := e.projectMetas.Load(key)
if !ok {
return fmt.Errorf("project %q not found", key)
}
Expand Down Expand Up @@ -557,14 +572,14 @@ func (e *Engine) gatherSchemas(
project Project,
out map[string]*schema.Module,
) error {
latestModule, ok := e.projectMetas[project.Config().Key]
latestModule, ok := e.projectMetas.Load(project.Config().Key)
if !ok {
latestModule = projectMeta{project: project}
}
for _, dep := range latestModule.project.Config().Dependencies {
out[dep] = moduleSchemas[dep]
if dep != "builtin" {
depModule, ok := e.projectMetas[ProjectKey(dep)]
depModule, ok := e.projectMetas.Load(ProjectKey(dep))
// TODO: should we be gathering schemas from dependencies without a project?
// This can happen if the schema is loaded from the controller
if ok {
Expand Down

0 comments on commit ec9eb1a

Please sign in to comment.