From 6d0c6525b17ed0a0890904179125702245cae835 Mon Sep 17 00:00:00 2001 From: Patrik Segedy Date: Tue, 28 Nov 2023 14:32:19 +0100 Subject: [PATCH] POC: migration job per accounts --- .../migration/migrate_system_package_data.go | 145 ++++++++++-------- 1 file changed, 79 insertions(+), 66 deletions(-) diff --git a/tasks/migration/migrate_system_package_data.go b/tasks/migration/migrate_system_package_data.go index fa3c3fe9c..0570b4596 100644 --- a/tasks/migration/migrate_system_package_data.go +++ b/tasks/migration/migrate_system_package_data.go @@ -45,6 +45,11 @@ type AccSys struct { } type SystemPackageRecord struct { + SystemID int64 + PackageUpdate +} + +type PackageUpdate struct { NameID int64 PackageID int64 UpdateDataJSON json.RawMessage `gorm:"column:update_data"` @@ -85,57 +90,58 @@ func processPartition(part string, i int) { tx := tasks.CancelableDB().Begin() defer tx.Rollback() - accSys := getAccSys(part, i) + accs := getAccounts(i) + utils.LogInfo("#", i, "len(account)", len(accs), "partition", part, "Migrating accounts") // process at most `maxGoroutines` systems at once guard := make(chan struct{}, maxGoroutines) - for sysIdx, as := range accSys { - guard <- struct{}{} - wg.Add(1) - go func(as AccSys, i int, part string, sysIdx int) { - defer func() { - <-guard - wg.Done() - }() - updates := getUpdates(as, part, i) - toInsert := make([]models.SystemPackage, 0, 1000) - for _, u := range updates { - updateData := getUpdateData(u, as, part, i) - latestApplicable, latestInstallable := getEvraApplicability(updateData) - applicableID, installableID := getPackageIDs(u, i, latestApplicable, latestInstallable) - if applicableID != 0 || installableID != 0 { - // insert ids to system_package2 - sp := models.SystemPackage{ - RhAccountID: as.RhAccountID, - SystemID: as.SystemID, - PackageID: u.PackageID, - NameID: u.NameID, - } - if installableID != 0 { - sp.InstallableID = &installableID - } - if applicableID != 0 { - sp.ApplicableID = &applicableID + for _, acc := range accs { + systemUpdates := getUpdates(acc, part, i) + utils.LogInfo("#", i, "len(systems)", len(systemUpdates), "partition", part, "account", acc, "Migrating account") + for system_id, updates := range systemUpdates { + guard <- struct{}{} + wg.Add(1) + go func(acc int, system_id int64, updates []PackageUpdate, i int, part string) { + defer func() { + <-guard + wg.Done() + }() + toInsert := make([]models.SystemPackage, 0, 1000) + for _, u := range updates { + updateData := getUpdateData(u, acc, system_id, part, i) + latestApplicable, latestInstallable := getEvraApplicability(updateData) + applicableID, installableID := getPackageIDs(u, i, latestApplicable, latestInstallable) + if applicableID != 0 || installableID != 0 { + // insert ids to system_package2 + sp := models.SystemPackage{ + RhAccountID: acc, + SystemID: system_id, + PackageID: u.PackageID, + NameID: u.NameID, + } + if installableID != 0 { + sp.InstallableID = &installableID + } + if applicableID != 0 { + sp.ApplicableID = &applicableID + } + toInsert = append(toInsert, sp) } - toInsert = append(toInsert, sp) } - } - if len(toInsert) > 0 { - err := database.UnnestInsert(tx, - "INSERT INTO system_package2 (rh_account_id, system_id, package_id, name_id, installable_id, applicable_id)"+ - " (select * from unnest($1::int[], $2::bigint[], $3::bigint[], $4::bigint[], $5::bigint[], $6::bigint[]))"+ - " ON CONFLICT DO NOTHING", toInsert) - if err != nil { - utils.LogWarn("#", i, "err", err.Error(), - "account", as.RhAccountID, "system", as.SystemID, - "Failed to insert to system_package2") + if len(toInsert) > 0 { + err := database.UnnestInsert(tx, + "INSERT INTO system_package2 (rh_account_id, system_id, package_id, name_id, installable_id, applicable_id)"+ + " (select * from unnest($1::int[], $2::bigint[], $3::bigint[], $4::bigint[], $5::bigint[], $6::bigint[]))"+ + " ON CONFLICT DO NOTHING", toInsert) + if err != nil { + utils.LogWarn("#", i, "err", err.Error(), "account", acc, "system", system_id, + "Failed to insert to system_package2") + } } - utils.LogDebug("#", i, "#system", sysIdx, "partition", part, "len(toInsert)", len(toInsert), - "account", as.RhAccountID, "system", as.SystemID, "Migrated system", - ) - } - }(as, i, part, sysIdx) + }(acc, system_id, updates, i, part) + } } + wg.Wait() if err := errors.Wrap(tx.Commit().Error, "Commit"); err != nil { utils.LogError("#", i, "partition", part, "err", err, "Failed to migrate partition") @@ -143,47 +149,54 @@ func processPartition(part string, i int) { utils.LogInfo("#", i, "partition", part, "Partition migrated") } -func getAccSys(part string, i int) []AccSys { - // get systems from system_package partition - accSys := make([]AccSys, 0) +// get account ids in partition from system_platform table +func getAccounts(i int) []int { + accs := make([]int, 0) err := tasks.WithReadReplicaTx(func(db *gorm.DB) error { - return db.Table(part). - Distinct("rh_account_id, system_id"). - Order("rh_account_id"). - Order("system_id"). - Find(&accSys).Error + return db.Table(fmt.Sprintf("system_platform_%d", i)). + Distinct("rh_account_id"). + Find(&accs).Error }) if err != nil { - utils.LogWarn("#", i, "partition", part, "Failed to load data from partition") - return accSys + utils.LogWarn("#", i, "Failed to load accounts from partition") + return accs } - utils.LogInfo("#", i, "partition", part, "count", len(accSys), "Migrating systems") - return accSys + utils.LogInfo("#", i, "count", len(accs), "Migrating accounts") + return accs } -func getUpdates(as AccSys, part string, i int) []SystemPackageRecord { +func getUpdates(account int, part string, i int) map[int64][]PackageUpdate { var updates []SystemPackageRecord - // get update_data from system_package for given system err := tasks.WithReadReplicaTx(func(db *gorm.DB) error { return db.Table(part). - Select("name_id, package_id, update_data"). - Where("rh_account_id = ?", as.RhAccountID). - Where("system_id = ?", as.SystemID). + Select("name_id, package_id, system_id, update_data"). + Where("rh_account_id = ?", account). + Order("system_id"). Find(&updates).Error }) if err != nil { - utils.LogWarn("#", i, "partition", part, "rh_account_id", as.RhAccountID, "system_id", as.SystemID, + utils.LogWarn("#", i, "partition", part, "rh_account_id", account, "err", err.Error(), "Couldn't get update_data") } - return updates + + nSystemsEst := len(updates) / 1000 //estimation, len(updates) / 1000 packages per system + res := make(map[int64][]PackageUpdate, nSystemsEst) + for _, u := range updates { + if _, has := res[u.SystemID]; !has { + res[u.SystemID] = []PackageUpdate{} + } + res[u.SystemID] = append(res[u.SystemID], u.PackageUpdate) + } + + return res } -func getUpdateData(u SystemPackageRecord, as AccSys, part string, i int) []UpdateData { +func getUpdateData(u PackageUpdate, acc int, sys int64, part string, i int) []UpdateData { var updateData []UpdateData if err := json.Unmarshal(u.UpdateDataJSON, &updateData); err != nil { - utils.LogWarn("#", i, "partition", part, "rh_account_id", as.RhAccountID, "system_id", as.SystemID, + utils.LogWarn("#", i, "partition", part, "rh_account_id", acc, "system_id", sys, "update_data", string(u.UpdateDataJSON), "err", err.Error(), "Couldn't unmarshal update_data") } @@ -217,7 +230,7 @@ func getEvraApplicability(udpateData []UpdateData) (string, string) { } // nolint: funlen -func getPackageIDs(u SystemPackageRecord, i int, latestApplicable, latestInstallable string) (int64, int64) { +func getPackageIDs(u PackageUpdate, i int, latestApplicable, latestInstallable string) (int64, int64) { // get package_id for latest installable and applicable packages if len(latestApplicable) == 0 && len(latestInstallable) == 0 { return 0, 0