From 62e4399f2259295e8ed4e796d9ed18b1cdba2265 Mon Sep 17 00:00:00 2001 From: Dominik Richter Date: Mon, 2 Oct 2023 23:48:30 -0700 Subject: [PATCH] =?UTF-8?q?=E2=AD=90=20support=20ephemeral=20vs=20normal?= =?UTF-8?q?=20runtimes=20(#2050)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Ephemeral runtimes are used as throw-away runtimes, that come with their own plugin runner (and execution space). These runtimes can be thrown away after they are used, while other runtimes and plugins remain shared. This is particularly useful when scanning targets with the discover mode, where potentially many runtimes/plugins need to be instantiated on the fly and are only required for the lifetime of the scan. Signed-off-by: Dominik Richter --- explorer/scan/local_scanner.go | 3 +- providers/coordinator.go | 93 ++++++++++++++++++++++++---------- providers/mock.go | 2 +- providers/runtime.go | 52 +++++++++++-------- 4 files changed, 101 insertions(+), 49 deletions(-) diff --git a/explorer/scan/local_scanner.go b/explorer/scan/local_scanner.go index 259fbfeb6d..be4bfd58a7 100644 --- a/explorer/scan/local_scanner.go +++ b/explorer/scan/local_scanner.go @@ -235,10 +235,11 @@ func (s *LocalScanner) distributeJob(job *Job, ctx context.Context, upstream *up for i := range assetCandidates { candidate := assetCandidates[i] - runtime, err := providers.Coordinator.RuntimeFor(candidate.asset, candidate.runtime) + runtime, err := providers.Coordinator.EphemeralRuntimeFor(candidate.asset) if err != nil { return nil, false, err } + runtime.SetRecording(candidate.runtime.Recording) err = runtime.Connect(&plugin.ConnectReq{ Features: config.Features, diff --git a/providers/coordinator.go b/providers/coordinator.go index 9a1bb47b15..a6d6550cbd 100644 --- a/providers/coordinator.go +++ b/providers/coordinator.go @@ -24,13 +24,15 @@ import ( var BuiltinCoreID = coreconf.Config.ID var Coordinator = coordinator{ - Running: []*RunningProvider{}, - runtimes: map[string]*Runtime{}, + RunningByID: map[string]*RunningProvider{}, + RunningEphemeral: map[*RunningProvider]struct{}{}, + runtimes: map[string]*Runtime{}, } type coordinator struct { - Providers Providers - Running []*RunningProvider + Providers Providers + RunningByID map[string]*RunningProvider + RunningEphemeral map[*RunningProvider]struct{} unprocessedRuntimes []*Runtime runtimes map[string]*Runtime @@ -101,7 +103,7 @@ type UpdateProvidersConfig struct { RefreshInterval int } -func (c *coordinator) Start(id string, update UpdateProvidersConfig) (*RunningProvider, error) { +func (c *coordinator) Start(id string, isEphemeral bool, update UpdateProvidersConfig) (*RunningProvider, error) { if x, ok := builtinProviders[id]; ok { // We don't warn for core providers, which are the only providers // built into the binary (for now). @@ -189,7 +191,11 @@ func (c *coordinator) Start(id string, update UpdateProvidersConfig) (*RunningPr } c.mutex.Lock() - c.Running = append(c.Running, res) + if isEphemeral { + c.RunningEphemeral[res] = struct{}{} + } else { + c.RunningByID[res.ID] = res + } c.mutex.Unlock() return res, nil @@ -265,6 +271,10 @@ func (c *coordinator) tryProviderUpdate(provider *Provider, update UpdateProvide } func (c *coordinator) NewRuntime() *Runtime { + return c.newRuntime(false) +} + +func (c *coordinator) newRuntime(isEphemeral bool) *Runtime { res := &Runtime{ coordinator: c, providers: map[string]*ConnectedProvider{}, @@ -276,19 +286,21 @@ func (c *coordinator) NewRuntime() *Runtime { }, Recording: NullRecording{}, shutdownTimeout: defaultShutdownTimeout, + isEphemeral: isEphemeral, } res.schema.runtime = res // TODO: do this dynamically in the future res.schema.loadAllSchemas() - c.mutex.Lock() - c.unprocessedRuntimes = append(c.unprocessedRuntimes, res) - c.runtimeCnt++ - cnt := c.runtimeCnt - c.mutex.Unlock() - - log.Debug().Msg("Started a new runtime (" + strconv.Itoa(cnt) + " total)") + if !isEphemeral { + c.mutex.Lock() + c.unprocessedRuntimes = append(c.unprocessedRuntimes, res) + c.runtimeCnt++ + cnt := c.runtimeCnt + c.mutex.Unlock() + log.Debug().Msg("Started a new runtime (" + strconv.Itoa(cnt) + " total)") + } return res } @@ -320,6 +332,18 @@ func (c *coordinator) RuntimeFor(asset *inventory.Asset, parent *Runtime) (*Runt return res, res.DetectProvider(asset) } +// EphemeralRuntimeFor an asset, creates a new ephemeral runtime and connectors. +// These are designed to be thrown away at the end of their use. +// Note: at the time of writing they may still share auxiliary providers with +// other runtimes, e.g. if provider X spawns another provider Y, the latter +// may be a shared provider. The majority of memory load should be on the +// primary provider (eg X in this case) so that it can effectively clear +// its memory at the end of its use. +func (c *coordinator) EphemeralRuntimeFor(asset *inventory.Asset) (*Runtime, error) { + res := c.newRuntime(true) + return res, res.DetectProvider(asset) +} + // Only call this with a mutex lock around it! func (c *coordinator) unsafeRefreshRuntimes() { var remaining []*Runtime @@ -361,31 +385,46 @@ func (c *coordinator) unsafeSetAssetRuntime(asset *inventory.Asset, runtime *Run return found } -func (c *coordinator) Close(p *RunningProvider) { - if !p.isClosed { - p.isClosed = true - if p.Client != nil { - p.Client.Kill() - } +func (c *coordinator) Stop(provider *RunningProvider, isEphemeral bool) error { + if provider == nil { + return nil } - c.mutex.Lock() - for i := range c.Running { - if c.Running[i] == p { - c.Running = append(c.Running[0:i], c.Running[i+1:]...) - break + defer c.mutex.Unlock() + + if isEphemeral { + delete(c.RunningEphemeral, provider) + } else { + found := c.RunningByID[provider.ID] + if found != nil { + delete(c.RunningByID, provider.ID) } } - c.mutex.Unlock() + + return provider.Shutdown() } func (c *coordinator) Shutdown() { c.mutex.Lock() - for i := range c.Running { - cur := c.Running[i] + + for cur := range c.RunningEphemeral { + if err := cur.Shutdown(); err != nil { + log.Warn().Err(err).Str("provider", cur.Name).Msg("failed to shut down provider") + } cur.isClosed = true cur.Client.Kill() } + c.RunningEphemeral = map[*RunningProvider]struct{}{} + + for _, runtime := range c.RunningByID { + if err := runtime.Shutdown(); err != nil { + log.Warn().Err(err).Str("provider", runtime.Name).Msg("failed to shut down provider") + } + runtime.isClosed = true + runtime.Client.Kill() + } + c.RunningByID = map[string]*RunningProvider{} + c.mutex.Unlock() } diff --git a/providers/mock.go b/providers/mock.go index 55d56879e7..500ca24b9b 100644 --- a/providers/mock.go +++ b/providers/mock.go @@ -63,7 +63,7 @@ func (s *mockProviderService) Connect(req *plugin.ConnectReq, callback plugin.Pr for i := range asset.Connections { conf := asset.Connections[i] - provider, err := s.runtime.addProvider(conf.ProviderID) + provider, err := s.runtime.addProvider(conf.ProviderID, false) if err != nil { return nil, errors.Wrap(err, "failed to init provider for connection in recording") } diff --git a/providers/runtime.go b/providers/runtime.go index d9bd8a8cbe..3c839a9ab1 100644 --- a/providers/runtime.go +++ b/providers/runtime.go @@ -37,6 +37,7 @@ type Runtime struct { // schema aggregates all resources executable on this asset schema extensibleSchema isClosed bool + isEphemeral bool close sync.Once shutdownTimeout time.Duration } @@ -58,14 +59,14 @@ type shutdownResult struct { } func (r *Runtime) tryShutdown() shutdownResult { - var errs multierr.Errors - for _, provider := range r.providers { - errs.Add(provider.Instance.Shutdown()) + // Ephemeral runtimes have their primary provider be ephemeral, i.e. non-shared. + // All other providers are shared and will not be shut down from within the provider. + if r.isEphemeral { + err := r.coordinator.Stop(r.Provider.Instance, true) + return shutdownResult{Error: err} } - return shutdownResult{ - Error: errs.Deduplicate(), - } + return shutdownResult{} } func (r *Runtime) Close() { @@ -107,7 +108,14 @@ func (r *Runtime) AssetMRN() string { // UseProvider sets the main provider for this runtime. func (r *Runtime) UseProvider(id string) error { - res, err := r.addProvider(id) + // We transfer isEphemeral here because: + // 1. If the runtime is not ephemeral, it behaves as a shared provider by + // default. + // 2. If the runtime is ephemeral, we only want the main provider to be + // ephemeral. All other providers by default are shared. + // (Note: In the future we plan to have an isolated runtime mode, + // where even other providers are ephemeral, ie not shared) + res, err := r.addProvider(id, r.isEphemeral) if err != nil { return err } @@ -121,21 +129,25 @@ func (r *Runtime) AddConnectedProvider(c *ConnectedProvider) { r.schema.Add(c.Instance.Name, c.Instance.Schema) } -func (r *Runtime) addProvider(id string) (*ConnectedProvider, error) { +func (r *Runtime) addProvider(id string, isEphemeral bool) (*ConnectedProvider, error) { var running *RunningProvider - for _, p := range r.coordinator.Running { - if p.ID == id { - running = p - break - } - } - - if running == nil { - var err error - running, err = r.coordinator.Start(id, r.AutoUpdate) + var err error + if isEphemeral { + running, err = r.coordinator.Start(id, true, r.AutoUpdate) if err != nil { return nil, err } + + } else { + // TODO: we need to detect only the shared running providers + running = r.coordinator.RunningByID[id] + if running == nil { + var err error + running, err = r.coordinator.Start(id, false, r.AutoUpdate) + if err != nil { + return nil, err + } + } } res := &ConnectedProvider{Instance: running} @@ -525,7 +537,7 @@ func (r *Runtime) lookupResourceProvider(resource string) (*ConnectedProvider, * return provider, info, nil } - res, err := r.addProvider(info.Provider) + res, err := r.addProvider(info.Provider, false) if err != nil { return nil, nil, multierr.Wrap(err, "failed to start provider '"+info.Provider+"'") } @@ -556,7 +568,7 @@ func (r *Runtime) lookupFieldProvider(resource string, field string) (*Connected return provider, resourceInfo, fieldInfo, nil } - res, err := r.addProvider(fieldInfo.Provider) + res, err := r.addProvider(fieldInfo.Provider, false) if err != nil { return nil, nil, nil, multierr.Wrap(err, "failed to start provider '"+fieldInfo.Provider+"'") }