Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

⭐ implement StoreResults for resources data #3096

Merged
merged 6 commits into from
Jan 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion apps/cnquery/cmd/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,10 @@ func (c *cnqueryPlugin) RunQuery(conf *run.RunQueryConfig, runtime *providers.Ru
}
defer func() {
// prevent the recording from being closed multiple times
connectAssetRuntime.Recording = providers.NullRecording{}
err = connectAssetRuntime.SetRecording(providers.NullRecording{})
if err != nil {
log.Error().Err(err).Msg("failed to set the recording layer to null")
}
sh.Close()
}()

Expand Down
4 changes: 1 addition & 3 deletions apps/cnquery/cmd/scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,9 +308,7 @@ func RunScan(config *scanConfig) (*explorer.ReportCollection, error) {
if config.runtime.UpstreamConfig != nil {
opts = append(opts, scan.WithUpstream(config.runtime.UpstreamConfig))
}
if config.runtime.Recording != nil {
opts = append(opts, scan.WithRecording(config.runtime.Recording))
}
opts = append(opts, scan.WithRecording(config.runtime.Recording()))

scanner := scan.NewLocalScanner(opts...)
ctx := cnquery.SetFeatures(context.Background(), config.Features)
Expand Down
599 changes: 311 additions & 288 deletions explorer/cnquery_explorer.pb.go

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions explorer/cnquery_explorer.proto
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,7 @@ message UpdateAssetJobsReq {
message StoreResultsReq {
string asset_mrn = 1;
map<string, cnquery.llx.Result> data = 3;
map<string, cnquery.llx.ResourceRecording> resources = 4;
}

// Retrieve data for a given set of entities which was previously stored
Expand Down
2 changes: 1 addition & 1 deletion explorer/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ func (e *instance) snapshotResults() map[string]*llx.Result {
return results
}

func (e *instance) StoreData() error {
func (e *instance) StoreQueryData() error {
if e.collector == nil {
return errors.New("cannot store data, no collector provided")
}
Expand Down
30 changes: 25 additions & 5 deletions explorer/scan/local_scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ type LocalScanner struct {
ctx context.Context
fetcher *fetcher
upstream *upstream.UpstreamConfig
recording providers.Recording
recording llx.Recording
}

type ScannerOption func(*LocalScanner)
Expand All @@ -58,7 +58,7 @@ func WithUpstream(u *upstream.UpstreamConfig) func(s *LocalScanner) {
}
}

func WithRecording(r providers.Recording) func(s *LocalScanner) {
func WithRecording(r llx.Recording) func(s *LocalScanner) {
return func(s *LocalScanner) {
s.recording = r
}
Expand Down Expand Up @@ -267,14 +267,18 @@ func (s *LocalScanner) distributeJob(job *Job, ctx context.Context, upstream *up
return nil, false, err
}
}
runtime.SetRecording(candidate.runtime.Recording)
err = runtime.SetRecording(candidate.runtime.Recording())
if err != nil {
log.Error().Err(err).Msg("unable to set recording for asset (pre-connect)")
continue
}

err = runtime.Connect(&plugin.ConnectReq{
Features: config.Features,
Asset: candidate.asset,
Upstream: upstream,
})
candidate.asset = runtime.Provider.Connection.Asset // to ensure we get all the information the connect call gave us

if err != nil {
log.Error().Err(err).Str("asset", candidate.asset.Name).Msg("unable to connect to asset")
continue
Expand Down Expand Up @@ -692,11 +696,27 @@ func (s *localAssetScanner) runQueryPack() (*AssetReport, error) {
return nil, err
}

err = e.StoreData()
err = e.StoreQueryData()
if err != nil {
return nil, err
}

if cnquery.GetFeatures(s.job.Ctx).IsActive(cnquery.StoreResourcesData) {
recording := s.Runtime.Recording()
data, ok := recording.GetAssetData(s.job.Asset.Mrn)
if !ok {
log.Debug().Msg("not storing resource data for this asset, nothing available")
} else {
_, err = conductor.StoreResults(context.Background(), &explorer.StoreResultsReq{
AssetMrn: s.job.Asset.Mrn,
Resources: data,
})
if err != nil {
return nil, err
}
}
}

ar := &AssetReport{
Mrn: s.job.Asset.Mrn,
Bundle: assetBundle,
Expand Down
5 changes: 3 additions & 2 deletions feature_string.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions featureflags.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,12 @@ const (
// start: v8.x
// end: v9.0
ErrorsAsFailures

// StoreResourcesData feature flag
// desc: Stores recording-like data with upstream
// start: v10.x
// end: tbd (candidate: v11.0)
StoreResourcesData
)

// FeaturesValue is a map from feature name to feature flag
Expand Down
Loading
Loading