Skip to content

Commit

Permalink
POC: add job to migrate data from system_package
Browse files Browse the repository at this point in the history
  • Loading branch information
psegedy committed Oct 2, 2023
1 parent a8ed84e commit a99ac87
Show file tree
Hide file tree
Showing 5 changed files with 270 additions and 0 deletions.
28 changes: 28 additions & 0 deletions deploy/clowdapp.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,32 @@ objects:
- {name: DELETE_UNUSED_DATA_LIMIT, value: '${DELETE_UNUSED_DATA_LIMIT}'}
- {name: ENABLE_UNUSED_DATA_DELETE, value: '${ENABLE_UNUSED_DATA_DELETE}'}

- name: migrate-system-package-data
activeDeadlineSeconds: ${{JOBS_TIMEOUT}}
disabled: ${MIGRATE_SYSTEM_PACKAGE_DISABLED}}
concurrencyPolicy: Forbid
completions: 1
podSpec:
image: ${IMAGE}:${IMAGE_TAG_JOBS}
initContainers:
- name: check-for-db
image: ${IMAGE}:${IMAGE_TAG_DATABASE_ADMIN}
command:
- ./database_admin/check-upgraded.sh
env:
- {name: SCHEMA_MIGRATION, value: '${SCHEMA_MIGRATION}'}
command:
- ./scripts/entrypoint.sh
- job
- system_package_data_migration
env:
- {name: LOG_LEVEL, value: '${LOG_LEVEL_JOBS}'}
- {name: GIN_MODE, value: '${GIN_MODE}'}
- {name: DB_DEBUG, value: '${DB_DEBUG_JOBS}'}
- {name: DB_USER, value: vmaas_sync}
- {name: DB_PASSWD, valueFrom: {secretKeyRef: {name: patchman-engine-database-passwords,
key: vmaas-sync-database-password}}}

database:
name: patchman
version: 12
Expand Down Expand Up @@ -693,6 +719,8 @@ parameters:
- {name: ADVISORY_REFRESH_SUSPEND, value: 'false'} # Disable cronjob execution
- {name: ENABLE_REFRESH_ADVISORY_CACHES, value: 'true'} # Enable periodic refresh of account advisory caches
- {name: SKIP_N_ACCOUNTS_REFRESH, value: '0'} # Skip advisory cache refresh for N first accounts in case the previous refresh didn't finish
# Migrate system package data
- {name: MIGRATE_SYSTEM_PACKAGE_DISABLED, value: 'true'}

# Database admin
- {name: IMAGE_TAG_DATABASE_ADMIN, value: v3.5.9}
Expand Down
18 changes: 18 additions & 0 deletions docker-compose.prod.yml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,24 @@ services:
security_opt:
- label=disable

migrate_system_package2:
container_name: migrate_system_package2
image: patchman-engine-app
env_file:
- ./conf/common.env
- ./conf/admin_api.env
command: ./dev/scripts/docker-compose-entrypoint.sh job system_package_data_migration
depends_on:
- db
- patchimg
- db_feed
volumes:
- ./dev:/go/src/app/dev
- ./dev/database/secrets:/opt/postgresql
- ./dev/kafka/secrets:/opt/kafka
security_opt:
- label=disable

kafka:
container_name: kafka
build:
Expand Down
17 changes: 17 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,23 @@ services:
security_opt:
- label=disable

migrate_system_package2:
container_name: migrate_system_package2
image: patchman-engine-app
env_file:
- ./conf/common.env
- ./conf/gorun.env
- ./conf/admin_api.env
command: ./dev/scripts/docker-compose-entrypoint.sh job system_package_data_migration
depends_on:
- db
- patchimg
- db_feed
volumes:
- ./:/go/src/app
security_opt:
- label=disable

kafka:
container_name: kafka
build:
Expand Down
3 changes: 3 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"app/platform"
"app/tasks/caches"
"app/tasks/cleaning"
"app/tasks/migration"
"app/tasks/system_culling"
"app/tasks/vmaas_sync"
"app/turnpike"
Expand Down Expand Up @@ -70,5 +71,7 @@ func runJob(name string) {
cleaning.RunDeleteUnusedData()
case "packages_cache_refresh":
caches.RunPackageRefresh()
case "system_package_data_migration":
migration.RunSystemPackageDataMigration()
}
}
204 changes: 204 additions & 0 deletions tasks/migration/migrate_system_package_data.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
package migration

import (
"app/base/core"
"app/base/models"
"app/base/utils"
"app/tasks"
"encoding/json"
"sync"

"gorm.io/gorm"
)

func RunSystemPackageDataMigration() {
tasks.HandleContextCancel(tasks.WaitAndExit)
core.ConfigureApp()
utils.LogInfo("Migrating installable/applicable advisories from system_package to system_package2")
MigrateSystemPackageData()
}

type AccSys struct {
RhAccountID int
SystemID int64
}

type SystemPackageRecord struct {
NameID int64
PackageID int64
UpdateDataJSON json.RawMessage `gorm:"column:update_data"`
}

type UpdateData struct {
Evra string `json:"evra" gorm:"-"`
Status string `json:"status" gorm:"-"`
}

type Package struct {
ID int64
Evra string
}

func MigrateSystemPackageData() {
var wg sync.WaitGroup
var partitions []string

err := tasks.WithReadReplicaTx(func(db *gorm.DB) error {
return db.Table("pg_tables").
Where("tablename ~ '^system_package_[0-9]+$'").
Pluck("tablename", &partitions).Error
})
if err != nil {
utils.LogError("err", err.Error(), "Couldn't get partitions for system_package")
return
}

for i, part := range partitions {
utils.LogInfo("#", i, "partition", part, "Migrating partition")
accSys := getAccSys(part, i)

// process at most 4 systems at once
guard := make(chan struct{}, 4)

for _, as := range accSys {
guard <- struct{}{}
wg.Add(1)
go func(as AccSys, i int, part string) {
defer func() {
<-guard
wg.Done()
}()
updates := getUpdates(as, part, i)
for _, u := range updates {
updateData := getUpdateData(u, as, part, i)
latestApplicable, latestInstallable := getEvraApplicability(updateData)
applicableID, installableID := getPackageIDs(u, i, latestApplicable, latestInstallable)
if applicableID != 0 && installableID != 0 {
// insert ids to system_package2
err = tasks.WithTx(func(db *gorm.DB) error {
return db.Table("system_package2").
Where("installable_id IS NULL AND applicable_id IS NULL").
Save(models.SystemPackage{
RhAccountID: as.RhAccountID,
SystemID: as.SystemID,
PackageID: u.PackageID,
NameID: u.NameID,
InstallableID: &installableID,
ApplicableID: &applicableID,
}).Error
})
if err != nil {
utils.LogWarn("#", i, "Failed to update system_package2")
}
}
}
}(as, i, part)
}
wg.Wait()
utils.LogInfo("#", i, "partition", part, "Partition migrated")
}
}

func getAccSys(part string, i int) []AccSys {
// get systems from system_package partition
accSys := make([]AccSys, 0)
err := tasks.WithReadReplicaTx(func(db *gorm.DB) error {
return db.Table(part).
Distinct("rh_account_id, system_id").
Order("rh_account_id").
Order("system_id").
Find(&accSys).Error
})
if err != nil {
utils.LogWarn("#", i, "partition", part, "Failed to load data from partition")
return accSys
}

utils.LogInfo("#", i, "partition", part, "count", len(accSys), "Migrating systems")
return accSys
}

func getUpdates(as AccSys, part string, i int) []SystemPackageRecord {
var updates []SystemPackageRecord

// get update_data from system_package for given system
err := tasks.WithReadReplicaTx(func(db *gorm.DB) error {
return db.Table(part).
Select("name_id, package_id, update_data").
Where("rh_account_id = ?", as.RhAccountID).
Where("system_id = ?", as.SystemID).
Find(&updates).Error
})
if err != nil {
utils.LogWarn("#", i, "partition", part, "rh_account_id", as.RhAccountID, "system_id", as.SystemID,
"err", err.Error(), "Couldn't get update_data")
}
return updates
}

func getUpdateData(u SystemPackageRecord, as AccSys, part string, i int) []UpdateData {
var updateData []UpdateData
if err := json.Unmarshal(u.UpdateDataJSON, &updateData); err != nil {
utils.LogWarn("#", i, "partition", part, "rh_account_id", as.RhAccountID, "system_id", as.SystemID,
"update_data", string(u.UpdateDataJSON),
"err", err.Error(), "Couldn't unmarshal update_data")
}
return updateData
}

func getEvraApplicability(udpateData []UpdateData) (string, string) {
// get latest applicable and installable evra
var latestInstallable, latestApplicable string
for i := len(udpateData) - 1; i >= 0; i-- {
if len(latestInstallable) > 0 && len(latestApplicable) > 0 {
break
}
evra := udpateData[i].Evra
switch udpateData[i].Status {
case "Installable":
if len(latestInstallable) == 0 {
latestInstallable = evra
}
if len(latestApplicable) == 0 {
latestApplicable = evra
}
case "Applicable":
if len(latestApplicable) == 0 {
latestApplicable = evra
}
}
}

return latestApplicable, latestInstallable
}

func getPackageIDs(u SystemPackageRecord, i int, latestApplicable, latestInstallable string) (int64, int64) {
// get package_id for latest installable and applicable packages
if len(latestApplicable) == 0 && len(latestInstallable) == 0 {
return 0, 0
}

var packages []Package
err := tasks.WithReadReplicaTx(func(db *gorm.DB) error {
return db.Table("package").
Select("id, evra").
Where("evra IN (?)", []string{latestApplicable, latestInstallable}).
Where("name_id = ?", u.NameID).
Find(&packages).Error
})
if err != nil {
utils.LogWarn("#", i, "Failed to load packages")
}

var applicableID, installableID int64
for _, p := range packages {
if p.Evra == latestApplicable {
applicableID = p.ID
}
if p.Evra == latestInstallable {
installableID = p.ID
}
}

return applicableID, installableID
}

0 comments on commit a99ac87

Please sign in to comment.