diff --git a/explorer/scan/local_scanner.go b/explorer/scan/local_scanner.go index 259fbfeb6d..be4bfd58a7 100644 --- a/explorer/scan/local_scanner.go +++ b/explorer/scan/local_scanner.go @@ -235,10 +235,11 @@ func (s *LocalScanner) distributeJob(job *Job, ctx context.Context, upstream *up for i := range assetCandidates { candidate := assetCandidates[i] - runtime, err := providers.Coordinator.RuntimeFor(candidate.asset, candidate.runtime) + runtime, err := providers.Coordinator.EphemeralRuntimeFor(candidate.asset) if err != nil { return nil, false, err } + runtime.SetRecording(candidate.runtime.Recording) err = runtime.Connect(&plugin.ConnectReq{ Features: config.Features, diff --git a/providers/coordinator.go b/providers/coordinator.go index 9a1bb47b15..a6d6550cbd 100644 --- a/providers/coordinator.go +++ b/providers/coordinator.go @@ -24,13 +24,15 @@ import ( var BuiltinCoreID = coreconf.Config.ID var Coordinator = coordinator{ - Running: []*RunningProvider{}, - runtimes: map[string]*Runtime{}, + RunningByID: map[string]*RunningProvider{}, + RunningEphemeral: map[*RunningProvider]struct{}{}, + runtimes: map[string]*Runtime{}, } type coordinator struct { - Providers Providers - Running []*RunningProvider + Providers Providers + RunningByID map[string]*RunningProvider + RunningEphemeral map[*RunningProvider]struct{} unprocessedRuntimes []*Runtime runtimes map[string]*Runtime @@ -101,7 +103,7 @@ type UpdateProvidersConfig struct { RefreshInterval int } -func (c *coordinator) Start(id string, update UpdateProvidersConfig) (*RunningProvider, error) { +func (c *coordinator) Start(id string, isEphemeral bool, update UpdateProvidersConfig) (*RunningProvider, error) { if x, ok := builtinProviders[id]; ok { // We don't warn for core providers, which are the only providers // built into the binary (for now). @@ -189,7 +191,11 @@ func (c *coordinator) Start(id string, update UpdateProvidersConfig) (*RunningPr } c.mutex.Lock() - c.Running = append(c.Running, res) + if isEphemeral { + c.RunningEphemeral[res] = struct{}{} + } else { + c.RunningByID[res.ID] = res + } c.mutex.Unlock() return res, nil @@ -265,6 +271,10 @@ func (c *coordinator) tryProviderUpdate(provider *Provider, update UpdateProvide } func (c *coordinator) NewRuntime() *Runtime { + return c.newRuntime(false) +} + +func (c *coordinator) newRuntime(isEphemeral bool) *Runtime { res := &Runtime{ coordinator: c, providers: map[string]*ConnectedProvider{}, @@ -276,19 +286,21 @@ func (c *coordinator) NewRuntime() *Runtime { }, Recording: NullRecording{}, shutdownTimeout: defaultShutdownTimeout, + isEphemeral: isEphemeral, } res.schema.runtime = res // TODO: do this dynamically in the future res.schema.loadAllSchemas() - c.mutex.Lock() - c.unprocessedRuntimes = append(c.unprocessedRuntimes, res) - c.runtimeCnt++ - cnt := c.runtimeCnt - c.mutex.Unlock() - - log.Debug().Msg("Started a new runtime (" + strconv.Itoa(cnt) + " total)") + if !isEphemeral { + c.mutex.Lock() + c.unprocessedRuntimes = append(c.unprocessedRuntimes, res) + c.runtimeCnt++ + cnt := c.runtimeCnt + c.mutex.Unlock() + log.Debug().Msg("Started a new runtime (" + strconv.Itoa(cnt) + " total)") + } return res } @@ -320,6 +332,18 @@ func (c *coordinator) RuntimeFor(asset *inventory.Asset, parent *Runtime) (*Runt return res, res.DetectProvider(asset) } +// EphemeralRuntimeFor an asset, creates a new ephemeral runtime and connectors. +// These are designed to be thrown away at the end of their use. +// Note: at the time of writing they may still share auxiliary providers with +// other runtimes, e.g. if provider X spawns another provider Y, the latter +// may be a shared provider. The majority of memory load should be on the +// primary provider (eg X in this case) so that it can effectively clear +// its memory at the end of its use. +func (c *coordinator) EphemeralRuntimeFor(asset *inventory.Asset) (*Runtime, error) { + res := c.newRuntime(true) + return res, res.DetectProvider(asset) +} + // Only call this with a mutex lock around it! func (c *coordinator) unsafeRefreshRuntimes() { var remaining []*Runtime @@ -361,31 +385,46 @@ func (c *coordinator) unsafeSetAssetRuntime(asset *inventory.Asset, runtime *Run return found } -func (c *coordinator) Close(p *RunningProvider) { - if !p.isClosed { - p.isClosed = true - if p.Client != nil { - p.Client.Kill() - } +func (c *coordinator) Stop(provider *RunningProvider, isEphemeral bool) error { + if provider == nil { + return nil } - c.mutex.Lock() - for i := range c.Running { - if c.Running[i] == p { - c.Running = append(c.Running[0:i], c.Running[i+1:]...) - break + defer c.mutex.Unlock() + + if isEphemeral { + delete(c.RunningEphemeral, provider) + } else { + found := c.RunningByID[provider.ID] + if found != nil { + delete(c.RunningByID, provider.ID) } } - c.mutex.Unlock() + + return provider.Shutdown() } func (c *coordinator) Shutdown() { c.mutex.Lock() - for i := range c.Running { - cur := c.Running[i] + + for cur := range c.RunningEphemeral { + if err := cur.Shutdown(); err != nil { + log.Warn().Err(err).Str("provider", cur.Name).Msg("failed to shut down provider") + } cur.isClosed = true cur.Client.Kill() } + c.RunningEphemeral = map[*RunningProvider]struct{}{} + + for _, runtime := range c.RunningByID { + if err := runtime.Shutdown(); err != nil { + log.Warn().Err(err).Str("provider", runtime.Name).Msg("failed to shut down provider") + } + runtime.isClosed = true + runtime.Client.Kill() + } + c.RunningByID = map[string]*RunningProvider{} + c.mutex.Unlock() } diff --git a/providers/mock.go b/providers/mock.go index 55d56879e7..500ca24b9b 100644 --- a/providers/mock.go +++ b/providers/mock.go @@ -63,7 +63,7 @@ func (s *mockProviderService) Connect(req *plugin.ConnectReq, callback plugin.Pr for i := range asset.Connections { conf := asset.Connections[i] - provider, err := s.runtime.addProvider(conf.ProviderID) + provider, err := s.runtime.addProvider(conf.ProviderID, false) if err != nil { return nil, errors.Wrap(err, "failed to init provider for connection in recording") } diff --git a/providers/runtime.go b/providers/runtime.go index d9bd8a8cbe..3c839a9ab1 100644 --- a/providers/runtime.go +++ b/providers/runtime.go @@ -37,6 +37,7 @@ type Runtime struct { // schema aggregates all resources executable on this asset schema extensibleSchema isClosed bool + isEphemeral bool close sync.Once shutdownTimeout time.Duration } @@ -58,14 +59,14 @@ type shutdownResult struct { } func (r *Runtime) tryShutdown() shutdownResult { - var errs multierr.Errors - for _, provider := range r.providers { - errs.Add(provider.Instance.Shutdown()) + // Ephemeral runtimes have their primary provider be ephemeral, i.e. non-shared. + // All other providers are shared and will not be shut down from within the provider. + if r.isEphemeral { + err := r.coordinator.Stop(r.Provider.Instance, true) + return shutdownResult{Error: err} } - return shutdownResult{ - Error: errs.Deduplicate(), - } + return shutdownResult{} } func (r *Runtime) Close() { @@ -107,7 +108,14 @@ func (r *Runtime) AssetMRN() string { // UseProvider sets the main provider for this runtime. func (r *Runtime) UseProvider(id string) error { - res, err := r.addProvider(id) + // We transfer isEphemeral here because: + // 1. If the runtime is not ephemeral, it behaves as a shared provider by + // default. + // 2. If the runtime is ephemeral, we only want the main provider to be + // ephemeral. All other providers by default are shared. + // (Note: In the future we plan to have an isolated runtime mode, + // where even other providers are ephemeral, ie not shared) + res, err := r.addProvider(id, r.isEphemeral) if err != nil { return err } @@ -121,21 +129,25 @@ func (r *Runtime) AddConnectedProvider(c *ConnectedProvider) { r.schema.Add(c.Instance.Name, c.Instance.Schema) } -func (r *Runtime) addProvider(id string) (*ConnectedProvider, error) { +func (r *Runtime) addProvider(id string, isEphemeral bool) (*ConnectedProvider, error) { var running *RunningProvider - for _, p := range r.coordinator.Running { - if p.ID == id { - running = p - break - } - } - - if running == nil { - var err error - running, err = r.coordinator.Start(id, r.AutoUpdate) + var err error + if isEphemeral { + running, err = r.coordinator.Start(id, true, r.AutoUpdate) if err != nil { return nil, err } + + } else { + // TODO: we need to detect only the shared running providers + running = r.coordinator.RunningByID[id] + if running == nil { + var err error + running, err = r.coordinator.Start(id, false, r.AutoUpdate) + if err != nil { + return nil, err + } + } } res := &ConnectedProvider{Instance: running} @@ -525,7 +537,7 @@ func (r *Runtime) lookupResourceProvider(resource string) (*ConnectedProvider, * return provider, info, nil } - res, err := r.addProvider(info.Provider) + res, err := r.addProvider(info.Provider, false) if err != nil { return nil, nil, multierr.Wrap(err, "failed to start provider '"+info.Provider+"'") } @@ -556,7 +568,7 @@ func (r *Runtime) lookupFieldProvider(resource string, field string) (*Connected return provider, resourceInfo, fieldInfo, nil } - res, err := r.addProvider(fieldInfo.Provider) + res, err := r.addProvider(fieldInfo.Provider, false) if err != nil { return nil, nil, nil, multierr.Wrap(err, "failed to start provider '"+fieldInfo.Provider+"'") }