Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🧹 new runtime per asset #2043

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 28 additions & 71 deletions explorer/scan/local_scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,12 @@ import (
"time"

"go.mondoo.com/cnquery/providers-sdk/v1/inventory/manager"
"go.mondoo.com/cnquery/providers-sdk/v1/plugin"

"github.com/mattn/go-isatty"
"github.com/rs/zerolog/log"
"github.com/segmentio/ksuid"
"github.com/spf13/viper"
"go.mondoo.com/cnquery"
"go.mondoo.com/cnquery/cli/config"
"go.mondoo.com/cnquery/cli/progress"
"go.mondoo.com/cnquery/explorer"
"go.mondoo.com/cnquery/explorer/executor"
Expand All @@ -31,18 +29,14 @@ import (
"go.mondoo.com/cnquery/mrn"
"go.mondoo.com/cnquery/providers"
"go.mondoo.com/cnquery/providers-sdk/v1/inventory"
"go.mondoo.com/cnquery/providers-sdk/v1/plugin"
"go.mondoo.com/cnquery/providers-sdk/v1/upstream"
"go.mondoo.com/cnquery/utils/multierr"
"go.mondoo.com/ranger-rpc/codes"
"go.mondoo.com/ranger-rpc/status"
"google.golang.org/protobuf/proto"
)

type assetWithRuntime struct {
asset *inventory.Asset
runtime *providers.Runtime
}

type LocalScanner struct {
ctx context.Context
fetcher *fetcher
Expand Down Expand Up @@ -180,23 +174,17 @@ func (s *LocalScanner) distributeJob(job *Job, ctx context.Context, upstream *up
}
assetList := im.GetAssets()

var assets []*assetWithRuntime
// note: asset candidate runtimes are the runtime that discovered them
var assetCandidates []*assetWithRuntime

// we connect and perform discovery for each asset in the job inventory
var assets []*inventory.Asset
var runtimes []*providers.Runtime
for i := range assetList {
asset := assetList[i]
resolvedAsset, err := im.ResolveAsset(asset)
if err != nil {
return nil, false, err
}

runtime, err := providers.Coordinator.RuntimeFor(asset, providers.DefaultRuntime())
if err != nil {
log.Error().Err(err).Str("asset", asset.Name).Msg("unable to create runtime for asset")
continue
}
runtime := providers.Coordinator.NewRuntime()
runtime.DetectProvider(resolvedAsset)
runtime.SetRecording(s.recording)

if err := runtime.Connect(&plugin.ConnectReq{
Expand All @@ -211,54 +199,24 @@ func (s *LocalScanner) distributeJob(job *Job, ctx context.Context, upstream *up
if err != nil {
return nil, false, err
}
log.Debug().Msgf("adding %d asset(s)", len(processedAssets))
assets = append(assets, processedAssets...)
for i := range processedAssets {
assetCandidates = append(assetCandidates, &assetWithRuntime{
asset: processedAssets[i],
runtime: runtime,
})
runtime := providers.Coordinator.NewRuntime()
runtime.DetectProvider(processedAssets[i])
runtime.SetRecording(s.recording)
runtime.Connect(&plugin.ConnectReq{Asset: processedAssets[i]})
runtimes = append(runtimes, runtime)
}

// TODO: we want to keep better track of errors, since there may be
// multiple assets coming in. It's annoying to abort the scan if we get one
// error at this stage.

// we grab the asset from the connection, because it contains all the
// detected metadata (and IDs)
// assets = append(assets, runtime.Provider.Connection.Asset)
}

// for each asset candidate, we initialize a new runtime and connect to it.
for i := range assetCandidates {
candidate := assetCandidates[i]

runtime, err := providers.Coordinator.RuntimeFor(candidate.asset, candidate.runtime)
if err != nil {
return nil, false, err
}

err = runtime.Connect(&plugin.ConnectReq{
Features: config.Features,
Asset: candidate.asset,
Upstream: upstream,
})
if err != nil {
log.Error().Err(err).Str("asset", candidate.asset.Name).Msg("unable to connect to asset")
continue
}

assets = append(assets, &assetWithRuntime{
asset: candidate.asset,
runtime: runtime,
})
}

if len(assets) == 0 {
return nil, false, nil
}

justAssets := []*inventory.Asset{}
for _, asset := range assets {
asset.asset.KindString = asset.asset.GetPlatform().Kind
justAssets = append(justAssets, asset.asset)
// runtimes = append(runtimes, runtime)
}

// sync assets
Expand All @@ -273,10 +231,9 @@ func (s *LocalScanner) distributeJob(job *Job, ctx context.Context, upstream *up
if err != nil {
return nil, false, err
}

resp, err := services.SynchronizeAssets(ctx, &explorer.SynchronizeAssetsReq{
SpaceMrn: client.SpaceMrn,
List: justAssets,
List: assets,
})
if err != nil {
return nil, false, err
Expand All @@ -290,28 +247,28 @@ func (s *LocalScanner) distributeJob(job *Job, ctx context.Context, upstream *up

// attach the asset details to the assets list
for i := range assets {
log.Debug().Str("asset", assets[i].asset.Name).Strs("platform-ids", assets[i].asset.PlatformIds).Msg("update asset")
platformMrn := assets[i].asset.PlatformIds[0]
assets[i].asset.Mrn = platformAssetMapping[platformMrn].AssetMrn
assets[i].asset.Url = platformAssetMapping[platformMrn].Url
log.Debug().Str("asset", assets[i].Name).Strs("platform-ids", assets[i].PlatformIds).Msg("update asset")
platformMrn := assets[i].PlatformIds[0]
assets[i].Mrn = platformAssetMapping[platformMrn].AssetMrn
assets[i].Url = platformAssetMapping[platformMrn].Url
}
} else {
// ensure we have non-empty asset MRNs
for i := range assets {
cur := assets[i]
if cur.asset.Mrn == "" {
if cur.Mrn == "" {
randID := "//" + explorer.SERVICE_NAME + "/" + explorer.MRN_RESOURCE_ASSET + "/" + ksuid.New().String()
x, err := mrn.NewMRN(randID)
if err != nil {
return nil, false, multierr.Wrap(err, "failed to generate a random asset MRN")
}
cur.asset.Mrn = x.String()
cur.Mrn = x.String()
}
}
}

// plan scan jobs
reporter := NewAggregateReporter(justAssets)
reporter := NewAggregateReporter(assets)
// if a bundle was provided check that it matches the filter, bundles can also be downloaded
// later therefore we do not want to stop execution here
if job.Bundle != nil && job.Bundle.FilterQueryPacks(job.QueryPackFilters) {
Expand All @@ -323,11 +280,11 @@ func (s *LocalScanner) distributeJob(job *Job, ctx context.Context, upstream *up
for i := range assets {
// this shouldn't happen, but might
// it normally indicates a bug in the provider
if presentAsset, present := progressBarElements[assets[i].asset.PlatformIds[0]]; present {
return nil, false, fmt.Errorf("asset %s and %s have the same platform id %s", presentAsset, assets[i].asset.Name, assets[i].asset.PlatformIds[0])
if presentAsset, present := progressBarElements[assets[i].PlatformIds[0]]; present {
return nil, false, fmt.Errorf("asset %s and %s have the same platform id %s", presentAsset, assets[i].Name, assets[i].PlatformIds[0])
}
progressBarElements[assets[i].asset.PlatformIds[0]] = assets[i].asset.Name
orderedKeys = append(orderedKeys, assets[i].asset.PlatformIds[0])
progressBarElements[assets[i].PlatformIds[0]] = assets[i].Name
orderedKeys = append(orderedKeys, assets[i].PlatformIds[0])
}
var multiprogress progress.MultiProgress
if isatty.IsTerminal(os.Stdout.Fd()) && !strings.EqualFold(logger.GetLevel(), "debug") && !strings.EqualFold(logger.GetLevel(), "trace") {
Expand All @@ -347,8 +304,8 @@ func (s *LocalScanner) distributeJob(job *Job, ctx context.Context, upstream *up
go func() {
defer scanGroup.Done()
for i := range assets {
asset := assets[i].asset
runtime := assets[i].runtime
asset := assets[i]
runtime := runtimes[i]

// Make sure the context has not been canceled in the meantime. Note that this approach works only for single threaded execution. If we have more than 1 thread calling this function,
// we need to solve this at a different level.
Expand Down
Loading