Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

POC: add job to migrate data from system_package #1316

Merged
merged 2 commits into from
Oct 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions deploy/clowdapp.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,37 @@ objects:
- {name: DELETE_UNUSED_DATA_LIMIT, value: '${DELETE_UNUSED_DATA_LIMIT}'}
- {name: ENABLE_UNUSED_DATA_DELETE, value: '${ENABLE_UNUSED_DATA_DELETE}'}

- name: migrate-system-package-data
activeDeadlineSeconds: ${{JOBS_TIMEOUT}}
disabled: ${MIGRATE_SYSTEM_PACKAGE_DISABLED}}
concurrencyPolicy: Forbid
completions: 1
podSpec:
image: ${IMAGE}:${IMAGE_TAG_JOBS}
initContainers:
- name: check-for-db
image: ${IMAGE}:${IMAGE_TAG_DATABASE_ADMIN}
command:
- ./database_admin/check-upgraded.sh
env:
- {name: SCHEMA_MIGRATION, value: '${SCHEMA_MIGRATION}'}
command:
- ./scripts/entrypoint.sh
- job
- system_package_data_migration
env:
- {name: LOG_LEVEL, value: '${LOG_LEVEL_JOBS}'}
- {name: GIN_MODE, value: '${GIN_MODE}'}
- {name: DB_DEBUG, value: '${DB_DEBUG_JOBS}'}
- {name: DB_USER, value: vmaas_sync}
- {name: DB_PASSWD, valueFrom: {secretKeyRef: {name: patchman-engine-database-passwords,
key: vmaas-sync-database-password}}}
- {name: PACKAGE_CACHE_SIZE, value: '${PACKAGE_CACHE_SIZE}'}
- {name: PACKAGE_NAME_CACHE_SIZE, value: '${PACKAGE_NAME_CACHE_SIZE}'}
resources:
limits: {cpu: '${RES_LIMIT_CPU_MIGRATE_JOB}', memory: '${RES_LIMIT_MEM_MIGRATE_JOB}'}
requests: {cpu: '${RES_REQUEST_CPU_MIGRATE_JOB}', memory: '${RES_REQUEST_MEM_MIGRATE_JOB}'}

database:
name: patchman
version: 12
Expand Down Expand Up @@ -693,6 +724,12 @@ parameters:
- {name: ADVISORY_REFRESH_SUSPEND, value: 'false'} # Disable cronjob execution
- {name: ENABLE_REFRESH_ADVISORY_CACHES, value: 'true'} # Enable periodic refresh of account advisory caches
- {name: SKIP_N_ACCOUNTS_REFRESH, value: '0'} # Skip advisory cache refresh for N first accounts in case the previous refresh didn't finish
# Migrate system package data
- {name: MIGRATE_SYSTEM_PACKAGE_DISABLED, value: 'true'}
- {name: RES_REQUEST_CPU_MIGRATE_JOB, value: '1'}
- {name: RES_LIMIT_CPU_MIGRATE_JOB, value: '2'}
- {name: RES_REQUEST_MEM_MIGRATE_JOB, value: '512Mi'}
- {name: RES_LIMIT_MEM_MIGRATE_JOB, value: '1024Mi'}

# Database admin
- {name: IMAGE_TAG_DATABASE_ADMIN, value: v3.5.9}
Expand Down
18 changes: 18 additions & 0 deletions docker-compose.prod.yml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,24 @@ services:
security_opt:
- label=disable

migrate_system_package2:
container_name: migrate_system_package2
image: patchman-engine-app
env_file:
- ./conf/common.env
- ./conf/admin_api.env
command: ./dev/scripts/docker-compose-entrypoint.sh job system_package_data_migration
depends_on:
- db
- patchimg
- db_feed
volumes:
- ./dev:/go/src/app/dev
- ./dev/database/secrets:/opt/postgresql
- ./dev/kafka/secrets:/opt/kafka
security_opt:
- label=disable

kafka:
container_name: kafka
build:
Expand Down
17 changes: 17 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,23 @@ services:
security_opt:
- label=disable

migrate_system_package2:
container_name: migrate_system_package2
image: patchman-engine-app
env_file:
- ./conf/common.env
- ./conf/gorun.env
- ./conf/admin_api.env
command: ./dev/scripts/docker-compose-entrypoint.sh job system_package_data_migration
depends_on:
- db
- patchimg
- db_feed
volumes:
- ./:/go/src/app
security_opt:
- label=disable

kafka:
container_name: kafka
build:
Expand Down
3 changes: 3 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"app/platform"
"app/tasks/caches"
"app/tasks/cleaning"
"app/tasks/migration"
"app/tasks/system_culling"
"app/tasks/vmaas_sync"
"app/turnpike"
Expand Down Expand Up @@ -70,5 +71,7 @@ func runJob(name string) {
cleaning.RunDeleteUnusedData()
case "packages_cache_refresh":
caches.RunPackageRefresh()
case "system_package_data_migration":
migration.RunSystemPackageDataMigration()
}
}
2 changes: 1 addition & 1 deletion scripts/check-dockercomposes.sh
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ sed \
-e "s|INSTALL_TOOLS=yes|INSTALL_TOOLS=no|" \
-e "s|target: buildimg|target: runtimeimg|" \
-e "/ - \.\/conf\/gorun.env/ d" \
-e "/ \(db_admin\|db_feed\|manager\|listener\|evaluator_recalc\|evaluator_upload\|vmaas_sync\|admin\):/,/^$/ {
-e "/ \(db_admin\|db_feed\|manager\|listener\|evaluator_recalc\|evaluator_upload\|vmaas_sync\|admin\|migrate_system_package2\):/,/^$/ {
s/- \.\/:\/go\/src\/app/- \.\/dev:\/go\/src\/app\/dev\n\
- .\/dev\/database\/secrets:\/opt\/postgresql\n\
- \.\/dev\/kafka\/secrets:\/opt\/kafka/
Expand Down
242 changes: 242 additions & 0 deletions tasks/migration/migrate_system_package_data.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,242 @@
package migration

import (
"app/base/core"
"app/base/models"
"app/base/utils"
"app/evaluator"
"app/tasks"
"encoding/json"
"fmt"
"sync"

"gorm.io/gorm"
)

var memoryPackageCache *evaluator.PackageCache

func RunSystemPackageDataMigration() {
tasks.HandleContextCancel(tasks.WaitAndExit)
core.ConfigureApp()
packageCacheSize := utils.GetIntEnvOrDefault("PACKAGE_CACHE_SIZE", 1000000)
packageNameCacheSize := utils.GetIntEnvOrDefault("PACKAGE_NAME_CACHE_SIZE", 60000)
memoryPackageCache = evaluator.NewPackageCache(true, true, packageCacheSize, packageNameCacheSize)
memoryPackageCache.Load()
utils.LogInfo("Migrating installable/applicable advisories from system_package to system_package2")
MigrateSystemPackageData()
}

type AccSys struct {
RhAccountID int
SystemID int64
}

type SystemPackageRecord struct {
NameID int64
PackageID int64
UpdateDataJSON json.RawMessage `gorm:"column:update_data"`
}

type UpdateData struct {
Evra string `json:"evra" gorm:"-"`
Status string `json:"status" gorm:"-"`
}

type Package struct {
ID int64
Evra string
}

func MigrateSystemPackageData() {
var wg sync.WaitGroup
var partitions []string

err := tasks.WithReadReplicaTx(func(db *gorm.DB) error {
return db.Table("pg_tables").
Where("tablename ~ '^system_package_[0-9]+$'").
Pluck("tablename", &partitions).Error
})
if err != nil {
utils.LogError("err", err.Error(), "Couldn't get partitions for system_package")
return
}

for i, part := range partitions {
utils.LogInfo("#", i, "partition", part, "Migrating partition")
accSys := getAccSys(part, i)

// process at most 4 systems at once
guard := make(chan struct{}, 4)

for _, as := range accSys {
guard <- struct{}{}
wg.Add(1)
go func(as AccSys, i int, part string) {
defer func() {
<-guard
wg.Done()
}()
updates := getUpdates(as, part, i)
for _, u := range updates {
updateData := getUpdateData(u, as, part, i)
latestApplicable, latestInstallable := getEvraApplicability(updateData)
applicableID, installableID := getPackageIDs(u, i, latestApplicable, latestInstallable)
if applicableID != 0 && installableID != 0 {
// insert ids to system_package2
err = tasks.WithTx(func(db *gorm.DB) error {
return db.Table("system_package2").
Where("installable_id IS NULL AND applicable_id IS NULL").
Save(models.SystemPackage{
RhAccountID: as.RhAccountID,
SystemID: as.SystemID,
PackageID: u.PackageID,
NameID: u.NameID,
InstallableID: &installableID,
ApplicableID: &applicableID,
}).Error
})
if err != nil {
utils.LogWarn("#", i, "Failed to update system_package2")
}
}
}
}(as, i, part)
}
wg.Wait()
utils.LogInfo("#", i, "partition", part, "Partition migrated")
}
}

func getAccSys(part string, i int) []AccSys {
// get systems from system_package partition
accSys := make([]AccSys, 0)
err := tasks.WithReadReplicaTx(func(db *gorm.DB) error {
return db.Table(part).
Distinct("rh_account_id, system_id").
Order("rh_account_id").
Order("system_id").
Find(&accSys).Error
})
if err != nil {
utils.LogWarn("#", i, "partition", part, "Failed to load data from partition")
return accSys
}

utils.LogInfo("#", i, "partition", part, "count", len(accSys), "Migrating systems")
return accSys
}

func getUpdates(as AccSys, part string, i int) []SystemPackageRecord {
var updates []SystemPackageRecord

// get update_data from system_package for given system
err := tasks.WithReadReplicaTx(func(db *gorm.DB) error {
return db.Table(part).
Select("name_id, package_id, update_data").
Where("rh_account_id = ?", as.RhAccountID).
Where("system_id = ?", as.SystemID).
Find(&updates).Error
})
if err != nil {
utils.LogWarn("#", i, "partition", part, "rh_account_id", as.RhAccountID, "system_id", as.SystemID,
"err", err.Error(), "Couldn't get update_data")
}
return updates
}

func getUpdateData(u SystemPackageRecord, as AccSys, part string, i int) []UpdateData {
var updateData []UpdateData
if err := json.Unmarshal(u.UpdateDataJSON, &updateData); err != nil {
utils.LogWarn("#", i, "partition", part, "rh_account_id", as.RhAccountID, "system_id", as.SystemID,
"update_data", string(u.UpdateDataJSON),
"err", err.Error(), "Couldn't unmarshal update_data")
}
return updateData
}

func getEvraApplicability(udpateData []UpdateData) (string, string) {
// get latest applicable and installable evra
var latestInstallable, latestApplicable string
for i := len(udpateData) - 1; i >= 0; i-- {
if len(latestInstallable) > 0 && len(latestApplicable) > 0 {
break
}
evra := udpateData[i].Evra
switch udpateData[i].Status {
case "Installable":
if len(latestInstallable) == 0 {
latestInstallable = evra
}
if len(latestApplicable) == 0 {
latestApplicable = evra
}
case "Applicable":
if len(latestApplicable) == 0 {
latestApplicable = evra
}
}
}

return latestApplicable, latestInstallable
}

func getPackageIDs(u SystemPackageRecord, i int, latestApplicable, latestInstallable string) (int64, int64) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given the fact that it will query same EVRAs over and over - can't we read them all into map (memory) at the start of the job?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can try to re-use the same cache as we are using in evaluator, sure

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I;m not sure how much memory we will need so I set the limit to 1024Mi

// get package_id for latest installable and applicable packages
if len(latestApplicable) == 0 && len(latestInstallable) == 0 {
return 0, 0
}

var applicableID, installableID int64

name, ok := memoryPackageCache.GetNameByID(u.NameID)
if ok {
var applicable, installable *evaluator.PackageCacheMetadata
// assume both evras will be found in cache
applicableInCache := true
installableInCache := true

if len(latestApplicable) > 0 {
nevraApplicable := fmt.Sprintf("%s-%s", name, latestApplicable)
applicable, applicableInCache = memoryPackageCache.GetByNevra(nevraApplicable)
if applicableInCache {
applicableID = applicable.ID
}
}

if len(latestInstallable) > 0 {
nevraInstallable := fmt.Sprintf("%s-%s", name, latestInstallable)
installable, installableInCache = memoryPackageCache.GetByNevra(nevraInstallable)
if installableInCache {
installableID = installable.ID
}
}

if applicableInCache && installableInCache {
// return ids only if both evras are found in cache
return applicableID, installableID
}
}

var packages []Package
err := tasks.WithReadReplicaTx(func(db *gorm.DB) error {
return db.Table("package").
Select("id, evra").
Where("evra IN (?)", []string{latestApplicable, latestInstallable}).
Where("name_id = ?", u.NameID).
Find(&packages).Error
})
if err != nil {
utils.LogWarn("#", i, "Failed to load packages")
}

for _, p := range packages {
if p.Evra == latestApplicable {
applicableID = p.ID
}
if p.Evra == latestInstallable {
installableID = p.ID
}
}

return applicableID, installableID
}
Loading