Skip to content

Commit

Permalink
POC: insert to system_package2 if applicable or installable update ex…
Browse files Browse the repository at this point in the history
…ists
  • Loading branch information
psegedy committed Nov 27, 2023
1 parent 2b7cd4e commit b5a0ccc
Showing 1 changed file with 36 additions and 19 deletions.
55 changes: 36 additions & 19 deletions tasks/migration/migrate_system_package_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"strings"
"sync"

"github.com/pkg/errors"
"gorm.io/gorm"
)

Expand Down Expand Up @@ -72,47 +73,63 @@ func processPartition(part string, i int) {

var wg sync.WaitGroup
tx := database.Db.Begin()
defer tx.Commit()
defer tx.Rollback()

accSys := getAccSys(part, i)
// process at most 4 systems at once
guard := make(chan struct{}, 4)

for _, as := range accSys {
for sysIdx, as := range accSys {
guard <- struct{}{}
wg.Add(1)
go func(as AccSys, i int, part string) {
go func(as AccSys, i int, part string, sysIdx int) {
defer func() {
<-guard
wg.Done()
}()
updates := getUpdates(as, part, i)
toInsert := make([]models.SystemPackage, 0, 1000)
for _, u := range updates {
updateData := getUpdateData(u, as, part, i)
latestApplicable, latestInstallable := getEvraApplicability(updateData)
applicableID, installableID := getPackageIDs(u, i, latestApplicable, latestInstallable)
if applicableID != 0 && installableID != 0 {
if applicableID != 0 || installableID != 0 {
// insert ids to system_package2
err := tasks.WithTx(func(db *gorm.DB) error {
return db.Table("system_package2").
Where("installable_id IS NULL AND applicable_id IS NULL").
Save(models.SystemPackage{
RhAccountID: as.RhAccountID,
SystemID: as.SystemID,
PackageID: u.PackageID,
NameID: u.NameID,
InstallableID: &installableID,
ApplicableID: &applicableID,
}).Error
})
if err != nil {
utils.LogWarn("#", i, "Failed to update system_package2")
sp := models.SystemPackage{
RhAccountID: as.RhAccountID,
SystemID: as.SystemID,
PackageID: u.PackageID,
NameID: u.NameID,
}
if installableID != 0 {
sp.InstallableID = &installableID
}
if applicableID != 0 {
sp.ApplicableID = &applicableID
}
toInsert = append(toInsert, sp)
}
}
}(as, i, part)
if len(toInsert) > 0 {
err := database.UnnestInsert(tx,
"INSERT INTO system_package2 (rh_account_id, system_id, package_id, name_id, installable_id, applicable_id)"+
" (select * from unnest($1::int[], $2::bigint[], $3::bigint[], $4::bigint[], $5::bigint[], $6::bigint[]))"+
" ON CONFLICT DO NOTHING", toInsert)
if err != nil {
utils.LogWarn("#", i, "err", err.Error(),
"account", as.RhAccountID, "system", as.SystemID,
"Failed to insert to system_package2")
}
utils.LogDebug("#", i, "#system", sysIdx, "partition", part, "len(toInsert)", len(toInsert),
"account", as.RhAccountID, "system", as.SystemID, "Migrated system",
)
}
}(as, i, part, sysIdx)
}
wg.Wait()
if err := errors.Wrap(tx.Commit().Error, "Commit"); err != nil {
utils.LogError("#", i, "partition", part, "err", err, "Failed to migrate partition")
}
utils.LogInfo("#", i, "partition", part, "Partition migrated")
}

Expand Down

0 comments on commit b5a0ccc

Please sign in to comment.