Skip to content

Commit

Permalink
⭐ support ephemeral vs normal runtimes (#2050)
Browse files Browse the repository at this point in the history
Ephemeral runtimes are used as throw-away runtimes, that come with their
own plugin runner (and execution space). These runtimes can be thrown
away after they are used, while other runtimes and plugins remain
shared.

This is particularly useful when scanning targets with the discover
mode, where potentially many runtimes/plugins need to be instantiated on
the fly and are only required for the lifetime of the scan.

Signed-off-by: Dominik Richter <[email protected]>
  • Loading branch information
arlimus authored Oct 3, 2023
1 parent a4118a5 commit 62e4399
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 49 deletions.
3 changes: 2 additions & 1 deletion explorer/scan/local_scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,10 +235,11 @@ func (s *LocalScanner) distributeJob(job *Job, ctx context.Context, upstream *up
for i := range assetCandidates {
candidate := assetCandidates[i]

runtime, err := providers.Coordinator.RuntimeFor(candidate.asset, candidate.runtime)
runtime, err := providers.Coordinator.EphemeralRuntimeFor(candidate.asset)
if err != nil {
return nil, false, err
}
runtime.SetRecording(candidate.runtime.Recording)

err = runtime.Connect(&plugin.ConnectReq{
Features: config.Features,
Expand Down
93 changes: 66 additions & 27 deletions providers/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,15 @@ import (
var BuiltinCoreID = coreconf.Config.ID

var Coordinator = coordinator{
Running: []*RunningProvider{},
runtimes: map[string]*Runtime{},
RunningByID: map[string]*RunningProvider{},
RunningEphemeral: map[*RunningProvider]struct{}{},
runtimes: map[string]*Runtime{},
}

type coordinator struct {
Providers Providers
Running []*RunningProvider
Providers Providers
RunningByID map[string]*RunningProvider
RunningEphemeral map[*RunningProvider]struct{}

unprocessedRuntimes []*Runtime
runtimes map[string]*Runtime
Expand Down Expand Up @@ -101,7 +103,7 @@ type UpdateProvidersConfig struct {
RefreshInterval int
}

func (c *coordinator) Start(id string, update UpdateProvidersConfig) (*RunningProvider, error) {
func (c *coordinator) Start(id string, isEphemeral bool, update UpdateProvidersConfig) (*RunningProvider, error) {
if x, ok := builtinProviders[id]; ok {
// We don't warn for core providers, which are the only providers
// built into the binary (for now).
Expand Down Expand Up @@ -189,7 +191,11 @@ func (c *coordinator) Start(id string, update UpdateProvidersConfig) (*RunningPr
}

c.mutex.Lock()
c.Running = append(c.Running, res)
if isEphemeral {
c.RunningEphemeral[res] = struct{}{}
} else {
c.RunningByID[res.ID] = res
}
c.mutex.Unlock()

return res, nil
Expand Down Expand Up @@ -265,6 +271,10 @@ func (c *coordinator) tryProviderUpdate(provider *Provider, update UpdateProvide
}

func (c *coordinator) NewRuntime() *Runtime {
return c.newRuntime(false)
}

func (c *coordinator) newRuntime(isEphemeral bool) *Runtime {
res := &Runtime{
coordinator: c,
providers: map[string]*ConnectedProvider{},
Expand All @@ -276,19 +286,21 @@ func (c *coordinator) NewRuntime() *Runtime {
},
Recording: NullRecording{},
shutdownTimeout: defaultShutdownTimeout,
isEphemeral: isEphemeral,
}
res.schema.runtime = res

// TODO: do this dynamically in the future
res.schema.loadAllSchemas()

c.mutex.Lock()
c.unprocessedRuntimes = append(c.unprocessedRuntimes, res)
c.runtimeCnt++
cnt := c.runtimeCnt
c.mutex.Unlock()

log.Debug().Msg("Started a new runtime (" + strconv.Itoa(cnt) + " total)")
if !isEphemeral {
c.mutex.Lock()
c.unprocessedRuntimes = append(c.unprocessedRuntimes, res)
c.runtimeCnt++
cnt := c.runtimeCnt
c.mutex.Unlock()
log.Debug().Msg("Started a new runtime (" + strconv.Itoa(cnt) + " total)")
}

return res
}
Expand Down Expand Up @@ -320,6 +332,18 @@ func (c *coordinator) RuntimeFor(asset *inventory.Asset, parent *Runtime) (*Runt
return res, res.DetectProvider(asset)
}

// EphemeralRuntimeFor an asset, creates a new ephemeral runtime and connectors.
// These are designed to be thrown away at the end of their use.
// Note: at the time of writing they may still share auxiliary providers with
// other runtimes, e.g. if provider X spawns another provider Y, the latter
// may be a shared provider. The majority of memory load should be on the
// primary provider (eg X in this case) so that it can effectively clear
// its memory at the end of its use.
func (c *coordinator) EphemeralRuntimeFor(asset *inventory.Asset) (*Runtime, error) {
res := c.newRuntime(true)
return res, res.DetectProvider(asset)
}

// Only call this with a mutex lock around it!
func (c *coordinator) unsafeRefreshRuntimes() {
var remaining []*Runtime
Expand Down Expand Up @@ -361,31 +385,46 @@ func (c *coordinator) unsafeSetAssetRuntime(asset *inventory.Asset, runtime *Run
return found
}

func (c *coordinator) Close(p *RunningProvider) {
if !p.isClosed {
p.isClosed = true
if p.Client != nil {
p.Client.Kill()
}
func (c *coordinator) Stop(provider *RunningProvider, isEphemeral bool) error {
if provider == nil {
return nil
}

c.mutex.Lock()
for i := range c.Running {
if c.Running[i] == p {
c.Running = append(c.Running[0:i], c.Running[i+1:]...)
break
defer c.mutex.Unlock()

if isEphemeral {
delete(c.RunningEphemeral, provider)
} else {
found := c.RunningByID[provider.ID]
if found != nil {
delete(c.RunningByID, provider.ID)
}
}
c.mutex.Unlock()

return provider.Shutdown()
}

func (c *coordinator) Shutdown() {
c.mutex.Lock()
for i := range c.Running {
cur := c.Running[i]

for cur := range c.RunningEphemeral {
if err := cur.Shutdown(); err != nil {
log.Warn().Err(err).Str("provider", cur.Name).Msg("failed to shut down provider")
}
cur.isClosed = true
cur.Client.Kill()
}
c.RunningEphemeral = map[*RunningProvider]struct{}{}

for _, runtime := range c.RunningByID {
if err := runtime.Shutdown(); err != nil {
log.Warn().Err(err).Str("provider", runtime.Name).Msg("failed to shut down provider")
}
runtime.isClosed = true
runtime.Client.Kill()
}
c.RunningByID = map[string]*RunningProvider{}

c.mutex.Unlock()
}

Expand Down
2 changes: 1 addition & 1 deletion providers/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func (s *mockProviderService) Connect(req *plugin.ConnectReq, callback plugin.Pr
for i := range asset.Connections {
conf := asset.Connections[i]

provider, err := s.runtime.addProvider(conf.ProviderID)
provider, err := s.runtime.addProvider(conf.ProviderID, false)
if err != nil {
return nil, errors.Wrap(err, "failed to init provider for connection in recording")
}
Expand Down
52 changes: 32 additions & 20 deletions providers/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type Runtime struct {
// schema aggregates all resources executable on this asset
schema extensibleSchema
isClosed bool
isEphemeral bool
close sync.Once
shutdownTimeout time.Duration
}
Expand All @@ -58,14 +59,14 @@ type shutdownResult struct {
}

func (r *Runtime) tryShutdown() shutdownResult {
var errs multierr.Errors
for _, provider := range r.providers {
errs.Add(provider.Instance.Shutdown())
// Ephemeral runtimes have their primary provider be ephemeral, i.e. non-shared.
// All other providers are shared and will not be shut down from within the provider.
if r.isEphemeral {
err := r.coordinator.Stop(r.Provider.Instance, true)
return shutdownResult{Error: err}
}

return shutdownResult{
Error: errs.Deduplicate(),
}
return shutdownResult{}
}

func (r *Runtime) Close() {
Expand Down Expand Up @@ -107,7 +108,14 @@ func (r *Runtime) AssetMRN() string {

// UseProvider sets the main provider for this runtime.
func (r *Runtime) UseProvider(id string) error {
res, err := r.addProvider(id)
// We transfer isEphemeral here because:
// 1. If the runtime is not ephemeral, it behaves as a shared provider by
// default.
// 2. If the runtime is ephemeral, we only want the main provider to be
// ephemeral. All other providers by default are shared.
// (Note: In the future we plan to have an isolated runtime mode,
// where even other providers are ephemeral, ie not shared)
res, err := r.addProvider(id, r.isEphemeral)
if err != nil {
return err
}
Expand All @@ -121,21 +129,25 @@ func (r *Runtime) AddConnectedProvider(c *ConnectedProvider) {
r.schema.Add(c.Instance.Name, c.Instance.Schema)
}

func (r *Runtime) addProvider(id string) (*ConnectedProvider, error) {
func (r *Runtime) addProvider(id string, isEphemeral bool) (*ConnectedProvider, error) {
var running *RunningProvider
for _, p := range r.coordinator.Running {
if p.ID == id {
running = p
break
}
}

if running == nil {
var err error
running, err = r.coordinator.Start(id, r.AutoUpdate)
var err error
if isEphemeral {
running, err = r.coordinator.Start(id, true, r.AutoUpdate)
if err != nil {
return nil, err
}

} else {
// TODO: we need to detect only the shared running providers
running = r.coordinator.RunningByID[id]
if running == nil {
var err error
running, err = r.coordinator.Start(id, false, r.AutoUpdate)
if err != nil {
return nil, err
}
}
}

res := &ConnectedProvider{Instance: running}
Expand Down Expand Up @@ -525,7 +537,7 @@ func (r *Runtime) lookupResourceProvider(resource string) (*ConnectedProvider, *
return provider, info, nil
}

res, err := r.addProvider(info.Provider)
res, err := r.addProvider(info.Provider, false)
if err != nil {
return nil, nil, multierr.Wrap(err, "failed to start provider '"+info.Provider+"'")
}
Expand Down Expand Up @@ -556,7 +568,7 @@ func (r *Runtime) lookupFieldProvider(resource string, field string) (*Connected
return provider, resourceInfo, fieldInfo, nil
}

res, err := r.addProvider(fieldInfo.Provider)
res, err := r.addProvider(fieldInfo.Provider, false)
if err != nil {
return nil, nil, nil, multierr.Wrap(err, "failed to start provider '"+fieldInfo.Provider+"'")
}
Expand Down

0 comments on commit 62e4399

Please sign in to comment.