From b5a0ccc43af1437af4a59aeb9e5c49cbdb41f3de Mon Sep 17 00:00:00 2001 From: Patrik Segedy Date: Mon, 27 Nov 2023 15:42:42 +0100 Subject: [PATCH] POC: insert to system_package2 if applicable or installable update exists --- .../migration/migrate_system_package_data.go | 55 ++++++++++++------- 1 file changed, 36 insertions(+), 19 deletions(-) diff --git a/tasks/migration/migrate_system_package_data.go b/tasks/migration/migrate_system_package_data.go index b681a252c..cefe40796 100644 --- a/tasks/migration/migrate_system_package_data.go +++ b/tasks/migration/migrate_system_package_data.go @@ -12,6 +12,7 @@ import ( "strings" "sync" + "github.com/pkg/errors" "gorm.io/gorm" ) @@ -72,47 +73,63 @@ func processPartition(part string, i int) { var wg sync.WaitGroup tx := database.Db.Begin() - defer tx.Commit() + defer tx.Rollback() accSys := getAccSys(part, i) // process at most 4 systems at once guard := make(chan struct{}, 4) - for _, as := range accSys { + for sysIdx, as := range accSys { guard <- struct{}{} wg.Add(1) - go func(as AccSys, i int, part string) { + go func(as AccSys, i int, part string, sysIdx int) { defer func() { <-guard wg.Done() }() updates := getUpdates(as, part, i) + toInsert := make([]models.SystemPackage, 0, 1000) for _, u := range updates { updateData := getUpdateData(u, as, part, i) latestApplicable, latestInstallable := getEvraApplicability(updateData) applicableID, installableID := getPackageIDs(u, i, latestApplicable, latestInstallable) - if applicableID != 0 && installableID != 0 { + if applicableID != 0 || installableID != 0 { // insert ids to system_package2 - err := tasks.WithTx(func(db *gorm.DB) error { - return db.Table("system_package2"). - Where("installable_id IS NULL AND applicable_id IS NULL"). - Save(models.SystemPackage{ - RhAccountID: as.RhAccountID, - SystemID: as.SystemID, - PackageID: u.PackageID, - NameID: u.NameID, - InstallableID: &installableID, - ApplicableID: &applicableID, - }).Error - }) - if err != nil { - utils.LogWarn("#", i, "Failed to update system_package2") + sp := models.SystemPackage{ + RhAccountID: as.RhAccountID, + SystemID: as.SystemID, + PackageID: u.PackageID, + NameID: u.NameID, } + if installableID != 0 { + sp.InstallableID = &installableID + } + if applicableID != 0 { + sp.ApplicableID = &applicableID + } + toInsert = append(toInsert, sp) } } - }(as, i, part) + if len(toInsert) > 0 { + err := database.UnnestInsert(tx, + "INSERT INTO system_package2 (rh_account_id, system_id, package_id, name_id, installable_id, applicable_id)"+ + " (select * from unnest($1::int[], $2::bigint[], $3::bigint[], $4::bigint[], $5::bigint[], $6::bigint[]))"+ + " ON CONFLICT DO NOTHING", toInsert) + if err != nil { + utils.LogWarn("#", i, "err", err.Error(), + "account", as.RhAccountID, "system", as.SystemID, + "Failed to insert to system_package2") + } + utils.LogDebug("#", i, "#system", sysIdx, "partition", part, "len(toInsert)", len(toInsert), + "account", as.RhAccountID, "system", as.SystemID, "Migrated system", + ) + } + }(as, i, part, sysIdx) } wg.Wait() + if err := errors.Wrap(tx.Commit().Error, "Commit"); err != nil { + utils.LogError("#", i, "partition", part, "err", err, "Failed to migrate partition") + } utils.LogInfo("#", i, "partition", part, "Partition migrated") }