From 33fb2a84f0e27d5adb7c69d5fc3a35f8dc4f12d7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jakub=20Dugovi=C4=8D?= Date: Wed, 4 Sep 2024 11:59:46 +0200 Subject: [PATCH] RHINENG-10032: implement Repack func --- tasks/repack/repack.go | 58 +++++++++++++++++++++++++++++++++++------- 1 file changed, 49 insertions(+), 9 deletions(-) diff --git a/tasks/repack/repack.go b/tasks/repack/repack.go index 8ca648b35..ce3cacf07 100644 --- a/tasks/repack/repack.go +++ b/tasks/repack/repack.go @@ -5,10 +5,13 @@ import ( "app/base/utils" "app/tasks" "fmt" + "os" + "os/exec" ) var pgRepackArgs = []string{ - "pg_repack", "--no-superuser-check", + "--no-superuser-check", + "--no-password", "-d", utils.CoreCfg.DBName, "-h", utils.CoreCfg.DBHost, "-p", fmt.Sprintf("%d", utils.CoreCfg.DBPort), @@ -19,22 +22,59 @@ func configure() { core.ConfigureApp() } -func Repack(table string) error { - // TODO +// GetCmd returns command that calls gp_repack with table. Table must be a partitioned table. +// Args are appended to the necessary pgRepackArgs. Stdout and stderr of the subprocess are redirected to host. +func getCmd(table string, args ...string) *exec.Cmd { + fullArgs := pgRepackArgs + fullArgs = append(fullArgs, "-I", table) + fullArgs = append(fullArgs, args...) + cmd := exec.Command("pg_repack", fullArgs...) + cmd.Env = append(os.Environ(), fmt.Sprintf("PGPASSWORD=%s", utils.CoreCfg.DBPassword)) + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + return cmd +} + +// Repack runs pg_repack reindex with table. If columns are provided, cluster by these columns is executed as well. +// Table must be a partitioned table. +func Repack(table string, columns string) error { + reindexCmd := getCmd(table, "-x") + err := reindexCmd.Run() + if err != nil { + return err + } + + if columns == "" || len(columns) == 0 { + utils.LogWarn("no columns provided, skipping repack clustering") + return nil + } + clusterCmd := getCmd(table, "-o", columns) + err = clusterCmd.Run() + if err != nil { + return err + } + return nil } +// RunRepack wraps Repack call for a job. func RunRepack() { tasks.HandleContextCancel(tasks.WaitAndExit) utils.LogInfo("Starting repack job") configure() - var table_name string = "system_package2" // FIXME: use env variable maybe + TABLES := map[string]string{ + "system_package2": "rh_account_id,system_id", + "system_platform": "rh_account_id,id,inventory_id", + "system_advisories": "rh_account_id,system_id", + } - err := Repack(table_name) - if err != nil { - utils.LogError("err", err.Error(), fmt.Sprintf("Repack table %s", table_name)) - return + for table, columns := range TABLES { + err := Repack(table, columns) + if err != nil { + utils.LogError("err", err.Error(), fmt.Sprintf("Failed to repack table %s", table)) + continue + } + utils.LogInfo(fmt.Sprintf("Successfully repacked table %s", table)) } - utils.LogInfo(fmt.Sprintf("Repacked table %s", table_name)) }