Skip to content

Commit

Permalink
🧹 make sure we set IsLastBatch only once (#1161)
Browse files Browse the repository at this point in the history
* make sure we set IsLastBatch only once

Signed-off-by: Ivan Milchev <[email protected]>

* combine data and scores when storing results

Signed-off-by: Ivan Milchev <[email protected]>

---------

Signed-off-by: Ivan Milchev <[email protected]>
  • Loading branch information
imilchev authored Mar 8, 2024
1 parent d628ee2 commit 067a377
Showing 1 changed file with 8 additions and 25 deletions.
33 changes: 8 additions & 25 deletions policy/executor/internal/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,12 +86,9 @@ func (c *BufferedCollector) run() {
}
c.lock.Unlock()

if len(results) > 0 || done {
c.collector.SinkData(results, done)
}

if len(scores) > 0 || done {
c.collector.SinkScore(scores, done)
// If we have something to send or this is the last batch, we do a Sink
if len(scores) > 0 || len(results) > 0 || done {
c.collector.Sink(results, scores, done)
}

results = results[:0]
Expand Down Expand Up @@ -164,39 +161,25 @@ func (c *PolicyServiceCollector) toResult(rr *llx.RawResult) *llx.Result {
return v
}

func (c *PolicyServiceCollector) SinkData(results []*llx.RawResult, isDone bool) {
if len(results) == 0 {
func (c *PolicyServiceCollector) Sink(results []*llx.RawResult, scores []*policy.Score, isDone bool) {
// If we have nothing to send and also this is not the last batch, we just skip
if len(results) == 0 && len(scores) == 0 && !isDone {
return
}
resultsToSend := make(map[string]*llx.Result, len(results))
for _, rr := range results {
resultsToSend[rr.CodeID] = c.toResult(rr)
}
log.Debug().Msg("Sending datapoints")
log.Debug().Msg("Sending datapoints and scores")
_, err := c.resolver.StoreResults(context.Background(), &policy.StoreResultsReq{
AssetMrn: c.assetMrn,
Data: resultsToSend,
IsPreprocessed: true,
IsLastBatch: isDone,
})
if err != nil {
log.Error().Err(err).Msg("failed to send datapoints")
}
}

func (c *PolicyServiceCollector) SinkScore(scores []*policy.Score, isDone bool) {
if len(scores) == 0 {
return
}
log.Debug().Msg("Sending scores")
_, err := c.resolver.StoreResults(context.Background(), &policy.StoreResultsReq{
AssetMrn: c.assetMrn,
Scores: scores,
IsPreprocessed: true,
IsLastBatch: isDone,
})
if err != nil {
log.Error().Err(err).Msg("failed to send scores")
log.Error().Err(err).Msg("failed to send datapoints and scores")
}
}

Expand Down

0 comments on commit 067a377

Please sign in to comment.