Skip to content

Commit

Permalink
⭐️ Add reporting jobs for data queries.
Browse files Browse the repository at this point in the history
Until now we turned data queries's reporting jobs into datapoints and attached those to the parent RJs. With this change we instead attach the datapoints to the data queries' RJs and we do not delete them.
This means that we now have scores coming in for data queries, even though those will always be of the unscored type.

Signed-off-by: Preslav <[email protected]>
  • Loading branch information
preslavgerchev committed Dec 13, 2023
1 parent c189bb3 commit 0980b1e
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 69 deletions.
105 changes: 37 additions & 68 deletions policy/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -1034,41 +1034,39 @@ func (cache *policyResolverCache) addCheckJobVariants(ctx context.Context, query
}

func (cache *policyResolverCache) addDataQueryJob(ctx context.Context, query *explorer.Mquery, ownerJob *ReportingJob) {
if len(query.Variants) != 0 {
err := cache.addDataQueryVariants(ctx, query, ownerJob)
if err != nil {
log.Error().Err(err).Str("queryMrn", query.Mrn).Msg("failed to add data query variants")
}
return
}

uuid := cache.global.relativeChecksum(query.Checksum)
uuid := cache.global.relativeChecksum(query.Mrn)
queryJob := cache.global.reportingJobsByUUID[uuid]

// note: the ReportingJob is only a placeholder and is replaced by individual query LLX checksum ReportingJobs
if queryJob == nil {
queryJob = &ReportingJob{
Uuid: cache.global.relativeChecksum(query.Checksum),
Uuid: uuid,
QrId: query.Mrn,
ChildJobs: map[string]*explorer.Impact{},
Datapoints: map[string]bool{},
Type: ReportingJob_DATA_QUERY,
// FIXME: DEPRECATED, remove in v10.0 vv
DeprecatedV8IsData: true,
}
cache.global.reportingJobsByUUID[queryJob.Uuid] = queryJob
cache.global.codeIdToMrn[query.CodeId] = append(cache.global.codeIdToMrn[query.CodeId], query.Mrn)
cache.global.reportingJobsByUUID[uuid] = queryJob
cache.global.reportingJobsByMsum[query.Checksum] = append(cache.global.reportingJobsByMsum[query.Checksum], queryJob)
cache.childJobsByMrn[query.Mrn] = append(cache.childJobsByMrn[query.Mrn], queryJob)
}

// local aspects for the resolved policy
queryJob.Notify = append(queryJob.Notify, ownerJob.Uuid)

ownerJob.Datapoints[queryJob.Uuid] = true
// we set a placeholder for the execution query, just to indicate it will be added
cache.global.executionQueries[query.Checksum] = nil
cache.global.dataQueries[query.Checksum] = struct{}{}
cache.global.queriesByMsum[query.Checksum] = query
if ownerJob.ChildJobs[queryJob.Uuid] == nil {
ownerJob.ChildJobs[queryJob.Uuid] = query.Impact
}
if len(query.Variants) != 0 {
err := cache.addDataQueryVariants(ctx, query, queryJob)
if err != nil {
log.Error().Err(err).Str("queryMrn", query.Mrn).Msg("failed to add data query variants")
}
} else {
// we set a placeholder for the execution query, just to indicate it will be added
cache.global.executionQueries[query.Checksum] = nil
cache.global.dataQueries[query.Checksum] = struct{}{}
cache.global.queriesByMsum[query.Checksum] = query
}
}

func (cache *policyResolverCache) addDataQueryVariants(ctx context.Context, query *explorer.Mquery, ownerJob *ReportingJob) error {
Expand Down Expand Up @@ -1126,13 +1124,6 @@ func (cache *policyResolverCache) modifyCheckJob(check *explorer.Mquery, impact
}
}

// type propInfo struct {
// prop *explorer.Property
// typ *llx.Primitive
// dataChecksum string
// name string
// }

func (s *LocalServices) jobsToQueries(ctx context.Context, policyMrn string, cache *resolverCache) (*ExecutionJob, *CollectorJob, error) {
ctx, span := tracer.Start(ctx, "resolver/jobsToQueries")
defer span.End()
Expand Down Expand Up @@ -1271,36 +1262,19 @@ func (s *LocalServices) jobsToQueries(ctx context.Context, policyMrn string, cac
}
arr.Items = append(arr.Items, rj.Uuid)

// process all the datapoints of this job
for dp := range executionQuery.Datapoints {
datapointID := executionQuery.Datapoints[dp]
datapointInfo, ok := collectorJob.Datapoints[datapointID]
if !ok {
return nil, nil, errors.New("failed to identity datapoint in collectorjob")
}

datapointInfo.Notify = append(datapointInfo.Notify, rj.Uuid)
rj.Datapoints[datapointID] = true
if err = connectDatapointsToReportingJob(executionQuery, rj, collectorJob.Datapoints); err != nil {
return nil, nil, err
}

continue
}

// (3) Data Queries handling
for _, parentID := range rj.Notify {
parentJob, ok := collectorJob.ReportingJobs[parentID]
if !ok {
return nil, nil, errors.New("failed to connect datapoint to reporting job")
}

if err = connectDatapointsToReportingJob(executionQuery, parentJob, collectorJob.Datapoints); err != nil {
return nil, nil, err
}

// we don't need this any longer, since every datapoint now reports into
// the parent job (i.e. reports into the policy instead of the data query)
delete(collectorJob.ReportingJobs, rj.Uuid)
delete(parentJob.Datapoints, rj.Uuid)
// We connect the datapoints to the data query reporting job. Previously we had connected the datapoints
/// to the data query's parent and had removed the data query RJ. We now keep those around to indicate which
// data queries have been ran.
if err = connectDatapointsToReportingJob(executionQuery, rj, collectorJob.Datapoints); err != nil {
return nil, nil, err
}
}
}
Expand All @@ -1309,9 +1283,8 @@ func (s *LocalServices) jobsToQueries(ctx context.Context, policyMrn string, cac
}

func connectDatapointsToReportingJob(query *ExecutionQuery, job *ReportingJob, datapoints map[string]*DataQueryInfo) error {
for dp := range query.Datapoints {
datapointID := query.Datapoints[dp]
datapointInfo, ok := datapoints[datapointID]
for _, dpId := range query.Datapoints {
datapointInfo, ok := datapoints[dpId]
if !ok {
return errors.New("failed to identity datapoint in collectorjob")
}
Expand All @@ -1320,7 +1293,7 @@ func connectDatapointsToReportingJob(query *ExecutionQuery, job *ReportingJob, d
if job.Datapoints == nil {
job.Datapoints = map[string]bool{}
}
job.Datapoints[datapointID] = true
job.Datapoints[dpId] = true
}
return nil
}
Expand Down Expand Up @@ -1582,24 +1555,20 @@ func (s *LocalServices) jobsToControls(cache *frameworkResolverCache, framework
continue
}
uuid := cache.relativeChecksum(mquery.Mrn)
queryJob := &ReportingJob{
Uuid: uuid,
QrId: mquery.Mrn,
ChildJobs: map[string]*explorer.Impact{},
Type: ReportingJob_DATA_QUERY,
}
nuJobs[uuid] = queryJob
for controlMrn := range targets {
controlJob := ensureControlJob(cache, nuJobs, controlMrn, framework, frameworkGroupByControlMrn)
controlJob.ChildJobs[queryJob.Uuid] = nil
queryJob.Notify = append(queryJob.Notify, controlJob.Uuid)
queryJob, ok := job.ReportingJobs[uuid]
if !ok {
queryJob = &ReportingJob{
Uuid: uuid,
QrId: mquery.Mrn,
ChildJobs: map[string]*explorer.Impact{},
Type: ReportingJob_DATA_QUERY,
}
err := connectDatapointsToReportingJob(execQuery, queryJob, job.Datapoints)
if err != nil {
return err
}

}

nuJobs[uuid] = queryJob
continue

case ResolvedFrameworkNodeTypeControl:
Expand Down
2 changes: 1 addition & 1 deletion policy/resolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ policies:
})
require.NoError(t, err)
require.NotNil(t, rp)
require.Len(t, rp.CollectorJob.ReportingJobs, 4)
require.Len(t, rp.CollectorJob.ReportingJobs, 5)
ignoreJob := rp.CollectorJob.ReportingJobs["lTbmPQz/DwA="]
require.NotNil(t, ignoreJob)
childJob := ignoreJob.ChildJobs["DmPNGpL6IXo="]
Expand Down

0 comments on commit 0980b1e

Please sign in to comment.