diff --git a/explorer/scan/local_scanner.go b/explorer/scan/local_scanner.go index 139a78464e..21db4cae3b 100644 --- a/explorer/scan/local_scanner.go +++ b/explorer/scan/local_scanner.go @@ -13,12 +13,14 @@ import ( "time" "go.mondoo.com/cnquery/providers-sdk/v1/inventory/manager" + "go.mondoo.com/cnquery/providers-sdk/v1/plugin" "github.com/mattn/go-isatty" "github.com/rs/zerolog/log" "github.com/segmentio/ksuid" "github.com/spf13/viper" "go.mondoo.com/cnquery" + "go.mondoo.com/cnquery/cli/config" "go.mondoo.com/cnquery/cli/progress" "go.mondoo.com/cnquery/explorer" "go.mondoo.com/cnquery/explorer/executor" @@ -29,7 +31,6 @@ import ( "go.mondoo.com/cnquery/mrn" "go.mondoo.com/cnquery/providers" "go.mondoo.com/cnquery/providers-sdk/v1/inventory" - "go.mondoo.com/cnquery/providers-sdk/v1/plugin" "go.mondoo.com/cnquery/providers-sdk/v1/upstream" "go.mondoo.com/cnquery/utils/multierr" "go.mondoo.com/ranger-rpc/codes" @@ -37,6 +38,11 @@ import ( "google.golang.org/protobuf/proto" ) +type assetWithRuntime struct { + asset *inventory.Asset + runtime *providers.Runtime +} + type LocalScanner struct { ctx context.Context fetcher *fetcher @@ -155,18 +161,18 @@ func (s *LocalScanner) distributeJob(job *Job, ctx context.Context, upstream *up } assetList := im.GetAssets() - var assets []*inventory.Asset - var runtimes []*providers.Runtime + var assets []*assetWithRuntime + var assetCandidates []*inventory.Asset + + // we connect and perform discovery for each asset in the job inventory for i := range assetList { asset := assetList[i] resolvedAsset, err := im.ResolveAsset(asset) if err != nil { return nil, false, err } - runtime := providers.Coordinator.NewRuntime() runtime.DetectProvider(resolvedAsset) - if err := runtime.Connect(&plugin.ConnectReq{ Features: cnquery.GetFeatures(ctx), Asset: resolvedAsset, @@ -174,9 +180,15 @@ func (s *LocalScanner) distributeJob(job *Job, ctx context.Context, upstream *up }); err != nil { return nil, false, err } - log.Debug().Msgf("adding %d asset(s)", len(runtime.Provider.Connection.Inventory.Spec.Assets)) - assets = append(assets, runtime.Provider.Connection.Inventory.Spec.Assets...) - + inventorySpec := runtime.Provider.Connection + if inventorySpec.Inventory != nil && + inventorySpec.Inventory.Spec != nil && + inventorySpec.Inventory.Spec.Assets != nil { + log.Debug().Msgf("adding %d discovered asset(s)", len(runtime.Provider.Connection.Inventory.Spec.Assets)) + assetCandidates = append(assetCandidates, inventorySpec.Inventory.Spec.Assets...) + } else { + assetCandidates = append(assetCandidates, runtime.Provider.Connection.Asset) + } // TODO: we want to keep better track of errors, since there may be // multiple assets coming in. It's annoying to abort the scan if we get one // error at this stage. @@ -184,9 +196,34 @@ func (s *LocalScanner) distributeJob(job *Job, ctx context.Context, upstream *up // we grab the asset from the connection, because it contains all the // detected metadata (and IDs) // assets = append(assets, runtime.Provider.Connection.Asset) - runtimes = append(runtimes, runtime) } + // for each asset candidate, we initialize a new runtime and connect to it. + for _, asset := range assetCandidates { + runtime := providers.Coordinator.NewRuntime() + // Make sure the provider for the asset is present + if err := runtime.DetectProvider(asset); err != nil { + return nil, false, err + } + + err := runtime.Connect(&plugin.ConnectReq{ + Features: config.Features, + Asset: asset, + Upstream: upstream, + }) + if err != nil { + return nil, false, err + } + assets = append(assets, &assetWithRuntime{ + asset: asset, + runtime: runtime, + }) + } + + justAssets := []*inventory.Asset{} + for _, asset := range assets { + justAssets = append(justAssets, asset.asset) + } // sync assets if upstream != nil && upstream.ApiEndpoint != "" && !upstream.Incognito { log.Info().Msg("synchronize assets") @@ -199,9 +236,10 @@ func (s *LocalScanner) distributeJob(job *Job, ctx context.Context, upstream *up if err != nil { return nil, false, err } + resp, err := services.SynchronizeAssets(ctx, &explorer.SynchronizeAssetsReq{ SpaceMrn: client.SpaceMrn, - List: assets, + List: justAssets, }) if err != nil { return nil, false, err @@ -215,28 +253,28 @@ func (s *LocalScanner) distributeJob(job *Job, ctx context.Context, upstream *up // attach the asset details to the assets list for i := range assets { - log.Debug().Str("asset", assets[i].Name).Strs("platform-ids", assets[i].PlatformIds).Msg("update asset") - platformMrn := assets[i].PlatformIds[0] - assets[i].Mrn = platformAssetMapping[platformMrn].AssetMrn - assets[i].Url = platformAssetMapping[platformMrn].Url + log.Debug().Str("asset", assets[i].asset.Name).Strs("platform-ids", assets[i].asset.PlatformIds).Msg("update asset") + platformMrn := assets[i].asset.PlatformIds[0] + assets[i].asset.Mrn = platformAssetMapping[platformMrn].AssetMrn + assets[i].asset.Url = platformAssetMapping[platformMrn].Url } } else { // ensure we have non-empty asset MRNs for i := range assets { cur := assets[i] - if cur.Mrn == "" { + if cur.asset.Mrn == "" { randID := "//" + explorer.SERVICE_NAME + "/" + explorer.MRN_RESOURCE_ASSET + "/" + ksuid.New().String() x, err := mrn.NewMRN(randID) if err != nil { return nil, false, multierr.Wrap(err, "failed to generate a random asset MRN") } - cur.Mrn = x.String() + cur.asset.Mrn = x.String() } } } // plan scan jobs - reporter := NewAggregateReporter(assets) + reporter := NewAggregateReporter(justAssets) // if a bundle was provided check that it matches the filter, bundles can also be downloaded // later therefore we do not want to stop execution here if job.Bundle != nil && job.Bundle.FilterQueryPacks(job.QueryPackFilters) { @@ -248,11 +286,11 @@ func (s *LocalScanner) distributeJob(job *Job, ctx context.Context, upstream *up for i := range assets { // this shouldn't happen, but might // it normally indicates a bug in the provider - if presentAsset, present := progressBarElements[assets[i].PlatformIds[0]]; present { - return nil, false, fmt.Errorf("asset %s and %s have the same platform id %s", presentAsset, assets[i].Name, assets[i].PlatformIds[0]) + if presentAsset, present := progressBarElements[assets[i].asset.PlatformIds[0]]; present { + return nil, false, fmt.Errorf("asset %s and %s have the same platform id %s", presentAsset, assets[i].asset.Name, assets[i].asset.PlatformIds[0]) } - progressBarElements[assets[i].PlatformIds[0]] = assets[i].Name - orderedKeys = append(orderedKeys, assets[i].PlatformIds[0]) + progressBarElements[assets[i].asset.PlatformIds[0]] = assets[i].asset.Name + orderedKeys = append(orderedKeys, assets[i].asset.PlatformIds[0]) } var multiprogress progress.MultiProgress if isatty.IsTerminal(os.Stdout.Fd()) && !strings.EqualFold(logger.GetLevel(), "debug") && !strings.EqualFold(logger.GetLevel(), "trace") { @@ -272,8 +310,8 @@ func (s *LocalScanner) distributeJob(job *Job, ctx context.Context, upstream *up go func() { defer scanGroup.Done() for i := range assets { - asset := assets[i] - runtime := runtimes[i] + asset := assets[i].asset + runtime := assets[i].runtime // Make sure the context has not been canceled in the meantime. Note that this approach works only for single threaded execution. If we have more than 1 thread calling this function, // we need to solve this at a different level.