Skip to content

Commit

Permalink
🧹 remove logic for ephemeral providers] (#3304)
Browse files Browse the repository at this point in the history
Signed-off-by: Ivan Milchev <[email protected]>
  • Loading branch information
imilchev authored Feb 14, 2024
1 parent 9179ae5 commit 538d609
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 86 deletions.
70 changes: 18 additions & 52 deletions providers/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,13 @@ import (
var BuiltinCoreID = coreconf.Config.ID

var Coordinator = coordinator{
RunningByID: map[string]*RunningProvider{},
RunningEphemeral: map[*RunningProvider]struct{}{},
runtimes: map[string]*Runtime{},
RunningByID: map[string]*RunningProvider{},
runtimes: map[string]*Runtime{},
}

type coordinator struct {
Providers Providers
RunningByID map[string]*RunningProvider
RunningEphemeral map[*RunningProvider]struct{}
Providers Providers
RunningByID map[string]*RunningProvider

unprocessedRuntimes []*Runtime
runtimes map[string]*Runtime
Expand Down Expand Up @@ -157,7 +155,7 @@ type UpdateProvidersConfig struct {
RefreshInterval int
}

func (c *coordinator) Start(id string, isEphemeral bool, update UpdateProvidersConfig) (*RunningProvider, error) {
func (c *coordinator) Start(id string, update UpdateProvidersConfig) (*RunningProvider, error) {
if x, ok := builtinProviders[id]; ok {
// We don't warn for core providers, which are the only providers
// built into the binary (for now).
Expand Down Expand Up @@ -252,11 +250,7 @@ func (c *coordinator) Start(id string, isEphemeral bool, update UpdateProvidersC
}

c.mutex.Lock()
if isEphemeral {
c.RunningEphemeral[res] = struct{}{}
} else {
c.RunningByID[res.ID] = res
}
c.RunningByID[res.ID] = res
c.mutex.Unlock()

return res, nil
Expand Down Expand Up @@ -332,17 +326,16 @@ func (c *coordinator) tryProviderUpdate(provider *Provider, update UpdateProvide
}

func (c *coordinator) NewRuntime() *Runtime {
return c.newRuntime(false)
return c.newRuntime()
}

func (c *coordinator) newRuntime(isEphemeral bool) *Runtime {
func (c *coordinator) newRuntime() *Runtime {
res := &Runtime{
coordinator: c,
providers: map[string]*ConnectedProvider{},
schema: newExtensibleSchema(),
recording: NullRecording{},
shutdownTimeout: defaultShutdownTimeout,
isEphemeral: isEphemeral,
}
res.schema.runtime = res

Expand All @@ -355,14 +348,12 @@ func (c *coordinator) newRuntime(isEphemeral bool) *Runtime {
// overkill.
res.schema.unsafeRefresh()

if !isEphemeral {
c.mutex.Lock()
c.unprocessedRuntimes = append(c.unprocessedRuntimes, res)
c.runtimeCnt++
cnt := c.runtimeCnt
c.mutex.Unlock()
log.Debug().Msg("Started a new runtime (" + strconv.Itoa(cnt) + " total)")
}
c.mutex.Lock()
c.unprocessedRuntimes = append(c.unprocessedRuntimes, res)
c.runtimeCnt++
cnt := c.runtimeCnt
c.mutex.Unlock()
log.Debug().Msg("Started a new runtime (" + strconv.Itoa(cnt) + " total)")

return res
}
Expand Down Expand Up @@ -394,18 +385,6 @@ func (c *coordinator) RuntimeFor(asset *inventory.Asset, parent *Runtime) (*Runt
return res, res.DetectProvider(asset)
}

// EphemeralRuntimeFor an asset, creates a new ephemeral runtime and connectors.
// These are designed to be thrown away at the end of their use.
// Note: at the time of writing they may still share auxiliary providers with
// other runtimes, e.g. if provider X spawns another provider Y, the latter
// may be a shared provider. The majority of memory load should be on the
// primary provider (eg X in this case) so that it can effectively clear
// its memory at the end of its use.
func (c *coordinator) EphemeralRuntimeFor(asset *inventory.Asset) (*Runtime, error) {
res := c.newRuntime(true)
return res, res.DetectProvider(asset)
}

// Only call this with a mutex lock around it!
func (c *coordinator) unsafeRefreshRuntimes() {
var remaining []*Runtime
Expand Down Expand Up @@ -447,20 +426,16 @@ func (c *coordinator) unsafeSetAssetRuntime(asset *inventory.Asset, runtime *Run
return found
}

func (c *coordinator) Stop(provider *RunningProvider, isEphemeral bool) error {
func (c *coordinator) Stop(provider *RunningProvider) error {
if provider == nil {
return nil
}
c.mutex.Lock()
defer c.mutex.Unlock()

if isEphemeral {
delete(c.RunningEphemeral, provider)
} else {
found := c.RunningByID[provider.ID]
if found != nil {
delete(c.RunningByID, provider.ID)
}
found := c.RunningByID[provider.ID]
if found != nil {
delete(c.RunningByID, provider.ID)
}

return provider.Shutdown()
Expand All @@ -469,15 +444,6 @@ func (c *coordinator) Stop(provider *RunningProvider, isEphemeral bool) error {
func (c *coordinator) Shutdown() {
c.mutex.Lock()

for cur := range c.RunningEphemeral {
if err := cur.Shutdown(); err != nil {
log.Warn().Err(err).Str("provider", cur.Name).Msg("failed to shut down provider")
}
cur.isClosed = true
cur.Client.Kill()
}
c.RunningEphemeral = map[*RunningProvider]struct{}{}

for _, runtime := range c.RunningByID {
if err := runtime.Shutdown(); err != nil {
log.Warn().Err(err).Str("provider", runtime.Name).Msg("failed to shut down provider")
Expand Down
2 changes: 1 addition & 1 deletion providers/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func (s *mockProviderService) Connect(req *plugin.ConnectReq, callback plugin.Pr
for i := range asset.Connections {
conf := asset.Connections[i]

provider, err := s.runtime.addProvider(conf.ProviderID, false)
provider, err := s.runtime.addProvider(conf.ProviderID)
if err != nil {
return nil, errors.Wrap(err, "failed to init provider for connection in recording")
}
Expand Down
43 changes: 10 additions & 33 deletions providers/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ type Runtime struct {
// schema aggregates all resources executable on this asset
schema extensibleSchema
isClosed bool
isEphemeral bool
close sync.Once
shutdownTimeout time.Duration
}
Expand Down Expand Up @@ -77,13 +76,6 @@ func (r *Runtime) tryShutdown() shutdownResult {
log.Error().Msg("failed to disconnect from provider " + provider.Instance.Name)
}
}

// Ephemeral runtimes have their primary provider be ephemeral, i.e. non-shared.
// All other providers are shared and will not be shut down from within the provider.
if r.isEphemeral {
err := r.coordinator.Stop(r.Provider.Instance, true)
return shutdownResult{Error: err}
}
return shutdownResult{}
}

Expand Down Expand Up @@ -132,14 +124,7 @@ func (r *Runtime) AssetMRN() string {

// UseProvider sets the main provider for this runtime.
func (r *Runtime) UseProvider(id string) error {
// We transfer isEphemeral here because:
// 1. If the runtime is not ephemeral, it behaves as a shared provider by
// default.
// 2. If the runtime is ephemeral, we only want the main provider to be
// ephemeral. All other providers by default are shared.
// (Note: In the future we plan to have an isolated runtime mode,
// where even other providers are ephemeral, ie not shared)
res, err := r.addProvider(id, r.isEphemeral)
res, err := r.addProvider(id)
if err != nil {
return err
}
Expand All @@ -153,25 +138,17 @@ func (r *Runtime) AddConnectedProvider(c *ConnectedProvider) {
r.schema.Add(c.Instance.Name, c.Instance.Schema)
}

func (r *Runtime) addProvider(id string, isEphemeral bool) (*ConnectedProvider, error) {
func (r *Runtime) addProvider(id string) (*ConnectedProvider, error) {
var running *RunningProvider
var err error
if isEphemeral {
running, err = r.coordinator.Start(id, true, r.AutoUpdate)

// TODO: we need to detect only the shared running providers
running = r.coordinator.RunningByID[id]
if running == nil {
var err error
running, err = r.coordinator.Start(id, r.AutoUpdate)
if err != nil {
return nil, err
}

} else {
// TODO: we need to detect only the shared running providers
running = r.coordinator.RunningByID[id]
if running == nil {
var err error
running, err = r.coordinator.Start(id, false, r.AutoUpdate)
if err != nil {
return nil, err
}
}
}

res := &ConnectedProvider{Instance: running}
Expand Down Expand Up @@ -617,7 +594,7 @@ func (r *Runtime) lookupResourceProvider(resource string) (*ConnectedProvider, *
return nil, nil, errors.New("incorrect provider for asset, not adding " + info.Provider)
}

res, err := r.addProvider(info.Provider, false)
res, err := r.addProvider(info.Provider)
if err != nil {
return nil, nil, multierr.Wrap(err, "failed to start provider '"+info.Provider+"'")
}
Expand Down Expand Up @@ -647,7 +624,7 @@ func (r *Runtime) lookupFieldProvider(resource string, field string) (*Connected
return provider, resourceInfo, fieldInfo, provider.ConnectionError
}

res, err := r.addProvider(fieldInfo.Provider, false)
res, err := r.addProvider(fieldInfo.Provider)
if err != nil {
return nil, nil, nil, multierr.Wrap(err, "failed to start provider '"+fieldInfo.Provider+"'")
}
Expand Down

0 comments on commit 538d609

Please sign in to comment.