From 538d609f9f3e881fd0a6761692fbf5f64918612b Mon Sep 17 00:00:00 2001 From: Ivan Milchev Date: Wed, 14 Feb 2024 12:20:17 +0200 Subject: [PATCH] =?UTF-8?q?=F0=9F=A7=B9=20remove=20logic=20for=20ephemeral?= =?UTF-8?q?=20providers]=20(#3304)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Ivan Milchev --- providers/coordinator.go | 70 +++++++++++----------------------------- providers/mock.go | 2 +- providers/runtime.go | 43 ++++++------------------ 3 files changed, 29 insertions(+), 86 deletions(-) diff --git a/providers/coordinator.go b/providers/coordinator.go index fea0969bd9..63d7c31841 100644 --- a/providers/coordinator.go +++ b/providers/coordinator.go @@ -27,15 +27,13 @@ import ( var BuiltinCoreID = coreconf.Config.ID var Coordinator = coordinator{ - RunningByID: map[string]*RunningProvider{}, - RunningEphemeral: map[*RunningProvider]struct{}{}, - runtimes: map[string]*Runtime{}, + RunningByID: map[string]*RunningProvider{}, + runtimes: map[string]*Runtime{}, } type coordinator struct { - Providers Providers - RunningByID map[string]*RunningProvider - RunningEphemeral map[*RunningProvider]struct{} + Providers Providers + RunningByID map[string]*RunningProvider unprocessedRuntimes []*Runtime runtimes map[string]*Runtime @@ -157,7 +155,7 @@ type UpdateProvidersConfig struct { RefreshInterval int } -func (c *coordinator) Start(id string, isEphemeral bool, update UpdateProvidersConfig) (*RunningProvider, error) { +func (c *coordinator) Start(id string, update UpdateProvidersConfig) (*RunningProvider, error) { if x, ok := builtinProviders[id]; ok { // We don't warn for core providers, which are the only providers // built into the binary (for now). @@ -252,11 +250,7 @@ func (c *coordinator) Start(id string, isEphemeral bool, update UpdateProvidersC } c.mutex.Lock() - if isEphemeral { - c.RunningEphemeral[res] = struct{}{} - } else { - c.RunningByID[res.ID] = res - } + c.RunningByID[res.ID] = res c.mutex.Unlock() return res, nil @@ -332,17 +326,16 @@ func (c *coordinator) tryProviderUpdate(provider *Provider, update UpdateProvide } func (c *coordinator) NewRuntime() *Runtime { - return c.newRuntime(false) + return c.newRuntime() } -func (c *coordinator) newRuntime(isEphemeral bool) *Runtime { +func (c *coordinator) newRuntime() *Runtime { res := &Runtime{ coordinator: c, providers: map[string]*ConnectedProvider{}, schema: newExtensibleSchema(), recording: NullRecording{}, shutdownTimeout: defaultShutdownTimeout, - isEphemeral: isEphemeral, } res.schema.runtime = res @@ -355,14 +348,12 @@ func (c *coordinator) newRuntime(isEphemeral bool) *Runtime { // overkill. res.schema.unsafeRefresh() - if !isEphemeral { - c.mutex.Lock() - c.unprocessedRuntimes = append(c.unprocessedRuntimes, res) - c.runtimeCnt++ - cnt := c.runtimeCnt - c.mutex.Unlock() - log.Debug().Msg("Started a new runtime (" + strconv.Itoa(cnt) + " total)") - } + c.mutex.Lock() + c.unprocessedRuntimes = append(c.unprocessedRuntimes, res) + c.runtimeCnt++ + cnt := c.runtimeCnt + c.mutex.Unlock() + log.Debug().Msg("Started a new runtime (" + strconv.Itoa(cnt) + " total)") return res } @@ -394,18 +385,6 @@ func (c *coordinator) RuntimeFor(asset *inventory.Asset, parent *Runtime) (*Runt return res, res.DetectProvider(asset) } -// EphemeralRuntimeFor an asset, creates a new ephemeral runtime and connectors. -// These are designed to be thrown away at the end of their use. -// Note: at the time of writing they may still share auxiliary providers with -// other runtimes, e.g. if provider X spawns another provider Y, the latter -// may be a shared provider. The majority of memory load should be on the -// primary provider (eg X in this case) so that it can effectively clear -// its memory at the end of its use. -func (c *coordinator) EphemeralRuntimeFor(asset *inventory.Asset) (*Runtime, error) { - res := c.newRuntime(true) - return res, res.DetectProvider(asset) -} - // Only call this with a mutex lock around it! func (c *coordinator) unsafeRefreshRuntimes() { var remaining []*Runtime @@ -447,20 +426,16 @@ func (c *coordinator) unsafeSetAssetRuntime(asset *inventory.Asset, runtime *Run return found } -func (c *coordinator) Stop(provider *RunningProvider, isEphemeral bool) error { +func (c *coordinator) Stop(provider *RunningProvider) error { if provider == nil { return nil } c.mutex.Lock() defer c.mutex.Unlock() - if isEphemeral { - delete(c.RunningEphemeral, provider) - } else { - found := c.RunningByID[provider.ID] - if found != nil { - delete(c.RunningByID, provider.ID) - } + found := c.RunningByID[provider.ID] + if found != nil { + delete(c.RunningByID, provider.ID) } return provider.Shutdown() @@ -469,15 +444,6 @@ func (c *coordinator) Stop(provider *RunningProvider, isEphemeral bool) error { func (c *coordinator) Shutdown() { c.mutex.Lock() - for cur := range c.RunningEphemeral { - if err := cur.Shutdown(); err != nil { - log.Warn().Err(err).Str("provider", cur.Name).Msg("failed to shut down provider") - } - cur.isClosed = true - cur.Client.Kill() - } - c.RunningEphemeral = map[*RunningProvider]struct{}{} - for _, runtime := range c.RunningByID { if err := runtime.Shutdown(); err != nil { log.Warn().Err(err).Str("provider", runtime.Name).Msg("failed to shut down provider") diff --git a/providers/mock.go b/providers/mock.go index 79a5819518..1b7782b910 100644 --- a/providers/mock.go +++ b/providers/mock.go @@ -67,7 +67,7 @@ func (s *mockProviderService) Connect(req *plugin.ConnectReq, callback plugin.Pr for i := range asset.Connections { conf := asset.Connections[i] - provider, err := s.runtime.addProvider(conf.ProviderID, false) + provider, err := s.runtime.addProvider(conf.ProviderID) if err != nil { return nil, errors.Wrap(err, "failed to init provider for connection in recording") } diff --git a/providers/runtime.go b/providers/runtime.go index 928099327c..304a822c07 100644 --- a/providers/runtime.go +++ b/providers/runtime.go @@ -39,7 +39,6 @@ type Runtime struct { // schema aggregates all resources executable on this asset schema extensibleSchema isClosed bool - isEphemeral bool close sync.Once shutdownTimeout time.Duration } @@ -77,13 +76,6 @@ func (r *Runtime) tryShutdown() shutdownResult { log.Error().Msg("failed to disconnect from provider " + provider.Instance.Name) } } - - // Ephemeral runtimes have their primary provider be ephemeral, i.e. non-shared. - // All other providers are shared and will not be shut down from within the provider. - if r.isEphemeral { - err := r.coordinator.Stop(r.Provider.Instance, true) - return shutdownResult{Error: err} - } return shutdownResult{} } @@ -132,14 +124,7 @@ func (r *Runtime) AssetMRN() string { // UseProvider sets the main provider for this runtime. func (r *Runtime) UseProvider(id string) error { - // We transfer isEphemeral here because: - // 1. If the runtime is not ephemeral, it behaves as a shared provider by - // default. - // 2. If the runtime is ephemeral, we only want the main provider to be - // ephemeral. All other providers by default are shared. - // (Note: In the future we plan to have an isolated runtime mode, - // where even other providers are ephemeral, ie not shared) - res, err := r.addProvider(id, r.isEphemeral) + res, err := r.addProvider(id) if err != nil { return err } @@ -153,25 +138,17 @@ func (r *Runtime) AddConnectedProvider(c *ConnectedProvider) { r.schema.Add(c.Instance.Name, c.Instance.Schema) } -func (r *Runtime) addProvider(id string, isEphemeral bool) (*ConnectedProvider, error) { +func (r *Runtime) addProvider(id string) (*ConnectedProvider, error) { var running *RunningProvider - var err error - if isEphemeral { - running, err = r.coordinator.Start(id, true, r.AutoUpdate) + + // TODO: we need to detect only the shared running providers + running = r.coordinator.RunningByID[id] + if running == nil { + var err error + running, err = r.coordinator.Start(id, r.AutoUpdate) if err != nil { return nil, err } - - } else { - // TODO: we need to detect only the shared running providers - running = r.coordinator.RunningByID[id] - if running == nil { - var err error - running, err = r.coordinator.Start(id, false, r.AutoUpdate) - if err != nil { - return nil, err - } - } } res := &ConnectedProvider{Instance: running} @@ -617,7 +594,7 @@ func (r *Runtime) lookupResourceProvider(resource string) (*ConnectedProvider, * return nil, nil, errors.New("incorrect provider for asset, not adding " + info.Provider) } - res, err := r.addProvider(info.Provider, false) + res, err := r.addProvider(info.Provider) if err != nil { return nil, nil, multierr.Wrap(err, "failed to start provider '"+info.Provider+"'") } @@ -647,7 +624,7 @@ func (r *Runtime) lookupFieldProvider(resource string, field string) (*Connected return provider, resourceInfo, fieldInfo, provider.ConnectionError } - res, err := r.addProvider(fieldInfo.Provider, false) + res, err := r.addProvider(fieldInfo.Provider) if err != nil { return nil, nil, nil, multierr.Wrap(err, "failed to start provider '"+fieldInfo.Provider+"'") }