diff --git a/deploy/clowdapp.yaml b/deploy/clowdapp.yaml index 1cb5f257f..608fa4a05 100644 --- a/deploy/clowdapp.yaml +++ b/deploy/clowdapp.yaml @@ -479,6 +479,32 @@ objects: - {name: DELETE_UNUSED_DATA_LIMIT, value: '${DELETE_UNUSED_DATA_LIMIT}'} - {name: ENABLE_UNUSED_DATA_DELETE, value: '${ENABLE_UNUSED_DATA_DELETE}'} + - name: migrate-system-package-data + activeDeadlineSeconds: ${{JOBS_TIMEOUT}} + disabled: ${MIGRATE_SYSTEM_PACKAGE_DISABLED}} + concurrencyPolicy: Forbid + completions: 1 + podSpec: + image: ${IMAGE}:${IMAGE_TAG_JOBS} + initContainers: + - name: check-for-db + image: ${IMAGE}:${IMAGE_TAG_DATABASE_ADMIN} + command: + - ./database_admin/check-upgraded.sh + env: + - {name: SCHEMA_MIGRATION, value: '${SCHEMA_MIGRATION}'} + command: + - ./scripts/entrypoint.sh + - job + - system_package_data_migration + env: + - {name: LOG_LEVEL, value: '${LOG_LEVEL_JOBS}'} + - {name: GIN_MODE, value: '${GIN_MODE}'} + - {name: DB_DEBUG, value: '${DB_DEBUG_JOBS}'} + - {name: DB_USER, value: vmaas_sync} + - {name: DB_PASSWD, valueFrom: {secretKeyRef: {name: patchman-engine-database-passwords, + key: vmaas-sync-database-password}}} + database: name: patchman version: 12 @@ -693,6 +719,8 @@ parameters: - {name: ADVISORY_REFRESH_SUSPEND, value: 'false'} # Disable cronjob execution - {name: ENABLE_REFRESH_ADVISORY_CACHES, value: 'true'} # Enable periodic refresh of account advisory caches - {name: SKIP_N_ACCOUNTS_REFRESH, value: '0'} # Skip advisory cache refresh for N first accounts in case the previous refresh didn't finish +# Migrate system package data +- {name: MIGRATE_SYSTEM_PACKAGE_DISABLED, value: 'true'} # Database admin - {name: IMAGE_TAG_DATABASE_ADMIN, value: v3.5.9} diff --git a/docker-compose.prod.yml b/docker-compose.prod.yml index b2c7a23e7..f1463996c 100644 --- a/docker-compose.prod.yml +++ b/docker-compose.prod.yml @@ -57,6 +57,24 @@ services: security_opt: - label=disable + migrate_system_package2: + container_name: migrate_system_package2 + image: patchman-engine-app + env_file: + - ./conf/common.env + - ./conf/admin_api.env + command: ./dev/scripts/docker-compose-entrypoint.sh job system_package_data_migration + depends_on: + - db + - patchimg + - db_feed + volumes: + - ./dev:/go/src/app/dev + - ./dev/database/secrets:/opt/postgresql + - ./dev/kafka/secrets:/opt/kafka + security_opt: + - label=disable + kafka: container_name: kafka build: diff --git a/docker-compose.yml b/docker-compose.yml index 355267df1..0e1b6a70e 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -55,6 +55,23 @@ services: security_opt: - label=disable + migrate_system_package2: + container_name: migrate_system_package2 + image: patchman-engine-app + env_file: + - ./conf/common.env + - ./conf/gorun.env + - ./conf/admin_api.env + command: ./dev/scripts/docker-compose-entrypoint.sh job system_package_data_migration + depends_on: + - db + - patchimg + - db_feed + volumes: + - ./:/go/src/app + security_opt: + - label=disable + kafka: container_name: kafka build: diff --git a/main.go b/main.go index 1a09b6bd6..e285c944e 100644 --- a/main.go +++ b/main.go @@ -10,6 +10,7 @@ import ( "app/platform" "app/tasks/caches" "app/tasks/cleaning" + "app/tasks/migration" "app/tasks/system_culling" "app/tasks/vmaas_sync" "app/turnpike" @@ -70,5 +71,7 @@ func runJob(name string) { cleaning.RunDeleteUnusedData() case "packages_cache_refresh": caches.RunPackageRefresh() + case "system_package_data_migration": + migration.RunSystemPackageDataMigration() } } diff --git a/tasks/migration/migrate_system_package_data.go b/tasks/migration/migrate_system_package_data.go new file mode 100644 index 000000000..b029a3543 --- /dev/null +++ b/tasks/migration/migrate_system_package_data.go @@ -0,0 +1,204 @@ +package migration + +import ( + "app/base/core" + "app/base/models" + "app/base/utils" + "app/tasks" + "encoding/json" + "sync" + + "gorm.io/gorm" +) + +func RunSystemPackageDataMigration() { + tasks.HandleContextCancel(tasks.WaitAndExit) + core.ConfigureApp() + utils.LogInfo("Migrating installable/applicable advisories from system_package to system_package2") + MigrateSystemPackageData() +} + +type AccSys struct { + RhAccountID int + SystemID int64 +} + +type SystemPackageRecord struct { + NameID int64 + PackageID int64 + UpdateDataJSON json.RawMessage `gorm:"column:update_data"` +} + +type UpdateData struct { + Evra string `json:"evra" gorm:"-"` + Status string `json:"status" gorm:"-"` +} + +type Package struct { + ID int64 + Evra string +} + +func MigrateSystemPackageData() { + var wg sync.WaitGroup + var partitions []string + + err := tasks.WithReadReplicaTx(func(db *gorm.DB) error { + return db.Table("pg_tables"). + Where("tablename ~ '^system_package_[0-9]+$'"). + Pluck("tablename", &partitions).Error + }) + if err != nil { + utils.LogError("err", err.Error(), "Couldn't get partitions for system_package") + return + } + + for i, part := range partitions { + utils.LogInfo("#", i, "partition", part, "Migrating partition") + accSys := getAccSys(part, i) + + // process at most 4 systems at once + guard := make(chan struct{}, 4) + + for _, as := range accSys { + guard <- struct{}{} + wg.Add(1) + go func(as AccSys, i int, part string) { + defer func() { + <-guard + wg.Done() + }() + updates := getUpdates(as, part, i) + for _, u := range updates { + updateData := getUpdateData(u, as, part, i) + latestApplicable, latestInstallable := getEvraApplicability(updateData) + applicableID, installableID := getPackageIDs(u, i, latestApplicable, latestInstallable) + if applicableID != 0 && installableID != 0 { + // insert ids to system_package2 + err = tasks.WithTx(func(db *gorm.DB) error { + return db.Table("system_package2"). + Where("installable_id IS NULL AND applicable_id IS NULL"). + Save(models.SystemPackage{ + RhAccountID: as.RhAccountID, + SystemID: as.SystemID, + PackageID: u.PackageID, + NameID: u.NameID, + InstallableID: &installableID, + ApplicableID: &applicableID, + }).Error + }) + if err != nil { + utils.LogWarn("#", i, "Failed to update system_package2") + } + } + } + }(as, i, part) + } + wg.Wait() + utils.LogInfo("#", i, "partition", part, "Partition migrated") + } +} + +func getAccSys(part string, i int) []AccSys { + // get systems from system_package partition + accSys := make([]AccSys, 0) + err := tasks.WithReadReplicaTx(func(db *gorm.DB) error { + return db.Table(part). + Distinct("rh_account_id, system_id"). + Order("rh_account_id"). + Order("system_id"). + Find(&accSys).Error + }) + if err != nil { + utils.LogWarn("#", i, "partition", part, "Failed to load data from partition") + return accSys + } + + utils.LogInfo("#", i, "partition", part, "count", len(accSys), "Migrating systems") + return accSys +} + +func getUpdates(as AccSys, part string, i int) []SystemPackageRecord { + var updates []SystemPackageRecord + + // get update_data from system_package for given system + err := tasks.WithReadReplicaTx(func(db *gorm.DB) error { + return db.Table(part). + Select("name_id, package_id, update_data"). + Where("rh_account_id = ?", as.RhAccountID). + Where("system_id = ?", as.SystemID). + Find(&updates).Error + }) + if err != nil { + utils.LogWarn("#", i, "partition", part, "rh_account_id", as.RhAccountID, "system_id", as.SystemID, + "err", err.Error(), "Couldn't get update_data") + } + return updates +} + +func getUpdateData(u SystemPackageRecord, as AccSys, part string, i int) []UpdateData { + var updateData []UpdateData + if err := json.Unmarshal(u.UpdateDataJSON, &updateData); err != nil { + utils.LogWarn("#", i, "partition", part, "rh_account_id", as.RhAccountID, "system_id", as.SystemID, + "update_data", string(u.UpdateDataJSON), + "err", err.Error(), "Couldn't unmarshal update_data") + } + return updateData +} + +func getEvraApplicability(udpateData []UpdateData) (string, string) { + // get latest applicable and installable evra + var latestInstallable, latestApplicable string + for i := len(udpateData) - 1; i >= 0; i-- { + if len(latestInstallable) > 0 && len(latestApplicable) > 0 { + break + } + evra := udpateData[i].Evra + switch udpateData[i].Status { + case "Installable": + if len(latestInstallable) == 0 { + latestInstallable = evra + } + if len(latestApplicable) == 0 { + latestApplicable = evra + } + case "Applicable": + if len(latestApplicable) == 0 { + latestApplicable = evra + } + } + } + + return latestApplicable, latestInstallable +} + +func getPackageIDs(u SystemPackageRecord, i int, latestApplicable, latestInstallable string) (int64, int64) { + // get package_id for latest installable and applicable packages + if len(latestApplicable) == 0 && len(latestInstallable) == 0 { + return 0, 0 + } + + var packages []Package + err := tasks.WithReadReplicaTx(func(db *gorm.DB) error { + return db.Table("package"). + Select("id, evra"). + Where("evra IN (?)", []string{latestApplicable, latestInstallable}). + Where("name_id = ?", u.NameID). + Find(&packages).Error + }) + if err != nil { + utils.LogWarn("#", i, "Failed to load packages") + } + + var applicableID, installableID int64 + for _, p := range packages { + if p.Evra == latestApplicable { + applicableID = p.ID + } + if p.Evra == latestInstallable { + installableID = p.ID + } + } + + return applicableID, installableID +}