Skip to content

Commit

Permalink
POC: migration job per accounts
Browse files Browse the repository at this point in the history
  • Loading branch information
psegedy committed Nov 28, 2023
1 parent 3ff49e3 commit 6d0c652
Showing 1 changed file with 79 additions and 66 deletions.
145 changes: 79 additions & 66 deletions tasks/migration/migrate_system_package_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ type AccSys struct {
}

type SystemPackageRecord struct {
SystemID int64
PackageUpdate
}

type PackageUpdate struct {
NameID int64
PackageID int64
UpdateDataJSON json.RawMessage `gorm:"column:update_data"`
Expand Down Expand Up @@ -85,105 +90,113 @@ func processPartition(part string, i int) {
tx := tasks.CancelableDB().Begin()
defer tx.Rollback()

accSys := getAccSys(part, i)
accs := getAccounts(i)
utils.LogInfo("#", i, "len(account)", len(accs), "partition", part, "Migrating accounts")
// process at most `maxGoroutines` systems at once
guard := make(chan struct{}, maxGoroutines)

for sysIdx, as := range accSys {
guard <- struct{}{}
wg.Add(1)
go func(as AccSys, i int, part string, sysIdx int) {
defer func() {
<-guard
wg.Done()
}()
updates := getUpdates(as, part, i)
toInsert := make([]models.SystemPackage, 0, 1000)
for _, u := range updates {
updateData := getUpdateData(u, as, part, i)
latestApplicable, latestInstallable := getEvraApplicability(updateData)
applicableID, installableID := getPackageIDs(u, i, latestApplicable, latestInstallable)
if applicableID != 0 || installableID != 0 {
// insert ids to system_package2
sp := models.SystemPackage{
RhAccountID: as.RhAccountID,
SystemID: as.SystemID,
PackageID: u.PackageID,
NameID: u.NameID,
}
if installableID != 0 {
sp.InstallableID = &installableID
}
if applicableID != 0 {
sp.ApplicableID = &applicableID
for _, acc := range accs {
systemUpdates := getUpdates(acc, part, i)
utils.LogInfo("#", i, "len(systems)", len(systemUpdates), "partition", part, "account", acc, "Migrating account")
for system_id, updates := range systemUpdates {
guard <- struct{}{}
wg.Add(1)
go func(acc int, system_id int64, updates []PackageUpdate, i int, part string) {
defer func() {
<-guard
wg.Done()
}()
toInsert := make([]models.SystemPackage, 0, 1000)
for _, u := range updates {
updateData := getUpdateData(u, acc, system_id, part, i)
latestApplicable, latestInstallable := getEvraApplicability(updateData)
applicableID, installableID := getPackageIDs(u, i, latestApplicable, latestInstallable)
if applicableID != 0 || installableID != 0 {
// insert ids to system_package2
sp := models.SystemPackage{
RhAccountID: acc,
SystemID: system_id,
PackageID: u.PackageID,
NameID: u.NameID,
}
if installableID != 0 {
sp.InstallableID = &installableID
}
if applicableID != 0 {
sp.ApplicableID = &applicableID
}
toInsert = append(toInsert, sp)
}
toInsert = append(toInsert, sp)
}
}
if len(toInsert) > 0 {
err := database.UnnestInsert(tx,
"INSERT INTO system_package2 (rh_account_id, system_id, package_id, name_id, installable_id, applicable_id)"+
" (select * from unnest($1::int[], $2::bigint[], $3::bigint[], $4::bigint[], $5::bigint[], $6::bigint[]))"+
" ON CONFLICT DO NOTHING", toInsert)
if err != nil {
utils.LogWarn("#", i, "err", err.Error(),
"account", as.RhAccountID, "system", as.SystemID,
"Failed to insert to system_package2")
if len(toInsert) > 0 {
err := database.UnnestInsert(tx,
"INSERT INTO system_package2 (rh_account_id, system_id, package_id, name_id, installable_id, applicable_id)"+
" (select * from unnest($1::int[], $2::bigint[], $3::bigint[], $4::bigint[], $5::bigint[], $6::bigint[]))"+
" ON CONFLICT DO NOTHING", toInsert)
if err != nil {
utils.LogWarn("#", i, "err", err.Error(), "account", acc, "system", system_id,
"Failed to insert to system_package2")
}
}
utils.LogDebug("#", i, "#system", sysIdx, "partition", part, "len(toInsert)", len(toInsert),
"account", as.RhAccountID, "system", as.SystemID, "Migrated system",
)
}
}(as, i, part, sysIdx)
}(acc, system_id, updates, i, part)
}
}

wg.Wait()
if err := errors.Wrap(tx.Commit().Error, "Commit"); err != nil {
utils.LogError("#", i, "partition", part, "err", err, "Failed to migrate partition")
}
utils.LogInfo("#", i, "partition", part, "Partition migrated")
}

func getAccSys(part string, i int) []AccSys {
// get systems from system_package partition
accSys := make([]AccSys, 0)
// get account ids in partition from system_platform table
func getAccounts(i int) []int {
accs := make([]int, 0)
err := tasks.WithReadReplicaTx(func(db *gorm.DB) error {
return db.Table(part).
Distinct("rh_account_id, system_id").
Order("rh_account_id").
Order("system_id").
Find(&accSys).Error
return db.Table(fmt.Sprintf("system_platform_%d", i)).
Distinct("rh_account_id").
Find(&accs).Error
})
if err != nil {
utils.LogWarn("#", i, "partition", part, "Failed to load data from partition")
return accSys
utils.LogWarn("#", i, "Failed to load accounts from partition")
return accs
}

utils.LogInfo("#", i, "partition", part, "count", len(accSys), "Migrating systems")
return accSys
utils.LogInfo("#", i, "count", len(accs), "Migrating accounts")
return accs
}

func getUpdates(as AccSys, part string, i int) []SystemPackageRecord {
func getUpdates(account int, part string, i int) map[int64][]PackageUpdate {
var updates []SystemPackageRecord

// get update_data from system_package for given system
err := tasks.WithReadReplicaTx(func(db *gorm.DB) error {
return db.Table(part).
Select("name_id, package_id, update_data").
Where("rh_account_id = ?", as.RhAccountID).
Where("system_id = ?", as.SystemID).
Select("name_id, package_id, system_id, update_data").
Where("rh_account_id = ?", account).
Order("system_id").
Find(&updates).Error
})
if err != nil {
utils.LogWarn("#", i, "partition", part, "rh_account_id", as.RhAccountID, "system_id", as.SystemID,
utils.LogWarn("#", i, "partition", part, "rh_account_id", account,
"err", err.Error(), "Couldn't get update_data")
}
return updates

nSystemsEst := len(updates) / 1000 //estimation, len(updates) / 1000 packages per system
res := make(map[int64][]PackageUpdate, nSystemsEst)
for _, u := range updates {
if _, has := res[u.SystemID]; !has {
res[u.SystemID] = []PackageUpdate{}
}
res[u.SystemID] = append(res[u.SystemID], u.PackageUpdate)
}

return res
}

func getUpdateData(u SystemPackageRecord, as AccSys, part string, i int) []UpdateData {
func getUpdateData(u PackageUpdate, acc int, sys int64, part string, i int) []UpdateData {
var updateData []UpdateData
if err := json.Unmarshal(u.UpdateDataJSON, &updateData); err != nil {
utils.LogWarn("#", i, "partition", part, "rh_account_id", as.RhAccountID, "system_id", as.SystemID,
utils.LogWarn("#", i, "partition", part, "rh_account_id", acc, "system_id", sys,
"update_data", string(u.UpdateDataJSON),
"err", err.Error(), "Couldn't unmarshal update_data")
}
Expand Down Expand Up @@ -217,7 +230,7 @@ func getEvraApplicability(udpateData []UpdateData) (string, string) {
}

// nolint: funlen
func getPackageIDs(u SystemPackageRecord, i int, latestApplicable, latestInstallable string) (int64, int64) {
func getPackageIDs(u PackageUpdate, i int, latestApplicable, latestInstallable string) (int64, int64) {
// get package_id for latest installable and applicable packages
if len(latestApplicable) == 0 && len(latestInstallable) == 0 {
return 0, 0
Expand Down

0 comments on commit 6d0c652

Please sign in to comment.