Skip to content

Commit

Permalink
⚙️ reduce the workerpool Task function
Browse files Browse the repository at this point in the history
We really don't need to do everything inside the workerpool, do we?

Signed-off-by: Salim Afiune Maya <[email protected]>
  • Loading branch information
afiune committed Dec 12, 2024
1 parent e785c76 commit e3a58fd
Showing 1 changed file with 28 additions and 27 deletions.
55 changes: 28 additions & 27 deletions explorer/scan/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,45 +168,46 @@ func discoverAssets(rootAssetWithRuntime *AssetWithRuntime, resolvedRootAsset *i
return
}

pool := workerpool.New[bool](workers)
pool := workerpool.New[*AssetWithRuntime](workers)
pool.Start()
defer pool.Close()

// for all discovered assets, we apply mondoo-specific labels and annotations that come from the root asset
for _, a := range rootAssetWithRuntime.Runtime.Provider.Connection.Inventory.Spec.Assets {
pool.Submit(func() (bool, error) {
// create runtime for root asset
assetWithRuntime, err := createRuntimeForAsset(a, upstream, recording)
for _, asset := range rootAssetWithRuntime.Runtime.Provider.Connection.Inventory.Spec.Assets {
pool.Submit(func() (*AssetWithRuntime, error) {
assetWithRuntime, err := createRuntimeForAsset(asset, upstream, recording)
if err != nil {
log.Error().Err(err).Str("asset", a.Name).Msg("unable to create runtime for asset")
discoveredAssets.AddError(a, err)
return false, err
log.Error().Err(err).Str("asset", asset.GetName()).Msg("unable to create runtime for asset")
discoveredAssets.AddError(asset, err)
}
return assetWithRuntime, nil
})
}

// If no asset was returned and no error, then we observed a duplicate asset with a
// runtime that already exists.
if assetWithRuntime == nil {
return false, nil
}
// Wait for the workers to finish processing
pool.Wait()

// Get all assets with runtimes from the pool
for _, assetWithRuntime := range pool.GetResults() {
// If asset is nil, then we observed a duplicate asset with a
// runtime that already exists.
if assetWithRuntime == nil {
continue
}

resolvedAsset := assetWithRuntime.Runtime.Provider.Connection.Asset
if len(resolvedAsset.PlatformIds) > 0 {
prepareAsset(resolvedAsset, resolvedRootAsset, runtimeLabels)
resolvedAsset := assetWithRuntime.Runtime.Provider.Connection.Asset
if len(resolvedAsset.PlatformIds) > 0 {
prepareAsset(resolvedAsset, resolvedRootAsset, runtimeLabels)

// If the asset has been already added, we should close its runtime
if !discoveredAssets.Add(resolvedAsset, assetWithRuntime.Runtime) {
assetWithRuntime.Runtime.Close()
}
} else {
discoverAssets(assetWithRuntime, resolvedRootAsset, discoveredAssets, runtimeLabels, upstream, recording)
// If the asset has been already added, we should close its runtime
if !discoveredAssets.Add(resolvedAsset, assetWithRuntime.Runtime) {
assetWithRuntime.Runtime.Close()
}
return true, nil
})
} else {
discoverAssets(assetWithRuntime, resolvedRootAsset, discoveredAssets, runtimeLabels, upstream, recording)
assetWithRuntime.Runtime.Close()
}
}

// Wait for the workers to finish processing
pool.Wait()
}

func createRuntimeForAsset(asset *inventory.Asset, upstream *upstream.UpstreamConfig, recording llx.Recording) (*AssetWithRuntime, error) {
Expand Down

0 comments on commit e3a58fd

Please sign in to comment.