diff --git a/explorer/scan/local_scanner.go b/explorer/scan/local_scanner.go index 5ff5d04df5..80b342e053 100644 --- a/explorer/scan/local_scanner.go +++ b/explorer/scan/local_scanner.go @@ -13,14 +13,12 @@ import ( "time" "go.mondoo.com/cnquery/providers-sdk/v1/inventory/manager" - "go.mondoo.com/cnquery/providers-sdk/v1/plugin" "github.com/mattn/go-isatty" "github.com/rs/zerolog/log" "github.com/segmentio/ksuid" "github.com/spf13/viper" "go.mondoo.com/cnquery" - "go.mondoo.com/cnquery/cli/config" "go.mondoo.com/cnquery/cli/progress" "go.mondoo.com/cnquery/explorer" "go.mondoo.com/cnquery/explorer/executor" @@ -31,6 +29,7 @@ import ( "go.mondoo.com/cnquery/mrn" "go.mondoo.com/cnquery/providers" "go.mondoo.com/cnquery/providers-sdk/v1/inventory" + "go.mondoo.com/cnquery/providers-sdk/v1/plugin" "go.mondoo.com/cnquery/providers-sdk/v1/upstream" "go.mondoo.com/cnquery/utils/multierr" "go.mondoo.com/ranger-rpc/codes" @@ -38,11 +37,6 @@ import ( "google.golang.org/protobuf/proto" ) -type assetWithRuntime struct { - asset *inventory.Asset - runtime *providers.Runtime -} - type LocalScanner struct { ctx context.Context fetcher *fetcher @@ -180,11 +174,8 @@ func (s *LocalScanner) distributeJob(job *Job, ctx context.Context, upstream *up } assetList := im.GetAssets() - var assets []*assetWithRuntime - // note: asset candidate runtimes are the runtime that discovered them - var assetCandidates []*assetWithRuntime - - // we connect and perform discovery for each asset in the job inventory + var assets []*inventory.Asset + var runtimes []*providers.Runtime for i := range assetList { asset := assetList[i] resolvedAsset, err := im.ResolveAsset(asset) @@ -192,11 +183,8 @@ func (s *LocalScanner) distributeJob(job *Job, ctx context.Context, upstream *up return nil, false, err } - runtime, err := providers.Coordinator.RuntimeFor(asset, providers.DefaultRuntime()) - if err != nil { - log.Error().Err(err).Str("asset", asset.Name).Msg("unable to create runtime for asset") - continue - } + runtime := providers.Coordinator.NewRuntime() + runtime.DetectProvider(resolvedAsset) runtime.SetRecording(s.recording) if err := runtime.Connect(&plugin.ConnectReq{ @@ -211,12 +199,16 @@ func (s *LocalScanner) distributeJob(job *Job, ctx context.Context, upstream *up if err != nil { return nil, false, err } + log.Debug().Msgf("adding %d asset(s)", len(processedAssets)) + assets = append(assets, processedAssets...) for i := range processedAssets { - assetCandidates = append(assetCandidates, &assetWithRuntime{ - asset: processedAssets[i], - runtime: runtime, - }) + runtime := providers.Coordinator.NewRuntime() + runtime.DetectProvider(processedAssets[i]) + runtime.SetRecording(s.recording) + runtime.Connect(&plugin.ConnectReq{Asset: processedAssets[i]}) + runtimes = append(runtimes, runtime) } + // TODO: we want to keep better track of errors, since there may be // multiple assets coming in. It's annoying to abort the scan if we get one // error at this stage. @@ -224,41 +216,7 @@ func (s *LocalScanner) distributeJob(job *Job, ctx context.Context, upstream *up // we grab the asset from the connection, because it contains all the // detected metadata (and IDs) // assets = append(assets, runtime.Provider.Connection.Asset) - } - - // for each asset candidate, we initialize a new runtime and connect to it. - for i := range assetCandidates { - candidate := assetCandidates[i] - - runtime, err := providers.Coordinator.RuntimeFor(candidate.asset, candidate.runtime) - if err != nil { - return nil, false, err - } - - err = runtime.Connect(&plugin.ConnectReq{ - Features: config.Features, - Asset: candidate.asset, - Upstream: upstream, - }) - if err != nil { - log.Error().Err(err).Str("asset", candidate.asset.Name).Msg("unable to connect to asset") - continue - } - - assets = append(assets, &assetWithRuntime{ - asset: candidate.asset, - runtime: runtime, - }) - } - - if len(assets) == 0 { - return nil, false, nil - } - - justAssets := []*inventory.Asset{} - for _, asset := range assets { - asset.asset.KindString = asset.asset.GetPlatform().Kind - justAssets = append(justAssets, asset.asset) + // runtimes = append(runtimes, runtime) } // sync assets @@ -273,10 +231,9 @@ func (s *LocalScanner) distributeJob(job *Job, ctx context.Context, upstream *up if err != nil { return nil, false, err } - resp, err := services.SynchronizeAssets(ctx, &explorer.SynchronizeAssetsReq{ SpaceMrn: client.SpaceMrn, - List: justAssets, + List: assets, }) if err != nil { return nil, false, err @@ -290,28 +247,28 @@ func (s *LocalScanner) distributeJob(job *Job, ctx context.Context, upstream *up // attach the asset details to the assets list for i := range assets { - log.Debug().Str("asset", assets[i].asset.Name).Strs("platform-ids", assets[i].asset.PlatformIds).Msg("update asset") - platformMrn := assets[i].asset.PlatformIds[0] - assets[i].asset.Mrn = platformAssetMapping[platformMrn].AssetMrn - assets[i].asset.Url = platformAssetMapping[platformMrn].Url + log.Debug().Str("asset", assets[i].Name).Strs("platform-ids", assets[i].PlatformIds).Msg("update asset") + platformMrn := assets[i].PlatformIds[0] + assets[i].Mrn = platformAssetMapping[platformMrn].AssetMrn + assets[i].Url = platformAssetMapping[platformMrn].Url } } else { // ensure we have non-empty asset MRNs for i := range assets { cur := assets[i] - if cur.asset.Mrn == "" { + if cur.Mrn == "" { randID := "//" + explorer.SERVICE_NAME + "/" + explorer.MRN_RESOURCE_ASSET + "/" + ksuid.New().String() x, err := mrn.NewMRN(randID) if err != nil { return nil, false, multierr.Wrap(err, "failed to generate a random asset MRN") } - cur.asset.Mrn = x.String() + cur.Mrn = x.String() } } } // plan scan jobs - reporter := NewAggregateReporter(justAssets) + reporter := NewAggregateReporter(assets) // if a bundle was provided check that it matches the filter, bundles can also be downloaded // later therefore we do not want to stop execution here if job.Bundle != nil && job.Bundle.FilterQueryPacks(job.QueryPackFilters) { @@ -323,11 +280,11 @@ func (s *LocalScanner) distributeJob(job *Job, ctx context.Context, upstream *up for i := range assets { // this shouldn't happen, but might // it normally indicates a bug in the provider - if presentAsset, present := progressBarElements[assets[i].asset.PlatformIds[0]]; present { - return nil, false, fmt.Errorf("asset %s and %s have the same platform id %s", presentAsset, assets[i].asset.Name, assets[i].asset.PlatformIds[0]) + if presentAsset, present := progressBarElements[assets[i].PlatformIds[0]]; present { + return nil, false, fmt.Errorf("asset %s and %s have the same platform id %s", presentAsset, assets[i].Name, assets[i].PlatformIds[0]) } - progressBarElements[assets[i].asset.PlatformIds[0]] = assets[i].asset.Name - orderedKeys = append(orderedKeys, assets[i].asset.PlatformIds[0]) + progressBarElements[assets[i].PlatformIds[0]] = assets[i].Name + orderedKeys = append(orderedKeys, assets[i].PlatformIds[0]) } var multiprogress progress.MultiProgress if isatty.IsTerminal(os.Stdout.Fd()) && !strings.EqualFold(logger.GetLevel(), "debug") && !strings.EqualFold(logger.GetLevel(), "trace") { @@ -347,8 +304,8 @@ func (s *LocalScanner) distributeJob(job *Job, ctx context.Context, upstream *up go func() { defer scanGroup.Done() for i := range assets { - asset := assets[i].asset - runtime := assets[i].runtime + asset := assets[i] + runtime := runtimes[i] // Make sure the context has not been canceled in the meantime. Note that this approach works only for single threaded execution. If we have more than 1 thread calling this function, // we need to solve this at a different level.