diff --git a/tasks/migration/migrate_system_package_data.go b/tasks/migration/migrate_system_package_data.go index b681a252c..26940ad6c 100644 --- a/tasks/migration/migrate_system_package_data.go +++ b/tasks/migration/migrate_system_package_data.go @@ -12,6 +12,7 @@ import ( "strings" "sync" + "github.com/pkg/errors" "gorm.io/gorm" ) @@ -52,29 +53,30 @@ type Package struct { func MigrateSystemPackageData() { var partitions []string - err := tasks.WithReadReplicaTx(func(db *gorm.DB) error { - return db.Table("pg_tables"). - Where("tablename ~ '^system_package_[0-9]+$'"). - Pluck("tablename", &partitions).Error - }) + readDB := tasks.CancelableReadReplicaDB() + writeDB := tasks.CancelableDB() + + err := readDB.Table("pg_tables"). + Where("tablename ~ '^system_package_[0-9]+$'"). + Pluck("tablename", &partitions).Error if err != nil { utils.LogError("err", err.Error(), "Couldn't get partitions for system_package") return } for i, part := range partitions { - processPartition(part, i) + processPartition(readDB, writeDB, part, i) } } -func processPartition(part string, i int) { +func processPartition(readDB *gorm.DB, writeDB *gorm.DB, part string, i int) { utils.LogInfo("#", i, "partition", part, "Migrating partition") var wg sync.WaitGroup - tx := database.Db.Begin() - defer tx.Commit() + tx := writeDB.Begin() + defer tx.Rollback() - accSys := getAccSys(part, i) + accSys := getAccSys(readDB, part, i) // process at most 4 systems at once guard := make(chan struct{}, 4) @@ -86,46 +88,58 @@ func processPartition(part string, i int) { <-guard wg.Done() }() - updates := getUpdates(as, part, i) + updates := getUpdates(readDB, as, part, i) + toInsert := make([]models.SystemPackage, 0, 1000) for _, u := range updates { updateData := getUpdateData(u, as, part, i) latestApplicable, latestInstallable := getEvraApplicability(updateData) - applicableID, installableID := getPackageIDs(u, i, latestApplicable, latestInstallable) - if applicableID != 0 && installableID != 0 { + applicableID, installableID := getPackageIDs(readDB, u, i, latestApplicable, latestInstallable) + if applicableID != 0 || installableID != 0 { // insert ids to system_package2 - err := tasks.WithTx(func(db *gorm.DB) error { - return db.Table("system_package2"). - Where("installable_id IS NULL AND applicable_id IS NULL"). - Save(models.SystemPackage{ - RhAccountID: as.RhAccountID, - SystemID: as.SystemID, - PackageID: u.PackageID, - NameID: u.NameID, - InstallableID: &installableID, - ApplicableID: &applicableID, - }).Error - }) - if err != nil { - utils.LogWarn("#", i, "Failed to update system_package2") + sp := models.SystemPackage{ + RhAccountID: as.RhAccountID, + SystemID: as.SystemID, + PackageID: u.PackageID, + NameID: u.NameID, + } + if installableID != 0 { + sp.InstallableID = &installableID } + if applicableID != 0 { + sp.ApplicableID = &applicableID + } + toInsert = append(toInsert, sp) + } + } + if len(toInsert) > 0 { + err := database.UnnestInsert(tx, + "INSERT INTO system_package2 (rh_account_id, system_id, package_id, name_id, installable_id, applicable_id)"+ + " (select * from unnest($1::int[], $2::bigint[], $3::bigint[], $4::bigint[], $5::bigint[], $6::bigint[]))"+ + " ON CONFLICT DO NOTHING", toInsert) + if err != nil { + utils.LogWarn("#", i, "err", err.Error(), + "account", as.RhAccountID, "system", as.SystemID, + "Failed to insert to system_package2") } } }(as, i, part) } wg.Wait() + if err := errors.Wrap(tx.Commit().Error, "Commit"); err != nil { + utils.LogError("#", i, "partition", part, "err", err, "Failed to migrate partition") + return + } utils.LogInfo("#", i, "partition", part, "Partition migrated") } -func getAccSys(part string, i int) []AccSys { +func getAccSys(readDB *gorm.DB, part string, i int) []AccSys { // get systems from system_package partition accSys := make([]AccSys, 0) - err := tasks.WithReadReplicaTx(func(db *gorm.DB) error { - return db.Table(part). - Distinct("rh_account_id, system_id"). - Order("rh_account_id"). - Order("system_id"). - Find(&accSys).Error - }) + err := readDB.Table(part). + Distinct("rh_account_id, system_id"). + Order("rh_account_id"). + Order("system_id"). + Find(&accSys).Error if err != nil { utils.LogWarn("#", i, "partition", part, "Failed to load data from partition") return accSys @@ -135,17 +149,15 @@ func getAccSys(part string, i int) []AccSys { return accSys } -func getUpdates(as AccSys, part string, i int) []SystemPackageRecord { +func getUpdates(readDB *gorm.DB, as AccSys, part string, i int) []SystemPackageRecord { var updates []SystemPackageRecord // get update_data from system_package for given system - err := tasks.WithReadReplicaTx(func(db *gorm.DB) error { - return db.Table(part). - Select("name_id, package_id, update_data"). - Where("rh_account_id = ?", as.RhAccountID). - Where("system_id = ?", as.SystemID). - Find(&updates).Error - }) + err := readDB.Table(part). + Select("name_id, package_id, update_data"). + Where("rh_account_id = ?", as.RhAccountID). + Where("system_id = ?", as.SystemID). + Find(&updates).Error if err != nil { utils.LogWarn("#", i, "partition", part, "rh_account_id", as.RhAccountID, "system_id", as.SystemID, "err", err.Error(), "Couldn't get update_data") @@ -190,7 +202,7 @@ func getEvraApplicability(udpateData []UpdateData) (string, string) { } // nolint: funlen -func getPackageIDs(u SystemPackageRecord, i int, latestApplicable, latestInstallable string) (int64, int64) { +func getPackageIDs(readDB *gorm.DB, u SystemPackageRecord, i int, latestApplicable, latestInstallable string) (int64, int64) { // get package_id for latest installable and applicable packages if len(latestApplicable) == 0 && len(latestInstallable) == 0 { return 0, 0 @@ -234,13 +246,11 @@ func getPackageIDs(u SystemPackageRecord, i int, latestApplicable, latestInstall } var packages []Package - err := tasks.WithReadReplicaTx(func(db *gorm.DB) error { - return db.Table("package"). - Select("id, evra"). - Where("evra IN (?)", []string{latestApplicable, latestInstallable}). - Where("name_id = ?", u.NameID). - Find(&packages).Error - }) + err := readDB.Table("package"). + Select("id, evra"). + Where("evra IN (?)", []string{latestApplicable, latestInstallable}). + Where("name_id = ?", u.NameID). + Find(&packages).Error if err != nil { utils.LogWarn("#", i, "Failed to load packages") }