diff --git a/docs/docs.go b/docs/docs.go index e58a499b..dd38de4c 100644 --- a/docs/docs.go +++ b/docs/docs.go @@ -1219,6 +1219,50 @@ const docTemplate = `{ } } }, + "/v1/restore/cancel/{jobID}": { + "post": { + "tags": [ + "Restore" + ], + "summary": "Cancel a running restore operation.", + "operationId": "cancelRestore", + "parameters": [ + { + "type": "integer", + "description": "Restore job ID", + "name": "jobID", + "in": "path", + "required": true + } + ], + "responses": { + "200": { + "description": "Restore job canceled successfully", + "schema": { + "type": "string" + } + }, + "400": { + "description": "Invalid job ID", + "schema": { + "type": "string" + } + }, + "404": { + "description": "Job not found", + "schema": { + "type": "string" + } + }, + "500": { + "description": "Internal server error", + "schema": { + "type": "string" + } + } + } + } + }, "/v1/restore/full": { "post": { "consumes": [ @@ -1244,7 +1288,7 @@ const docTemplate = `{ "202": { "description": "Restore operation job id", "schema": { - "type": "int64" + "type": "integer" } }, "400": { @@ -1287,7 +1331,7 @@ const docTemplate = `{ "202": { "description": "Restore operation job id", "schema": { - "type": "int64" + "type": "integer" } }, "400": { @@ -1318,7 +1362,6 @@ const docTemplate = `{ "parameters": [ { "type": "integer", - "format": "int64", "description": "Job ID to retrieve the status", "name": "jobId", "in": "path", @@ -1367,7 +1410,7 @@ const docTemplate = `{ "202": { "description": "Restore operation job id", "schema": { - "type": "int64" + "type": "integer" } }, "400": { @@ -2088,12 +2131,14 @@ const docTemplate = `{ "enum": [ "Running", "Done", - "Failed" + "Failed", + "Cancelled" ], "x-enum-varnames": [ "JobStatusRunning", "JobStatusDone", - "JobStatusFailed" + "JobStatusFailed", + "JobStatusCancelled" ] }, "dto.LocalStorage": { @@ -2240,16 +2285,7 @@ const docTemplate = `{ "example": 4 }, "status": { - "enum": [ - "Running", - "Done", - "Failed" - ], - "allOf": [ - { - "$ref": "#/definitions/dto.JobStatus" - } - ] + "$ref": "#/definitions/dto.JobStatus" }, "total-bytes": { "type": "integer", diff --git a/docs/openapi.json b/docs/openapi.json index 018bc105..e241f253 100644 --- a/docs/openapi.json +++ b/docs/openapi.json @@ -1366,6 +1366,64 @@ "x-codegen-request-body-name" : "storage" } }, + "/v1/restore/cancel/{jobID}" : { + "post" : { + "operationId" : "cancelRestore", + "parameters" : [ { + "description" : "Restore job ID", + "in" : "path", + "name" : "jobID", + "required" : true, + "schema" : { + "type" : "integer" + } + } ], + "responses" : { + "200" : { + "content" : { + "*/*" : { + "schema" : { + "type" : "string" + } + } + }, + "description" : "Restore job canceled successfully" + }, + "400" : { + "content" : { + "*/*" : { + "schema" : { + "type" : "string" + } + } + }, + "description" : "Invalid job ID" + }, + "404" : { + "content" : { + "*/*" : { + "schema" : { + "type" : "string" + } + } + }, + "description" : "Job not found" + }, + "500" : { + "content" : { + "*/*" : { + "schema" : { + "type" : "string" + } + } + }, + "description" : "Internal server error" + } + }, + "summary" : "Cancel a running restore operation.", + "tags" : [ "Restore" ] + } + }, "/v1/restore/full" : { "post" : { "operationId" : "restoreFull", @@ -1385,7 +1443,6 @@ "content" : { "*/*" : { "schema" : { - "format" : "int64", "type" : "integer" } } @@ -1437,7 +1494,6 @@ "content" : { "*/*" : { "schema" : { - "format" : "int64", "type" : "integer" } } @@ -1479,7 +1535,6 @@ "name" : "jobId", "required" : true, "schema" : { - "format" : "int64", "type" : "integer" } } ], @@ -1529,7 +1584,6 @@ "content" : { "*/*" : { "schema" : { - "format" : "int64", "type" : "integer" } } @@ -2215,9 +2269,9 @@ "type" : "object" }, "dto.JobStatus" : { - "enum" : [ "Running", "Done", "Failed" ], + "enum" : [ "Running", "Done", "Failed", "Cancelled" ], "type" : "string", - "x-enum-varnames" : [ "JobStatusRunning", "JobStatusDone", "JobStatusFailed" ] + "x-enum-varnames" : [ "JobStatusRunning", "JobStatusDone", "JobStatusFailed", "JobStatusCancelled" ] }, "dto.LocalStorage" : { "properties" : { @@ -2339,10 +2393,7 @@ "type" : "integer" }, "status" : { - "allOf" : [ { - "$ref" : "#/components/schemas/dto.JobStatus" - } ], - "type" : "object" + "$ref" : "#/components/schemas/dto.JobStatus" }, "total-bytes" : { "example" : 2000, diff --git a/docs/openapi.yaml b/docs/openapi.yaml index dbfa10ec..e6306edc 100644 --- a/docs/openapi.yaml +++ b/docs/openapi.yaml @@ -944,6 +944,44 @@ paths: tags: - Configuration x-codegen-request-body-name: storage + /v1/restore/cancel/{jobID}: + post: + operationId: cancelRestore + parameters: + - description: Restore job ID + in: path + name: jobID + required: true + schema: + type: integer + responses: + "200": + content: + '*/*': + schema: + type: string + description: Restore job canceled successfully + "400": + content: + '*/*': + schema: + type: string + description: Invalid job ID + "404": + content: + '*/*': + schema: + type: string + description: Job not found + "500": + content: + '*/*': + schema: + type: string + description: Internal server error + summary: Cancel a running restore operation. + tags: + - Restore /v1/restore/full: post: operationId: restoreFull @@ -959,7 +997,6 @@ paths: content: '*/*': schema: - format: int64 type: integer description: Restore operation job id "400": @@ -993,7 +1030,6 @@ paths: content: '*/*': schema: - format: int64 type: integer description: Restore operation job id "400": @@ -1021,7 +1057,6 @@ paths: name: jobId required: true schema: - format: int64 type: integer responses: "200": @@ -1055,7 +1090,6 @@ paths: content: '*/*': schema: - format: int64 type: integer description: Restore operation job id "400": @@ -1796,11 +1830,13 @@ components: - Running - Done - Failed + - Cancelled type: string x-enum-varnames: - JobStatusRunning - JobStatusDone - JobStatusFailed + - JobStatusCancelled dto.LocalStorage: properties: path: @@ -1889,7 +1925,7 @@ components: fresher-records: 5 skipped-records: 4 index-count: 3 - status: "{}" + status: Running properties: current-job: $ref: '#/components/schemas/dto.RunningJob' @@ -1928,9 +1964,7 @@ components: format: int64 type: integer status: - allOf: - - $ref: '#/components/schemas/dto.JobStatus' - type: object + $ref: '#/components/schemas/dto.JobStatus' total-bytes: example: 2000 format: int64 diff --git a/internal/server/handlers/handlers_test.go b/internal/server/handlers/handlers_test.go index 1b842e7f..a753b018 100644 --- a/internal/server/handlers/handlers_test.go +++ b/internal/server/handlers/handlers_test.go @@ -121,6 +121,14 @@ func (mock restoreManagerMock) RetrieveConfiguration(routine string, _ time.Time return []byte(fmt.Sprintf(`{ "dir": "%s" }`, testDir)), nil } +func (mock restoreManagerMock) CancelRestore(jobID model.RestoreJobID) error { + if jobID != model.RestoreJobID(testJobID) { + return errTest + } + + return nil +} + type backupListReaderMock struct{} func (mock backupListReaderMock) FullBackupList(_ context.Context, timebounds model.TimeBounds, diff --git a/internal/server/handlers/restore.go b/internal/server/handlers/restore.go index eca7ebdc..4f8bdc43 100644 --- a/internal/server/handlers/restore.go +++ b/internal/server/handlers/restore.go @@ -3,6 +3,7 @@ package handlers import ( "encoding/json" + "errors" "fmt" "log/slog" "net/http" @@ -11,6 +12,7 @@ import ( "github.com/aerospike/aerospike-backup-service/v2/pkg/dto" "github.com/aerospike/aerospike-backup-service/v2/pkg/model" + "github.com/aerospike/aerospike-backup-service/v2/pkg/service" "github.com/gorilla/mux" ) @@ -21,7 +23,7 @@ import ( // @Router /v1/restore/full [post] // @Accept json // @Param request body dto.RestoreRequest true "Restore request details" -// @Success 202 {int64} int64 "Restore operation job id" +// @Success 202 {integer} int "Restore operation job id" // @Failure 400 {string} string // @Failure 405 {string} string func (s *Service) RestoreFullHandler(w http.ResponseWriter, r *http.Request) { @@ -74,7 +76,7 @@ func (s *Service) RestoreFullHandler(w http.ResponseWriter, r *http.Request) { // @Router /v1/restore/incremental [post] // @Accept json // @Param request body dto.RestoreRequest true "Restore request details" -// @Success 202 {int64} int64 "Restore operation job id" +// @Success 202 {integer} int "Restore operation job id" // @Failure 400 {string} string // @Failure 405 {string} string func (s *Service) RestoreIncrementalHandler(w http.ResponseWriter, r *http.Request) { @@ -129,7 +131,7 @@ func (s *Service) RestoreIncrementalHandler(w http.ResponseWriter, r *http.Reque // @Router /v1/restore/timestamp [post] // @Accept json // @Param request body dto.RestoreTimestampRequest true "Restore request details" -// @Success 202 {int64} int64 "Restore operation job id" +// @Success 202 {integer} int "Restore operation job id" // @Failure 400 {string} string // @Failure 405 {string} string func (s *Service) RestoreByTimeHandler(w http.ResponseWriter, r *http.Request) { @@ -183,37 +185,33 @@ func (s *Service) RestoreByTimeHandler(w http.ResponseWriter, r *http.Request) { // @ID restoreStatus // @Tags Restore // @Produce json -// @Param jobId path int true "Job ID to retrieve the status" format(int64) +// @Param jobId path int true "Job ID to retrieve the status" // @Router /v1/restore/status/{jobId} [get] // @Success 200 {object} dto.RestoreJobStatus "Restore job status details" // @Failure 400 {string} string func (s *Service) RestoreStatusHandler(w http.ResponseWriter, r *http.Request) { hLogger := s.logger.With(slog.String("handler", "RestoreStatusHandler")) - jobIDParam := mux.Vars(r)["jobId"] - - if jobIDParam == "" { - hLogger.Error("job id required") - http.Error(w, "jobId required", http.StatusBadRequest) - return - } - jobID, err := strconv.Atoi(jobIDParam) + jobID, err := extractJobID(r) if err != nil { - hLogger.Error("failed to parse job id", - slog.String("jobIDParam", jobIDParam), - slog.Any("error", err)) - http.Error(w, "invalid job id", http.StatusBadRequest) + hLogger.Error("failed to extract job id", slog.Any("error", err)) + http.Error(w, "failed to extract job id", http.StatusBadRequest) return } w.Header().Set("Content-Type", "application/json") - status, err := s.restoreManager.JobStatus(model.RestoreJobID(jobID)) + status, err := s.restoreManager.JobStatus(jobID) if err != nil { - hLogger.Error("failed to get job status", - slog.Int("jobID", jobID), - slog.Any("error", err), - ) - http.Error(w, err.Error(), http.StatusNotFound) + var jobErr *service.ErrJobNotFound + if errors.As(err, &jobErr) { + hLogger.Error("job not found", slog.Any("jobID", jobErr.JobID)) + http.Error(w, fmt.Sprintf("Job with ID %d not found", jobErr.JobID), http.StatusNotFound) + } else { + hLogger.Error("failed to get job status", slog.Any("error", err)) + http.Error(w, "Failed to get job status", http.StatusInternalServerError) + } + return } + jsonResponse, err := dto.Serialize(dto.NewResultFromModel(status), dto.JSON) w.WriteHeader(http.StatusOK) if err != nil { @@ -232,6 +230,18 @@ func (s *Service) RestoreStatusHandler(w http.ResponseWriter, r *http.Request) { } } +func extractJobID(r *http.Request) (model.RestoreJobID, error) { + jobIDParam := mux.Vars(r)["jobId"] + if jobIDParam == "" { + return 0, fmt.Errorf("jobId required") + } + jobID, err := strconv.Atoi(jobIDParam) + if err != nil { + return 0, fmt.Errorf("invalid jobId %q", jobIDParam) + } + return model.RestoreJobID(jobID), nil +} + // RetrieveConfig // @Summary Retrieve Aerospike cluster configuration backup // @ID retrieveConfiguration @@ -288,3 +298,43 @@ func (s *Service) RetrieveConfig(w http.ResponseWriter, r *http.Request) { ) } } + +// CancelRestoreHandler +// @Summary Cancel a running restore operation. +// @ID cancelRestore +// @Tags Restore +// @Router /v1/restore/cancel/{jobID} [post] +// @Param jobID path int true "Restore job ID" +// @Success 200 {string} string "Restore job canceled successfully" +// @Failure 400 {string} string "Invalid job ID" +// @Failure 404 {string} string "Job not found" +// @Failure 500 {string} string "Internal server error" +func (s *Service) CancelRestoreHandler(w http.ResponseWriter, r *http.Request) { + hLogger := s.logger.With(slog.String("handler", "CancelRestoreHandler")) + + jobID, err := extractJobID(r) + if err != nil { + hLogger.Error("failed to extract job id", slog.Any("error", err)) + http.Error(w, "failed to extract job id", http.StatusBadRequest) + return + } + + err = s.restoreManager.CancelRestore(jobID) + if err != nil { + var jobErr *service.ErrJobNotFound + if errors.As(err, &jobErr) { + hLogger.Error("job not found", slog.Any("jobID", jobErr.JobID)) + http.Error(w, fmt.Sprintf("Job with ID %d not found", jobErr.JobID), http.StatusNotFound) + } else { + hLogger.Error("failed to cancel restore", slog.Any("error", err)) + http.Error(w, "Failed to cancel restore: "+err.Error(), http.StatusInternalServerError) + } + return + } + + // Return a success response + hLogger.Info("Restore job canceled successfully", slog.Any("jobID", jobID)) + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + _, _ = fmt.Fprint(w, "Restore job canceled successfully") +} diff --git a/internal/server/router.go b/internal/server/router.go index 412f6c26..01498564 100644 --- a/internal/server/router.go +++ b/internal/server/router.go @@ -76,6 +76,9 @@ func NewRouter(apiPath, sysPath string, h *handlers.Service, middlewares ...mux. // Restore job status endpoint apiRouter.HandleFunc("/restore/status/{jobId}", h.RestoreStatusHandler).Methods(http.MethodGet) + // Cancel restore job + apiRouter.HandleFunc("/restore/cancel/{jobId}", h.CancelRestoreHandler).Methods(http.MethodPost) + // Return backed up Aerospike configuration apiRouter.HandleFunc("/retrieve/configuration/{name}/{timestamp}", h.RetrieveConfig).Methods(http.MethodGet) diff --git a/pkg/dto/restore_result.go b/pkg/dto/restore_result.go index 9cb788bd..c3c3a681 100644 --- a/pkg/dto/restore_result.go +++ b/pkg/dto/restore_result.go @@ -5,9 +5,10 @@ import "github.com/aerospike/aerospike-backup-service/v2/pkg/model" type JobStatus string const ( - JobStatusRunning JobStatus = "Running" - JobStatusDone JobStatus = "Done" - JobStatusFailed JobStatus = "Failed" + JobStatusRunning JobStatus = "Running" + JobStatusDone JobStatus = "Done" + JobStatusFailed JobStatus = "Failed" + JobStatusCancelled JobStatus = "Cancelled" ) // RestoreJobStatus represents a restore job status. @@ -15,7 +16,7 @@ const ( type RestoreJobStatus struct { RestoreStats CurrentRestore *RunningJob `yaml:"current-restore,omitempty" json:"current-job,omitempty"` - Status JobStatus `yaml:"status,omitempty" json:"status,omitempty" enums:"Running,Done,Failed"` + Status JobStatus `yaml:"status,omitempty" json:"status,omitempty"` Error string `yaml:"error,omitempty" json:"error,omitempty"` } diff --git a/pkg/model/restore_result.go b/pkg/model/restore_result.go index 85307c34..22dd5826 100644 --- a/pkg/model/restore_result.go +++ b/pkg/model/restore_result.go @@ -3,9 +3,10 @@ package model type JobStatus string const ( - JobStatusRunning JobStatus = "Running" - JobStatusDone JobStatus = "Done" - JobStatusFailed JobStatus = "Failed" + JobStatusRunning JobStatus = "Running" + JobStatusDone JobStatus = "Done" + JobStatusFailed JobStatus = "Failed" + JobStatusCancelled JobStatus = "Cancelled" ) // RestoreJobStatus represents a restore job status. diff --git a/pkg/service/jobs_holder.go b/pkg/service/jobs_holder.go index 62959650..357d1afd 100644 --- a/pkg/service/jobs_holder.go +++ b/pkg/service/jobs_holder.go @@ -1,7 +1,8 @@ package service import ( - "fmt" + "context" + "errors" "math/rand" "sync" "time" @@ -10,7 +11,7 @@ import ( ) type jobInfo struct { - handlers []RestoreHandler + handlers []*RestoreHandlerWithCancel // Each handler restores one namespace. status model.JobStatus err error totalRecords uint64 @@ -45,8 +46,8 @@ func (h *RestoreJobsHolder) newJob(label string) model.RestoreJobID { return id } -// addHandler should be called for each backup (full or incremental) handler. -func (h *RestoreJobsHolder) addHandler(id model.RestoreJobID, handler RestoreHandler) { +// addJob should be called for each backup (full or incremental) handler. +func (h *RestoreJobsHolder) addJob(id model.RestoreJobID, handler *RestoreHandlerWithCancel) { h.Lock() defer h.Unlock() if job, exists := h.jobs[id]; exists { @@ -64,28 +65,37 @@ func (h *RestoreJobsHolder) addTotalRecords(id model.RestoreJobID, t uint64) { } } -func (h *RestoreJobsHolder) setDone(id model.RestoreJobID) { +func (h *RestoreJobsHolder) finishJob(id model.RestoreJobID, err error) { h.Lock() defer h.Unlock() if job, exists := h.jobs[id]; exists { - job.status = model.JobStatusDone + if err == nil { + job.status = model.JobStatusDone + return + } + if errors.Is(err, context.Canceled) { + job.status = model.JobStatusCancelled + return + } + job.status = model.JobStatusFailed + job.err = err } } -func (h *RestoreJobsHolder) setFailed(id model.RestoreJobID, err error) { +func (h *RestoreJobsHolder) getStatus(id model.RestoreJobID) (*model.RestoreJobStatus, error) { h.Lock() defer h.Unlock() if job, exists := h.jobs[id]; exists { - job.status = model.JobStatusFailed - job.err = err + return RestoreJobStatus(job), nil } + return nil, NewErrJobNotFound(id) } -func (h *RestoreJobsHolder) getStatus(id model.RestoreJobID) (*model.RestoreJobStatus, error) { +func (h *RestoreJobsHolder) getJob(id model.RestoreJobID) (*jobInfo, error) { h.Lock() defer h.Unlock() if job, exists := h.jobs[id]; exists { - return RestoreJobStatus(job), nil + return job, nil } - return nil, fmt.Errorf("job with ID %d not found", id) + return nil, NewErrJobNotFound(id) } diff --git a/pkg/service/restore_configuration_test.go b/pkg/service/restore_configuration_test.go new file mode 100644 index 00000000..1fb28fa5 --- /dev/null +++ b/pkg/service/restore_configuration_test.go @@ -0,0 +1,89 @@ +package service + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func Test_RetrieveConfiguration(t *testing.T) { + tests := []struct { + name string + routine string + timestamp time.Time + wantErr bool + }{ + { + name: "normal", + routine: "routine", + timestamp: time.UnixMilli(10), + wantErr: false, + }, + { + name: "wrong time", + routine: "routine", + timestamp: time.UnixMilli(1), + wantErr: true, + }, + { + name: "wrong routine", + routine: "routine_fail_read", + timestamp: time.UnixMilli(10), + wantErr: true, + }, + { + name: "routine not found", + routine: "routine not found", + timestamp: time.UnixMilli(10), + wantErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + res, err := restoreService.RetrieveConfiguration(tt.routine, tt.timestamp) + assert.Equal(t, tt.wantErr, err != nil, "Unexpected error presence, got: %v", err) + + if !tt.wantErr { + assert.NotNil(t, res, "Expected non-nil result, got nil.") + } else { + assert.Nil(t, res, "Expected nil result as an error was expected.") + } + }) + } +} + +func Test_CalculateConfigurationBackupPath(t *testing.T) { + tests := []struct { + name string + path string + want string + wantErr bool + }{ + { + name: "NormalPath", + path: "backup/12345/data/ns1", + want: "backup/12345/configuration", + wantErr: false, + }, + { + name: "InvalidPath", + path: "://", + want: "", + wantErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result, err := calculateConfigurationBackupPath(tt.path) + if (err != nil) != tt.wantErr { + t.Errorf("calculateConfigurationBackupPath() error = %v, wantErr %v", err, tt.wantErr) + return + } + if result != tt.want { + t.Errorf("calculateConfigurationBackupPath() got = %v, want %v", result, tt.want) + } + }) + } +} diff --git a/pkg/service/restore_data.go b/pkg/service/restore_data.go index bd0efa69..4c10b829 100644 --- a/pkg/service/restore_data.go +++ b/pkg/service/restore_data.go @@ -12,12 +12,23 @@ import ( "github.com/aerospike/aerospike-backup-service/v2/pkg/service/aerospike" "github.com/aerospike/aerospike-backup-service/v2/pkg/service/storage" "github.com/aerospike/backup-go" - "github.com/prometheus/client_golang/prometheus" ) var errBackendNotFound = errors.New("backend not found") var errBackupNotFound = errors.New("backup not found") +type ErrJobNotFound struct { + JobID model.RestoreJobID +} + +func (e *ErrJobNotFound) Error() string { + return fmt.Sprintf("restore job with ID %d not found", e.JobID) +} + +func NewErrJobNotFound(id model.RestoreJobID) *ErrJobNotFound { + return &ErrJobNotFound{id} +} + // dataRestorer implements the RestoreManager interface. // Stores job information locally within a map. type dataRestorer struct { @@ -54,20 +65,20 @@ func NewRestoreManager(backends BackendsHolder, } func (r *dataRestorer) Restore(request *model.RestoreRequest) (model.RestoreJobID, error) { - jobID := r.restoreJobs.newJob(request.BackupDataPath) ctx := context.TODO() totalRecords, err := recordsInBackup(ctx, request) if err != nil { slog.Info("Could not read backup metadata", slog.Any("err", err)) } + jobID := r.restoreJobs.newJob(request.BackupDataPath) go func() { client, err := r.clientManager.GetClient(request.DestinationCluster) if err != nil { slog.Error("Failed to restore by path", slog.Any("cluster", request.DestinationCluster.ClusterLabel), slog.Any("err", err)) - r.restoreJobs.setFailed(jobID, err) + r.restoreJobs.finishJob(jobID, err) return } defer r.clientManager.Close(client) @@ -78,20 +89,19 @@ func (r *dataRestorer) Restore(request *model.RestoreRequest) (model.RestoreJobI handler, err := r.restoreService.Run(ctx, client, request) if err != nil { - r.restoreJobs.setFailed(jobID, fmt.Errorf("failed to start restore operation: %w", err)) + r.restoreJobs.finishJob(jobID, fmt.Errorf("failed to start restore operation: %w", err)) return } r.restoreJobs.addTotalRecords(jobID, totalRecords) - r.restoreJobs.addHandler(jobID, handler) + ctx, cancel := context.WithCancel(ctx) + r.restoreJobs.addJob(jobID, &RestoreHandlerWithCancel{ + RestoreHandler: handler, + cancel: cancel, + }) // Wait for the restore operation to complete err = handler.Wait(ctx) - if err != nil { - r.restoreJobs.setFailed(jobID, fmt.Errorf("failed restore operation: %w", err)) - return - } - - r.restoreJobs.setDone(jobID) + r.restoreJobs.finishJob(jobID, err) }() return jobID, nil @@ -108,7 +118,7 @@ func (r *dataRestorer) validateDestinationNamespace(request *model.RestoreReques slog.Error("Failed to restore by path", slog.Any("cluster label", request.DestinationCluster.ClusterLabel), slog.Any("err", err)) - r.restoreJobs.setFailed(jobID, err) + r.restoreJobs.finishJob(jobID, err) return true } } @@ -145,20 +155,19 @@ func (r *dataRestorer) restoreByTimeSync( slog.Error("Failed to restore by timestamp", slog.Any("cluster", request.DestinationCluster.ClusterLabel), slog.Any("err", err)) - r.restoreJobs.setFailed(jobID, err) + r.restoreJobs.finishJob(jobID, err) return } defer r.clientManager.Close(client) var wg sync.WaitGroup - - multiError := prometheus.MultiError{} + var multiError error for _, nsBackup := range fullBackups { wg.Add(1) go func(nsBackup model.BackupDetails) { defer wg.Done() if err := r.restoreNamespace(ctx, client, backend, request, jobID, nsBackup); err != nil { - multiError.Append( + multiError = errors.Join(multiError, fmt.Errorf("failed to restore routine %s, namespace %s by timestamp: %w", request.RoutineName, nsBackup.Namespace, err)) } @@ -167,13 +176,7 @@ func (r *dataRestorer) restoreByTimeSync( wg.Wait() - err = multiError.MaybeUnwrap() - if err != nil { - r.restoreJobs.setFailed(jobID, err) - return - } - - r.restoreJobs.setDone(jobID) + r.restoreJobs.finishJob(jobID, multiError) } func (r *dataRestorer) restoreNamespace( @@ -208,7 +211,11 @@ func (r *dataRestorer) restoreNamespace( } r.restoreJobs.addTotalRecords(jobID, b.RecordCount) - r.restoreJobs.addHandler(jobID, handler) + ctx, cancel := context.WithCancel(ctx) + r.restoreJobs.addJob(jobID, &RestoreHandlerWithCancel{ + RestoreHandler: handler, + cancel: cancel, + }) err = handler.Wait(ctx) if err != nil { @@ -261,3 +268,17 @@ func recordsInBackup(ctx context.Context, request *model.RestoreRequest) (uint64 } return metadata.RecordCount, nil } + +// CancelRestore cancels an ongoing restore. +func (r *dataRestorer) CancelRestore(jobID model.RestoreJobID) error { + job, err := r.restoreJobs.getJob(jobID) + if err != nil { + return err + } + slog.Debug("Canceling restore job", slog.Any("job ID", jobID)) + for _, h := range job.handlers { + h.Cancel() + } + + return nil +} diff --git a/pkg/service/restore_data_test.go b/pkg/service/restore_data_test.go index 4430007b..961267fe 100644 --- a/pkg/service/restore_data_test.go +++ b/pkg/service/restore_data_test.go @@ -3,28 +3,17 @@ package service import ( "context" "errors" - "os" + "sync" "testing" "time" "github.com/aerospike/aerospike-backup-service/v2/pkg/model" "github.com/aerospike/backup-go" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) -var restoreService = makeTestRestoreService() -var validBackupPath = "./testout/backup/data" - -func makeTestFolders() { - _ = os.MkdirAll(validBackupPath, os.ModePerm) - create, _ := os.Create(validBackupPath + "/backup.asb") - _ = create.Close() -} - -func cleanTestFolder() { - _ = os.RemoveAll("./testout") -} +var restoreService = makeTestRestoreService(nil) +var validBackupPath = "testout/backup/data" type BackendHolderMock struct{} @@ -51,7 +40,7 @@ func (b *BackendHolderMock) GetAllReaders() map[string]BackupListReader { func (b *BackendHolderMock) Init(_ *model.Config) { } -func makeTestRestoreService() *dataRestorer { +func makeTestRestoreService(wg *sync.WaitGroup) *dataRestorer { storage := &model.LocalStorage{} config := model.NewConfig() _ = config.AddStorage("s", storage) @@ -69,7 +58,7 @@ func makeTestRestoreService() *dataRestorer { backends: &BackendHolderMock{}, }, restoreJobs: NewRestoreJobsHolder(), - restoreService: NewRestoreMock(), + restoreService: NewRestoreMock(wg), backends: &BackendHolderMock{}, config: config, clientManager: &MockClientManager{}, @@ -173,15 +162,9 @@ func (*BackendFailMock) IncrementalBackupList(_ context.Context, _ model.TimeBou } func TestRestoreOK(t *testing.T) { - makeTestFolders() - t.Cleanup(func() { - cleanTestFolder() - }) restoreRequest := &model.RestoreRequest{ DestinationCluster: model.NewLocalAerospikeCluster(), - Policy: &model.RestorePolicy{ - SetList: []string{"set1"}, - }, + Policy: &model.RestorePolicy{}, SourceStorage: &model.LocalStorage{ Path: validBackupPath, }, @@ -191,14 +174,10 @@ func TestRestoreOK(t *testing.T) { require.NoError(t, err) jobStatus, err := restoreService.JobStatus(jobID) require.NoError(t, err) - if jobStatus.Status != model.JobStatusRunning { - t.Errorf("Expected jobStatus to be %s, but was %s", model.JobStatusDone, jobStatus.Status) - } - time.Sleep(1 * time.Second) - jobStatus, _ = restoreService.JobStatus(jobID) - if jobStatus.Status != model.JobStatusDone { - t.Errorf("Expected jobStatus to be %s, but was %s", model.JobStatusDone, jobStatus.Status) - } + require.Equal(t, model.JobStatusRunning, jobStatus.Status) + + _, err = waitForJobStatus(t, jobID, model.JobStatusDone) + require.NoError(t, err) } func TestLatestFullBackupBeforeTime(t *testing.T) { @@ -212,15 +191,9 @@ func TestLatestFullBackupBeforeTime(t *testing.T) { toTime := time.UnixMilli(25) result := latestBackupBeforeTime(backupList, &toTime) - if result == nil { - t.Error("Expected a non-nil result, but got nil") - } - if len(result) != 2 { - t.Errorf("Expected 2 backups") - } - if result[0] != backupList[1] { - t.Errorf("Expected the latest backup, but got %+v", result) - } + require.NotNil(t, result) + require.Equal(t, 2, len(result)) + require.Equal(t, result[0], backupList[1]) } func TestLatestFullBackupEqualTime(t *testing.T) { @@ -233,15 +206,9 @@ func TestLatestFullBackupEqualTime(t *testing.T) { toTime := time.UnixMilli(20) result := latestBackupBeforeTime(backupList, &toTime) - if result == nil { - t.Error("Expected a non-nil result, but got nil") - } - if len(result) != 1 { - t.Errorf("Expected 1 backup") - } - if result[0] != backupList[1] { - t.Errorf("Expected the latest backup, but got %+v", result) - } + require.NotNil(t, result) + require.Equal(t, 1, len(result)) + require.Equal(t, result[0], backupList[1]) } func TestLatestFullBackupBeforeTime_NotFound(t *testing.T) { @@ -254,39 +221,28 @@ func TestLatestFullBackupBeforeTime_NotFound(t *testing.T) { toTime := time.UnixMilli(5) result := latestBackupBeforeTime(backupList, &toTime) - if result != nil { - t.Errorf("Expected a non result, but got %+v", result) - } + require.Nil(t, result) } func Test_RestoreTimestamp(t *testing.T) { request := model.RestoreTimestampRequest{ DestinationCluster: model.NewLocalAerospikeCluster(), - Policy: &model.RestorePolicy{ - SetList: []string{"set1"}, - }, - Time: time.UnixMilli(100), - RoutineName: "routine", + Policy: &model.RestorePolicy{}, + Time: time.UnixMilli(100), + RoutineName: "routine", } jobID, err := restoreService.RestoreByTime(&request) - if err != nil { - t.Errorf("expected nil, got %s", err.Error()) - } + require.NoError(t, err, "RestoreByTime should not return an error") - time.Sleep(1 * time.Second) - jobStatus, _ := restoreService.JobStatus(jobID) - require.Equal(t, model.JobStatusDone, jobStatus.Status) - if jobStatus.ReadRecords != 3 { - t.Errorf("Expected 3 (one full and 2 incremental backups), got %d", jobStatus.ReadRecords) - } + jobStatus, err := waitForJobStatus(t, jobID, model.JobStatusDone) + require.NoError(t, err) + require.Equal(t, 3, int(jobStatus.ReadRecords), "Expected 3 (one full and 2 incremental backups)") } func Test_WrongStatus(t *testing.T) { - wrongJobStatus, err := restoreService.JobStatus(1111) - if err == nil { - t.Errorf("Expected not found, but got %v", wrongJobStatus) - } + _, err := restoreService.JobStatus(1111) + require.Error(t, err) } func Test_RestoreByTimeFailNoBackend(t *testing.T) { @@ -295,9 +251,7 @@ func Test_RestoreByTimeFailNoBackend(t *testing.T) { } _, err := restoreService.RestoreByTime(request) - if err == nil || !errors.Is(err, errBackendNotFound) { - t.Errorf("Expected error %v, but got %v", errBackendNotFound, err) - } + require.ErrorIs(t, err, errBackendNotFound) } func Test_RestoreByTimeFailNoTimestamp(t *testing.T) { @@ -306,9 +260,7 @@ func Test_RestoreByTimeFailNoTimestamp(t *testing.T) { } _, err := restoreService.RestoreByTime(request) - if err == nil || !errors.Is(err, errBackupNotFound) { - t.Errorf("Expected error %v, but got %v", errBackupNotFound, err) - } + require.ErrorIs(t, err, errBackupNotFound) } func Test_RestoreByTimeFailNoBackup(t *testing.T) { @@ -318,9 +270,7 @@ func Test_RestoreByTimeFailNoBackup(t *testing.T) { } _, err := restoreService.RestoreByTime(request) - if err == nil || !errors.Is(err, errBackupNotFound) { - t.Errorf("Expected error %v, but got %v", errBackupNotFound, err) - } + require.ErrorIs(t, err, errBackupNotFound) } func Test_restoreTimestampFail(t *testing.T) { @@ -334,87 +284,6 @@ func Test_restoreTimestampFail(t *testing.T) { require.Error(t, err) } -func Test_RetrieveConfiguration(t *testing.T) { - tests := []struct { - name string - routine string - timestamp time.Time - wantErr bool - }{ - { - name: "normal", - routine: "routine", - timestamp: time.UnixMilli(10), - wantErr: false, - }, - { - name: "wrong time", - routine: "routine", - timestamp: time.UnixMilli(1), - wantErr: true, - }, - { - name: "wrong routine", - routine: "routine_fail_read", - timestamp: time.UnixMilli(10), - wantErr: true, - }, - { - name: "routine not found", - routine: "routine not found", - timestamp: time.UnixMilli(10), - wantErr: true, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - res, err := restoreService.RetrieveConfiguration(tt.routine, tt.timestamp) - assert.Equal(t, tt.wantErr, err != nil, "Unexpected error presence, got: %v", err) - - if !tt.wantErr { - assert.NotNil(t, res, "Expected non-nil result, got nil.") - } else { - assert.Nil(t, res, "Expected nil result as an error was expected.") - } - }) - } -} - -func Test_CalculateConfigurationBackupPath(t *testing.T) { - tests := []struct { - name string - path string - want string - wantErr bool - }{ - { - name: "NormalPath", - path: "backup/12345/data/ns1", - want: "backup/12345/configuration", - wantErr: false, - }, - { - name: "InvalidPath", - path: "://", - want: "", - wantErr: true, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - result, err := calculateConfigurationBackupPath(tt.path) - if (err != nil) != tt.wantErr { - t.Errorf("calculateConfigurationBackupPath() error = %v, wantErr %v", err, tt.wantErr) - return - } - if result != tt.want { - t.Errorf("calculateConfigurationBackupPath() got = %v, want %v", result, tt.want) - } - }) - } -} - // MockClientManager is a mock implementation of ClientManager for testing type MockClientManager struct { } @@ -433,3 +302,53 @@ func (m *MockClientManager) CreateClient(cluster *model.AerospikeCluster) (*back return &backup.Client{}, nil } + +func TestRestoreCancel(t *testing.T) { + wg := &sync.WaitGroup{} + restoreService = makeTestRestoreService(wg) + restoreRequest := &model.RestoreRequest{ + DestinationCluster: model.NewLocalAerospikeCluster(), + Policy: &model.RestorePolicy{}, + SourceStorage: &model.LocalStorage{ + Path: validBackupPath, + }, + BackupDataPath: "namespace", + } + wg.Add(1) + NewRestoreMock(nil).restoreWaitWg = wg + jobID, err := restoreService.Restore(restoreRequest) + require.NoError(t, err) + + wg.Wait() // wait until restore starts + err = restoreService.CancelRestore(jobID) + require.NoError(t, err) + _, err = waitForJobStatus(t, jobID, model.JobStatusCancelled) + require.NoError(t, err) +} + +func waitForJobStatus( + t *testing.T, jobID model.RestoreJobID, expected model.JobStatus, +) (*model.RestoreJobStatus, error) { + t.Helper() + return wait(func() (*model.RestoreJobStatus, bool) { + status, err := restoreService.JobStatus(jobID) + require.NoError(t, err) + require.NotNil(t, status) + return status, status.Status == expected + }) +} + +func wait[T any](f func() (T, bool)) (T, error) { + deadline := time.Now().Add(1 * time.Second) + for time.Now().Before(deadline) { + val, success := f() + if success { + return val, nil + } + + time.Sleep(20 * time.Millisecond) + } + + var result T + return result, errors.New("timeout reached") +} diff --git a/pkg/service/restore.go b/pkg/service/restore_manager.go similarity index 89% rename from pkg/service/restore.go rename to pkg/service/restore_manager.go index ffc14be5..41957549 100644 --- a/pkg/service/restore.go +++ b/pkg/service/restore_manager.go @@ -20,4 +20,7 @@ type RestoreManager interface { // RetrieveConfiguration return backed up Aerospike configuration. RetrieveConfiguration(routine string, toTime time.Time) ([]byte, error) + + // CancelRestore cancels an ongoing restore. + CancelRestore(jobID model.RestoreJobID) error } diff --git a/pkg/service/restore_mock.go b/pkg/service/restore_mock.go index ecbc9b52..340543eb 100644 --- a/pkg/service/restore_mock.go +++ b/pkg/service/restore_mock.go @@ -3,6 +3,8 @@ package service import ( "context" "log/slog" + "sync" + "time" "github.com/aerospike/aerospike-backup-service/v2/pkg/model" "github.com/aerospike/backup-go" @@ -12,15 +14,19 @@ import ( // RestoreMock mocks the Restore interface. // Used in CI workflows to skip building the C shared libraries. type RestoreMock struct { + restoreWaitWg *sync.WaitGroup } // NewRestoreMock returns a new RestoreMock instance. -func NewRestoreMock() *RestoreMock { - return &RestoreMock{} +func NewRestoreMock(wg *sync.WaitGroup) *RestoreMock { + return &RestoreMock{ + restoreWaitWg: wg, + } } // MockRestoreHandler is a mock implementation of the RestoreHandler interface. type MockRestoreHandler struct { + restoreWaitWg *sync.WaitGroup } func (m *MockRestoreHandler) GetStats() *models.RestoreStats { @@ -29,13 +35,25 @@ func (m *MockRestoreHandler) GetStats() *models.RestoreStats { return &stats } -func (m *MockRestoreHandler) Wait(_ context.Context) error { - return nil +func (m *MockRestoreHandler) Wait(ctx context.Context) error { + if m.restoreWaitWg != nil { + m.restoreWaitWg.Done() + } + + select { + case <-time.After(100 * time.Millisecond): + // Simulate work completion after 100ms + return nil + case <-ctx.Done(): + return ctx.Err() + } } // Run mocks the interface method. func (r *RestoreMock) Run(_ context.Context, _ *backup.Client, _ *model.RestoreRequest) (RestoreHandler, error) { slog.Info("RestoreRun mock call") - return &MockRestoreHandler{}, nil + return &MockRestoreHandler{ + restoreWaitWg: r.restoreWaitWg, + }, nil } diff --git a/pkg/service/types.go b/pkg/service/types.go index 1c7fe60e..f9e58e62 100644 --- a/pkg/service/types.go +++ b/pkg/service/types.go @@ -17,6 +17,22 @@ type RestoreHandler interface { Wait(context.Context) error } +// RestoreHandlerWithCancel is a wrapper around a RestoreHandler that adds a cancel function. +type RestoreHandlerWithCancel struct { + RestoreHandler + cancel func() +} + +// Cancel cancels the restore job. +func (rj *RestoreHandlerWithCancel) Cancel() { + rj.cancel() +} + +// GetStats returns the statistics of the restore job. +func (rj *RestoreHandlerWithCancel) GetStats() *models.RestoreStats { + return rj.RestoreHandler.GetStats() +} + // Restore represents a restore service. type Restore interface { Run(