From d6a23b8162155aec8fe93150dac653f0506b532b Mon Sep 17 00:00:00 2001 From: Eugene R Date: Thu, 16 Nov 2023 11:02:40 +0200 Subject: [PATCH] APPS-990 Add backup routine configuration (#52) --- .golangci.yml | 1 + cmd/backup/config/config.yml | 20 +- cmd/backup/config/minio_config.yml | 37 ++-- docs/swagger.json | 207 +++++++++++++++--- docs/swagger.yaml | 138 ++++++++++-- internal/server/config_handlers.go | 106 +++++++++ internal/server/server.go | 18 ++ pkg/model/config.go | 126 ++++++++--- pkg/service/backup_handler.go | 40 ++-- pkg/service/configuration_manager_local.go | 1 + pkg/service/configuration_manager_s3.go | 3 +- pkg/service/configuration_service_cluster.go | 8 +- .../configuration_service_cluster_test.go | 11 +- pkg/service/configuration_service_policy.go | 25 +-- .../configuration_service_policy_test.go | 27 +-- pkg/service/configuration_service_routine.go | 58 +++++ .../configuration_service_routine_test.go | 90 ++++++++ pkg/service/configuration_service_storage.go | 24 +- .../configuration_service_storage_test.go | 15 +- pkg/service/scheduler.go | 5 +- pkg/shared/backup.go | 20 +- pkg/shared/backup_mock.go | 4 +- pkg/shared/types.go | 9 +- 23 files changed, 780 insertions(+), 213 deletions(-) create mode 100644 pkg/service/configuration_service_routine.go create mode 100644 pkg/service/configuration_service_routine_test.go diff --git a/.golangci.yml b/.golangci.yml index a728f8fe..22738b0d 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -37,6 +37,7 @@ issues: - unparam - prealloc - funlen + - goconst - path: pkg/shared linters: - typecheck diff --git a/cmd/backup/config/config.yml b/cmd/backup/config/config.yml index 5c0c393b..e05b4467 100644 --- a/cmd/backup/config/config.yml +++ b/cmd/backup/config/config.yml @@ -4,8 +4,8 @@ aerospike-clusters: name: "cluster1" host: "localhost" port: 3000 - user: "tester" - password: "psw" + # user: "tester" + # password: "psw" storage: local1: @@ -14,16 +14,20 @@ storage: path: "./testout" backup-policies: - policy-daily: - name: "policy-daily" + policy1: + name: "policy1" + type: 1 + parallel: 1 + remove-files: false + +backup-routines: + routine1: + name: "routine1" interval: 30000 incr-interval: 5000 - type: 1 + backup-policy: "policy1" source-cluster: "cluster1" storage: "local1" namespace: "test" set-list: - set1 - parallel: 1 - remove-files: false - \ No newline at end of file diff --git a/cmd/backup/config/minio_config.yml b/cmd/backup/config/minio_config.yml index 88917b85..f3817133 100644 --- a/cmd/backup/config/minio_config.yml +++ b/cmd/backup/config/minio_config.yml @@ -1,27 +1,34 @@ --- -aerospike-cluster: - - name: "cluster1" +aerospike-clusters: + cluster1: + name: "cluster1" host: "localhost" port: 3000 # user: "tester" # password: "psw" -backup-storage: - - name: "s3" +storage: + s3: + name: "s3" type: 1 - s3_endpoint_override: "http://localhost:9000" - s3_region: "eu-central-1" - s3_profile: "minio" + s3-endpoint_override: "http://localhost:9000" + s3-region: "eu-central-1" + s3-profile: "minio" path: "s3://as-backup-bucket/test-backup" -backup-policy: - - name: "policy-daily" - interval: 30000 - incr_interval: 5000 +backup-policies: + policy1: + name: "policy1" type: 1 - source_cluster: "cluster1" + parallel: 1 + remove-files: false + +backup-routines: + routine1: + name: "routine1" + interval: 30000 + incr-interval: 5000 + backup-policy: "policy1" + source-cluster: "cluster1" storage: "s3" namespace: "test" - parallel: 1 - remove_files: false - \ No newline at end of file diff --git a/docs/swagger.json b/docs/swagger.json index 2afdfb90..03b9be17 100644 --- a/docs/swagger.json +++ b/docs/swagger.json @@ -406,6 +406,126 @@ } } }, + "/config/routine": { + "get": { + "produces": [ + "application/json" + ], + "tags": [ + "Configuration" + ], + "summary": "Reads all routines from the configuration.", + "operationId": "readRoutines", + "responses": { + "200": { + "description": "OK", + "schema": { + "type": "object", + "additionalProperties": { + "$ref": "#/definitions/model.BackupRoutine" + } + } + }, + "400": { + "description": "Bad Request", + "schema": { + "type": "string" + } + } + } + }, + "put": { + "consumes": [ + "application/json" + ], + "tags": [ + "Configuration" + ], + "summary": "Updates an existing routine in the configuration.", + "operationId": "updateRoutine", + "parameters": [ + { + "description": "backup routine", + "name": "storage", + "in": "body", + "required": true, + "schema": { + "$ref": "#/definitions/model.BackupRoutine" + } + } + ], + "responses": { + "200": { + "description": "OK" + }, + "400": { + "description": "Bad Request", + "schema": { + "type": "string" + } + } + } + }, + "post": { + "consumes": [ + "application/json" + ], + "tags": [ + "Configuration" + ], + "summary": "Adds a backup routine to the config.", + "operationId": "addRoutine", + "parameters": [ + { + "description": "backup routine", + "name": "storage", + "in": "body", + "required": true, + "schema": { + "$ref": "#/definitions/model.BackupRoutine" + } + } + ], + "responses": { + "201": { + "description": "Created" + }, + "400": { + "description": "Bad Request", + "schema": { + "type": "string" + } + } + } + }, + "delete": { + "tags": [ + "Configuration" + ], + "summary": "Deletes a backup routine from the configuration by name.", + "operationId": "deleteRoutine", + "parameters": [ + { + "type": "string", + "description": "Routine Name", + "name": "name", + "in": "query", + "required": true + } + ], + "responses": { + "204": { + "description": "No Content" + }, + "400": { + "description": "Bad Request", + "schema": { + "type": "string" + } + } + } + } + }, "/config/storage": { "get": { "produces": [ @@ -692,30 +812,15 @@ "model.BackupPolicy": { "type": "object", "properties": { - "after-digest": { - "type": "string" - }, "bandwidth": { "type": "integer" }, - "bin-list": { - "type": "array", - "items": { - "type": "string" - } - }, "file-limit": { "type": "integer" }, "filter-exp": { "type": "string" }, - "incr-interval": { - "type": "integer" - }, - "interval": { - "type": "integer" - }, "max-records": { "type": "integer" }, @@ -725,9 +830,6 @@ "name": { "type": "string" }, - "namespace": { - "type": "string" - }, "no-bins": { "type": "boolean" }, @@ -740,18 +842,9 @@ "no-udfs": { "type": "boolean" }, - "node-list": { - "type": "array", - "items": { - "$ref": "#/definitions/model.Node" - } - }, "parallel": { "type": "integer" }, - "partition-list": { - "type": "string" - }, "records-per-second": { "type": "integer" }, @@ -764,26 +857,64 @@ "retry-delay": { "type": "integer" }, - "set-list": { + "socket-timeout": { + "type": "integer" + }, + "total-timeout": { + "type": "integer" + }, + "type": { + "$ref": "#/definitions/model.BackupType" + } + } + }, + "model.BackupRoutine": { + "type": "object", + "properties": { + "after-digest": { + "type": "string" + }, + "backup-policy": { + "type": "string" + }, + "bin-list": { "type": "array", "items": { "type": "string" } }, - "socket-timeout": { + "incr-interval": { "type": "integer" }, - "source-cluster": { + "interval": { + "type": "integer" + }, + "name": { "type": "string" }, - "storage": { + "namespace": { "type": "string" }, - "total-timeout": { - "type": "integer" + "node-list": { + "type": "array", + "items": { + "$ref": "#/definitions/model.Node" + } }, - "type": { - "$ref": "#/definitions/model.BackupType" + "partition-list": { + "type": "string" + }, + "set-list": { + "type": "array", + "items": { + "type": "string" + } + }, + "source-cluster": { + "type": "string" + }, + "storage": { + "type": "string" } } }, @@ -813,6 +944,12 @@ "$ref": "#/definitions/model.BackupPolicy" } }, + "backup-routines": { + "type": "object", + "additionalProperties": { + "$ref": "#/definitions/model.BackupRoutine" + } + }, "service": { "$ref": "#/definitions/model.HTTPServerConfig" }, diff --git a/docs/swagger.yaml b/docs/swagger.yaml index 0d68b2d9..f91f4b61 100644 --- a/docs/swagger.yaml +++ b/docs/swagger.yaml @@ -29,30 +29,18 @@ definitions: type: object model.BackupPolicy: properties: - after-digest: - type: string bandwidth: type: integer - bin-list: - items: - type: string - type: array file-limit: type: integer filter-exp: type: string - incr-interval: - type: integer - interval: - type: integer max-records: type: integer max-retries: type: integer name: type: string - namespace: - type: string no-bins: type: boolean no-indexes: @@ -61,14 +49,8 @@ definitions: type: boolean no-udfs: type: boolean - node-list: - items: - $ref: '#/definitions/model.Node' - type: array parallel: type: integer - partition-list: - type: string records-per-second: type: integer remove-artifacts: @@ -77,20 +59,45 @@ definitions: type: boolean retry-delay: type: integer - set-list: + socket-timeout: + type: integer + total-timeout: + type: integer + type: + $ref: '#/definitions/model.BackupType' + type: object + model.BackupRoutine: + properties: + after-digest: + type: string + backup-policy: + type: string + bin-list: items: type: string type: array - socket-timeout: + incr-interval: + type: integer + interval: type: integer + name: + type: string + namespace: + type: string + node-list: + items: + $ref: '#/definitions/model.Node' + type: array + partition-list: + type: string + set-list: + items: + type: string + type: array source-cluster: type: string storage: type: string - total-timeout: - type: integer - type: - $ref: '#/definitions/model.BackupType' type: object model.BackupType: enum: @@ -110,6 +117,10 @@ definitions: additionalProperties: $ref: '#/definitions/model.BackupPolicy' type: object + backup-routines: + additionalProperties: + $ref: '#/definitions/model.BackupRoutine' + type: object service: $ref: '#/definitions/model.HTTPServerConfig' storage: @@ -506,6 +517,85 @@ paths: summary: Updates an existing policy in the configuration. tags: - Configuration + /config/routine: + delete: + operationId: deleteRoutine + parameters: + - description: Routine Name + in: query + name: name + required: true + type: string + responses: + "204": + description: No Content + "400": + description: Bad Request + schema: + type: string + summary: Deletes a backup routine from the configuration by name. + tags: + - Configuration + get: + operationId: readRoutines + produces: + - application/json + responses: + "200": + description: OK + schema: + additionalProperties: + $ref: '#/definitions/model.BackupRoutine' + type: object + "400": + description: Bad Request + schema: + type: string + summary: Reads all routines from the configuration. + tags: + - Configuration + post: + consumes: + - application/json + operationId: addRoutine + parameters: + - description: backup routine + in: body + name: storage + required: true + schema: + $ref: '#/definitions/model.BackupRoutine' + responses: + "201": + description: Created + "400": + description: Bad Request + schema: + type: string + summary: Adds a backup routine to the config. + tags: + - Configuration + put: + consumes: + - application/json + operationId: updateRoutine + parameters: + - description: backup routine + in: body + name: storage + required: true + schema: + $ref: '#/definitions/model.BackupRoutine' + responses: + "200": + description: OK + "400": + description: Bad Request + schema: + type: string + summary: Updates an existing routine in the configuration. + tags: + - Configuration /config/storage: delete: operationId: deleteStorage diff --git a/internal/server/config_handlers.go b/internal/server/config_handlers.go index 2f3f1a5a..58dbafd1 100644 --- a/internal/server/config_handlers.go +++ b/internal/server/config_handlers.go @@ -372,3 +372,109 @@ func (ws *HTTPServer) deletePolicy(w http.ResponseWriter, r *http.Request) { } w.WriteHeader(http.StatusNoContent) } + +// addRoutine +// @Summary Adds a backup routine to the config. +// @ID addRoutine +// @Tags Configuration +// @Router /config/routine [post] +// @Accept json +// @Param storage body model.BackupRoutine true "backup routine" +// @Success 201 +// @Failure 400 {string} string +func (ws *HTTPServer) addRoutine(w http.ResponseWriter, r *http.Request) { + var newRoutine model.BackupRoutine + err := json.NewDecoder(r.Body).Decode(&newRoutine) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + err = service.AddRoutine(ws.config, &newRoutine) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + err = ConfigurationManager.WriteConfiguration(ws.config) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + w.WriteHeader(http.StatusCreated) +} + +// readRoutines reads all backup routines from the configuration. +// @Summary Reads all routines from the configuration. +// @ID readRoutines +// @Tags Configuration +// @Router /config/routine [get] +// @Produce json +// @Success 200 {object} map[string]model.BackupRoutine +// @Failure 400 {string} string +func (ws *HTTPServer) readRoutines(w http.ResponseWriter) { + jsonResponse, err := json.Marshal(ws.config.BackupRoutines) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + _, _ = w.Write(jsonResponse) +} + +// updateRoutine updates an existing backup routine in the configuration. +// @Summary Updates an existing routine in the configuration. +// @ID updateRoutine +// @Tags Configuration +// @Router /config/routine [put] +// @Accept json +// @Param storage body model.BackupRoutine true "backup routine" +// @Success 200 +// @Failure 400 {string} string +func (ws *HTTPServer) updateRoutine(w http.ResponseWriter, r *http.Request) { + var updatedRoutine model.BackupRoutine + err := json.NewDecoder(r.Body).Decode(&updatedRoutine) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + err = service.UpdateRoutine(ws.config, &updatedRoutine) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + err = ConfigurationManager.WriteConfiguration(ws.config) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + w.WriteHeader(http.StatusOK) +} + +// deleteRoutine +// @Summary Deletes a backup routine from the configuration by name. +// @ID deleteRoutine +// @Tags Configuration +// @Router /config/routine [delete] +// @Param name query string true "Routine Name" +// @Success 204 +// @Failure 400 {string} string +func (ws *HTTPServer) deleteRoutine(w http.ResponseWriter, r *http.Request) { + routineName := r.URL.Query().Get("name") + if routineName == "" { + http.Error(w, "Routine name is required", http.StatusBadRequest) + return + } + + err := service.DeleteRoutine(ws.config, &routineName) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + err = ConfigurationManager.WriteConfiguration(ws.config) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + w.WriteHeader(http.StatusNoContent) +} diff --git a/internal/server/server.go b/internal/server/server.go index 343b1c4e..c559afdb 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -155,6 +155,9 @@ func (ws *HTTPServer) Start() { // policy config route mux.HandleFunc("/config/policy", ws.configPolicyActionHandler) + // routine config route + mux.HandleFunc("/config/routine", ws.configRoutineActionHandler) + // health route mux.HandleFunc("/health", healthActionHandler) @@ -248,3 +251,18 @@ func (ws *HTTPServer) configPolicyActionHandler(w http.ResponseWriter, r *http.R http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) } } + +func (ws *HTTPServer) configRoutineActionHandler(w http.ResponseWriter, r *http.Request) { + switch r.Method { + case http.MethodPost: + ws.addRoutine(w, r) + case http.MethodGet: + ws.readRoutines(w) + case http.MethodPut: + ws.updateRoutine(w, r) + case http.MethodDelete: + ws.deleteRoutine(w, r) + default: + http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) + } +} diff --git a/pkg/model/config.go b/pkg/model/config.go index 9f4cf625..c2a20fad 100644 --- a/pkg/model/config.go +++ b/pkg/model/config.go @@ -2,6 +2,8 @@ package model import ( "encoding/json" + "errors" + "fmt" "log/slog" "os" "sync" @@ -10,11 +12,14 @@ import ( ) // Config represents the service configuration file. +// +//nolint:lll type Config struct { HTTPServer *HTTPServerConfig `yaml:"service,omitempty" json:"service,omitempty"` AerospikeClusters map[string]*AerospikeCluster `yaml:"aerospike-clusters,omitempty" json:"aerospike-clusters,omitempty"` Storage map[string]*Storage `yaml:"storage,omitempty" json:"storage,omitempty"` BackupPolicies map[string]*BackupPolicy `yaml:"backup-policies,omitempty" json:"backup-policies,omitempty"` + BackupRoutines map[string]*BackupRoutine `yaml:"backup-routines,omitempty" json:"backup-routines,omitempty"` } // NewConfigWithDefaultValues returns a new Config with default values. @@ -32,6 +37,21 @@ func NewConfigWithDefaultValues() *Config { return config } +// Validate validates the configuration. +func (c *Config) Validate() error { + for _, routine := range c.BackupRoutines { + if err := routine.Validate(); err != nil { + return err + } + } + for _, storage := range c.Storage { + if err := storage.Validate(); err != nil { + return err + } + } + return nil +} + // String satisfies the fmt.Stringer interface. func (c Config) String() string { cfg, err := json.Marshal(c) @@ -100,36 +120,86 @@ type Storage struct { S3LogLevel *string `yaml:"s3-log-level,omitempty" json:"s3-log-level,omitempty"` } +// Validate validates the storage configuration. +func (s *Storage) Validate() error { + if s.Name == nil || *s.Name == "" { + return errors.New("storage name is required") + } + if s.Type == nil { + return errors.New("storage type is required") + } + if s.Path == nil { + return errors.New("storage path is required") + } + return nil +} + // BackupPolicy represents a scheduled backup policy. type BackupPolicy struct { - Name *string `yaml:"name,omitempty" json:"name,omitempty"` - IntervalMillis *int64 `yaml:"interval,omitempty" json:"interval,omitempty"` - IncrIntervalMillis *int64 `yaml:"incr-interval,omitempty" json:"incr-interval,omitempty"` - BackupType *BackupType `yaml:"type,omitempty" json:"type,omitempty"` - SourceCluster *string `yaml:"source-cluster,omitempty" json:"source-cluster,omitempty"` - Storage *string `yaml:"storage,omitempty" json:"storage,omitempty"` - Namespace *string `yaml:"namespace,omitempty" json:"namespace,omitempty"` - Parallel *int32 `yaml:"parallel,omitempty" json:"parallel,omitempty"` - SetList []string `yaml:"set-list,omitempty" json:"set-list,omitempty"` - BinList []string `yaml:"bin-list,omitempty" json:"bin-list,omitempty"` - NodeList []Node `yaml:"node-list,omitempty" json:"node-list,omitempty"` - SocketTimeout *uint32 `yaml:"socket-timeout,omitempty" json:"socket-timeout,omitempty"` - TotalTimeout *uint32 `yaml:"total-timeout,omitempty" json:"total-timeout,omitempty"` - MaxRetries *uint32 `yaml:"max-retries,omitempty" json:"max-retries,omitempty"` - RetryDelay *uint32 `yaml:"retry-delay,omitempty" json:"retry-delay,omitempty"` - RemoveFiles *bool `yaml:"remove-files,omitempty" json:"remove-files,omitempty"` - RemoveArtifacts *bool `yaml:"remove-artifacts,omitempty" json:"remove-artifacts,omitempty"` - NoBins *bool `yaml:"no-bins,omitempty" json:"no-bins,omitempty"` - NoRecords *bool `yaml:"no-records,omitempty" json:"no-records,omitempty"` - NoIndexes *bool `yaml:"no-indexes,omitempty" json:"no-indexes,omitempty"` - NoUdfs *bool `yaml:"no-udfs,omitempty" json:"no-udfs,omitempty"` - Bandwidth *uint64 `yaml:"bandwidth,omitempty" json:"bandwidth,omitempty"` - MaxRecords *uint64 `yaml:"max-records,omitempty" json:"max-records,omitempty"` - RecordsPerSecond *uint32 `yaml:"records-per-second,omitempty" json:"records-per-second,omitempty"` - FileLimit *uint64 `yaml:"file-limit,omitempty" json:"file-limit,omitempty"` - PartitionList *string `yaml:"partition-list,omitempty" json:"partition-list,omitempty"` - AfterDigest *string `yaml:"after-digest,omitempty" json:"after-digest,omitempty"` - FilterExp *string `yaml:"filter-exp,omitempty" json:"filter-exp,omitempty"` + Name *string `yaml:"name,omitempty" json:"name,omitempty"` + BackupType *BackupType `yaml:"type,omitempty" json:"type,omitempty"` + Parallel *int32 `yaml:"parallel,omitempty" json:"parallel,omitempty"` + SocketTimeout *uint32 `yaml:"socket-timeout,omitempty" json:"socket-timeout,omitempty"` + TotalTimeout *uint32 `yaml:"total-timeout,omitempty" json:"total-timeout,omitempty"` + MaxRetries *uint32 `yaml:"max-retries,omitempty" json:"max-retries,omitempty"` + RetryDelay *uint32 `yaml:"retry-delay,omitempty" json:"retry-delay,omitempty"` + RemoveFiles *bool `yaml:"remove-files,omitempty" json:"remove-files,omitempty"` + RemoveArtifacts *bool `yaml:"remove-artifacts,omitempty" json:"remove-artifacts,omitempty"` + NoBins *bool `yaml:"no-bins,omitempty" json:"no-bins,omitempty"` + NoRecords *bool `yaml:"no-records,omitempty" json:"no-records,omitempty"` + NoIndexes *bool `yaml:"no-indexes,omitempty" json:"no-indexes,omitempty"` + NoUdfs *bool `yaml:"no-udfs,omitempty" json:"no-udfs,omitempty"` + Bandwidth *uint64 `yaml:"bandwidth,omitempty" json:"bandwidth,omitempty"` + MaxRecords *uint64 `yaml:"max-records,omitempty" json:"max-records,omitempty"` + RecordsPerSecond *uint32 `yaml:"records-per-second,omitempty" json:"records-per-second,omitempty"` + FileLimit *uint64 `yaml:"file-limit,omitempty" json:"file-limit,omitempty"` + FilterExp *string `yaml:"filter-exp,omitempty" json:"filter-exp,omitempty"` +} + +// BackupRoutine represents a scheduled backup operation routine. +type BackupRoutine struct { + Name string `yaml:"name,omitempty" json:"name,omitempty"` + BackupPolicy string `yaml:"backup-policy,omitempty" json:"backup-policy,omitempty"` + SourceCluster string `yaml:"source-cluster,omitempty" json:"source-cluster,omitempty"` + Storage string `yaml:"storage,omitempty" json:"storage,omitempty"` + + IntervalMillis *int64 `yaml:"interval,omitempty" json:"interval,omitempty"` + IncrIntervalMillis *int64 `yaml:"incr-interval,omitempty" json:"incr-interval,omitempty"` + + Namespace *string `yaml:"namespace,omitempty" json:"namespace,omitempty"` + SetList []string `yaml:"set-list,omitempty" json:"set-list,omitempty"` + BinList []string `yaml:"bin-list,omitempty" json:"bin-list,omitempty"` + NodeList []Node `yaml:"node-list,omitempty" json:"node-list,omitempty"` + + PartitionList *string `yaml:"partition-list,omitempty" json:"partition-list,omitempty"` + AfterDigest *string `yaml:"after-digest,omitempty" json:"after-digest,omitempty"` +} + +// Validate validates the backup routine configuration. +func (r *BackupRoutine) Validate() error { + if r.Name == "" { + return routineValidationError("name") + } + if r.BackupPolicy == "" { + return routineValidationError("backup-policy") + } + if r.SourceCluster == "" { + return routineValidationError("source-cluster") + } + if r.Storage == "" { + return routineValidationError("storage") + } + if r.Namespace == nil { + return routineValidationError("namespace") + } + if r.IntervalMillis == nil && r.IncrIntervalMillis == nil { + return errors.New("interval or incr-interval must be specified for backup routine") + } + return nil +} + +func routineValidationError(field string) error { + return fmt.Errorf("%s specification for backup routine is required", field) } // Clone clones the backup policy struct. diff --git a/pkg/service/backup_handler.go b/pkg/service/backup_handler.go index 3e15585f..05782361 100644 --- a/pkg/service/backup_handler.go +++ b/pkg/service/backup_handler.go @@ -22,6 +22,7 @@ type BackupScheduler interface { type BackupHandler struct { backend BackupBackend backupPolicy *model.BackupPolicy + backupRoutine *model.BackupRoutine cluster *model.AerospikeCluster storage *model.Storage state *model.BackupState @@ -33,14 +34,18 @@ var _ BackupScheduler = (*BackupHandler)(nil) var BackupScheduleTick = 1000 * time.Millisecond // NewBackupHandler returns a new BackupHandler instance. -func NewBackupHandler(config *model.Config, backupPolicy *model.BackupPolicy) (*BackupHandler, error) { - cluster, found := config.AerospikeClusters[*backupPolicy.SourceCluster] +func NewBackupHandler(config *model.Config, backupRoutine *model.BackupRoutine) (*BackupHandler, error) { + cluster, found := config.AerospikeClusters[backupRoutine.SourceCluster] if !found { - return nil, fmt.Errorf("cluster not found for %s", *backupPolicy.SourceCluster) + return nil, fmt.Errorf("cluster not found for %s", backupRoutine.SourceCluster) } - storage, found := config.Storage[*backupPolicy.Storage] + storage, found := config.Storage[backupRoutine.Storage] if !found { - return nil, fmt.Errorf("storage not found for %s", *backupPolicy.Storage) + return nil, fmt.Errorf("storage not found for %s", backupRoutine.Storage) + } + backupPolicy, found := config.BackupPolicies[backupRoutine.BackupPolicy] + if !found { + return nil, fmt.Errorf("backupPolicy not found for %s", backupRoutine.BackupPolicy) } var backupBackend BackupBackend @@ -54,11 +59,12 @@ func NewBackupHandler(config *model.Config, backupPolicy *model.BackupPolicy) (* } return &BackupHandler{ - backend: backupBackend, - backupPolicy: backupPolicy, - cluster: cluster, - storage: storage, - state: backupBackend.readState(), + backend: backupBackend, + backupRoutine: backupRoutine, + backupPolicy: backupPolicy, + cluster: cluster, + storage: storage, + state: backupBackend.readState(), }, nil } @@ -67,7 +73,7 @@ func (h *BackupHandler) Schedule(ctx context.Context) { slog.Info("Scheduling full backup", "name", *h.backupPolicy.Name) h.scheduleBackupPeriodically(ctx, h.runFullBackup) - if h.backupPolicy.IncrIntervalMillis != nil && *h.backupPolicy.IncrIntervalMillis > 0 { + if h.backupRoutine.IncrIntervalMillis != nil && *h.backupRoutine.IncrIntervalMillis > 0 { slog.Info("Scheduling incremental backup", "name", *h.backupPolicy.Name) h.scheduleBackupPeriodically(ctx, h.runIncrementalBackup) } @@ -76,7 +82,8 @@ func (h *BackupHandler) Schedule(ctx context.Context) { // scheduleBackupPeriodically runs the backup periodically based on the provided interval. func (h *BackupHandler) scheduleBackupPeriodically( ctx context.Context, - backupFunc func(time.Time)) { + backupFunc func(time.Time), +) { go func() { ticker := time.NewTicker(BackupScheduleTick) defer ticker.Stop() @@ -113,7 +120,8 @@ func (h *BackupHandler) runFullBackup(now time.Time) { } backupRunFunc := func() { started := time.Now() - if !backupService.BackupRun(h.backupPolicy, h.cluster, h.storage, shared.BackupOptions{}) { + if !backupService.BackupRun(h.backupRoutine, h.backupPolicy, h.cluster, + h.storage, shared.BackupOptions{}) { backupFailureCounter.Inc() } else { elapsed := time.Since(started) @@ -159,7 +167,7 @@ func (h *BackupHandler) runIncrementalBackup(now time.Time) { lastIncrRunEpoch := state.LastIncrRun.UnixNano() opts.ModAfter = &lastIncrRunEpoch started := time.Now() - if !backupService.BackupRun(h.backupPolicy, h.cluster, h.storage, opts) { + if !backupService.BackupRun(h.backupRoutine, h.backupPolicy, h.cluster, h.storage, opts) { incrBackupFailureCounter.Inc() } else { elapsed := time.Since(started) @@ -178,11 +186,11 @@ func (h *BackupHandler) runIncrementalBackup(now time.Time) { } func (h *BackupHandler) isFullEligible(n time.Time, t time.Time) bool { - return n.UnixMilli()-t.UnixMilli() >= *h.backupPolicy.IntervalMillis + return n.UnixMilli()-t.UnixMilli() >= *h.backupRoutine.IntervalMillis } func (h *BackupHandler) isIncrementalEligible(n time.Time, t time.Time) bool { - return n.UnixMilli()-t.UnixMilli() >= *h.backupPolicy.IncrIntervalMillis + return n.UnixMilli()-t.UnixMilli() >= *h.backupRoutine.IncrIntervalMillis } func (h *BackupHandler) updateBackupState() { diff --git a/pkg/service/configuration_manager_local.go b/pkg/service/configuration_manager_local.go index 31b2fe4f..d52dcfaf 100644 --- a/pkg/service/configuration_manager_local.go +++ b/pkg/service/configuration_manager_local.go @@ -44,6 +44,7 @@ func (cm *FileConfigurationManager) ReadConfiguration() (*model.Config, error) { return nil, fmt.Errorf("in file %q: %w", filePath, err) } + err = config.Validate() return config, err } diff --git a/pkg/service/configuration_manager_s3.go b/pkg/service/configuration_manager_s3.go index 4d18562a..693fb638 100644 --- a/pkg/service/configuration_manager_s3.go +++ b/pkg/service/configuration_manager_s3.go @@ -22,7 +22,8 @@ func NewS3ConfigurationManager(configStorage *model.Storage) ConfigurationManage func (s *S3ConfigurationManager) ReadConfiguration() (*model.Config, error) { config := model.NewConfigWithDefaultValues() s.readFile(s.Path, config) - return config, nil + err := config.Validate() + return config, err } // WriteConfiguration writes the configuration to S3. diff --git a/pkg/service/configuration_service_cluster.go b/pkg/service/configuration_service_cluster.go index 954c654f..0cfcf77f 100644 --- a/pkg/service/configuration_service_cluster.go +++ b/pkg/service/configuration_service_cluster.go @@ -33,18 +33,18 @@ func UpdateCluster(config *model.Config, updatedCluster *model.AerospikeCluster) } // DeleteCluster -// deletes an AerospikeCluster from the configuration if it is not used in any policy. +// deletes an AerospikeCluster from the configuration if it is not used in any backup routine. func DeleteCluster(config *model.Config, clusterToDeleteName *string) error { _, found := config.AerospikeClusters[*clusterToDeleteName] if !found { return fmt.Errorf("cluster %s not found", *clusterToDeleteName) } - policy, found := util.Find(config.BackupPolicies, func(policy *model.BackupPolicy) bool { - return *policy.SourceCluster == *clusterToDeleteName + routine, found := util.Find(config.BackupRoutines, func(policy *model.BackupRoutine) bool { + return policy.SourceCluster == *clusterToDeleteName }) if found { - return fmt.Errorf("cannot delete cluster as it is used in a policy %s", *policy.Name) + return fmt.Errorf("cannot delete cluster as it is used in a routine %s", routine.Name) } delete(config.AerospikeClusters, *clusterToDeleteName) diff --git a/pkg/service/configuration_service_cluster_test.go b/pkg/service/configuration_service_cluster_test.go index f9841172..76e0ae89 100644 --- a/pkg/service/configuration_service_cluster_test.go +++ b/pkg/service/configuration_service_cluster_test.go @@ -7,7 +7,7 @@ import ( "github.com/aws/smithy-go/ptr" ) -func TestAddCluster(t *testing.T) { +func TestCluster_Add(t *testing.T) { name := "cluster1" config := &model.Config{ AerospikeClusters: map[string]*model.AerospikeCluster{name: {Name: &name}}, @@ -25,7 +25,7 @@ func TestAddCluster(t *testing.T) { } } -func TestUpdateCluster(t *testing.T) { +func TestCluster_Update(t *testing.T) { name := "cluster1" config := &model.Config{ AerospikeClusters: map[string]*model.AerospikeCluster{name: {Name: &name}}, @@ -47,12 +47,15 @@ func TestUpdateCluster(t *testing.T) { } } -func TestDeleteCluster(t *testing.T) { +func TestCluster_Delete(t *testing.T) { name := "cluster1" name2 := "cluster2" + policy := "policy" + routine := "routine" config := &model.Config{ AerospikeClusters: map[string]*model.AerospikeCluster{name: {Name: &name}, name2: {Name: &name2}}, - BackupPolicies: map[string]*model.BackupPolicy{name: {Name: ptr.String("policy1"), SourceCluster: &name}}, + BackupPolicies: map[string]*model.BackupPolicy{policy: {Name: &policy}}, + BackupRoutines: map[string]*model.BackupRoutine{routine: {Name: routine, SourceCluster: name}}, } err := DeleteCluster(config, &name) if err == nil { diff --git a/pkg/service/configuration_service_policy.go b/pkg/service/configuration_service_policy.go index 00d86ba7..b9b6f145 100644 --- a/pkg/service/configuration_service_policy.go +++ b/pkg/service/configuration_service_policy.go @@ -1,32 +1,17 @@ package service import ( - "errors" "fmt" "github.com/aerospike/backup/pkg/model" ) // AddPolicy -// adds a new BackupPolicy to the configuration if a cluster with the same name doesn't already exist. +// adds a new BackupPolicy to the configuration if a policy with the same name doesn't already exist. func AddPolicy(config *model.Config, newPolicy *model.BackupPolicy) error { - if newPolicy.Storage == nil { - return errors.New("storage is nil") - } - if newPolicy.SourceCluster == nil { - return errors.New("cluster is nil") - } - _, found := config.Storage[*newPolicy.Storage] - if !found { - return fmt.Errorf("storage %s not found", *newPolicy.Storage) - } - _, found = config.AerospikeClusters[*newPolicy.SourceCluster] - if !found { - return fmt.Errorf("cluster %s not found", *newPolicy.SourceCluster) - } - _, found = config.BackupPolicies[*newPolicy.Name] + _, found := config.BackupPolicies[*newPolicy.Name] if found { - return fmt.Errorf("aerospike policy with the same name %s already exists", *newPolicy.Name) + return fmt.Errorf("backup policy with the same name %s already exists", *newPolicy.Name) } config.BackupPolicies[*newPolicy.Name] = newPolicy @@ -38,7 +23,7 @@ func AddPolicy(config *model.Config, newPolicy *model.BackupPolicy) error { func UpdatePolicy(config *model.Config, updatedPolicy *model.BackupPolicy) error { _, found := config.BackupPolicies[*updatedPolicy.Name] if !found { - return fmt.Errorf("policy %s not found", *updatedPolicy.Name) + return fmt.Errorf("backup policy %s not found", *updatedPolicy.Name) } config.BackupPolicies[*updatedPolicy.Name] = updatedPolicy @@ -50,7 +35,7 @@ func UpdatePolicy(config *model.Config, updatedPolicy *model.BackupPolicy) error func DeletePolicy(config *model.Config, policyToDeleteName *string) error { _, found := config.BackupPolicies[*policyToDeleteName] if !found { - return fmt.Errorf("policy %s not found", *policyToDeleteName) + return fmt.Errorf("backup policy %s not found", *policyToDeleteName) } delete(config.BackupPolicies, *policyToDeleteName) diff --git a/pkg/service/configuration_service_policy_test.go b/pkg/service/configuration_service_policy_test.go index a65810a7..6775e4a2 100644 --- a/pkg/service/configuration_service_policy_test.go +++ b/pkg/service/configuration_service_policy_test.go @@ -3,12 +3,11 @@ package service import ( "testing" - "github.com/aws/smithy-go/ptr" - "github.com/aerospike/backup/pkg/model" + "github.com/aws/smithy-go/ptr" ) -func TestAddPolicyOK(t *testing.T) { +func TestPolicy_AddOK(t *testing.T) { cluster := "cluster" storage := "storage" config := &model.Config{ @@ -17,33 +16,27 @@ func TestAddPolicyOK(t *testing.T) { BackupPolicies: map[string]*model.BackupPolicy{}, } - pass := model.BackupPolicy{Name: ptr.String("newName"), Storage: &storage, SourceCluster: &cluster} + pass := model.BackupPolicy{Name: ptr.String("newName")} err := AddPolicy(config, &pass) if err != nil { t.Errorf("Expected nil error, got %v", err) } } -func TestAddPolicyErrors(t *testing.T) { +func TestPolicy_AddErrors(t *testing.T) { policy := "policy" cluster := "cluster" storage := "storage" - wrong := "-" fails := []struct { name string policy model.BackupPolicy }{ - {name: "empty", policy: model.BackupPolicy{}}, - {name: "no storage", policy: model.BackupPolicy{SourceCluster: &cluster}}, - {name: "no cluster", policy: model.BackupPolicy{Storage: &storage}}, - {name: "wrong storage", policy: model.BackupPolicy{Storage: &wrong, SourceCluster: &cluster}}, - {name: "wrong cluster", policy: model.BackupPolicy{Storage: &storage, SourceCluster: &wrong}}, {name: "existing policy", policy: model.BackupPolicy{Name: &policy}}, } config := &model.Config{ BackupPolicies: map[string]*model.BackupPolicy{policy: {Name: &policy}}, - AerospikeClusters: map[string]*model.AerospikeCluster{policy: {Name: &cluster}}, + AerospikeClusters: map[string]*model.AerospikeCluster{cluster: {Name: &cluster}}, Storage: map[string]*model.Storage{storage: {Name: &storage}}, } @@ -55,7 +48,7 @@ func TestAddPolicyErrors(t *testing.T) { } } -func TestUpdatePolicy(t *testing.T) { +func TestPolicy_Update(t *testing.T) { name := "policy1" config := &model.Config{ BackupPolicies: map[string]*model.BackupPolicy{name: {Name: &name}}, @@ -77,11 +70,12 @@ func TestUpdatePolicy(t *testing.T) { } if *config.BackupPolicies[name].Name != *updatedPolicy.Name { - t.Errorf("UpdatePolicy failed, expected policy name to be updated, got %v", *config.BackupPolicies[name].Name) + t.Errorf("UpdatePolicy failed, expected policy name to be updated, got %v", + *config.BackupPolicies[name].Name) } } -func TestDeletePolicy(t *testing.T) { +func TestPolicy_Delete(t *testing.T) { name := "policy1" config := &model.Config{ BackupPolicies: map[string]*model.BackupPolicy{name: {Name: &name}}, @@ -98,6 +92,7 @@ func TestDeletePolicy(t *testing.T) { } if len(config.BackupPolicies) != 0 { - t.Errorf("DeletePolicy failed, expected policy to be deleted, got %d", len(config.BackupPolicies)) + t.Errorf("DeletePolicy failed, expected policy to be deleted, got %d", + len(config.BackupPolicies)) } } diff --git a/pkg/service/configuration_service_routine.go b/pkg/service/configuration_service_routine.go new file mode 100644 index 00000000..2b00f947 --- /dev/null +++ b/pkg/service/configuration_service_routine.go @@ -0,0 +1,58 @@ +package service + +import ( + "errors" + "fmt" + + "github.com/aerospike/backup/pkg/model" +) + +// AddRoutine +// adds a new BackupRoutine to the configuration if a routine with the same name doesn't already exist. +func AddRoutine(config *model.Config, newRoutine *model.BackupRoutine) error { + if newRoutine.Storage == "" { + return errors.New("storage is empty") + } + if newRoutine.SourceCluster == "" { + return errors.New("cluster is empty") + } + _, found := config.Storage[newRoutine.Storage] + if !found { + return fmt.Errorf("storage %s not found", newRoutine.Storage) + } + _, found = config.AerospikeClusters[newRoutine.SourceCluster] + if !found { + return fmt.Errorf("cluster %s not found", newRoutine.SourceCluster) + } + _, found = config.BackupRoutines[newRoutine.Name] + if found { + return fmt.Errorf("aerospike routine with the same name %s already exists", newRoutine.Name) + } + + config.BackupRoutines[newRoutine.Name] = newRoutine + return nil +} + +// UpdateRoutine +// updates an existing BackupRoutine in the configuration. +func UpdateRoutine(config *model.Config, updatedRoutine *model.BackupRoutine) error { + _, found := config.BackupRoutines[updatedRoutine.Name] + if !found { + return fmt.Errorf("backup routine %s not found", updatedRoutine.Name) + } + + config.BackupRoutines[updatedRoutine.Name] = updatedRoutine + return nil +} + +// DeleteRoutine +// deletes a BackupRoutine from the configuration. +func DeleteRoutine(config *model.Config, routineToDeleteName *string) error { + _, found := config.BackupRoutines[*routineToDeleteName] + if !found { + return fmt.Errorf("backup routine %s not found", *routineToDeleteName) + } + + delete(config.BackupRoutines, *routineToDeleteName) + return nil +} diff --git a/pkg/service/configuration_service_routine_test.go b/pkg/service/configuration_service_routine_test.go new file mode 100644 index 00000000..4dfb8178 --- /dev/null +++ b/pkg/service/configuration_service_routine_test.go @@ -0,0 +1,90 @@ +package service + +import ( + "testing" + + "github.com/aerospike/backup/pkg/model" + "github.com/aws/smithy-go/ptr" +) + +func TestRoutine_AddErrors(t *testing.T) { + routine := "routine" + policy := "policy" + cluster := "cluster" + storage := "storage" + wrong := "-" + fails := []struct { + name string + routine model.BackupRoutine + }{ + {name: "empty", routine: model.BackupRoutine{}}, + {name: "no storage", routine: model.BackupRoutine{SourceCluster: cluster}}, + {name: "no cluster", routine: model.BackupRoutine{Storage: storage}}, + {name: "wrong storage", routine: model.BackupRoutine{Storage: wrong, SourceCluster: cluster}}, + {name: "wrong cluster", routine: model.BackupRoutine{Storage: storage, SourceCluster: wrong}}, + {name: "existing policy", routine: model.BackupRoutine{Name: policy}}, + } + + config := &model.Config{ + BackupRoutines: map[string]*model.BackupRoutine{routine: {Name: routine}}, + BackupPolicies: map[string]*model.BackupPolicy{policy: {Name: &policy}}, + AerospikeClusters: map[string]*model.AerospikeCluster{cluster: {Name: &cluster}}, + Storage: map[string]*model.Storage{storage: {Name: &storage}}, + } + + for _, testRoutine := range fails { + err := AddRoutine(config, &testRoutine.routine) + if err == nil { + t.Errorf("Expected an error on %s", testRoutine.name) + } + } +} + +func TestRoutine_Update(t *testing.T) { + name := "routine1" + config := &model.Config{ + BackupRoutines: map[string]*model.BackupRoutine{name: {Name: name}}, + } + + updatedRoutine := &model.BackupRoutine{ + Name: "routine2", + } + + err := UpdateRoutine(config, updatedRoutine) + if err == nil { + t.Errorf("UpdateRoutine failed, expected routine not found error") + } + + updatedRoutine.Name = name + err = UpdateRoutine(config, updatedRoutine) + if err != nil { + t.Errorf("UpdateRoutine failed, expected nil error, got %v", err) + } + + if config.BackupRoutines[name].Name != updatedRoutine.Name { + t.Errorf("UpdateRoutine failed, expected routine name to be updated, got %v", + config.BackupRoutines[name].Name) + } +} + +func TestRoutine_Delete(t *testing.T) { + name := "routine1" + config := &model.Config{ + BackupRoutines: map[string]*model.BackupRoutine{name: {Name: name}}, + } + + err := DeleteRoutine(config, ptr.String("routine2")) + if err == nil { + t.Errorf("DeleteRoutine failed, expected nil error, got %v", err) + } + + err = DeleteRoutine(config, &name) + if err != nil { + t.Errorf("DeleteRoutine failed, expected nil error, got %v", err) + } + + if len(config.BackupRoutines) != 0 { + t.Errorf("DeleteRoutine failed, expected routine to be deleted, got %d", + len(config.BackupRoutines)) + } +} diff --git a/pkg/service/configuration_service_storage.go b/pkg/service/configuration_service_storage.go index c1316b8f..523d8e2f 100644 --- a/pkg/service/configuration_service_storage.go +++ b/pkg/service/configuration_service_storage.go @@ -2,7 +2,6 @@ package service import ( - "errors" "fmt" "github.com/aerospike/backup/pkg/model" @@ -16,7 +15,7 @@ func AddStorage(config *model.Config, newStorage *model.Storage) error { if found { return fmt.Errorf("storage %s already exists", *newStorage.Name) } - if err := validate(newStorage); err != nil { + if err := newStorage.Validate(); err != nil { return err } @@ -31,7 +30,7 @@ func UpdateStorage(config *model.Config, updatedStorage *model.Storage) error { if !found { return fmt.Errorf("storage %s not found", *updatedStorage.Name) } - if err := validate(updatedStorage); err != nil { + if err := updatedStorage.Validate(); err != nil { return err } @@ -46,26 +45,13 @@ func DeleteStorage(config *model.Config, storageToDeleteName *string) error { if !found { return fmt.Errorf("storage %s not found", *storageToDeleteName) } - policy, found := util.Find(config.BackupPolicies, func(policy *model.BackupPolicy) bool { - return *policy.Storage == *storageToDeleteName + routine, found := util.Find(config.BackupRoutines, func(routine *model.BackupRoutine) bool { + return routine.Storage == *storageToDeleteName }) if found { - return fmt.Errorf("cannot delete storage as it is used in a policy %s", *policy.Name) + return fmt.Errorf("cannot delete storage as it is used in a routine %s", routine.Name) } delete(config.Storage, *storageToDeleteName) return nil } - -func validate(b *model.Storage) error { - if b.Name == nil || *b.Name == "" { - return errors.New("storage name is required") - } - if b.Type == nil { - return errors.New("storage type is required") - } - if b.Path == nil { - return errors.New("storage path is required") - } - return nil -} diff --git a/pkg/service/configuration_service_storage_test.go b/pkg/service/configuration_service_storage_test.go index 74766f92..3f70351f 100644 --- a/pkg/service/configuration_service_storage_test.go +++ b/pkg/service/configuration_service_storage_test.go @@ -3,13 +3,12 @@ package service import ( "testing" - "github.com/aws/smithy-go/ptr" - "github.com/aerospike/backup/pkg/model" "github.com/aerospike/backup/pkg/util" + "github.com/aws/smithy-go/ptr" ) -func TestAddStorage(t *testing.T) { +func TestStorage_Add(t *testing.T) { config := &model.Config{ Storage: map[string]*model.Storage{}, } @@ -33,7 +32,7 @@ func TestAddStorage(t *testing.T) { } } -func TestUpdateStorage(t *testing.T) { +func TestStorage_Update(t *testing.T) { storage := "storage" config := &model.Config{ Storage: map[string]*model.Storage{storage: {Name: &storage}}, @@ -62,13 +61,15 @@ func TestUpdateStorage(t *testing.T) { } } -func TestDeleteStorage(t *testing.T) { - policy := "policy" +func TestStorage_Delete(t *testing.T) { storage := "storage" storage2 := "storage2" + policy := "policy" + routine := "routine" config := &model.Config{ - BackupPolicies: map[string]*model.BackupPolicy{policy: {Name: &policy, Storage: &storage}}, + BackupPolicies: map[string]*model.BackupPolicy{policy: {Name: &policy}}, Storage: map[string]*model.Storage{storage: {Name: &storage}, storage2: {Name: &storage2}}, + BackupRoutines: map[string]*model.BackupRoutine{routine: {Name: routine, Storage: storage}}, } // Deleting a storage that is being used by a policy should result in an error diff --git a/pkg/service/scheduler.go b/pkg/service/scheduler.go index d03e2ba6..7b243060 100644 --- a/pkg/service/scheduler.go +++ b/pkg/service/scheduler.go @@ -2,6 +2,7 @@ package service import ( "context" + "github.com/aerospike/backup/internal/util" "github.com/aerospike/backup/pkg/model" "github.com/aerospike/backup/pkg/shared" @@ -25,8 +26,8 @@ func ScheduleHandlers(ctx context.Context, handlers []BackupScheduler) { // the given configuration. func BuildBackupHandlers(config *model.Config) []BackupScheduler { schedulers := make([]BackupScheduler, 0, len(config.BackupPolicies)) - for _, backupPolicy := range config.BackupPolicies { - handler, err := NewBackupHandler(config, backupPolicy) + for _, backupRoutine := range config.BackupRoutines { + handler, err := NewBackupHandler(config, backupRoutine) util.Check(err) schedulers = append(schedulers, handler) } diff --git a/pkg/shared/backup.go b/pkg/shared/backup.go index a71ce89e..7c9a9df2 100644 --- a/pkg/shared/backup.go +++ b/pkg/shared/backup.go @@ -43,8 +43,8 @@ func NewBackup() *BackupShared { // BackupRun calls the backup_run function from the asbackup shared library. // //nolint:funlen -func (b *BackupShared) BackupRun(backupPolicy *model.BackupPolicy, cluster *model.AerospikeCluster, - storage *model.Storage, opts BackupOptions) bool { +func (b *BackupShared) BackupRun(backupRoutine *model.BackupRoutine, backupPolicy *model.BackupPolicy, + cluster *model.AerospikeCluster, storage *model.Storage, opts BackupOptions) bool { // lock to restrict parallel execution (shared library limitation) b.Lock() defer b.Unlock() @@ -66,12 +66,12 @@ func (b *BackupShared) BackupRun(backupPolicy *model.BackupPolicy, cluster *mode setCString(&backupConfig.password, cluster.GetPassword()) setCString(&backupConfig.auth_mode, cluster.AuthMode) - parseSetList(&backupConfig.set_list, &backupPolicy.SetList) - if backupPolicy.BinList != nil { - setCString(&backupConfig.bin_list, ptr.String(strings.Join(backupPolicy.BinList, ","))) + parseSetList(&backupConfig.set_list, &backupRoutine.SetList) + if backupRoutine.BinList != nil { + setCString(&backupConfig.bin_list, ptr.String(strings.Join(backupRoutine.BinList, ","))) } - if backupPolicy.NodeList != nil { - setCString(&backupConfig.node_list, printNodes(backupPolicy.NodeList)) + if backupRoutine.NodeList != nil { + setCString(&backupConfig.node_list, printNodes(backupRoutine.NodeList)) } setCUint(&backupConfig.socket_timeout, backupPolicy.SocketTimeout) setCUint(&backupConfig.total_timeout, backupPolicy.TotalTimeout) @@ -79,7 +79,7 @@ func (b *BackupShared) BackupRun(backupPolicy *model.BackupPolicy, cluster *mode setCUint(&backupConfig.retry_delay, backupPolicy.RetryDelay) // namespace list configuration - nsCharArray := C.CString(*backupPolicy.Namespace) + nsCharArray := C.CString(*backupRoutine.Namespace) C.strcpy((*C.char)(unsafe.Pointer(&backupConfig.ns)), nsCharArray) setCInt(&backupConfig.parallel, backupPolicy.Parallel) @@ -94,8 +94,8 @@ func (b *BackupShared) BackupRun(backupPolicy *model.BackupPolicy, cluster *mode setCUlong(&backupConfig.max_records, backupPolicy.MaxRecords) setCUint(&backupConfig.records_per_second, backupPolicy.RecordsPerSecond) setCUlong(&backupConfig.file_limit, backupPolicy.FileLimit) - setCString(&backupConfig.partition_list, backupPolicy.PartitionList) - setCString(&backupConfig.after_digest, backupPolicy.AfterDigest) + setCString(&backupConfig.partition_list, backupRoutine.PartitionList) + setCString(&backupConfig.after_digest, backupRoutine.AfterDigest) setCString(&backupConfig.filter_exp, backupPolicy.FilterExp) // S3 configuration diff --git a/pkg/shared/backup_mock.go b/pkg/shared/backup_mock.go index 5ef4d179..631861d7 100644 --- a/pkg/shared/backup_mock.go +++ b/pkg/shared/backup_mock.go @@ -21,8 +21,8 @@ func NewBackup() *BackupShared { } // BackupRun mocks the interface method. -func (b *BackupShared) BackupRun(backupPolicy *model.BackupPolicy, cluster *model.AerospikeCluster, - storage *model.Storage, opts BackupOptions) bool { +func (b *BackupShared) BackupRun(backupRoutine *model.BackupRoutine, backupPolicy *model.BackupPolicy, + cluster *model.AerospikeCluster, storage *model.Storage, opts BackupOptions) bool { slog.Info("BackupRun mock call") return true } diff --git a/pkg/shared/types.go b/pkg/shared/types.go index d7af75ce..86bdfe0b 100644 --- a/pkg/shared/types.go +++ b/pkg/shared/types.go @@ -8,8 +8,13 @@ type BackupOptions struct { } type Backup interface { - BackupRun(backupPolicy *model.BackupPolicy, cluster *model.AerospikeCluster, - storage *model.Storage, opts BackupOptions) bool + BackupRun( + backupRoutine *model.BackupRoutine, + backupPolicy *model.BackupPolicy, + cluster *model.AerospikeCluster, + storage *model.Storage, + opts BackupOptions, + ) bool } type Restore interface {