diff --git a/policy/resolver.go b/policy/resolver.go index 5f2feb774..d162a318f 100644 --- a/policy/resolver.go +++ b/policy/resolver.go @@ -1034,41 +1034,39 @@ func (cache *policyResolverCache) addCheckJobVariants(ctx context.Context, query } func (cache *policyResolverCache) addDataQueryJob(ctx context.Context, query *explorer.Mquery, ownerJob *ReportingJob) { - if len(query.Variants) != 0 { - err := cache.addDataQueryVariants(ctx, query, ownerJob) - if err != nil { - log.Error().Err(err).Str("queryMrn", query.Mrn).Msg("failed to add data query variants") - } - return - } - - uuid := cache.global.relativeChecksum(query.Checksum) + uuid := cache.global.relativeChecksum(query.Mrn) queryJob := cache.global.reportingJobsByUUID[uuid] - // note: the ReportingJob is only a placeholder and is replaced by individual query LLX checksum ReportingJobs if queryJob == nil { queryJob = &ReportingJob{ - Uuid: cache.global.relativeChecksum(query.Checksum), + Uuid: uuid, QrId: query.Mrn, ChildJobs: map[string]*explorer.Impact{}, Datapoints: map[string]bool{}, Type: ReportingJob_DATA_QUERY, - // FIXME: DEPRECATED, remove in v10.0 vv - DeprecatedV8IsData: true, } - cache.global.reportingJobsByUUID[queryJob.Uuid] = queryJob + cache.global.codeIdToMrn[query.CodeId] = append(cache.global.codeIdToMrn[query.CodeId], query.Mrn) + cache.global.reportingJobsByUUID[uuid] = queryJob cache.global.reportingJobsByMsum[query.Checksum] = append(cache.global.reportingJobsByMsum[query.Checksum], queryJob) cache.childJobsByMrn[query.Mrn] = append(cache.childJobsByMrn[query.Mrn], queryJob) } // local aspects for the resolved policy queryJob.Notify = append(queryJob.Notify, ownerJob.Uuid) - - ownerJob.Datapoints[queryJob.Uuid] = true - // we set a placeholder for the execution query, just to indicate it will be added - cache.global.executionQueries[query.Checksum] = nil - cache.global.dataQueries[query.Checksum] = struct{}{} - cache.global.queriesByMsum[query.Checksum] = query + if ownerJob.ChildJobs[queryJob.Uuid] == nil { + ownerJob.ChildJobs[queryJob.Uuid] = query.Impact + } + if len(query.Variants) != 0 { + err := cache.addDataQueryVariants(ctx, query, queryJob) + if err != nil { + log.Error().Err(err).Str("queryMrn", query.Mrn).Msg("failed to add data query variants") + } + } else { + // we set a placeholder for the execution query, just to indicate it will be added + cache.global.executionQueries[query.Checksum] = nil + cache.global.dataQueries[query.Checksum] = struct{}{} + cache.global.queriesByMsum[query.Checksum] = query + } } func (cache *policyResolverCache) addDataQueryVariants(ctx context.Context, query *explorer.Mquery, ownerJob *ReportingJob) error { @@ -1126,13 +1124,6 @@ func (cache *policyResolverCache) modifyCheckJob(check *explorer.Mquery, impact } } -// type propInfo struct { -// prop *explorer.Property -// typ *llx.Primitive -// dataChecksum string -// name string -// } - func (s *LocalServices) jobsToQueries(ctx context.Context, policyMrn string, cache *resolverCache) (*ExecutionJob, *CollectorJob, error) { ctx, span := tracer.Start(ctx, "resolver/jobsToQueries") defer span.End() @@ -1271,36 +1262,19 @@ func (s *LocalServices) jobsToQueries(ctx context.Context, policyMrn string, cac } arr.Items = append(arr.Items, rj.Uuid) - // process all the datapoints of this job - for dp := range executionQuery.Datapoints { - datapointID := executionQuery.Datapoints[dp] - datapointInfo, ok := collectorJob.Datapoints[datapointID] - if !ok { - return nil, nil, errors.New("failed to identity datapoint in collectorjob") - } - - datapointInfo.Notify = append(datapointInfo.Notify, rj.Uuid) - rj.Datapoints[datapointID] = true + if err = connectDatapointsToReportingJob(executionQuery, rj, collectorJob.Datapoints); err != nil { + return nil, nil, err } continue } // (3) Data Queries handling - for _, parentID := range rj.Notify { - parentJob, ok := collectorJob.ReportingJobs[parentID] - if !ok { - return nil, nil, errors.New("failed to connect datapoint to reporting job") - } - - if err = connectDatapointsToReportingJob(executionQuery, parentJob, collectorJob.Datapoints); err != nil { - return nil, nil, err - } - - // we don't need this any longer, since every datapoint now reports into - // the parent job (i.e. reports into the policy instead of the data query) - delete(collectorJob.ReportingJobs, rj.Uuid) - delete(parentJob.Datapoints, rj.Uuid) + // We connect the datapoints to the data query reporting job. Previously we had connected the datapoints + /// to the data query's parent and had removed the data query RJ. We now keep those around to indicate which + // data queries have been ran. + if err = connectDatapointsToReportingJob(executionQuery, rj, collectorJob.Datapoints); err != nil { + return nil, nil, err } } } @@ -1309,9 +1283,8 @@ func (s *LocalServices) jobsToQueries(ctx context.Context, policyMrn string, cac } func connectDatapointsToReportingJob(query *ExecutionQuery, job *ReportingJob, datapoints map[string]*DataQueryInfo) error { - for dp := range query.Datapoints { - datapointID := query.Datapoints[dp] - datapointInfo, ok := datapoints[datapointID] + for _, dpId := range query.Datapoints { + datapointInfo, ok := datapoints[dpId] if !ok { return errors.New("failed to identity datapoint in collectorjob") } @@ -1320,7 +1293,7 @@ func connectDatapointsToReportingJob(query *ExecutionQuery, job *ReportingJob, d if job.Datapoints == nil { job.Datapoints = map[string]bool{} } - job.Datapoints[datapointID] = true + job.Datapoints[dpId] = true } return nil } @@ -1582,24 +1555,20 @@ func (s *LocalServices) jobsToControls(cache *frameworkResolverCache, framework continue } uuid := cache.relativeChecksum(mquery.Mrn) - queryJob := &ReportingJob{ - Uuid: uuid, - QrId: mquery.Mrn, - ChildJobs: map[string]*explorer.Impact{}, - Type: ReportingJob_DATA_QUERY, - } - nuJobs[uuid] = queryJob - for controlMrn := range targets { - controlJob := ensureControlJob(cache, nuJobs, controlMrn, framework, frameworkGroupByControlMrn) - controlJob.ChildJobs[queryJob.Uuid] = nil - queryJob.Notify = append(queryJob.Notify, controlJob.Uuid) + queryJob, ok := job.ReportingJobs[uuid] + if !ok { + queryJob = &ReportingJob{ + Uuid: uuid, + QrId: mquery.Mrn, + ChildJobs: map[string]*explorer.Impact{}, + Type: ReportingJob_DATA_QUERY, + } err := connectDatapointsToReportingJob(execQuery, queryJob, job.Datapoints) if err != nil { return err } - } - + nuJobs[uuid] = queryJob continue case ResolvedFrameworkNodeTypeControl: diff --git a/policy/resolver_test.go b/policy/resolver_test.go index ec551fe8c..149cb1614 100644 --- a/policy/resolver_test.go +++ b/policy/resolver_test.go @@ -222,7 +222,7 @@ policies: }) require.NoError(t, err) require.NotNil(t, rp) - require.Len(t, rp.CollectorJob.ReportingJobs, 4) + require.Len(t, rp.CollectorJob.ReportingJobs, 5) ignoreJob := rp.CollectorJob.ReportingJobs["lTbmPQz/DwA="] require.NotNil(t, ignoreJob) childJob := ignoreJob.ChildJobs["DmPNGpL6IXo="]