Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

APPS-911 Store config on s3 #22

Merged
merged 16 commits into from
Oct 22, 2023
6 changes: 4 additions & 2 deletions cmd/backup/config/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ aerospike-cluster:
- name: "cluster1"
host: "localhost"
port: 3000
user: "tester"
password: "psw"

backup-storage:
- name: "local1"
Expand All @@ -11,7 +13,7 @@ backup-storage:

backup-policy:
- name: "policy-daily"
interval: 30000
interval: 10000
incr_interval: 5000
type: 1
source_cluster: "cluster1"
Expand All @@ -20,5 +22,5 @@ backup-policy:
set_list:
- set1
parallel: 1
remove_files: true
remove_files: false

Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ aerospike-cluster:
- name: "cluster1"
host: "localhost"
port: 3000
user: "tester"
password: "psw"

backup-storage:
- name: "s3"
Expand All @@ -21,5 +23,5 @@ backup-policy:
storage: "s3"
namespace: "test"
parallel: 1
remove_files: true
remove_files: false

4 changes: 4 additions & 0 deletions cmd/backup/config/remote_config.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
type: 1
s3_region: "eu-central-1"
s3_profile: "default"
path: "s3://as-backup-bucket/s3_config.yml"
5 changes: 5 additions & 0 deletions cmd/backup/config/remote_minio_config.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
type: 1
s3_endpoint_override: "http://localhost:9000"
s3_region: "eu-central-1"
s3_profile: "minio"
path: "s3://as-backup-bucket/minio_config.yml"
27 changes: 24 additions & 3 deletions cmd/backup/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"context"
"errors"
"fmt"
"os"
"os/signal"
Expand All @@ -18,28 +19,48 @@ import (
)

// run parses the CLI parameters and executes backup.

func run() int {
var (
host, configFile, logLevel string
port int
host, configFile, remoteConfig, logLevel string
port int
)

validateFlags := func(cmd *cobra.Command, args []string) error {
if len(configFile) == 0 && len(remoteConfig) == 0 {
return errors.New("one of --config or --remote is required")
}
if len(configFile) > 0 && len(remoteConfig) > 0 {
return errors.New("only one of --config or --remote is allowed")
}
return nil
}
rootCmd := &cobra.Command{
Use: "Use the following properties for service configuration",
Short: "Aerospike Backup Service",
Version: util.Version,
PreRunE: validateFlags,
}

rootCmd.Flags().StringVar(&host, "host", "0.0.0.0", "service host")
rootCmd.Flags().IntVar(&port, "port", 8080, "service port")
rootCmd.Flags().StringVarP(&configFile, "config", "c", "", "configuration file path")
rootCmd.Flags().StringVarP(&remoteConfig, "remote", "r", "", "remote configuration")
rootCmd.Flags().StringVarP(&logLevel, "log", "l", "DEBUG", "log level")

rootCmd.RunE = func(cmd *cobra.Command, args []string) error {
// set default logger
slog.SetDefault(slog.New(util.LogHandler(logLevel)))
// read configuration file
server.ConfigurationManager = service.NewConfigurationManager(configFile)
if configFile != "" {
server.ConfigurationManager = service.NewConfigurationManager(configFile)
} else if remoteConfig != "" {
configurationStorage, err := service.ReadConfigStorage(remoteConfig)
if err != nil {
return err
}
server.ConfigurationManager = service.NewS3ConfigurationManager(configurationStorage)
}
config, err := readConfiguration()
if err != nil {
return err
Expand Down
7 changes: 7 additions & 0 deletions pkg/model/constants.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package model

const (
StateFileName = "state.yaml"
IncrementalBackupDirectory = "incremental"
FullBackupDirectory = "backup"
)
6 changes: 3 additions & 3 deletions pkg/service/backup_backend_local.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ var _ BackupBackend = (*BackupBackendLocal)(nil)
// NewBackupBackendLocal returns a new BackupBackendLocal instance.
func NewBackupBackendLocal(path, backupPolicyName string) *BackupBackendLocal {
prepareDirectory(path)
incrDirectoryPath := path + "/" + incremenalBackupDirectory
incrDirectoryPath := path + "/" + model.IncrementalBackupDirectory
prepareDirectory(incrDirectoryPath)
return &BackupBackendLocal{
path: path,
stateFilePath: path + "/" + stateFileName,
stateFilePath: path + "/" + model.StateFileName,
backupPolicyName: backupPolicyName,
}
}
Expand Down Expand Up @@ -84,7 +84,7 @@ func (local *BackupBackendLocal) FullBackupList() ([]model.BackupDetails, error)
}

func (local *BackupBackendLocal) IncrementalBackupList() ([]model.BackupDetails, error) {
entries, err := os.ReadDir(local.path + "/" + incremenalBackupDirectory)
entries, err := os.ReadDir(local.path + "/" + model.IncrementalBackupDirectory)
if err != nil {
return nil, err
}
Expand Down
169 changes: 40 additions & 129 deletions pkg/service/backup_backend_s3.go
Original file line number Diff line number Diff line change
@@ -1,172 +1,83 @@
package service

import (
"bytes"
"context"
"encoding/json"
"io"
"log/slog"
"net/url"
"os"

"github.com/aerospike/backup/pkg/model"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/aws/smithy-go/ptr"
)

// BackupBackendS3 implements the BackupBackend interface by
// saving state to AWS S3.
type BackupBackendS3 struct {
ctx context.Context
client *s3.Client
bucket string
path string
stateFilePath string
backupPolicyName string
*S3Context
stateFilePath string
backupPolicy *model.BackupPolicy
}

var _ BackupBackend = (*BackupBackendS3)(nil)

// NewBackupBackendS3 returns a new BackupBackendS3 instance.
func NewBackupBackendS3(storage *model.BackupStorage, backupPolicyName string) *BackupBackendS3 {
// Load the SDK's configuration from environment and shared config, and
// create the client with this.
ctx := context.TODO()
cfg, err := config.LoadDefaultConfig(ctx, config.WithSharedConfigProfile(*storage.S3Profile))
if err != nil {
slog.Error("Failed to load S3 SDK configuration", "err", err)
os.Exit(1)
}
cfg.Region = *storage.S3Region

client := s3.NewFromConfig(cfg, func(o *s3.Options) {
o.BaseEndpoint = aws.String(*storage.S3EndpointOverride)
o.UsePathStyle = true
})

parsed, err := url.Parse(*storage.Path)
if err != nil {
slog.Error("Failed to parse S3 storage path", "err", err)
os.Exit(1)
}

func NewBackupBackendS3(storage *model.BackupStorage, backupPolicy *model.BackupPolicy) *BackupBackendS3 {
s3Context := NewS3Context(storage)
return &BackupBackendS3{
ctx: ctx,
client: client,
bucket: parsed.Host,
path: parsed.Path,
stateFilePath: parsed.Path + "/" + stateFileName,
backupPolicyName: backupPolicyName,
S3Context: s3Context,
stateFilePath: s3Context.Path + "/" + model.StateFileName,
backupPolicy: backupPolicy,
}
}

func (s *BackupBackendS3) readState() *model.BackupState {
result, err := s.client.GetObject(s.ctx, &s3.GetObjectInput{
Bucket: aws.String(s.bucket),
Key: aws.String(s.stateFilePath),
})
state := model.NewBackupState()
if err != nil {
slog.Warn("Failed to read state file for backup", "path", s.stateFilePath)
return state
}
defer result.Body.Close()
bytes, err := io.ReadAll(result.Body)
if err != nil {
slog.Warn("Couldn't read object body of backup state file",
"path", s.stateFilePath, "err", err)
}
if err = json.Unmarshal(bytes, state); err != nil {
slog.Warn("Failed unmarshal state file for backup",
"path", s.stateFilePath, "err", err)
}
s.readFile(s.stateFilePath, state)
return state
}

func (s *BackupBackendS3) writeState(state *model.BackupState) error {
backupState, err := json.Marshal(state)
if err != nil {
return err
}
reader := bytes.NewReader(backupState)
_, err = s.client.PutObject(s.ctx, &s3.PutObjectInput{
Bucket: aws.String(s.bucket),
Key: aws.String(s.stateFilePath),
Body: reader,
})
if err != nil {
slog.Warn("Couldn't upload state file", "path", s.stateFilePath,
"bucket", s.bucket, "err", err)
}

return err
return s.writeFile(s.stateFilePath, state)
}

func (s *BackupBackendS3) FullBackupList() ([]model.BackupDetails, error) {
result, err := s.client.ListObjectsV2(s.ctx, &s3.ListObjectsV2Input{
Bucket: aws.String(s.bucket),
Prefix: aws.String(s.path + "/"),
Delimiter: aws.String("/"),
})
var contents []model.BackupDetails
if err != nil {
slog.Warn("Couldn't list backups in bucket", "path", s.path, "err", err)
} else {
for _, prefix := range result.CommonPrefixes {
details := model.BackupDetails{
Key: prefix.Prefix,
}
contents = append(contents, details)
backupFolder := s.Path + "/" + model.FullBackupDirectory + "/"
if s.backupPolicy.RemoveFiles != nil && *s.backupPolicy.RemoveFiles {
files, _ := s.listFiles(backupFolder)
if len(files) > 0 {
return []model.BackupDetails{{
Key: ptr.String(backupFolder),
}}, nil
}
return []model.BackupDetails{}, nil
}
return contents, err
}

func (s *BackupBackendS3) IncrementalBackupList() ([]model.BackupDetails, error) {
result, err := s.client.ListObjectsV2(s.ctx, &s3.ListObjectsV2Input{
Bucket: aws.String(s.bucket),
Prefix: aws.String(s.path + "/" + incremenalBackupDirectory + "/"),
Delimiter: aws.String(""),
})
var contents []model.BackupDetails
list, err := s.listFolders(backupFolder)
if err != nil {
slog.Warn("Couldn't list incremental backups", "path", s.path, "err", err)
} else {
for _, object := range result.Contents {
details := model.BackupDetails{
Key: object.Key,
LastModified: object.LastModified,
Size: &object.Size,
}
contents = append(contents, details)
return nil, err
}
contents := make([]model.BackupDetails, len(list))
for i, object := range list {
details := model.BackupDetails{
Key: object.Prefix,
}
contents[i] = details
}
return contents, err
}

func (s *BackupBackendS3) CleanDir(name string) {
path := s.path + "/" + name
result, err := s.client.ListObjectsV2(s.ctx, &s3.ListObjectsV2Input{
Bucket: aws.String(s.bucket),
Prefix: aws.String(path),
Delimiter: aws.String(""),
})
func (s *BackupBackendS3) IncrementalBackupList() ([]model.BackupDetails, error) {
list, err := s.listFiles(s.Path + "/" + model.IncrementalBackupDirectory)
if err != nil {
slog.Warn("Couldn't list files in directory", "path", path, "err", err)
} else {
for _, file := range result.Contents {
_, err := s.client.DeleteObject(s.ctx, &s3.DeleteObjectInput{
Bucket: aws.String(s.bucket),
Key: file.Key,
})
if err != nil {
slog.Debug("Couldn't delete file", "path", *file.Key, "err", err)
}
return nil, err
}
contents := make([]model.BackupDetails, len(list))
for i, object := range list {
details := model.BackupDetails{
Key: object.Key,
LastModified: object.LastModified,
Size: &object.Size,
}
contents[i] = details
}
return contents, err
}

func (s *BackupBackendS3) BackupPolicyName() string {
return s.backupPolicyName
return *s.backupPolicy.Name
}
9 changes: 2 additions & 7 deletions pkg/service/backup_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,6 @@ import (
"github.com/aerospike/backup/pkg/util"
)

const (
stateFileName = "state.json"
incremenalBackupDirectory = "incremental"
)

// BackupScheduler knows how to schedule a backup.
type BackupScheduler interface {
Schedule(ctx context.Context)
Expand Down Expand Up @@ -48,7 +43,7 @@ func NewBackupHandler(config *model.Config, backupPolicy *model.BackupPolicy) (*
case model.Local:
backupBackend = NewBackupBackendLocal(*storage.Path, *backupPolicy.Name)
case model.S3:
backupBackend = NewBackupBackendS3(storage, *backupPolicy.Name)
backupBackend = NewBackupBackendS3(storage, backupPolicy)
default:
return nil, fmt.Errorf("unsupported storage type: %d", *storage.Type)
}
Expand Down Expand Up @@ -90,7 +85,7 @@ loop:
// update the state
h.updateBackupState(now, state)
// clean incremental backups
h.backend.CleanDir(incremenalBackupDirectory)
h.backend.CleanDir(model.IncrementalBackupDirectory)
} else {
slog.Debug("The full backup is not due to run yet", "name", *h.backupPolicy.Name)
}
Expand Down
Loading
Loading