diff --git a/cmd/backup/main.go b/cmd/backup/main.go index f5c72e72..3b14c7a7 100644 --- a/cmd/backup/main.go +++ b/cmd/backup/main.go @@ -72,12 +72,13 @@ func run() int { slog.Info("Aerospike Backup Service", "commit", commit, "buildTime", buildTime) // schedule all configured backups backends := service.NewBackupBackends(config) - scheduler, err := service.ScheduleBackup(ctx, config, backends) + handlers := service.MakeHandlers(config, backends) + scheduler, err := service.ScheduleBackup(ctx, config, handlers) if err != nil { return err } // run HTTP server - err = runHTTPServer(ctx, backends, config, scheduler) + err = runHTTPServer(ctx, backends, config, scheduler, handlers) // shutdown shared resources shared.Shutdown() // stop the scheduler @@ -119,9 +120,13 @@ func readConfiguration() (*model.Config, error) { return config, nil } -func runHTTPServer(ctx context.Context, backends service.BackendsHolder, - config *model.Config, scheduler quartz.Scheduler) error { - httpServer := server.NewHTTPServer(backends, config, scheduler) +func runHTTPServer(ctx context.Context, + backends service.BackendsHolder, + config *model.Config, + scheduler quartz.Scheduler, + handlerHolder service.BackupHandlerHolder, +) error { + httpServer := server.NewHTTPServer(backends, config, scheduler, handlerHolder) go func() { httpServer.Start() }() diff --git a/config/config.yml b/config/config.yml index 65726e63..33d41bcc 100644 --- a/config/config.yml +++ b/config/config.yml @@ -44,7 +44,7 @@ backup-policies: remove-files: KeepAll bandwidth: 120000 sealed: true - encryptedCompressedPolicy: + encryptedCompressedPolicy128: parallel: 8 remove-files: RemoveAll sealed: true @@ -54,6 +54,16 @@ backup-policies: compression: level: 20 mode: ZSTD + encryptedCompressedPolicy256: + parallel: 8 + remove-files: RemoveAll + sealed: true + encryption: + key-file: encryptionKey + mode: AES256 + compression: + level: 20 + mode: ZSTD notSealed: parallel: 8 remove-files: KeepAll @@ -194,12 +204,18 @@ backup-routines: storage: local namespaces: ["source-ns13"] backup-policy: keepFilesPolicySlow - fullBackupEncrypedCompressed: - interval-cron: "@yearly" - source-cluster: absDefaultCluster - storage: local - namespaces: ["source-ns18"] - backup-policy: encryptedCompressedPolicy + fullBackupEncrypedCompressed128: + interval-cron: "@yearly" + source-cluster: absDefaultCluster + storage: local + namespaces: ["source-ns18"] + backup-policy: encryptedCompressedPolicy128 + fullBackupEncrypedCompressed256: + interval-cron: "@yearly" + source-cluster: absDefaultCluster + storage: local + namespaces: ["source-ns21"] + backup-policy: encryptedCompressedPolicy256 noIndexesUdfsRecords: interval-cron: "@yearly" source-cluster: absDefaultCluster diff --git a/docs/docs.go b/docs/docs.go index 1f837677..56e0c1f7 100644 --- a/docs/docs.go +++ b/docs/docs.go @@ -96,6 +96,41 @@ const docTemplate = `{ } } }, + "/v1/backups/currentBackup/{name}": { + "get": { + "produces": [ + "application/json" + ], + "tags": [ + "Backup" + ], + "summary": "Get current backup statistics.", + "operationId": "getCurrentBackup", + "parameters": [ + { + "type": "string", + "description": "Backup routine name", + "name": "name", + "in": "path", + "required": true + } + ], + "responses": { + "200": { + "description": "Current backup statistics", + "schema": { + "$ref": "#/definitions/model.CurrentBackups" + } + }, + "404": { + "description": "Not Found", + "schema": { + "type": "string" + } + } + } + } + }, "/v1/backups/full": { "get": { "produces": [ @@ -1741,6 +1776,57 @@ const docTemplate = `{ } } }, + "model.CurrentBackup": { + "type": "object", + "properties": { + "done-records": { + "description": "DoneRecords: the number of records that have been successfully backed up.", + "type": "integer", + "example": 50 + }, + "estimated-end-time": { + "description": "EstimatedEndTime: the estimated time when the backup operation will be completed.\nA nil value indicates that the estimation is not available yet.", + "type": "string", + "example": "2006-01-02T15:04:05Z07:00" + }, + "percentage-done": { + "description": "PercentageDone: the progress of the backup operation as a percentage.", + "type": "integer", + "example": 50 + }, + "start-time": { + "description": "StartTime: the time when the backup operation started.", + "type": "string", + "example": "2006-01-02T15:04:05Z07:00" + }, + "total-records": { + "description": "TotalRecords: the total number of records to be backed up.", + "type": "integer", + "example": 100 + } + } + }, + "model.CurrentBackups": { + "type": "object", + "properties": { + "full": { + "description": "Full represents the state of a full backup. Nil if no full backup is running.", + "allOf": [ + { + "$ref": "#/definitions/model.CurrentBackup" + } + ] + }, + "incremental": { + "description": "Incremental represents the state of an incremental backup. Nil if no incremental backup is running.", + "allOf": [ + { + "$ref": "#/definitions/model.CurrentBackup" + } + ] + } + } + }, "model.EncryptionPolicy": { "description": "EncryptionPolicy contains backup encryption information.", "type": "object", @@ -2290,7 +2376,7 @@ const docTemplate = `{ "example": "eu-central-1" }, "type": { - "description": "The type of the storage provider (0 - Local, 1 - AWS S3).", + "description": "The type of the storage provider", "enum": [ "local", "aws-s3" diff --git a/docs/openapi.json b/docs/openapi.json index b31a5c83..6d58b542 100644 --- a/docs/openapi.json +++ b/docs/openapi.json @@ -90,6 +90,44 @@ "tags" : [ "System" ] } }, + "/v1/backups/currentBackup/{name}" : { + "get" : { + "operationId" : "getCurrentBackup", + "parameters" : [ { + "description" : "Backup routine name", + "in" : "path", + "name" : "name", + "required" : true, + "schema" : { + "type" : "string" + } + } ], + "responses" : { + "200" : { + "content" : { + "application/json" : { + "schema" : { + "$ref" : "#/components/schemas/model.CurrentBackups" + } + } + }, + "description" : "Current backup statistics" + }, + "404" : { + "content" : { + "application/json" : { + "schema" : { + "type" : "string" + } + } + }, + "description" : "Not Found" + } + }, + "summary" : "Get current backup statistics.", + "tags" : [ "Backup" ] + } + }, "/v1/backups/full" : { "get" : { "operationId" : "getFullBackups", @@ -1842,6 +1880,55 @@ }, "type" : "object" }, + "model.CurrentBackup" : { + "properties" : { + "done-records" : { + "description" : "DoneRecords: the number of records that have been successfully backed up.", + "example" : 50, + "type" : "integer" + }, + "estimated-end-time" : { + "description" : "EstimatedEndTime: the estimated time when the backup operation will be completed.\nA nil value indicates that the estimation is not available yet.", + "example" : "2006-01-02T15:04:05Z07:00", + "type" : "string" + }, + "percentage-done" : { + "description" : "PercentageDone: the progress of the backup operation as a percentage.", + "example" : 50, + "type" : "integer" + }, + "start-time" : { + "description" : "StartTime: the time when the backup operation started.", + "example" : "2006-01-02T15:04:05Z07:00", + "type" : "string" + }, + "total-records" : { + "description" : "TotalRecords: the total number of records to be backed up.", + "example" : 100, + "type" : "integer" + } + }, + "type" : "object" + }, + "model.CurrentBackups" : { + "properties" : { + "full" : { + "allOf" : [ { + "$ref" : "#/components/schemas/model.CurrentBackup" + } ], + "description" : "Full represents the state of a full backup. Nil if no full backup is running.", + "type" : "object" + }, + "incremental" : { + "allOf" : [ { + "$ref" : "#/components/schemas/model.CurrentBackup" + } ], + "description" : "Incremental represents the state of an incremental backup. Nil if no incremental backup is running.", + "type" : "object" + } + }, + "type" : "object" + }, "model.EncryptionPolicy" : { "description" : "EncryptionPolicy contains backup encryption information.", "properties" : { @@ -2318,7 +2405,7 @@ "allOf" : [ { "$ref" : "#/components/schemas/model.StorageType" } ], - "description" : "The type of the storage provider (0 - Local, 1 - AWS S3).", + "description" : "The type of the storage provider", "type" : "object" } }, diff --git a/docs/openapi.yaml b/docs/openapi.yaml index 9609a21d..1a9d7a92 100644 --- a/docs/openapi.yaml +++ b/docs/openapi.yaml @@ -67,6 +67,32 @@ paths: summary: Readiness endpoint. tags: - System + /v1/backups/currentBackup/{name}: + get: + operationId: getCurrentBackup + parameters: + - description: Backup routine name + in: path + name: name + required: true + schema: + type: string + responses: + "200": + content: + application/json: + schema: + $ref: '#/components/schemas/model.CurrentBackups' + description: Current backup statistics + "404": + content: + application/json: + schema: + type: string + description: Not Found + summary: Get current backup statistics. + tags: + - Backup /v1/backups/full: get: operationId: getFullBackups @@ -1477,6 +1503,51 @@ components: example: testUser type: string type: object + model.CurrentBackup: + properties: + done-records: + description: "DoneRecords: the number of records that have been successfully\ + \ backed up." + example: 50 + type: integer + estimated-end-time: + description: |- + EstimatedEndTime: the estimated time when the backup operation will be completed. + A nil value indicates that the estimation is not available yet. + example: 2006-01-02T15:04:05Z07:00 + type: string + percentage-done: + description: "PercentageDone: the progress of the backup operation as a\ + \ percentage." + example: 50 + type: integer + start-time: + description: "StartTime: the time when the backup operation started." + example: 2006-01-02T15:04:05Z07:00 + type: string + total-records: + description: "TotalRecords: the total number of records to be backed up." + example: 100 + type: integer + type: object + model.CurrentBackups: + example: + incremental: "{}" + full: "{}" + properties: + full: + allOf: + - $ref: '#/components/schemas/model.CurrentBackup' + description: Full represents the state of a full backup. Nil if no full + backup is running. + type: object + incremental: + allOf: + - $ref: '#/components/schemas/model.CurrentBackup' + description: Incremental represents the state of an incremental backup. + Nil if no incremental backup is running. + type: object + type: object model.EncryptionPolicy: description: EncryptionPolicy contains backup encryption information. properties: @@ -1951,7 +2022,7 @@ components: type: allOf: - $ref: '#/components/schemas/model.StorageType' - description: "The type of the storage provider (0 - Local, 1 - AWS S3)." + description: The type of the storage provider type: object required: - path diff --git a/go.mod b/go.mod index b39b9c3a..f0d1b10e 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.22 require ( github.com/aerospike/aerospike-client-go/v7 v7.6.0 github.com/aerospike/aerospike-management-lib v1.4.0 - github.com/aerospike/backup-go v0.0.0-20240723084926-1ebd1eeb6271 + github.com/aerospike/backup-go v0.0.0-20240728070104-6c6b315f52d8 github.com/aws/aws-sdk-go-v2 v1.30.3 github.com/aws/aws-sdk-go-v2/config v1.27.27 github.com/aws/aws-sdk-go-v2/service/s3 v1.58.2 diff --git a/go.sum b/go.sum index 3bfb9353..22f940f3 100644 --- a/go.sum +++ b/go.sum @@ -210,6 +210,12 @@ github.com/aerospike/backup-go v0.0.0-20240723065039-ba1dcdae8a73 h1:Yi9rYp369Ol github.com/aerospike/backup-go v0.0.0-20240723065039-ba1dcdae8a73/go.mod h1:WfFo03sY4w5qu3IALjqNQaPZVe+0p7GqYPG+qifkm+8= github.com/aerospike/backup-go v0.0.0-20240723084926-1ebd1eeb6271 h1:wLmVSBoSUJGc2+tWn4hLuI/MLTz8OaVXEUq+1opnQSM= github.com/aerospike/backup-go v0.0.0-20240723084926-1ebd1eeb6271/go.mod h1:WfFo03sY4w5qu3IALjqNQaPZVe+0p7GqYPG+qifkm+8= +github.com/aerospike/backup-go v0.0.0-20240725064337-443a87106769 h1:ogbW0yY0dAqTW+JI+Yk4KGDLHOdEUQTcRz7kS5BCsnY= +github.com/aerospike/backup-go v0.0.0-20240725064337-443a87106769/go.mod h1:WfFo03sY4w5qu3IALjqNQaPZVe+0p7GqYPG+qifkm+8= +github.com/aerospike/backup-go v0.0.0-20240725064655-47f776116683 h1:hSzvcVnw7oMPeQQ7lIjxJRiCVcQGD0WBNULID9wpJ9c= +github.com/aerospike/backup-go v0.0.0-20240725064655-47f776116683/go.mod h1:WfFo03sY4w5qu3IALjqNQaPZVe+0p7GqYPG+qifkm+8= +github.com/aerospike/backup-go v0.0.0-20240728070104-6c6b315f52d8 h1:novGz78Gq6EYvz9E5b3NO7/DMBEP9r7ejso/aOqImeY= +github.com/aerospike/backup-go v0.0.0-20240728070104-6c6b315f52d8/go.mod h1:WfFo03sY4w5qu3IALjqNQaPZVe+0p7GqYPG+qifkm+8= github.com/aws/aws-sdk-go-v2 v1.26.1 h1:5554eUqIYVWpU0YmeeYZ0wU64H2VLBs8TlhRB2L+EkA= github.com/aws/aws-sdk-go-v2 v1.26.1/go.mod h1:ffIFB97e2yNsv4aTSGkqtHnppsIJzw7G7BReUZ3jCXM= github.com/aws/aws-sdk-go-v2 v1.27.0 h1:7bZWKoXhzI+mMR/HjdMx8ZCC5+6fY0lS5tr0bbgiLlo= diff --git a/internal/server/backup_handlers.go b/internal/server/backup_handlers.go index 6594bf35..4d9402aa 100644 --- a/internal/server/backup_handlers.go +++ b/internal/server/backup_handlers.go @@ -187,7 +187,7 @@ func (ws *HTTPServer) scheduleFullBackup(w http.ResponseWriter, r *http.Request) } fullBackupJobDetail := service.NewAdHocFullBackupJobForRoutine(routineName) if fullBackupJobDetail == nil { - http.Error(w, "unknown routine name", http.StatusNotFound) + http.Error(w, "unknown routine name "+routineName, http.StatusNotFound) return } trigger := quartz.NewRunOnceTrigger(time.Duration(delayMillis) * time.Millisecond) @@ -198,3 +198,36 @@ func (ws *HTTPServer) scheduleFullBackup(w http.ResponseWriter, r *http.Request) } w.WriteHeader(http.StatusAccepted) } + +// @Summary Get current backup statistics. +// @ID getCurrentBackup +// @Tags Backup +// @Produce json +// @Param name path string true "Backup routine name" +// @Router /v1/backups/currentBackup/{name} [get] +// @Success 200 {object} model.CurrentBackups "Current backup statistics" +// @Failure 404 {string} string +func (ws *HTTPServer) getCurrentBackupInfo(w http.ResponseWriter, r *http.Request) { + routineName := r.PathValue("name") + if routineName == "" { + http.Error(w, "routine name required", http.StatusBadRequest) + return + } + + handler, found := ws.handlerHolder[routineName] + if !found { + http.Error(w, "unknown routine name "+routineName, http.StatusNotFound) + return + } + + stat := handler.GetCurrentStat() + response, err := json.Marshal(stat) + if err != nil { + http.Error(w, "failed to marshal statistics", http.StatusInternalServerError) + return + } + + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + _, _ = w.Write(response) +} diff --git a/internal/server/server.go b/internal/server/server.go index 30c07c41..83c83432 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -84,11 +84,16 @@ type HTTPServer struct { scheduler quartz.Scheduler restoreService service.RestoreService backupBackends service.BackendsHolder + handlerHolder service.BackupHandlerHolder } // NewHTTPServer returns a new instance of HTTPServer. -func NewHTTPServer(backends service.BackendsHolder, config *model.Config, - scheduler quartz.Scheduler) *HTTPServer { +func NewHTTPServer( + backends service.BackendsHolder, + config *model.Config, + scheduler quartz.Scheduler, + handlerHolder service.BackupHandlerHolder, +) *HTTPServer { serverConfig := config.ServiceConfig.HTTPServer addr := fmt.Sprintf("%s:%d", serverConfig.GetAddressOrDefault(), serverConfig.GetPortOrDefault()) @@ -106,6 +111,7 @@ func NewHTTPServer(backends service.BackendsHolder, config *model.Config, scheduler: scheduler, restoreService: service.NewRestoreMemory(backends, config, shared.NewRestoreGo()), backupBackends: backends, + handlerHolder: handlerHolder, } } @@ -199,6 +205,9 @@ func (ws *HTTPServer) Start() { // Schedules a full backup operation mux.HandleFunc(ws.api("/backups/schedule/{name}"), ws.scheduleFullBackup) + // Get information on currently running backups + mux.HandleFunc(ws.api("/backups/currentBackup/{name}"), ws.getCurrentBackupInfo) + ws.server.Handler = ws.rateLimiterMiddleware(mux) err := ws.server.ListenAndServe() if err != nil && strings.Contains(err.Error(), "Server closed") { diff --git a/pkg/model/current_backup.go b/pkg/model/current_backup.go new file mode 100644 index 00000000..e8ca38ad --- /dev/null +++ b/pkg/model/current_backup.go @@ -0,0 +1,74 @@ +package model + +import ( + "time" + + "github.com/aerospike/backup-go" +) + +// CurrentBackups represent the current state of backups (full and incremental) +type CurrentBackups struct { + // Full represents the state of a full backup. Nil if no full backup is running. + Full *CurrentBackup `json:"full,omitempty"` + // Incremental represents the state of an incremental backup. Nil if no incremental backup is running. + Incremental *CurrentBackup `json:"incremental,omitempty"` +} + +type CurrentBackup struct { + // TotalRecords: the total number of records to be backed up. + TotalRecords uint64 `json:"total-records,omitempty" example:"100"` + // DoneRecords: the number of records that have been successfully backed up. + DoneRecords uint64 `json:"done-records,omitempty" example:"50"` + // StartTime: the time when the backup operation started. + StartTime time.Time `json:"start-time,omitempty" example:"2006-01-02T15:04:05Z07:00"` + // PercentageDone: the progress of the backup operation as a percentage. + PercentageDone int `json:"percentage-done,omitempty" example:"50"` + // EstimatedEndTime: the estimated time when the backup operation will be completed. + // A nil value indicates that the estimation is not available yet. + EstimatedEndTime *time.Time `json:"estimated-end-time,omitempty" example:"2006-01-02T15:04:05Z07:00"` +} + +func NewCurrentBackup(handlers map[string]*backup.BackupHandler) *CurrentBackup { + if len(handlers) == 0 { + return nil + } + + var total, done uint64 + for _, handler := range handlers { + done += handler.GetStats().GetReadRecords() + total += handler.GetStats().TotalRecords + } + if total == 0 { + return nil + } + percent := float64(done) / float64(total) + + startTime := GetAnyHandler(handlers).GetStats().StartTime + + return &CurrentBackup{ + TotalRecords: total, + DoneRecords: done, + StartTime: startTime, + PercentageDone: int(percent * 100), + EstimatedEndTime: calculateEstimatedEndTime(startTime, percent), + } +} + +func GetAnyHandler(m map[string]*backup.BackupHandler) *backup.BackupHandler { + for _, value := range m { + return value + } + + return nil +} + +func calculateEstimatedEndTime(startTime time.Time, percentDone float64) *time.Time { + if percentDone < 0.01 { // too early to calculate estimation, or zero done yet. + return nil + } + + elapsed := time.Since(startTime) + totalTime := time.Duration(float64(elapsed) / percentDone) + result := startTime.Add(totalTime) + return &result +} diff --git a/pkg/model/restore_policy.go b/pkg/model/restore_policy.go index 3281c2d4..d6e26449 100644 --- a/pkg/model/restore_policy.go +++ b/pkg/model/restore_policy.go @@ -2,8 +2,6 @@ package model import ( "fmt" - - "github.com/aerospike/backup-go/models" ) // RestorePolicy represents a policy for the restore operation. @@ -31,7 +29,7 @@ type RestorePolicy struct { BatchSize *int32 `json:"batch-size,omitempty" example:"128"` // Namespace details for the restore operation. // By default, the data is restored to the namespace from which it was taken. - Namespace *models.RestoreNamespace `json:"namespace,omitempty"` + Namespace *RestoreNamespace `json:"namespace,omitempty"` // The sets to restore (optional, an empty list implies restoring all sets). SetList []string `json:"set-list,omitempty" example:"set1,set2"` // The bins to restore (optional, an empty list implies restoring all bins). diff --git a/pkg/service/backup_handler.go b/pkg/service/backup_handler.go index a770a4e3..72bcfb34 100644 --- a/pkg/service/backup_handler.go +++ b/pkg/service/backup_handler.go @@ -1,11 +1,15 @@ package service import ( + "context" "fmt" "log/slog" "strconv" "time" + "github.com/aerospike/aerospike-client-go/v7" + "github.com/aerospike/backup-go" + "github.com/aerospike/backup-go/models" "github.com/aerospike/backup/pkg/model" "github.com/aerospike/backup/pkg/shared" "github.com/aerospike/backup/pkg/util" @@ -24,8 +28,15 @@ type BackupHandler struct { secretAgent *model.SecretAgent state *model.BackupState retry *RetryService + + // backup handlers by namespace + fullBackupHandlers map[string]*backup.BackupHandler + incrBackupHandlers map[string]*backup.BackupHandler } +// BackupHandlerHolder stores backupHandlers by routine name +type BackupHandlerHolder map[string]*BackupHandler + var backupService shared.Backup = shared.NewBackupGo() // newBackupHandler returns a new BackupHandler instance. @@ -49,44 +60,59 @@ func newBackupHandler(config *model.Config, routineName string, backupBackend *B } return &BackupHandler{ - backend: backupBackend, - backupRoutine: backupRoutine, - backupFullPolicy: backupPolicy, - backupIncrPolicy: backupPolicy.CopySMDDisabled(), // incremental backups should not contain metadata - routineName: routineName, - namespaces: namespaces, - cluster: cluster, - storage: storage, - secretAgent: secretAgent, - state: backupBackend.readState(), - retry: NewRetryService(routineName), + backend: backupBackend, + backupRoutine: backupRoutine, + backupFullPolicy: backupPolicy, + backupIncrPolicy: backupPolicy.CopySMDDisabled(), // incremental backups should not contain metadata + routineName: routineName, + namespaces: namespaces, + cluster: cluster, + storage: storage, + secretAgent: secretAgent, + state: backupBackend.readState(), + retry: NewRetryService(routineName), + fullBackupHandlers: make(map[string]*backup.BackupHandler), + incrBackupHandlers: make(map[string]*backup.BackupHandler), }, nil } -func (h *BackupHandler) runFullBackup(now time.Time) { +func (h *BackupHandler) runFullBackup(ctx context.Context, now time.Time) { h.retry.retry( - func() error { return h.runFullBackupInternal(now) }, + func() error { return h.runFullBackupInternal(ctx, now) }, time.Duration(h.backupFullPolicy.GetRetryDelayOrDefault())*time.Millisecond, h.backupFullPolicy.GetMaxRetriesOrDefault(), ) } -func (h *BackupHandler) runFullBackupInternal(now time.Time) error { +func (h *BackupHandler) runFullBackupInternal(ctx context.Context, now time.Time) error { + var err error if !h.backend.FullBackupInProgress().CompareAndSwap(false, true) { slog.Info("Full backup is currently in progress, skipping full backup", "name", h.routineName) return nil } slog.Debug("Acquire fullBackupInProgress lock", "name", h.routineName) + + client, aerr := aerospike.NewClientWithPolicyAndHost(h.cluster.ASClientPolicy(), h.cluster.ASClientHosts()...) + if aerr != nil { + return fmt.Errorf("failed to connect to aerospike cluster, %w", aerr) + } + // release the lock defer func() { h.backend.FullBackupInProgress().Store(false) slog.Debug("Release fullBackupInProgress lock", "name", h.routineName) + client.Close() + clear(h.fullBackupHandlers) }() - for _, namespace := range h.namespaces { - err := h.fullBackupForNamespace(now, namespace) - if err != nil { - return err - } + + err = h.startFullBackupForAllNamespaces(now, client) + if err != nil { + return err + } + + err = h.waitForFullBackups(ctx, now) + if err != nil { + return err } // increment backupCounter metric @@ -101,6 +127,48 @@ func (h *BackupHandler) runFullBackupInternal(now time.Time) error { return nil } +func (h *BackupHandler) startFullBackupForAllNamespaces(upperBound time.Time, client *aerospike.Client) error { + clear(h.fullBackupHandlers) + + options := shared.BackupOptions{} + if h.backupFullPolicy.IsSealed() { + options.ModBefore = &upperBound + } + + for _, namespace := range h.namespaces { + backupFolder := getFullPath(h.backend.fullBackupsPath, h.backupFullPolicy, namespace, upperBound) + backupPath := h.backend.wrapWithPrefix(backupFolder) + handler, err := backupService.BackupRun(h.backupRoutine, h.backupFullPolicy, client, + h.storage, h.secretAgent, options, &namespace, backupPath) + if err != nil { + backupFailureCounter.Inc() + return fmt.Errorf("could not start backup of namespace %s, routine %s: %w", namespace, h.routineName, err) + } + + h.fullBackupHandlers[namespace] = handler + } + + return nil +} + +func (h *BackupHandler) waitForFullBackups(ctx context.Context, backupTimestamp time.Time) error { + startTime := time.Now() // startTime is only used to measure backup time + for namespace, handler := range h.fullBackupHandlers { + err := handler.Wait(ctx) + if err != nil { + backupFailureCounter.Inc() + return fmt.Errorf("error during backup namespace %s, routine %s: %w", namespace, h.routineName, err) + } + + backupFolder := getFullPath(h.backend.fullBackupsPath, h.backupFullPolicy, namespace, backupTimestamp) + if err := h.writeBackupMetadata(handler.GetStats(), backupTimestamp, namespace, backupFolder); err != nil { + return err + } + } + backupDurationGauge.Set(float64(time.Since(startTime).Milliseconds())) + return nil +} + func (h *BackupHandler) writeClusterConfiguration(now time.Time) { infos, err := getClusterConfiguration(h.cluster) if err != nil || len(infos) == 0 { @@ -119,36 +187,27 @@ func (h *BackupHandler) writeClusterConfiguration(now time.Time) { } } -func (h *BackupHandler) fullBackupForNamespace(upperBound time.Time, namespace string) error { - backupFolder := getFullPath(h.backend.fullBackupsPath, h.backupFullPolicy, namespace, upperBound) - h.backend.CreateFolder(backupFolder) - - options := shared.BackupOptions{} - if h.backupFullPolicy.IsSealed() { - options.ModBefore = &upperBound +func (h *BackupHandler) writeBackupMetadata(stats *models.BackupStats, + created time.Time, + namespace string, + backupFolder string) error { + metadata := model.BackupMetadata{ + From: time.Time{}, + Created: created, + Namespace: namespace, + RecordCount: stats.GetReadRecords(), + FileCount: stats.GetFileCount(), + ByteCount: stats.GetBytesWritten(), + SecondaryIndexCount: uint64(stats.GetSIndexes()), + UDFCount: uint64(stats.GetUDFs()), } - slog.Debug("Starting full backup", "up to", upperBound, "name", h.routineName) - - started := time.Now() - backupPath := h.backend.wrapWithPrefix(backupFolder) - stats, err := backupService.BackupRun(h.backupRoutine, h.backupFullPolicy, h.cluster, - h.storage, h.secretAgent, options, &namespace, backupPath) - elapsed := time.Since(started) - backupDurationGauge.Set(float64(elapsed.Milliseconds())) - slog.Debug("Completed full backup", "name", h.routineName) - - if err != nil { - backupFailureCounter.Inc() - return fmt.Errorf("error during backup namespace %s, routine %s: %w", namespace, h.routineName, err) - } - - metadata := stats.ToMetadata(time.Time{}, upperBound, namespace) if err := h.backend.writeBackupMetadata(backupFolder, metadata); err != nil { slog.Error("Could not write backup metadata", "name", h.routineName, "folder", backupFolder, "err", err) return err } + return nil } @@ -162,7 +221,7 @@ func (h *BackupHandler) cleanIncrementalBackups() { } } -func (h *BackupHandler) runIncrementalBackup(now time.Time) { +func (h *BackupHandler) runIncrementalBackup(ctx context.Context, now time.Time) { if h.state.LastFullRunIsEmpty() { slog.Debug("Skip incremental backup until initial full backup is done", "name", h.routineName) @@ -173,10 +232,24 @@ func (h *BackupHandler) runIncrementalBackup(now time.Time) { "name", h.routineName) return } - for _, namespace := range h.namespaces { - h.runIncrBackupForNamespace(now, namespace) + if len(h.incrBackupHandlers) > 0 { + slog.Debug("Incremental backup is currently in progress, skipping incremental backup", + "name", h.routineName) + return } + client, aerr := aerospike.NewClientWithPolicyAndHost(h.cluster.ASClientPolicy(), h.cluster.ASClientHosts()...) + if aerr != nil { + slog.Error("failed to connect to aerospike cluster", "err", aerr) + } + defer func() { + client.Close() + clear(h.incrBackupHandlers) + }() + + h.startIncrementalBackupForAllNamespaces(client, now) + + h.waitForIncrementalBackups(ctx, now) // increment incrBackupCounter metric incrBackupCounter.Inc() @@ -184,10 +257,7 @@ func (h *BackupHandler) runIncrementalBackup(now time.Time) { h.updateIncrementalBackupState(now) } -func (h *BackupHandler) runIncrBackupForNamespace(upperBound time.Time, namespace string) { - backupFolder := getIncrementalPath(h.backend.incrementalBackupsPath, namespace, upperBound) - h.backend.CreateFolder(backupFolder) - +func (h *BackupHandler) startIncrementalBackupForAllNamespaces(client *aerospike.Client, upperBound time.Time) { fromEpoch := h.state.LastRunEpoch() options := shared.BackupOptions{ ModAfter: util.Ptr(time.Unix(0, fromEpoch)), @@ -196,38 +266,42 @@ func (h *BackupHandler) runIncrBackupForNamespace(upperBound time.Time, namespac options.ModBefore = &upperBound } - slog.Debug("Starting incremental backup", "name", h.routineName) - started := time.Now() - backupPath := h.backend.wrapWithPrefix(backupFolder) - stats, err := backupService.BackupRun( - h.backupRoutine, h.backupIncrPolicy, h.cluster, h.storage, h.secretAgent, options, &namespace, backupPath) - elapsed := time.Since(started) - - incrBackupDurationGauge.Set(float64(elapsed.Milliseconds())) - slog.Debug("Completed incremental backup", "name", h.routineName) - - if err != nil { - slog.Warn("Failed incremental backup", "name", h.routineName, "err", err) - incrBackupFailureCounter.Inc() - return - } - // delete if the backup file is empty - if h.isBackupEmpty(stats) { - h.deleteEmptyBackup(backupFolder, h.routineName) - } else { - metadata := stats.ToMetadata(time.Unix(0, fromEpoch), upperBound, namespace) - if err := h.backend.writeBackupMetadata(backupFolder, metadata); err != nil { - slog.Error("Could not write backup metadata", "name", h.routineName, - "folder", backupFolder, "err", err) + clear(h.incrBackupHandlers) + for _, namespace := range h.namespaces { + backupFolder := getIncrementalPath(h.backend.incrementalBackupsPath, namespace, upperBound) + backupPath := h.backend.wrapWithPrefix(backupFolder) + handler, err := backupService.BackupRun( + h.backupRoutine, h.backupIncrPolicy, client, h.storage, h.secretAgent, options, &namespace, backupPath) + if err != nil { + incrBackupFailureCounter.Inc() + slog.Warn("could not start backup", "namespace", namespace, "routine", h.routineName, "err", err) } + h.incrBackupHandlers[namespace] = handler } } -func (h *BackupHandler) isBackupEmpty(stats *shared.BackupStat) bool { - if stats == nil { - return true +func (h *BackupHandler) waitForIncrementalBackups(ctx context.Context, backupTimestamp time.Time) { + startTime := time.Now() // startTime is only used to measure backup time + for namespace, handler := range h.incrBackupHandlers { + err := handler.Wait(ctx) + if err != nil { + slog.Warn("Failed incremental backup", "name", h.routineName, "err", err) + incrBackupFailureCounter.Inc() + } + + backupFolder := getIncrementalPath(h.backend.incrementalBackupsPath, namespace, backupTimestamp) + // delete if the backup file is empty + if handler.GetStats().IsEmpty() { + h.deleteEmptyBackup(backupFolder, h.routineName) + } else { + if err := h.writeBackupMetadata(handler.GetStats(), backupTimestamp, namespace, backupFolder); err != nil { + slog.Error("Could not write backup metadata", "name", h.routineName, + "folder", backupFolder, "err", err) + } + } } - return stats.IsEmpty() + + incrBackupDurationGauge.Set(float64(time.Since(startTime).Milliseconds())) } func (h *BackupHandler) deleteEmptyBackup(path string, routineName string) { @@ -278,3 +352,10 @@ func getConfigurationPath(fullBackupsPath string, backupPolicy *model.BackupPoli func timeSuffix(now time.Time) string { return strconv.FormatInt(now.UnixMilli(), 10) } + +func (h *BackupHandler) GetCurrentStat() *model.CurrentBackups { + return &model.CurrentBackups{ + Full: model.NewCurrentBackup(h.fullBackupHandlers), + Incremental: model.NewCurrentBackup(h.incrBackupHandlers), + } +} diff --git a/pkg/service/backup_job.go b/pkg/service/backup_job.go index 8cb386cf..45af3609 100644 --- a/pkg/service/backup_job.go +++ b/pkg/service/backup_job.go @@ -20,14 +20,14 @@ type backupJob struct { var _ quartz.Job = (*backupJob)(nil) // Execute is called by a Scheduler when the Trigger associated with this job fires. -func (j *backupJob) Execute(_ context.Context) error { +func (j *backupJob) Execute(ctx context.Context) error { if j.isRunning.CompareAndSwap(false, true) { defer j.isRunning.Store(false) switch j.jobType { case quartzGroupBackupFull: - j.handler.runFullBackup(time.Now()) + j.handler.runFullBackup(ctx, time.Now()) case quartzGroupBackupIncremental: - j.handler.runIncrementalBackup(time.Now()) + j.handler.runIncrementalBackup(ctx, time.Now()) default: slog.Error("Unsupported backup type", "type", j.jobType, diff --git a/pkg/service/backup_scheduler.go b/pkg/service/backup_scheduler.go index 856fe62b..8455a63e 100644 --- a/pkg/service/backup_scheduler.go +++ b/pkg/service/backup_scheduler.go @@ -51,40 +51,49 @@ func ApplyNewConfig(scheduler quartz.Scheduler, config *model.Config, backends B backends.SetData(BuildBackupBackends(config)) - return scheduleRoutines(scheduler, config, backends) + return scheduleRoutines(scheduler, config, MakeHandlers(config, backends)) } // ScheduleBackup creates a new quartz.Scheduler, schedules all the configured backup jobs, // starts and returns the scheduler. -func ScheduleBackup(ctx context.Context, config *model.Config, backends BackendsHolder, +func ScheduleBackup(ctx context.Context, config *model.Config, handlers BackupHandlerHolder, ) (quartz.Scheduler, error) { scheduler := quartz.NewStdScheduler() scheduler.Start(ctx) - err := scheduleRoutines(scheduler, config, backends) + err := scheduleRoutines(scheduler, config, handlers) if err != nil { return nil, err } return scheduler, nil } -func scheduleRoutines(scheduler quartz.Scheduler, config *model.Config, backends BackendsHolder) error { - for routineName, routine := range config.BackupRoutines { +func MakeHandlers(config *model.Config, backends BackendsHolder) BackupHandlerHolder { + handlers := make(BackupHandlerHolder) + for routineName := range config.BackupRoutines { backend, _ := backends.Get(routineName) handler, err := newBackupHandler(config, routineName, backend) if err != nil { slog.Error("failed to create backup handler", "routine", routineName, "err", err) continue } + handlers[routineName] = handler + } + return handlers +} + +func scheduleRoutines(scheduler quartz.Scheduler, config *model.Config, handlers BackupHandlerHolder) error { + for routineName, routine := range config.BackupRoutines { + handler := handlers[routineName] // schedule full backup job for the routine - if err := scheduleFullBackup(scheduler, handler, routine, routineName); err != nil { + if err := scheduleFullBackup(scheduler, handler, routine.IntervalCron, routineName); err != nil { return err } if routine.IncrIntervalCron != "" { // schedule incremental backup job for the routine - if err := scheduleIncrementalBackup(scheduler, handler, routine, routineName); err != nil { + if err := scheduleIncrementalBackup(scheduler, handler, routine.IncrIntervalCron, routineName); err != nil { return err } } @@ -93,8 +102,8 @@ func scheduleRoutines(scheduler quartz.Scheduler, config *model.Config, backends } func scheduleFullBackup(scheduler quartz.Scheduler, handler *BackupHandler, - routine *model.BackupRoutine, routineName string) error { - fullCronTrigger, err := quartz.NewCronTrigger(routine.IntervalCron) + interval string, routineName string) error { + fullCronTrigger, err := quartz.NewCronTrigger(interval) if err != nil { return err } @@ -121,8 +130,8 @@ func scheduleFullBackup(scheduler quartz.Scheduler, handler *BackupHandler, } func scheduleIncrementalBackup(scheduler quartz.Scheduler, handler *BackupHandler, - routine *model.BackupRoutine, routineName string) error { - incrCronTrigger, err := quartz.NewCronTrigger(routine.IncrIntervalCron) + interval string, routineName string) error { + incrCronTrigger, err := quartz.NewCronTrigger(interval) if err != nil { return err } diff --git a/pkg/shared/backup_go.go b/pkg/shared/backup_go.go index e02721de..1a6db9e8 100644 --- a/pkg/shared/backup_go.go +++ b/pkg/shared/backup_go.go @@ -29,15 +29,8 @@ func NewBackupGo() *BackupGo { // //nolint:funlen,gocritic func (b *BackupGo) BackupRun(backupRoutine *model.BackupRoutine, backupPolicy *model.BackupPolicy, - cluster *model.AerospikeCluster, storage *model.Storage, _ *model.SecretAgent, - opts BackupOptions, namespace *string, path *string) (*BackupStat, error) { - - var err error - client, err := a.NewClientWithPolicyAndHost(cluster.ASClientPolicy(), cluster.ASClientHosts()...) - if err != nil { - return nil, fmt.Errorf("failed to connect to aerospike cluster, %w", err) - } - defer client.Close() + client *a.Client, storage *model.Storage, _ *model.SecretAgent, + opts BackupOptions, namespace *string, path *string) (*backup.BackupHandler, error) { backupClient, err := backup.NewClient(client, "1", slog.Default()) if err != nil { @@ -111,18 +104,7 @@ func (b *BackupGo) BackupRun(backupRoutine *model.BackupRoutine, backupPolicy *m return nil, fmt.Errorf("failed to start backup, %w", err) } - err = handler.Wait(ctx) - if err != nil { - return nil, fmt.Errorf("error during backup, %w", err) - } - - return &BackupStat{ - RecordCount: handler.GetStats().GetReadRecords(), - IndexCount: uint64(handler.GetStats().GetSIndexes()), - UDFCount: uint64(handler.GetStats().GetUDFs()), - ByteCount: handler.GetStats().GetBytesWritten(), - FileCount: handler.GetStats().GetFileCount(), - }, nil + return handler, nil } func getWriter(ctx context.Context, path *string, storage *model.Storage) (backup.WriteFactory, error) { diff --git a/pkg/shared/backup_mock.go b/pkg/shared/backup_mock.go deleted file mode 100644 index 41c46943..00000000 --- a/pkg/shared/backup_mock.go +++ /dev/null @@ -1,29 +0,0 @@ -//go:build ci - -package shared - -import ( - "log/slog" - - "github.com/aerospike/backup/pkg/model" -) - -// BackupShared mocks the Backup interface. -// Used in CI workflows to skip building the C shared libraries. -type BackupShared struct { -} - -var _ Backup = (*BackupShared)(nil) - -// NewBackup returns a new BackupShared instance. -func NewBackup() *BackupShared { - return &BackupShared{} -} - -// BackupRun mocks the interface method. -func (b *BackupShared) BackupRun(_ *model.BackupRoutine, _ *model.BackupPolicy, - _ *model.AerospikeCluster, _ *model.Storage, _ *model.SecretAgent, - _ BackupOptions, _ *string, _ *string) (*BackupStat, error) { - slog.Info("BackupRun mock call") - return &BackupStat{}, nil -} diff --git a/pkg/shared/restore_go.go b/pkg/shared/restore_go.go index 11b2dc10..8c0736e2 100644 --- a/pkg/shared/restore_go.go +++ b/pkg/shared/restore_go.go @@ -74,7 +74,12 @@ func (r *RestoreGo) RestoreRun(restoreRequest *model.RestoreRequestInternal) (*m config.NoUDFs = true } - config.Namespace = restoreRequest.Policy.Namespace + if restoreRequest.Policy.Namespace != nil { + config.Namespace = &models.RestoreNamespace{ + Source: restoreRequest.Policy.Namespace.Source, + Destination: restoreRequest.Policy.Namespace.Destination, + } + } if restoreRequest.Policy.Parallel != nil { config.Parallel = int(*restoreRequest.Policy.Parallel) @@ -150,7 +155,8 @@ func recordExistsAction(replace, unique *bool) a.RecordExistsAction { } } -func getReader(ctx context.Context, path *string, storage *model.Storage, decoder encoding.DecoderFactory) (backup.StreamingReader, error) { +func getReader(ctx context.Context, path *string, storage *model.Storage, decoder encoding.DecoderFactory, +) (backup.StreamingReader, error) { switch storage.Type { case model.Local: return local.NewDirectoryStreamingReader(*path, decoder) diff --git a/pkg/shared/types.go b/pkg/shared/types.go index 14f02d13..9d028298 100644 --- a/pkg/shared/types.go +++ b/pkg/shared/types.go @@ -3,6 +3,8 @@ package shared import ( "time" + "github.com/aerospike/aerospike-client-go/v7" + "github.com/aerospike/backup-go" "github.com/aerospike/backup/pkg/model" ) @@ -12,51 +14,20 @@ type BackupOptions struct { ModAfter *time.Time } -// BackupStat represents partial backup result statistics returned from asbackup library. -type BackupStat struct { - RecordCount uint64 - ByteCount uint64 - FileCount uint64 - IndexCount uint64 - UDFCount uint64 -} - -// IsEmpty indicates whether the backup operation represented by the -// BackupStat completed with an empty data set. -func (stats *BackupStat) IsEmpty() bool { - return stats.RecordCount == 0 && - stats.UDFCount == 0 && - stats.IndexCount == 0 -} - // Backup represents a backup service. type Backup interface { - BackupRun( - backupRoutine *model.BackupRoutine, + BackupRun(backupRoutine *model.BackupRoutine, backupPolicy *model.BackupPolicy, - cluster *model.AerospikeCluster, + client *aerospike.Client, storage *model.Storage, secretAgent *model.SecretAgent, opts BackupOptions, namespace *string, path *string, - ) (*BackupStat, error) + ) (*backup.BackupHandler, error) } // Restore represents a restore service. type Restore interface { RestoreRun(restoreRequest *model.RestoreRequestInternal) (*model.RestoreResult, error) } - -func (stats *BackupStat) ToMetadata(from, created time.Time, namespace string) model.BackupMetadata { - return model.BackupMetadata{ - Created: created, - From: from, - Namespace: namespace, - RecordCount: stats.RecordCount, - FileCount: stats.FileCount, - ByteCount: stats.ByteCount, - SecondaryIndexCount: stats.IndexCount, - UDFCount: stats.UDFCount, - } -}