Skip to content

Commit

Permalink
Attempt at fixing the scan runtime problem. (#1649)
Browse files Browse the repository at this point in the history
Not necessarily the fix, but an attempt to get a proper solution for
this.

There are 2 problems I am trying to solve here:
1. Do not rely on two separate `assets` and `runtime` lists to get the
scan to work. This will crash if we do not have an equal amount of
assets and runtimes. Instead use a private struct to tie those two
together
2. Running filters (queries) on a provider uses the currently attached
asset, see
https://github.com/mondoohq/cnquery/blob/main/providers/runtime.go#L419.
At this moment we can only attach one asset, found under
`r.Provider.Connection.Asset`. To ensure this works for all discovered
assets, we need to make sure there's a proper runtime for each of those
assets. This is where I am not sure if this is the right fix as we're
now creating a runtime for each asset.
  • Loading branch information
preslavgerchev authored Sep 7, 2023
1 parent 233b0b4 commit 25b6c12
Showing 1 changed file with 61 additions and 23 deletions.
84 changes: 61 additions & 23 deletions explorer/scan/local_scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,14 @@ import (
"time"

"go.mondoo.com/cnquery/providers-sdk/v1/inventory/manager"
"go.mondoo.com/cnquery/providers-sdk/v1/plugin"

"github.com/mattn/go-isatty"
"github.com/rs/zerolog/log"
"github.com/segmentio/ksuid"
"github.com/spf13/viper"
"go.mondoo.com/cnquery"
"go.mondoo.com/cnquery/cli/config"
"go.mondoo.com/cnquery/cli/progress"
"go.mondoo.com/cnquery/explorer"
"go.mondoo.com/cnquery/explorer/executor"
Expand All @@ -29,14 +31,18 @@ import (
"go.mondoo.com/cnquery/mrn"
"go.mondoo.com/cnquery/providers"
"go.mondoo.com/cnquery/providers-sdk/v1/inventory"
"go.mondoo.com/cnquery/providers-sdk/v1/plugin"
"go.mondoo.com/cnquery/providers-sdk/v1/upstream"
"go.mondoo.com/cnquery/utils/multierr"
"go.mondoo.com/ranger-rpc/codes"
"go.mondoo.com/ranger-rpc/status"
"google.golang.org/protobuf/proto"
)

type assetWithRuntime struct {
asset *inventory.Asset
runtime *providers.Runtime
}

type LocalScanner struct {
ctx context.Context
fetcher *fetcher
Expand Down Expand Up @@ -155,38 +161,69 @@ func (s *LocalScanner) distributeJob(job *Job, ctx context.Context, upstream *up
}
assetList := im.GetAssets()

var assets []*inventory.Asset
var runtimes []*providers.Runtime
var assets []*assetWithRuntime
var assetCandidates []*inventory.Asset

// we connect and perform discovery for each asset in the job inventory
for i := range assetList {
asset := assetList[i]
resolvedAsset, err := im.ResolveAsset(asset)
if err != nil {
return nil, false, err
}

runtime := providers.Coordinator.NewRuntime()
runtime.DetectProvider(resolvedAsset)

if err := runtime.Connect(&plugin.ConnectReq{
Features: cnquery.GetFeatures(ctx),
Asset: resolvedAsset,
Upstream: upstream,
}); err != nil {
return nil, false, err
}
log.Debug().Msgf("adding %d asset(s)", len(runtime.Provider.Connection.Inventory.Spec.Assets))
assets = append(assets, runtime.Provider.Connection.Inventory.Spec.Assets...)

inventorySpec := runtime.Provider.Connection
if inventorySpec.Inventory != nil &&
inventorySpec.Inventory.Spec != nil &&
inventorySpec.Inventory.Spec.Assets != nil {
log.Debug().Msgf("adding %d discovered asset(s)", len(runtime.Provider.Connection.Inventory.Spec.Assets))
assetCandidates = append(assetCandidates, inventorySpec.Inventory.Spec.Assets...)
} else {
assetCandidates = append(assetCandidates, runtime.Provider.Connection.Asset)
}
// TODO: we want to keep better track of errors, since there may be
// multiple assets coming in. It's annoying to abort the scan if we get one
// error at this stage.

// we grab the asset from the connection, because it contains all the
// detected metadata (and IDs)
// assets = append(assets, runtime.Provider.Connection.Asset)
runtimes = append(runtimes, runtime)
}

// for each asset candidate, we initialize a new runtime and connect to it.
for _, asset := range assetCandidates {
runtime := providers.Coordinator.NewRuntime()
// Make sure the provider for the asset is present
if err := runtime.DetectProvider(asset); err != nil {
return nil, false, err
}

err := runtime.Connect(&plugin.ConnectReq{
Features: config.Features,
Asset: asset,
Upstream: upstream,
})
if err != nil {
return nil, false, err
}
assets = append(assets, &assetWithRuntime{
asset: asset,
runtime: runtime,
})
}

justAssets := []*inventory.Asset{}
for _, asset := range assets {
justAssets = append(justAssets, asset.asset)
}
// sync assets
if upstream != nil && upstream.ApiEndpoint != "" && !upstream.Incognito {
log.Info().Msg("synchronize assets")
Expand All @@ -199,9 +236,10 @@ func (s *LocalScanner) distributeJob(job *Job, ctx context.Context, upstream *up
if err != nil {
return nil, false, err
}

resp, err := services.SynchronizeAssets(ctx, &explorer.SynchronizeAssetsReq{
SpaceMrn: client.SpaceMrn,
List: assets,
List: justAssets,
})
if err != nil {
return nil, false, err
Expand All @@ -215,28 +253,28 @@ func (s *LocalScanner) distributeJob(job *Job, ctx context.Context, upstream *up

// attach the asset details to the assets list
for i := range assets {
log.Debug().Str("asset", assets[i].Name).Strs("platform-ids", assets[i].PlatformIds).Msg("update asset")
platformMrn := assets[i].PlatformIds[0]
assets[i].Mrn = platformAssetMapping[platformMrn].AssetMrn
assets[i].Url = platformAssetMapping[platformMrn].Url
log.Debug().Str("asset", assets[i].asset.Name).Strs("platform-ids", assets[i].asset.PlatformIds).Msg("update asset")
platformMrn := assets[i].asset.PlatformIds[0]
assets[i].asset.Mrn = platformAssetMapping[platformMrn].AssetMrn
assets[i].asset.Url = platformAssetMapping[platformMrn].Url
}
} else {
// ensure we have non-empty asset MRNs
for i := range assets {
cur := assets[i]
if cur.Mrn == "" {
if cur.asset.Mrn == "" {
randID := "//" + explorer.SERVICE_NAME + "/" + explorer.MRN_RESOURCE_ASSET + "/" + ksuid.New().String()
x, err := mrn.NewMRN(randID)
if err != nil {
return nil, false, multierr.Wrap(err, "failed to generate a random asset MRN")
}
cur.Mrn = x.String()
cur.asset.Mrn = x.String()
}
}
}

// plan scan jobs
reporter := NewAggregateReporter(assets)
reporter := NewAggregateReporter(justAssets)
// if a bundle was provided check that it matches the filter, bundles can also be downloaded
// later therefore we do not want to stop execution here
if job.Bundle != nil && job.Bundle.FilterQueryPacks(job.QueryPackFilters) {
Expand All @@ -248,11 +286,11 @@ func (s *LocalScanner) distributeJob(job *Job, ctx context.Context, upstream *up
for i := range assets {
// this shouldn't happen, but might
// it normally indicates a bug in the provider
if presentAsset, present := progressBarElements[assets[i].PlatformIds[0]]; present {
return nil, false, fmt.Errorf("asset %s and %s have the same platform id %s", presentAsset, assets[i].Name, assets[i].PlatformIds[0])
if presentAsset, present := progressBarElements[assets[i].asset.PlatformIds[0]]; present {
return nil, false, fmt.Errorf("asset %s and %s have the same platform id %s", presentAsset, assets[i].asset.Name, assets[i].asset.PlatformIds[0])
}
progressBarElements[assets[i].PlatformIds[0]] = assets[i].Name
orderedKeys = append(orderedKeys, assets[i].PlatformIds[0])
progressBarElements[assets[i].asset.PlatformIds[0]] = assets[i].asset.Name
orderedKeys = append(orderedKeys, assets[i].asset.PlatformIds[0])
}
var multiprogress progress.MultiProgress
if isatty.IsTerminal(os.Stdout.Fd()) && !strings.EqualFold(logger.GetLevel(), "debug") && !strings.EqualFold(logger.GetLevel(), "trace") {
Expand All @@ -272,8 +310,8 @@ func (s *LocalScanner) distributeJob(job *Job, ctx context.Context, upstream *up
go func() {
defer scanGroup.Done()
for i := range assets {
asset := assets[i]
runtime := runtimes[i]
asset := assets[i].asset
runtime := assets[i].runtime

// Make sure the context has not been canceled in the meantime. Note that this approach works only for single threaded execution. If we have more than 1 thread calling this function,
// we need to solve this at a different level.
Expand Down

0 comments on commit 25b6c12

Please sign in to comment.