Skip to content

Commit

Permalink
RHINENG-8141: improve evaluation performance
Browse files Browse the repository at this point in the history
  • Loading branch information
semantic-release authored and Dugowitch committed Jul 31, 2024
1 parent 4f50a2c commit 692814f
Show file tree
Hide file tree
Showing 12 changed files with 649 additions and 378 deletions.
6 changes: 2 additions & 4 deletions base/database/baseline.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,20 @@ import (
"app/base/utils"
"encoding/json"
"time"

"gorm.io/gorm"
)

type BaselineConfig struct {
// Filter applicable advisories (updates) by the latest publish time.
ToTime time.Time `json:"to_time" example:"2022-12-31T12:00:00-04:00"`
}

func GetBaselineConfig(tx *gorm.DB, system *models.SystemPlatform) *BaselineConfig {
func GetBaselineConfig(system *models.SystemPlatform) *BaselineConfig {
if system.BaselineID == nil {
return nil
}

var jsonb [][]byte
err := tx.Table("baseline").
err := DB.Table("baseline").
Where("id = ? and rh_account_id = ?", system.BaselineID, system.RhAccountID).
Pluck("config", &jsonb).Error
if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions base/database/baseline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,18 @@ func TestBaselineConfig(t *testing.T) {

// system without baseline
system := models.SystemPlatform{ID: 8, RhAccountID: 1, BaselineID: nil}
baselineConfig := GetBaselineConfig(DB, &system)
baselineConfig := GetBaselineConfig(&system)
assert.Nil(t, baselineConfig)

// system with existing baseline
system = models.SystemPlatform{ID: int64(1), RhAccountID: 1, BaselineID: utils.PtrInt64(1)}
baselineConfig = GetBaselineConfig(DB, &system)
baselineConfig = GetBaselineConfig(&system)
assert.Equal(t, "2010-09-22 00:00:00+00", baselineConfig.ToTime.Format("2006-01-02 15:04:05-07"))

baselineID := CreateBaselineWithConfig(t, "", nil, nil, nil)
// baseline with empty config
system = models.SystemPlatform{ID: 1, RhAccountID: 1, BaselineID: &baselineID}
baselineConfig = GetBaselineConfig(DB, &system)
baselineConfig = GetBaselineConfig(&system)
assert.Nil(t, baselineConfig)
DeleteBaseline(t, baselineID)
}
8 changes: 4 additions & 4 deletions base/database/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ func SystemAdvisoriesByInventoryID(tx *gorm.DB, accountID int, groups map[string
return (joinsT)(joins).apply(tx)
}

func SystemAdvisoriesBySystemID(tx *gorm.DB, accountID int, systemID int64) *gorm.DB {
query := systemAdvisoriesQuery(tx, accountID).Where("sp.id = ?", systemID)
func SystemAdvisoriesBySystemID(accountID int, systemID int64) *gorm.DB {
query := systemAdvisoriesQuery(accountID).Where("sp.id = ?", systemID)
return query
}

Expand All @@ -79,8 +79,8 @@ func AdvisoryMetadata(tx *gorm.DB) *gorm.DB {
return JoinAdvisoryType(tx)
}

func systemAdvisoriesQuery(tx *gorm.DB, accountID int) *gorm.DB {
query := tx.Table("system_advisories sa").Select("sa.*").
func systemAdvisoriesQuery(accountID int) *gorm.DB {
query := DB.Table("system_advisories sa").Select("sa.*").
Joins("join system_platform sp ON sa.rh_account_id = sp.rh_account_id AND sa.system_id = sp.id").
Where("sa.rh_account_id = ? AND sp.rh_account_id = ?", accountID, accountID)
return query
Expand Down
118 changes: 62 additions & 56 deletions evaluator/evaluate.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"github.com/jinzhu/copier"
"github.com/pkg/errors"
"gorm.io/gorm"
"gorm.io/gorm/clause"

log "github.com/sirupsen/logrus"
)
Expand Down Expand Up @@ -166,7 +165,7 @@ func Evaluate(ctx context.Context, event *mqueue.PlatformEvent, inventoryID, eva
return nil
}

// Runs Evaluate method in Goroutines
// RunEvaluate runs Evaluate method in Goroutines
func runEvaluate(
ctx context.Context,
event mqueue.PlatformEvent, // makes a copy to avoid races
Expand Down Expand Up @@ -204,11 +203,7 @@ func runEvaluate(

func evaluateInDatabase(ctx context.Context, event *mqueue.PlatformEvent, inventoryID string) (
*models.SystemPlatform, *vmaas.UpdatesV3Response, error) {
tx := database.DB.WithContext(base.Context).Begin()
// Don't allow requested TX to hang around locking the rows
defer tx.Rollback()

system, err := tryGetSystem(tx, event.AccountID, inventoryID, event.Timestamp)
system, err := tryGetSystem(event.AccountID, inventoryID, event.Timestamp)
if err != nil {
return nil, nil, errors.Wrap(err, "unable to get system")
}
Expand All @@ -217,7 +212,7 @@ func evaluateInDatabase(ctx context.Context, event *mqueue.PlatformEvent, invent
return nil, nil, nil
}

thirdParty, err := analyzeRepos(tx, system)
thirdParty, err := analyzeRepos(system)
if err != nil {
return nil, nil, errors.Wrap(err, "Repo analysis failed")
}
Expand All @@ -227,12 +222,13 @@ func evaluateInDatabase(ctx context.Context, event *mqueue.PlatformEvent, invent
if err != nil {
return nil, nil, errors.Wrap(err, "unable to get updates data")
}

if updatesData == nil {
utils.LogWarn("inventoryID", inventoryID, "No vmaas updates")
return nil, nil, nil
}

vmaasData, err := evaluateWithVmaas(tx, updatesData, system, event)
vmaasData, err := evaluateWithVmaas(updatesData, system, event)
if err != nil {
return nil, nil, errors.Wrap(err, "evaluation with vmaas failed")
}
Expand Down Expand Up @@ -283,32 +279,25 @@ func tryGetYumUpdates(system *models.SystemPlatform) (*vmaas.UpdatesV3Response,
return &resp, nil
}

func evaluateWithVmaas(tx *gorm.DB, updatesData *vmaas.UpdatesV3Response,
func evaluateWithVmaas(updatesData *vmaas.UpdatesV3Response,
system *models.SystemPlatform, event *mqueue.PlatformEvent) (*vmaas.UpdatesV3Response, error) {
if enableBaselineEval {
if !system.SatelliteManaged || (system.SatelliteManaged && !enableSatelliteFunctionality) {
err := limitVmaasToBaseline(tx, system, updatesData)
err := limitVmaasToBaseline(system, updatesData)
if err != nil {
return nil, errors.Wrap(err, "Failed to evaluate baseline")
}
}
}

err := evaluateAndStore(tx, system, updatesData, event)
err := evaluateAndStore(system, updatesData, event)
if err != nil {
return nil, errors.Wrap(err, "Unable to evaluate and store results")
}

err = commitWithObserve(tx)
if err != nil {
evaluationCnt.WithLabelValues("error-database-commit").Inc()
return nil, errors.New("database commit failed")
}
return updatesData, nil
}

func getUpdatesData(ctx context.Context, system *models.SystemPlatform) (
*vmaas.UpdatesV3Response, error) {
func getUpdatesData(ctx context.Context, system *models.SystemPlatform) (*vmaas.UpdatesV3Response, error) {
var yumUpdates *vmaas.UpdatesV3Response
var yumErr error
if enableYumUpdatesEval {
Expand Down Expand Up @@ -421,9 +410,9 @@ func tryGetVmaasRequest(system *models.SystemPlatform) (*vmaas.UpdatesV3Request,
return &updatesReq, nil
}

func tryGetSystem(tx *gorm.DB, accountID int, inventoryID string,
func tryGetSystem(accountID int, inventoryID string,
requested *types.Rfc3339Timestamp) (*models.SystemPlatform, error) {
system, err := loadSystemData(tx, accountID, inventoryID)
system, err := loadSystemData(accountID, inventoryID)
if err != nil {
evaluationCnt.WithLabelValues("error-db-read-inventory-data").Inc()
return nil, errors.Wrap(err, "error loading system from DB")
Expand Down Expand Up @@ -459,39 +448,73 @@ func commitWithObserve(tx *gorm.DB) error {
return nil
}

func evaluateAndStore(tx *gorm.DB, system *models.SystemPlatform,
// EvaluateAndStore first loads advisories and packages (including change evaluation)
// and then executes all deletions, updates, and insertions in a single transaction.
func evaluateAndStore(system *models.SystemPlatform,
vmaasData *vmaas.UpdatesV3Response, event *mqueue.PlatformEvent) error {
newSystemAdvisories, err := analyzeAdvisories(tx, system, vmaasData)
advisoriesByName, err := lazySaveAndLoadAdvisories(system, vmaasData)
if err != nil {
return errors.Wrap(err, "Advisory analysis failed")
return errors.Wrap(err, "Advisory loading failed")
}

installed, installable, applicable, err := analyzePackages(tx, system, vmaasData)
pkgByName, installed, installable, applicable, err := lazySaveAndLoadPackages(system, vmaasData)
if err != nil {
return errors.Wrap(err, "Package analysis failed")
return errors.Wrap(err, "Package loading failed")
}

err = updateSystemPlatform(tx, system, newSystemAdvisories, installed, installable, applicable)
tx := database.DB.WithContext(base.Context).Begin()
// Don't allow requested TX to hang around locking the rows
defer tx.Rollback()

systemAdvisoriesNew, err := storeAdvisoryData(tx, system, advisoriesByName)
if err != nil {
evaluationCnt.WithLabelValues("error-store-advisories").Inc()
return errors.Wrap(err, "Unable to store advisory data")
}

err = updateSystemPackages(tx, system, pkgByName)
if err != nil {
evaluationCnt.WithLabelValues("error-system-pkgs").Inc()
return errors.Wrap(err, "Unable to update system packages")
}

err = updateSystemPlatform(tx, system, systemAdvisoriesNew, installed, installable, applicable)
if err != nil {
evaluationCnt.WithLabelValues("error-update-system").Inc()
return errors.Wrap(err, "Unable to update system")
}

// Send instant notification with new advisories
if enableInstantNotifications {
err = publishNewAdvisoriesNotification(tx, system, event, system.RhAccountID, newSystemAdvisories)
err = publishNewAdvisoriesNotification(tx, system, event, system.RhAccountID, systemAdvisoriesNew)
if err != nil {
evaluationCnt.WithLabelValues("error-advisory-notification").Inc()
utils.LogError("orgID", event.GetOrgID(), "inventoryID", system.GetInventoryID(), "err", err.Error(),
"publishing new advisories notification failed")
}
}

err = commitWithObserve(tx)
if err != nil {
evaluationCnt.WithLabelValues("error-database-commit").Inc()
return errors.New("database commit failed")
}

return nil
}

func analyzeRepos(tx *gorm.DB, system *models.SystemPlatform) (
thirdParty bool, err error) {
func incrementAdvisoryTypeCounts(advisory models.AdvisoryMetadata, enhCount, bugCount, secCount *int) {
switch advisory.AdvisoryTypeID {
case enhancement:
*enhCount++
case bugfix:
*bugCount++
case security:
*secCount++
}
}

func analyzeRepos(system *models.SystemPlatform) (thirdParty bool, err error) {
if !enableRepoAnalysis {
utils.LogInfo("repo analysis disabled, skipping")
return false, nil
Expand All @@ -500,7 +523,7 @@ func analyzeRepos(tx *gorm.DB, system *models.SystemPlatform) (
// if system has associated at least one third party repo
// it's marked as third party system
var thirdPartyCount int64
err = tx.Table("system_repo sr").
err = database.DB.Table("system_repo sr").
Joins("join repo r on r.id = sr.repo_id").
Where("sr.rh_account_id = ?", system.RhAccountID).
Where("sr.system_id = ?", system.ID).
Expand All @@ -517,7 +540,7 @@ func analyzeRepos(tx *gorm.DB, system *models.SystemPlatform) (

// nolint: funlen
func updateSystemPlatform(tx *gorm.DB, system *models.SystemPlatform,
new SystemAdvisoryMap, installed, installable, applicable int) error {
advisories SystemAdvisoryMap, installed, installable, applicable int) error {
tStart := time.Now()
defer utils.ObserveSecondsSince(tStart, evaluationPartDuration.WithLabelValues("system-update"))
defer utils.ObserveSecondsSince(*system.LastUpload, uploadEvaluationDelay)
Expand All @@ -529,7 +552,7 @@ func updateSystemPlatform(tx *gorm.DB, system *models.SystemPlatform,
data["last_evaluation"] = time.Now()

if enableAdvisoryAnalysis {
if new == nil {
if advisories == nil {
return errors.New("Invalid args")
}
installableCount := 0
Expand All @@ -540,26 +563,12 @@ func updateSystemPlatform(tx *gorm.DB, system *models.SystemPlatform,
applicableEnhCount := 0
applicableBugCount := 0
applicableSecCount := 0
for _, sa := range new {
for _, sa := range advisories {
if sa.StatusID == INSTALLABLE {
switch sa.Advisory.AdvisoryTypeID {
case 1:
installableEnhCount++
case 2:
installableBugCount++
case 3:
installableSecCount++
}
incrementAdvisoryTypeCounts(sa.Advisory, &installableEnhCount, &installableBugCount, &installableSecCount)
installableCount++
}
switch sa.Advisory.AdvisoryTypeID {
case 1:
applicableEnhCount++
case 2:
applicableBugCount++
case 3:
applicableSecCount++
}
incrementAdvisoryTypeCounts(sa.Advisory, &applicableEnhCount, &applicableBugCount, &applicableSecCount)
applicableCount++
}

Expand Down Expand Up @@ -624,15 +633,12 @@ func callVMaas(ctx context.Context, request *vmaas.UpdatesV3Request) (*vmaas.Upd
return vmaasDataPtr.(*vmaas.UpdatesV3Response), nil
}

func loadSystemData(tx *gorm.DB, accountID int, inventoryID string) (*models.SystemPlatform, error) {
func loadSystemData(accountID int, inventoryID string) (*models.SystemPlatform, error) {
tStart := time.Now()
defer utils.ObserveSecondsSince(tStart, evaluationPartDuration.WithLabelValues("data-loading"))

var system models.SystemPlatform
err := tx.Clauses(clause.Locking{
Strength: "UPDATE",
Table: clause.Table{Name: clause.CurrentTable},
}).Where("rh_account_id = ?", accountID).
err := database.DB.Where("rh_account_id = ?", accountID).
Where("inventory_id = ?::uuid", inventoryID).
Find(&system).Error
return &system, err
Expand Down
Loading

0 comments on commit 692814f

Please sign in to comment.