Skip to content

Commit

Permalink
🧹 new runtime per asset
Browse files Browse the repository at this point in the history
  • Loading branch information
vjeffrey committed Oct 2, 2023
1 parent e56eac2 commit 2c73276
Showing 1 changed file with 28 additions and 71 deletions.
99 changes: 28 additions & 71 deletions explorer/scan/local_scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,12 @@ import (
"time"

"go.mondoo.com/cnquery/providers-sdk/v1/inventory/manager"
"go.mondoo.com/cnquery/providers-sdk/v1/plugin"

"github.com/mattn/go-isatty"
"github.com/rs/zerolog/log"
"github.com/segmentio/ksuid"
"github.com/spf13/viper"
"go.mondoo.com/cnquery"
"go.mondoo.com/cnquery/cli/config"
"go.mondoo.com/cnquery/cli/progress"
"go.mondoo.com/cnquery/explorer"
"go.mondoo.com/cnquery/explorer/executor"
Expand All @@ -31,18 +29,14 @@ import (
"go.mondoo.com/cnquery/mrn"
"go.mondoo.com/cnquery/providers"
"go.mondoo.com/cnquery/providers-sdk/v1/inventory"
"go.mondoo.com/cnquery/providers-sdk/v1/plugin"
"go.mondoo.com/cnquery/providers-sdk/v1/upstream"
"go.mondoo.com/cnquery/utils/multierr"
"go.mondoo.com/ranger-rpc/codes"
"go.mondoo.com/ranger-rpc/status"
"google.golang.org/protobuf/proto"
)

type assetWithRuntime struct {
asset *inventory.Asset
runtime *providers.Runtime
}

type LocalScanner struct {
ctx context.Context
fetcher *fetcher
Expand Down Expand Up @@ -180,23 +174,17 @@ func (s *LocalScanner) distributeJob(job *Job, ctx context.Context, upstream *up
}
assetList := im.GetAssets()

var assets []*assetWithRuntime
// note: asset candidate runtimes are the runtime that discovered them
var assetCandidates []*assetWithRuntime

// we connect and perform discovery for each asset in the job inventory
var assets []*inventory.Asset
var runtimes []*providers.Runtime
for i := range assetList {
asset := assetList[i]
resolvedAsset, err := im.ResolveAsset(asset)
if err != nil {
return nil, false, err
}

runtime, err := providers.Coordinator.RuntimeFor(asset, providers.DefaultRuntime())
if err != nil {
log.Error().Err(err).Str("asset", asset.Name).Msg("unable to create runtime for asset")
continue
}
runtime := providers.Coordinator.NewRuntime()
runtime.DetectProvider(resolvedAsset)
runtime.SetRecording(s.recording)

if err := runtime.Connect(&plugin.ConnectReq{
Expand All @@ -211,54 +199,24 @@ func (s *LocalScanner) distributeJob(job *Job, ctx context.Context, upstream *up
if err != nil {
return nil, false, err
}
log.Debug().Msgf("adding %d asset(s)", len(processedAssets))
assets = append(assets, processedAssets...)
for i := range processedAssets {
assetCandidates = append(assetCandidates, &assetWithRuntime{
asset: processedAssets[i],
runtime: runtime,
})
runtime := providers.Coordinator.NewRuntime()
runtime.DetectProvider(processedAssets[i])
runtime.SetRecording(s.recording)
runtime.Connect(&plugin.ConnectReq{Asset: processedAssets[i]})
runtimes = append(runtimes, runtime)
}

// TODO: we want to keep better track of errors, since there may be
// multiple assets coming in. It's annoying to abort the scan if we get one
// error at this stage.

// we grab the asset from the connection, because it contains all the
// detected metadata (and IDs)
// assets = append(assets, runtime.Provider.Connection.Asset)
}

// for each asset candidate, we initialize a new runtime and connect to it.
for i := range assetCandidates {
candidate := assetCandidates[i]

runtime, err := providers.Coordinator.RuntimeFor(candidate.asset, candidate.runtime)
if err != nil {
return nil, false, err
}

err = runtime.Connect(&plugin.ConnectReq{
Features: config.Features,
Asset: candidate.asset,
Upstream: upstream,
})
if err != nil {
log.Error().Err(err).Str("asset", candidate.asset.Name).Msg("unable to connect to asset")
continue
}

assets = append(assets, &assetWithRuntime{
asset: candidate.asset,
runtime: runtime,
})
}

if len(assets) == 0 {
return nil, false, nil
}

justAssets := []*inventory.Asset{}
for _, asset := range assets {
asset.asset.KindString = asset.asset.GetPlatform().Kind
justAssets = append(justAssets, asset.asset)
// runtimes = append(runtimes, runtime)
}

// sync assets
Expand All @@ -273,10 +231,9 @@ func (s *LocalScanner) distributeJob(job *Job, ctx context.Context, upstream *up
if err != nil {
return nil, false, err
}

resp, err := services.SynchronizeAssets(ctx, &explorer.SynchronizeAssetsReq{
SpaceMrn: client.SpaceMrn,
List: justAssets,
List: assets,
})
if err != nil {
return nil, false, err
Expand All @@ -290,28 +247,28 @@ func (s *LocalScanner) distributeJob(job *Job, ctx context.Context, upstream *up

// attach the asset details to the assets list
for i := range assets {
log.Debug().Str("asset", assets[i].asset.Name).Strs("platform-ids", assets[i].asset.PlatformIds).Msg("update asset")
platformMrn := assets[i].asset.PlatformIds[0]
assets[i].asset.Mrn = platformAssetMapping[platformMrn].AssetMrn
assets[i].asset.Url = platformAssetMapping[platformMrn].Url
log.Debug().Str("asset", assets[i].Name).Strs("platform-ids", assets[i].PlatformIds).Msg("update asset")
platformMrn := assets[i].PlatformIds[0]
assets[i].Mrn = platformAssetMapping[platformMrn].AssetMrn
assets[i].Url = platformAssetMapping[platformMrn].Url
}
} else {
// ensure we have non-empty asset MRNs
for i := range assets {
cur := assets[i]
if cur.asset.Mrn == "" {
if cur.Mrn == "" {
randID := "//" + explorer.SERVICE_NAME + "/" + explorer.MRN_RESOURCE_ASSET + "/" + ksuid.New().String()
x, err := mrn.NewMRN(randID)
if err != nil {
return nil, false, multierr.Wrap(err, "failed to generate a random asset MRN")
}
cur.asset.Mrn = x.String()
cur.Mrn = x.String()
}
}
}

// plan scan jobs
reporter := NewAggregateReporter(justAssets)
reporter := NewAggregateReporter(assets)
// if a bundle was provided check that it matches the filter, bundles can also be downloaded
// later therefore we do not want to stop execution here
if job.Bundle != nil && job.Bundle.FilterQueryPacks(job.QueryPackFilters) {
Expand All @@ -323,11 +280,11 @@ func (s *LocalScanner) distributeJob(job *Job, ctx context.Context, upstream *up
for i := range assets {
// this shouldn't happen, but might
// it normally indicates a bug in the provider
if presentAsset, present := progressBarElements[assets[i].asset.PlatformIds[0]]; present {
return nil, false, fmt.Errorf("asset %s and %s have the same platform id %s", presentAsset, assets[i].asset.Name, assets[i].asset.PlatformIds[0])
if presentAsset, present := progressBarElements[assets[i].PlatformIds[0]]; present {
return nil, false, fmt.Errorf("asset %s and %s have the same platform id %s", presentAsset, assets[i].Name, assets[i].PlatformIds[0])
}
progressBarElements[assets[i].asset.PlatformIds[0]] = assets[i].asset.Name
orderedKeys = append(orderedKeys, assets[i].asset.PlatformIds[0])
progressBarElements[assets[i].PlatformIds[0]] = assets[i].Name
orderedKeys = append(orderedKeys, assets[i].PlatformIds[0])
}
var multiprogress progress.MultiProgress
if isatty.IsTerminal(os.Stdout.Fd()) && !strings.EqualFold(logger.GetLevel(), "debug") && !strings.EqualFold(logger.GetLevel(), "trace") {
Expand All @@ -347,8 +304,8 @@ func (s *LocalScanner) distributeJob(job *Job, ctx context.Context, upstream *up
go func() {
defer scanGroup.Done()
for i := range assets {
asset := assets[i].asset
runtime := assets[i].runtime
asset := assets[i]
runtime := runtimes[i]

// Make sure the context has not been canceled in the meantime. Note that this approach works only for single threaded execution. If we have more than 1 thread calling this function,
// we need to solve this at a different level.
Expand Down

0 comments on commit 2c73276

Please sign in to comment.