Skip to content

Commit

Permalink
POC: insert to system_package2 if applicable or installable update ex…
Browse files Browse the repository at this point in the history
…ists
  • Loading branch information
psegedy committed Nov 27, 2023
1 parent 2b7cd4e commit dba8000
Showing 1 changed file with 60 additions and 51 deletions.
111 changes: 60 additions & 51 deletions tasks/migration/migrate_system_package_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ import (
"strings"
"sync"

"github.com/pkg/errors"
"gorm.io/gorm"
"gorm.io/gorm/clause"
)

var memoryPackageCache *evaluator.PackageCache
Expand Down Expand Up @@ -52,29 +54,30 @@ type Package struct {
func MigrateSystemPackageData() {
var partitions []string

err := tasks.WithReadReplicaTx(func(db *gorm.DB) error {
return db.Table("pg_tables").
Where("tablename ~ '^system_package_[0-9]+$'").
Pluck("tablename", &partitions).Error
})
readDB := tasks.CancelableReadReplicaDB()
writeDB := tasks.CancelableDB()

err := readDB.Table("pg_tables").
Where("tablename ~ '^system_package_[0-9]+$'").
Pluck("tablename", &partitions).Error
if err != nil {
utils.LogError("err", err.Error(), "Couldn't get partitions for system_package")
return
}

for i, part := range partitions {
processPartition(part, i)
processPartition(readDB, writeDB, part, i)
}
}

func processPartition(part string, i int) {
func processPartition(readDB *gorm.DB, writeDB *gorm.DB, part string, i int) {
utils.LogInfo("#", i, "partition", part, "Migrating partition")

var wg sync.WaitGroup
tx := database.Db.Begin()
defer tx.Commit()
tx := writeDB.Begin()
defer tx.Rollback()

accSys := getAccSys(part, i)
accSys := getAccSys(readDB, part, i)
// process at most 4 systems at once
guard := make(chan struct{}, 4)

Expand All @@ -86,46 +89,56 @@ func processPartition(part string, i int) {
<-guard
wg.Done()
}()
updates := getUpdates(as, part, i)
updates := getUpdates(readDB, as, part, i)
toInsert := make([]models.SystemPackage, 0, 1000)
for _, u := range updates {
updateData := getUpdateData(u, as, part, i)
latestApplicable, latestInstallable := getEvraApplicability(updateData)
applicableID, installableID := getPackageIDs(u, i, latestApplicable, latestInstallable)
if applicableID != 0 && installableID != 0 {
applicableID, installableID := getPackageIDs(readDB, u, i, latestApplicable, latestInstallable)
if applicableID != 0 || installableID != 0 {
// insert ids to system_package2
err := tasks.WithTx(func(db *gorm.DB) error {
return db.Table("system_package2").
Where("installable_id IS NULL AND applicable_id IS NULL").
Save(models.SystemPackage{
RhAccountID: as.RhAccountID,
SystemID: as.SystemID,
PackageID: u.PackageID,
NameID: u.NameID,
InstallableID: &installableID,
ApplicableID: &applicableID,
}).Error
})
if err != nil {
utils.LogWarn("#", i, "Failed to update system_package2")
sp := models.SystemPackage{
RhAccountID: as.RhAccountID,
SystemID: as.SystemID,
PackageID: u.PackageID,
NameID: u.NameID,
}
if installableID != 0 {
sp.InstallableID = &installableID
}
if applicableID != 0 {
sp.ApplicableID = &applicableID
}
toInsert = append(toInsert, sp)
}
}
if len(toInsert) > 0 {
tx = tx.Clauses(clause.OnConflict{DoNothing: true})
err := database.BulkInsert(tx, toInsert)
if err != nil {
utils.LogWarn("#", i, "err", err.Error(),
"account", as.RhAccountID, "system", as.SystemID,
"Failed to insert to system_package2")
}
}
}(as, i, part)
}
wg.Wait()
if err := errors.Wrap(tx.Commit().Error, "Commit"); err != nil {
utils.LogError("#", i, "partition", part, "err", err, "Failed to migrate partition")
return
}
utils.LogInfo("#", i, "partition", part, "Partition migrated")
}

func getAccSys(part string, i int) []AccSys {
func getAccSys(readDB *gorm.DB, part string, i int) []AccSys {
// get systems from system_package partition
accSys := make([]AccSys, 0)
err := tasks.WithReadReplicaTx(func(db *gorm.DB) error {
return db.Table(part).
Distinct("rh_account_id, system_id").
Order("rh_account_id").
Order("system_id").
Find(&accSys).Error
})
err := readDB.Table(part).
Distinct("rh_account_id, system_id").
Order("rh_account_id").
Order("system_id").
Find(&accSys).Error
if err != nil {
utils.LogWarn("#", i, "partition", part, "Failed to load data from partition")
return accSys
Expand All @@ -135,17 +148,15 @@ func getAccSys(part string, i int) []AccSys {
return accSys
}

func getUpdates(as AccSys, part string, i int) []SystemPackageRecord {
func getUpdates(readDB *gorm.DB, as AccSys, part string, i int) []SystemPackageRecord {
var updates []SystemPackageRecord

// get update_data from system_package for given system
err := tasks.WithReadReplicaTx(func(db *gorm.DB) error {
return db.Table(part).
Select("name_id, package_id, update_data").
Where("rh_account_id = ?", as.RhAccountID).
Where("system_id = ?", as.SystemID).
Find(&updates).Error
})
err := readDB.Table(part).
Select("name_id, package_id, update_data").
Where("rh_account_id = ?", as.RhAccountID).
Where("system_id = ?", as.SystemID).
Find(&updates).Error
if err != nil {
utils.LogWarn("#", i, "partition", part, "rh_account_id", as.RhAccountID, "system_id", as.SystemID,
"err", err.Error(), "Couldn't get update_data")
Expand Down Expand Up @@ -190,7 +201,7 @@ func getEvraApplicability(udpateData []UpdateData) (string, string) {
}

// nolint: funlen
func getPackageIDs(u SystemPackageRecord, i int, latestApplicable, latestInstallable string) (int64, int64) {
func getPackageIDs(readDB *gorm.DB, u SystemPackageRecord, i int, latestApplicable, latestInstallable string) (int64, int64) {
// get package_id for latest installable and applicable packages
if len(latestApplicable) == 0 && len(latestInstallable) == 0 {
return 0, 0
Expand Down Expand Up @@ -234,13 +245,11 @@ func getPackageIDs(u SystemPackageRecord, i int, latestApplicable, latestInstall
}

var packages []Package
err := tasks.WithReadReplicaTx(func(db *gorm.DB) error {
return db.Table("package").
Select("id, evra").
Where("evra IN (?)", []string{latestApplicable, latestInstallable}).
Where("name_id = ?", u.NameID).
Find(&packages).Error
})
err := readDB.Table("package").
Select("id, evra").
Where("evra IN (?)", []string{latestApplicable, latestInstallable}).
Where("name_id = ?", u.NameID).
Find(&packages).Error
if err != nil {
utils.LogWarn("#", i, "Failed to load packages")
}
Expand Down

0 comments on commit dba8000

Please sign in to comment.