Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RHINENG-8141: improve evaluation performance #1461

Merged
merged 15 commits into from
Aug 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions base/database/baseline.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,20 @@ import (
"app/base/utils"
"encoding/json"
"time"

"gorm.io/gorm"
)

type BaselineConfig struct {
// Filter applicable advisories (updates) by the latest publish time.
ToTime time.Time `json:"to_time" example:"2022-12-31T12:00:00-04:00"`
}

func GetBaselineConfig(tx *gorm.DB, system *models.SystemPlatform) *BaselineConfig {
func GetBaselineConfig(system *models.SystemPlatform) *BaselineConfig {
if system.BaselineID == nil {
return nil
}

var jsonb [][]byte
err := tx.Table("baseline").
err := DB.Table("baseline").
Where("id = ? and rh_account_id = ?", system.BaselineID, system.RhAccountID).
Pluck("config", &jsonb).Error
if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions base/database/baseline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,18 @@ func TestBaselineConfig(t *testing.T) {

// system without baseline
system := models.SystemPlatform{ID: 8, RhAccountID: 1, BaselineID: nil}
baselineConfig := GetBaselineConfig(DB, &system)
baselineConfig := GetBaselineConfig(&system)
assert.Nil(t, baselineConfig)

// system with existing baseline
system = models.SystemPlatform{ID: int64(1), RhAccountID: 1, BaselineID: utils.PtrInt64(1)}
baselineConfig = GetBaselineConfig(DB, &system)
baselineConfig = GetBaselineConfig(&system)
assert.Equal(t, "2010-09-22 00:00:00+00", baselineConfig.ToTime.Format("2006-01-02 15:04:05-07"))

baselineID := CreateBaselineWithConfig(t, "", nil, nil, nil)
// baseline with empty config
system = models.SystemPlatform{ID: 1, RhAccountID: 1, BaselineID: &baselineID}
baselineConfig = GetBaselineConfig(DB, &system)
baselineConfig = GetBaselineConfig(&system)
assert.Nil(t, baselineConfig)
DeleteBaseline(t, baselineID)
}
8 changes: 4 additions & 4 deletions base/database/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ func SystemAdvisoriesByInventoryID(tx *gorm.DB, accountID int, groups map[string
return (joinsT)(joins).apply(tx)
}

func SystemAdvisoriesBySystemID(tx *gorm.DB, accountID int, systemID int64) *gorm.DB {
query := systemAdvisoriesQuery(tx, accountID).Where("sp.id = ?", systemID)
func SystemAdvisoriesBySystemID(accountID int, systemID int64) *gorm.DB {
query := systemAdvisoriesQuery(accountID).Where("sp.id = ?", systemID)
return query
}

Expand All @@ -79,8 +79,8 @@ func AdvisoryMetadata(tx *gorm.DB) *gorm.DB {
return JoinAdvisoryType(tx)
}

func systemAdvisoriesQuery(tx *gorm.DB, accountID int) *gorm.DB {
query := tx.Table("system_advisories sa").Select("sa.*").
func systemAdvisoriesQuery(accountID int) *gorm.DB {
query := DB.Table("system_advisories sa").Select("sa.*").
Joins("join system_platform sp ON sa.rh_account_id = sp.rh_account_id AND sa.system_id = sp.id").
Where("sa.rh_account_id = ? AND sp.rh_account_id = ?", accountID, accountID)
return query
Expand Down
2 changes: 1 addition & 1 deletion deploy/clowdapp.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -594,7 +594,7 @@ parameters:
- {name: PKG_REFRESH_SCHEDULE, value: '5 11-20/2 * * *'} # Cronjob schedule definition
- {name: PKG_REFRESH_SUSPEND, value: 'false'} # Disable cronjob execution
- {name: ADVISORY_REFRESH_SCHEDULE, value: '*/15 * * * *'} # Cronjob schedule definition
- {name: ADVISORY_REFRESH_SUSPEND, value: 'false'} # Disable cronjob execution
- {name: ADVISORY_REFRESH_SUSPEND, value: 'true'} # Disable cronjob execution

# Database admin
- {name: IMAGE_TAG_DATABASE_ADMIN, value: v3.6.99}
Expand Down
117 changes: 61 additions & 56 deletions evaluator/evaluate.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"github.com/jinzhu/copier"
"github.com/pkg/errors"
"gorm.io/gorm"
"gorm.io/gorm/clause"

log "github.com/sirupsen/logrus"
)
Expand Down Expand Up @@ -166,7 +165,7 @@ func Evaluate(ctx context.Context, event *mqueue.PlatformEvent, inventoryID, eva
return nil
}

// Runs Evaluate method in Goroutines
// RunEvaluate runs Evaluate method in Goroutines
func runEvaluate(
ctx context.Context,
event mqueue.PlatformEvent, // makes a copy to avoid races
Expand Down Expand Up @@ -204,11 +203,7 @@ func runEvaluate(

func evaluateInDatabase(ctx context.Context, event *mqueue.PlatformEvent, inventoryID string) (
*models.SystemPlatform, *vmaas.UpdatesV3Response, error) {
tx := database.DB.WithContext(base.Context).Begin()
// Don't allow requested TX to hang around locking the rows
defer tx.Rollback()

system, err := tryGetSystem(tx, event.AccountID, inventoryID, event.Timestamp)
system, err := tryGetSystem(event.AccountID, inventoryID, event.Timestamp)
if err != nil {
return nil, nil, errors.Wrap(err, "unable to get system")
}
Expand All @@ -217,7 +212,7 @@ func evaluateInDatabase(ctx context.Context, event *mqueue.PlatformEvent, invent
return nil, nil, nil
}

thirdParty, err := analyzeRepos(tx, system)
thirdParty, err := analyzeRepos(system)
if err != nil {
return nil, nil, errors.Wrap(err, "Repo analysis failed")
}
Expand All @@ -232,7 +227,7 @@ func evaluateInDatabase(ctx context.Context, event *mqueue.PlatformEvent, invent
return nil, nil, nil
}

vmaasData, err := evaluateWithVmaas(tx, updatesData, system, event)
vmaasData, err := evaluateWithVmaas(updatesData, system, event)
if err != nil {
return nil, nil, errors.Wrap(err, "evaluation with vmaas failed")
}
Expand Down Expand Up @@ -283,32 +278,25 @@ func tryGetYumUpdates(system *models.SystemPlatform) (*vmaas.UpdatesV3Response,
return &resp, nil
}

func evaluateWithVmaas(tx *gorm.DB, updatesData *vmaas.UpdatesV3Response,
func evaluateWithVmaas(updatesData *vmaas.UpdatesV3Response,
system *models.SystemPlatform, event *mqueue.PlatformEvent) (*vmaas.UpdatesV3Response, error) {
if enableBaselineEval {
if !system.SatelliteManaged || (system.SatelliteManaged && !enableSatelliteFunctionality) {
err := limitVmaasToBaseline(tx, system, updatesData)
err := limitVmaasToBaseline(system, updatesData)
if err != nil {
return nil, errors.Wrap(err, "Failed to evaluate baseline")
}
}
}

err := evaluateAndStore(tx, system, updatesData, event)
err := evaluateAndStore(system, updatesData, event)
if err != nil {
return nil, errors.Wrap(err, "Unable to evaluate and store results")
}

err = commitWithObserve(tx)
if err != nil {
evaluationCnt.WithLabelValues("error-database-commit").Inc()
return nil, errors.New("database commit failed")
}
return updatesData, nil
}

func getUpdatesData(ctx context.Context, system *models.SystemPlatform) (
*vmaas.UpdatesV3Response, error) {
func getUpdatesData(ctx context.Context, system *models.SystemPlatform) (*vmaas.UpdatesV3Response, error) {
var yumUpdates *vmaas.UpdatesV3Response
var yumErr error
if enableYumUpdatesEval {
Expand Down Expand Up @@ -421,9 +409,9 @@ func tryGetVmaasRequest(system *models.SystemPlatform) (*vmaas.UpdatesV3Request,
return &updatesReq, nil
}

func tryGetSystem(tx *gorm.DB, accountID int, inventoryID string,
func tryGetSystem(accountID int, inventoryID string,
requested *types.Rfc3339Timestamp) (*models.SystemPlatform, error) {
system, err := loadSystemData(tx, accountID, inventoryID)
system, err := loadSystemData(accountID, inventoryID)
if err != nil {
evaluationCnt.WithLabelValues("error-db-read-inventory-data").Inc()
return nil, errors.Wrap(err, "error loading system from DB")
Expand Down Expand Up @@ -459,39 +447,62 @@ func commitWithObserve(tx *gorm.DB) error {
return nil
}

func evaluateAndStore(tx *gorm.DB, system *models.SystemPlatform,
// EvaluateAndStore first loads advisories and packages (including change evaluation)
// and then executes all deletions, updates, and insertions in a single transaction.
func evaluateAndStore(system *models.SystemPlatform,
vmaasData *vmaas.UpdatesV3Response, event *mqueue.PlatformEvent) error {
newSystemAdvisories, err := analyzeAdvisories(tx, system, vmaasData)
advisoriesByName, err := lazySaveAndLoadAdvisories(system, vmaasData)
if err != nil {
return errors.Wrap(err, "Advisory loading failed")
}

pkgByName, installed, installable, applicable, err := lazySaveAndLoadPackages(system, vmaasData)
if err != nil {
return errors.Wrap(err, "Package loading failed")
}

tx := database.DB.WithContext(base.Context).Begin()
// Don't allow requested TX to hang around locking the rows
defer tx.Rollback()

systemAdvisoriesNew, err := storeAdvisoryData(tx, system, advisoriesByName)
if err != nil {
return errors.Wrap(err, "Advisory analysis failed")
evaluationCnt.WithLabelValues("error-store-advisories").Inc()
return errors.Wrap(err, "Unable to store advisory data")
}

installed, installable, applicable, err := analyzePackages(tx, system, vmaasData)
err = updateSystemPackages(tx, system, pkgByName)
if err != nil {
return errors.Wrap(err, "Package analysis failed")
evaluationCnt.WithLabelValues("error-system-pkgs").Inc()
return errors.Wrap(err, "Unable to update system packages")
}

err = updateSystemPlatform(tx, system, newSystemAdvisories, installed, installable, applicable)
err = updateSystemPlatform(tx, system, systemAdvisoriesNew, installed, installable, applicable)
if err != nil {
evaluationCnt.WithLabelValues("error-update-system").Inc()
return errors.Wrap(err, "Unable to update system")
}

// Send instant notification with new advisories
if enableInstantNotifications {
err = publishNewAdvisoriesNotification(tx, system, event, system.RhAccountID, newSystemAdvisories)
err = publishNewAdvisoriesNotification(tx, system, event, system.RhAccountID, systemAdvisoriesNew)
if err != nil {
evaluationCnt.WithLabelValues("error-advisory-notification").Inc()
utils.LogError("orgID", event.GetOrgID(), "inventoryID", system.GetInventoryID(), "err", err.Error(),
"publishing new advisories notification failed")
}
}

err = commitWithObserve(tx)
if err != nil {
evaluationCnt.WithLabelValues("error-database-commit").Inc()
return errors.New("database commit failed")
}

return nil
}

func analyzeRepos(tx *gorm.DB, system *models.SystemPlatform) (
thirdParty bool, err error) {
func analyzeRepos(system *models.SystemPlatform) (thirdParty bool, err error) {
if !enableRepoAnalysis {
utils.LogInfo("repo analysis disabled, skipping")
return false, nil
Expand All @@ -500,7 +511,7 @@ func analyzeRepos(tx *gorm.DB, system *models.SystemPlatform) (
// if system has associated at least one third party repo
// it's marked as third party system
var thirdPartyCount int64
err = tx.Table("system_repo sr").
err = database.DB.Table("system_repo sr").
Joins("join repo r on r.id = sr.repo_id").
Where("sr.rh_account_id = ?", system.RhAccountID).
Where("sr.system_id = ?", system.ID).
Expand All @@ -515,9 +526,20 @@ func analyzeRepos(tx *gorm.DB, system *models.SystemPlatform) (
return thirdParty, nil
}

func incrementAdvisoryTypeCounts(advisory models.AdvisoryMetadata, enhCount, bugCount, secCount *int) {
switch advisory.AdvisoryTypeID {
case enhancement:
*enhCount++
case bugfix:
*bugCount++
case security:
*secCount++
}
}

// nolint: funlen
func updateSystemPlatform(tx *gorm.DB, system *models.SystemPlatform,
new SystemAdvisoryMap, installed, installable, applicable int) error {
advisories SystemAdvisoryMap, installed, installable, applicable int) error {
tStart := time.Now()
defer utils.ObserveSecondsSince(tStart, evaluationPartDuration.WithLabelValues("system-update"))
defer utils.ObserveSecondsSince(*system.LastUpload, uploadEvaluationDelay)
Expand All @@ -529,7 +551,7 @@ func updateSystemPlatform(tx *gorm.DB, system *models.SystemPlatform,
data["last_evaluation"] = time.Now()

if enableAdvisoryAnalysis {
if new == nil {
if advisories == nil {
return errors.New("Invalid args")
}
installableCount := 0
Expand All @@ -540,26 +562,12 @@ func updateSystemPlatform(tx *gorm.DB, system *models.SystemPlatform,
applicableEnhCount := 0
applicableBugCount := 0
applicableSecCount := 0
for _, sa := range new {
for _, sa := range advisories {
if sa.StatusID == INSTALLABLE {
switch sa.Advisory.AdvisoryTypeID {
case 1:
installableEnhCount++
case 2:
installableBugCount++
case 3:
installableSecCount++
}
incrementAdvisoryTypeCounts(sa.Advisory, &installableEnhCount, &installableBugCount, &installableSecCount)
installableCount++
}
switch sa.Advisory.AdvisoryTypeID {
case 1:
applicableEnhCount++
case 2:
applicableBugCount++
case 3:
applicableSecCount++
}
incrementAdvisoryTypeCounts(sa.Advisory, &applicableEnhCount, &applicableBugCount, &applicableSecCount)
applicableCount++
}

Expand Down Expand Up @@ -624,15 +632,12 @@ func callVMaas(ctx context.Context, request *vmaas.UpdatesV3Request) (*vmaas.Upd
return vmaasDataPtr.(*vmaas.UpdatesV3Response), nil
}

func loadSystemData(tx *gorm.DB, accountID int, inventoryID string) (*models.SystemPlatform, error) {
func loadSystemData(accountID int, inventoryID string) (*models.SystemPlatform, error) {
tStart := time.Now()
defer utils.ObserveSecondsSince(tStart, evaluationPartDuration.WithLabelValues("data-loading"))

var system models.SystemPlatform
err := tx.Clauses(clause.Locking{
Strength: "UPDATE",
Table: clause.Table{Name: clause.CurrentTable},
}).Where("rh_account_id = ?", accountID).
err := database.DB.Where("rh_account_id = ?", accountID).
Where("inventory_id = ?::uuid", inventoryID).
Find(&system).Error
return &system, err
Expand Down
Loading
Loading