Skip to content

Commit

Permalink
Merge branch 'wal-g:master' into kb-dev
Browse files Browse the repository at this point in the history
  • Loading branch information
ldming authored Mar 21, 2024
2 parents b36b726 + b77e637 commit eedb4b8
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 24 deletions.
5 changes: 4 additions & 1 deletion cmd/gp/check_ao_length.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ package gp

import (
"github.com/spf13/cobra"
"github.com/spf13/viper"
"github.com/wal-g/tracelog"
"github.com/wal-g/wal-g/internal"
conf "github.com/wal-g/wal-g/internal/config"
"github.com/wal-g/wal-g/internal/databases/greenplum"
)

Expand All @@ -24,7 +26,8 @@ var checkAOTableLengthMasterCmd = &cobra.Command{
}

func init() {
checkAOTableLengthMasterCmd.PersistentFlags().StringVarP(&logsDir, "logs", "l", "/var/log/greenplum", `directory to store logs`)
checkAOTableLengthMasterCmd.PersistentFlags().StringVarP(&logsDir, "logs", "l", viper.GetString(conf.GPLogsDirectory),
"directory to store logs")
checkAOTableLengthMasterCmd.PersistentFlags().BoolVar(&runBackupCheck, "check-backup", false,
"if the flag is set, checks backup`s length")
checkAOTableLengthMasterCmd.PersistentFlags().StringVarP(&name, "backup-name", "n", internal.LatestString,
Expand Down
21 changes: 11 additions & 10 deletions internal/databases/greenplum/ao_check_length_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,13 @@ import (
)

type AOLengthCheckHandler struct {
logsDir string
checkBackup bool
backupName string
}

func NewAOLengthCheckHandler(logsDir string, checkBackup bool, backupName string) (*AOLengthCheckHandler, error) {
initGpLog(logsDir)
return &AOLengthCheckHandler{
logsDir: logsDir,
checkBackup: checkBackup,
backupName: backupName,
}, nil
Expand All @@ -44,15 +42,18 @@ func (checker *AOLengthCheckHandler) CheckAOTableLength() {
tracelog.ErrorLogger.FatalfOnError("could not get cluster info %v", err)
}

segmentsBaccups, err := getSegmentBackupNames(checker.backupName)
if err != nil {
tracelog.ErrorLogger.FatalfOnError("could not get segment`s backups %v", err)
segmentsBackups := make(map[int]string)
if checker.checkBackup {
segmentsBackups, err = getSegmentBackupNames(checker.backupName)
if err != nil {
tracelog.ErrorLogger.FatalfOnError("could not get segment`s backups %v", err)
}
}

remoteOutput := globalCluster.GenerateAndExecuteCommand("Run ao/aocs length check",
cluster.ON_SEGMENTS,
func(contentID int) string {
return checker.buildCheckAOLengthCmd(contentID, segmentsBaccups[contentID], globalCluster)
return checker.buildCheckAOLengthCmd(contentID, segmentsBackups, globalCluster)
})

for _, command := range remoteOutput.Commands {
Expand All @@ -68,16 +69,16 @@ func (checker *AOLengthCheckHandler) CheckAOTableLength() {
}
}

func (checker *AOLengthCheckHandler) buildCheckAOLengthCmd(contentID int, backupName string, globalCluster *cluster.Cluster) string {
func (checker *AOLengthCheckHandler) buildCheckAOLengthCmd(contentID int, backupNames map[int]string,
globalCluster *cluster.Cluster) string {
segment := globalCluster.ByContent[contentID][0]

runCheckArgs := []string{
fmt.Sprintf("--port=%d", segment.Port),
fmt.Sprintf("--segnum=%d", segment.ContentID),
}

if checker.checkBackup {
runCheckArgs = append(runCheckArgs, "--check-backup", fmt.Sprintf("--backup-name=%s", backupName))
runCheckArgs = append(runCheckArgs, "--check-backup", fmt.Sprintf("--backup-name=%s", backupNames[contentID]))
}

runCheckArgsLine := strings.Join(runCheckArgs, " ")
Expand All @@ -89,7 +90,7 @@ func (checker *AOLengthCheckHandler) buildCheckAOLengthCmd(contentID int, backup
fmt.Sprintf("--config=%s", conf.CfgFile),
// method
"check-ao-aocs-length-segment",
// actual arguments to be passed to the backup-push command
// actual arguments to be passed to the check-ao command
runCheckArgsLine,
// forward stdout and stderr to the log file
"&>>", formatSegmentLogPath(contentID),
Expand Down
6 changes: 6 additions & 0 deletions internal/databases/greenplum/ao_storage_uploader.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,12 @@ func (u *AoStorageUploader) addFile(cfi *internal.ComposeFileInfo, aoMeta AoRelF
return u.regularAoUpload(cfi, aoMeta, location)
}

if !u.isIncremental && remoteFile.IsIncremented {
tracelog.DebugLogger.Printf("%s: backup isIncremental: %t, remote file isIncremented: %t, will perform a regular upload",
cfi.Header.Name, u.isIncremental, remoteFile.IsIncremented)
return u.regularAoUpload(cfi, aoMeta, location)
}

if aoMeta.modCount != remoteFile.ModCount {
if !u.isIncremental || aoMeta.modCount == 0 {
tracelog.DebugLogger.Printf("%s: isIncremental: %t, modCount: %d, will perform a regular upload",
Expand Down
58 changes: 50 additions & 8 deletions internal/databases/greenplum/ao_storage_uploader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func TestRegularAoUpload(t *testing.T) {
ModCount: 4,
},
}
runSingleTest(t, baseFiles, bundleFiles, testFiles, expectedResults, deduplicationAgeLimit)
runSingleTest(t, baseFiles, bundleFiles, testFiles, expectedResults, deduplicationAgeLimit, true)
}

func TestAoUpload_MaxAge(t *testing.T) {
Expand Down Expand Up @@ -176,7 +176,7 @@ func TestAoUpload_MaxAge(t *testing.T) {
ModCount: 5,
},
}
runSingleTest(t, baseFiles, bundleFiles, testFiles, expectedResults, deduplicationAgeLimit)
runSingleTest(t, baseFiles, bundleFiles, testFiles, expectedResults, deduplicationAgeLimit, true)
}

func TestIncrementalAoUpload(t *testing.T) {
Expand Down Expand Up @@ -256,7 +256,7 @@ func TestIncrementalAoUpload(t *testing.T) {
ModCount: 5,
},
}
runSingleTest(t, baseFiles, bundleFiles, testFiles, expectedResults, deduplicationAgeLimit)
runSingleTest(t, baseFiles, bundleFiles, testFiles, expectedResults, deduplicationAgeLimit, true)
}

func TestIncrementalAoUpload_EqualEof_DifferentModCount(t *testing.T) {
Expand Down Expand Up @@ -298,7 +298,7 @@ func TestIncrementalAoUpload_EqualEof_DifferentModCount(t *testing.T) {
ModCount: 5,
},
}
runSingleTest(t, baseFiles, bundleFiles, testFiles, expectedResults, deduplicationAgeLimit)
runSingleTest(t, baseFiles, bundleFiles, testFiles, expectedResults, deduplicationAgeLimit, true)
}

func TestIncrementalAoUpload_DifferentEof_EqualModCount(t *testing.T) {
Expand Down Expand Up @@ -340,7 +340,49 @@ func TestIncrementalAoUpload_DifferentEof_EqualModCount(t *testing.T) {
ModCount: 4,
},
}
runSingleTest(t, baseFiles, bundleFiles, testFiles, expectedResults, deduplicationAgeLimit)
runSingleTest(t, baseFiles, bundleFiles, testFiles, expectedResults, deduplicationAgeLimit, true)
}

func TestIncrementalAoUpload_FullAfterDelta(t *testing.T) {
baseFiles := greenplum.BackupAOFiles{
"1663.1": {
StoragePath: "1009_13_md5summock_1663_1_4_test_D_aoseg",
IsSkipped: false,
IsIncremented: true,
MTime: time.Now(),
StorageType: greenplum.ColumnOriented,
EOF: 70,
ModCount: 4,
Compressor: "",
FileMode: 420,
InitialUploadTS: time.Now(),
},
}
bundleFiles := &internal.RegularBundleFiles{}
testFiles := map[string]TestFileInfo{
"1663.1": {
AoRelFileMetadata: greenplum.NewAoRelFileMetadata("md5summock", greenplum.ColumnOriented, 70, 4),
BlockLocation: walparser.BlockLocation{
RelationFileNode: walparser.RelFileNode{
SpcNode: 1009,
DBNode: 13,
RelNode: 1663,
},
BlockNo: 1,
},
},
}
expectedResults := map[string]ExpectedResult{
"1663.1": {
StoragePath: "1009_13_md5summock_1663_1_4_test_aoseg",
IsSkipped: false,
IsIncremented: false,
StorageType: greenplum.ColumnOriented,
EOF: 70,
ModCount: 4,
},
}
runSingleTest(t, baseFiles, bundleFiles, testFiles, expectedResults, deduplicationAgeLimit, false)
}

func TestAoUpload_SkippedFile(t *testing.T) {
Expand Down Expand Up @@ -382,7 +424,7 @@ func TestAoUpload_SkippedFile(t *testing.T) {
ModCount: 4,
},
}
runSingleTest(t, baseFiles, bundleFiles, testFiles, expectedResults, deduplicationAgeLimit)
runSingleTest(t, baseFiles, bundleFiles, testFiles, expectedResults, deduplicationAgeLimit, true)
}

func TestAoUpload_NotExistFile(t *testing.T) {
Expand Down Expand Up @@ -429,8 +471,8 @@ func TestAoUpload_NotExistFile(t *testing.T) {

func runSingleTest(t *testing.T, baseFiles greenplum.BackupAOFiles,
bundleFiles *internal.RegularBundleFiles, testFiles map[string]TestFileInfo, expectedResults map[string]ExpectedResult,
deduplicationAgeLimit time.Duration) {
uploader := newAoStorageUploader(baseFiles, bundleFiles, true, deduplicationAgeLimit)
deduplicationAgeLimit time.Duration, isAploaderIncremental bool) {
uploader := newAoStorageUploader(baseFiles, bundleFiles, isAploaderIncremental, deduplicationAgeLimit)
testDir, testFiles := generateData("data", testFiles, t)
defer os.RemoveAll(testDir)

Expand Down
31 changes: 26 additions & 5 deletions internal/databases/postgres/connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,35 @@ func Connect(configOptions ...func(config *pgx.ConnConfig) error) (*pgx.Conn, er
}
}

var archiveMode string
err = checkArchiveCommand(conn)
if err != nil {
return nil, err
}

return conn, nil
}

func checkArchiveCommand(conn *pgx.Conn) error {
// TODO: Move this logic to queryRunner

var standby bool

err := conn.QueryRow("select pg_is_in_recovery()").Scan(&standby)
if err != nil {
return errors.Wrap(err, "Connect: postgres standby test failed")
}

if standby {
// archive_mode may be configured on primary
return nil
}

var archiveMode string

err = conn.QueryRow("show archive_mode").Scan(&archiveMode)

if err != nil {
return nil, errors.Wrap(err, "Connect: postgres archive_mode test failed")
return errors.Wrap(err, "Connect: postgres archive_mode test failed")
}

if archiveMode != "on" && archiveMode != "always" {
Expand All @@ -60,7 +82,7 @@ func Connect(configOptions ...func(config *pgx.ConnConfig) error) (*pgx.Conn, er
err = conn.QueryRow("show archive_command").Scan(&archiveCommand)

if err != nil {
return nil, errors.Wrap(err, "Connect: postgres archive_mode test failed")
return errors.Wrap(err, "Connect: postgres archive_mode test failed")
}

if len(archiveCommand) == 0 || archiveCommand == "(disabled)" {
Expand All @@ -69,8 +91,7 @@ func Connect(configOptions ...func(config *pgx.ConnConfig) error) (*pgx.Conn, er
" Please consider configuring WAL archiving.")
}
}

return conn, nil
return nil
}

// nolint:gocritic
Expand Down

0 comments on commit eedb4b8

Please sign in to comment.