diff --git a/.vscode/launch.json b/.vscode/launch.json index 85931b3a76..2fab2c0990 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -198,6 +198,18 @@ "shell", "ssh", "user@18.215.249.49", ], }, + { + "name": "scan github org", + "type": "go", + "request": "launch", + "program": "${workspaceRoot}/apps/cnquery/cnquery.go", + "args": [ + "scan", + "github", + "org", "hit-training", + "--log-level", "trace" + ] + }, { "name": "Configure Built-in Providers", "type": "go", diff --git a/explorer/scan/discovery.go b/explorer/scan/discovery.go index be7a67e7e0..92d16e3dea 100644 --- a/explorer/scan/discovery.go +++ b/explorer/scan/discovery.go @@ -6,11 +6,13 @@ package scan import ( "context" "errors" + "sync" "time" "github.com/rs/zerolog/log" "go.mondoo.com/cnquery/v11/cli/config" "go.mondoo.com/cnquery/v11/cli/execruntime" + "go.mondoo.com/cnquery/v11/internal/workerpool" "go.mondoo.com/cnquery/v11/llx" "go.mondoo.com/cnquery/v11/logger" "go.mondoo.com/cnquery/v11/providers" @@ -20,6 +22,9 @@ import ( "go.mondoo.com/cnquery/v11/providers-sdk/v1/upstream" ) +// number of parallel goroutines discovering assets +const workers = 10 + type AssetWithRuntime struct { Asset *inventory.Asset Runtime *providers.Runtime @@ -34,11 +39,15 @@ type DiscoveredAssets struct { platformIds map[string]struct{} Assets []*AssetWithRuntime Errors []*AssetWithError + assetsLock sync.Mutex } // Add adds an asset and its runtime to the discovered assets list. It returns true if the // asset has been added, false if it is a duplicate func (d *DiscoveredAssets) Add(asset *inventory.Asset, runtime *providers.Runtime) bool { + d.assetsLock.Lock() + defer d.assetsLock.Unlock() + isDuplicate := false for _, platformId := range asset.PlatformIds { if _, ok := d.platformIds[platformId]; ok { @@ -161,35 +170,45 @@ func discoverAssets(rootAssetWithRuntime *AssetWithRuntime, resolvedRootAsset *i return } + pool := workerpool.New[bool](workers) + pool.Start() + defer pool.Close() + // for all discovered assets, we apply mondoo-specific labels and annotations that come from the root asset for _, a := range rootAssetWithRuntime.Runtime.Provider.Connection.Inventory.Spec.Assets { - // create runtime for root asset - assetWithRuntime, err := createRuntimeForAsset(a, upstream, recording) - if err != nil { - log.Error().Err(err).Str("asset", a.Name).Msg("unable to create runtime for asset") - discoveredAssets.AddError(a, err) - continue - } + pool.Submit(func() (bool, error) { + // create runtime for root asset + assetWithRuntime, err := createRuntimeForAsset(a, upstream, recording) + if err != nil { + log.Error().Err(err).Str("asset", a.Name).Msg("unable to create runtime for asset") + discoveredAssets.AddError(a, err) + return false, err + } - // If no asset was returned and no error, then we observed a duplicate asset with a - // runtime that already exists. - if assetWithRuntime == nil { - continue - } + // If no asset was returned and no error, then we observed a duplicate asset with a + // runtime that already exists. + if assetWithRuntime == nil { + return false, nil + } - resolvedAsset := assetWithRuntime.Runtime.Provider.Connection.Asset - if len(resolvedAsset.PlatformIds) > 0 { - prepareAsset(resolvedAsset, resolvedRootAsset, runtimeLabels) + resolvedAsset := assetWithRuntime.Runtime.Provider.Connection.Asset + if len(resolvedAsset.PlatformIds) > 0 { + prepareAsset(resolvedAsset, resolvedRootAsset, runtimeLabels) - // If the asset has been already added, we should close its runtime - if !discoveredAssets.Add(resolvedAsset, assetWithRuntime.Runtime) { + // If the asset has been already added, we should close its runtime + if !discoveredAssets.Add(resolvedAsset, assetWithRuntime.Runtime) { + assetWithRuntime.Runtime.Close() + } + } else { + discoverAssets(assetWithRuntime, resolvedRootAsset, discoveredAssets, runtimeLabels, upstream, recording) assetWithRuntime.Runtime.Close() } - } else { - discoverAssets(assetWithRuntime, resolvedRootAsset, discoveredAssets, runtimeLabels, upstream, recording) - assetWithRuntime.Runtime.Close() - } + return true, nil + }) } + + // Wait for the workers to finish processing + pool.Wait() } func createRuntimeForAsset(asset *inventory.Asset, upstream *upstream.UpstreamConfig, recording llx.Recording) (*AssetWithRuntime, error) { diff --git a/providers-sdk/v1/plugin/service.go b/providers-sdk/v1/plugin/service.go index 382efcc90b..ed01ca5511 100644 --- a/providers-sdk/v1/plugin/service.go +++ b/providers-sdk/v1/plugin/service.go @@ -51,11 +51,8 @@ func (s *Service) AddRuntime(conf *inventory.Config, createRuntime func(connId u } // ^^ - s.runtimesLock.Lock() - defer s.runtimesLock.Unlock() - // If a runtime with this ID already exists, then return that - if runtime, ok := s.runtimes[conf.Id]; ok { + if runtime, err := s.GetRuntime(conf.Id); err == nil { return runtime, nil } @@ -66,7 +63,7 @@ func (s *Service) AddRuntime(conf *inventory.Config, createRuntime func(connId u if runtime.Connection != nil { if parentId := runtime.Connection.ParentID(); parentId > 0 { - parentRuntime, err := s.doGetRuntime(parentId) + parentRuntime, err := s.GetRuntime(parentId) if err != nil { return nil, errors.New("parent connection " + strconv.FormatUint(uint64(parentId), 10) + " not found") } @@ -74,10 +71,19 @@ func (s *Service) AddRuntime(conf *inventory.Config, createRuntime func(connId u } } - s.runtimes[conf.Id] = runtime + + // store the new runtime + s.addRuntime(conf.Id, runtime) + return runtime, nil } +func (s *Service) addRuntime(id uint32, runtime *Runtime) { + s.runtimesLock.Lock() + defer s.runtimesLock.Unlock() + s.runtimes[id] = runtime +} + // FIXME: DEPRECATED, remove in v12.0 vv func (s *Service) deprecatedAddRuntime(createRuntime func(connId uint32) (*Runtime, error)) (*Runtime, error) { s.runtimesLock.Lock() @@ -93,7 +99,7 @@ func (s *Service) deprecatedAddRuntime(createRuntime func(connId uint32) (*Runti if runtime.Connection != nil { if parentId := runtime.Connection.ParentID(); parentId > 0 { - parentRuntime, err := s.doGetRuntime(parentId) + parentRuntime, err := s.GetRuntime(parentId) if err != nil { return nil, errors.New("parent connection " + strconv.FormatUint(uint64(parentId), 10) + " not found") } diff --git a/providers/github/connection/connection.go b/providers/github/connection/connection.go index 8e7cdb5942..c974ef0b06 100644 --- a/providers/github/connection/connection.go +++ b/providers/github/connection/connection.go @@ -74,6 +74,7 @@ func NewGithubConnection(id uint32, asset *inventory.Asset) (*GithubConnection, ctx := context.WithValue(context.Background(), github.SleepUntilPrimaryRateLimitResetWhenRateLimited, true) // perform a quick call to verify the token's validity. + // @afiune do we need to validate the token for every connection? can this be a "once" operation? _, resp, err := client.Meta.Zen(ctx) if err != nil { if resp != nil && resp.StatusCode == 401 {