diff --git a/docs/docs.go b/docs/docs.go index dd38de4c..f7b2cdcc 100644 --- a/docs/docs.go +++ b/docs/docs.go @@ -96,6 +96,41 @@ const docTemplate = `{ } } }, + "/v1/backups/cancel/{name}": { + "post": { + "tags": [ + "Backup" + ], + "summary": "Cancel current backup.", + "operationId": "cancelCurrentBackup", + "parameters": [ + { + "type": "string", + "description": "Backup routine name", + "name": "name", + "in": "path", + "required": true + } + ], + "responses": { + "202": { + "description": "Accepted" + }, + "404": { + "description": "Not Found", + "schema": { + "type": "string" + } + }, + "500": { + "description": "Internal Server Error", + "schema": { + "type": "string" + } + } + } + } + }, "/v1/backups/currentBackup/{name}": { "get": { "produces": [ @@ -1229,6 +1264,7 @@ const docTemplate = `{ "parameters": [ { "type": "integer", + "format": "int64", "description": "Restore job ID", "name": "jobID", "in": "path", @@ -1288,7 +1324,7 @@ const docTemplate = `{ "202": { "description": "Restore operation job id", "schema": { - "type": "integer" + "type": "int64" } }, "400": { @@ -1331,7 +1367,7 @@ const docTemplate = `{ "202": { "description": "Restore operation job id", "schema": { - "type": "integer" + "type": "int64" } }, "400": { @@ -1362,6 +1398,7 @@ const docTemplate = `{ "parameters": [ { "type": "integer", + "format": "int64", "description": "Job ID to retrieve the status", "name": "jobId", "in": "path", @@ -1410,7 +1447,7 @@ const docTemplate = `{ "202": { "description": "Restore operation job id", "schema": { - "type": "integer" + "type": "int64" } }, "400": { diff --git a/docs/openapi.json b/docs/openapi.json index e241f253..9d6b1646 100644 --- a/docs/openapi.json +++ b/docs/openapi.json @@ -90,6 +90,48 @@ "tags" : [ "System" ] } }, + "/v1/backups/cancel/{name}" : { + "post" : { + "operationId" : "cancelCurrentBackup", + "parameters" : [ { + "description" : "Backup routine name", + "in" : "path", + "name" : "name", + "required" : true, + "schema" : { + "type" : "string" + } + } ], + "responses" : { + "202" : { + "content" : { }, + "description" : "Accepted" + }, + "404" : { + "content" : { + "*/*" : { + "schema" : { + "type" : "string" + } + } + }, + "description" : "Not Found" + }, + "500" : { + "content" : { + "*/*" : { + "schema" : { + "type" : "string" + } + } + }, + "description" : "Internal Server Error" + } + }, + "summary" : "Cancel current backup.", + "tags" : [ "Backup" ] + } + }, "/v1/backups/currentBackup/{name}" : { "get" : { "operationId" : "getCurrentBackup", @@ -1375,6 +1417,7 @@ "name" : "jobID", "required" : true, "schema" : { + "format" : "int64", "type" : "integer" } } ], @@ -1443,6 +1486,7 @@ "content" : { "*/*" : { "schema" : { + "format" : "int64", "type" : "integer" } } @@ -1494,6 +1538,7 @@ "content" : { "*/*" : { "schema" : { + "format" : "int64", "type" : "integer" } } @@ -1535,6 +1580,7 @@ "name" : "jobId", "required" : true, "schema" : { + "format" : "int64", "type" : "integer" } } ], @@ -1584,6 +1630,7 @@ "content" : { "*/*" : { "schema" : { + "format" : "int64", "type" : "integer" } } diff --git a/docs/openapi.yaml b/docs/openapi.yaml index e6306edc..c1617b04 100644 --- a/docs/openapi.yaml +++ b/docs/openapi.yaml @@ -67,6 +67,35 @@ paths: summary: Readiness endpoint. tags: - System + /v1/backups/cancel/{name}: + post: + operationId: cancelCurrentBackup + parameters: + - description: Backup routine name + in: path + name: name + required: true + schema: + type: string + responses: + "202": + content: {} + description: Accepted + "404": + content: + '*/*': + schema: + type: string + description: Not Found + "500": + content: + '*/*': + schema: + type: string + description: Internal Server Error + summary: Cancel current backup. + tags: + - Backup /v1/backups/currentBackup/{name}: get: operationId: getCurrentBackup @@ -953,6 +982,7 @@ paths: name: jobID required: true schema: + format: int64 type: integer responses: "200": @@ -997,6 +1027,7 @@ paths: content: '*/*': schema: + format: int64 type: integer description: Restore operation job id "400": @@ -1030,6 +1061,7 @@ paths: content: '*/*': schema: + format: int64 type: integer description: Restore operation job id "400": @@ -1057,6 +1089,7 @@ paths: name: jobId required: true schema: + format: int64 type: integer responses: "200": @@ -1090,6 +1123,7 @@ paths: content: '*/*': schema: + format: int64 type: integer description: Restore operation job id "400": diff --git a/internal/server/handlers/backup.go b/internal/server/handlers/backup.go index b63be1fa..bb228988 100644 --- a/internal/server/handlers/backup.go +++ b/internal/server/handlers/backup.go @@ -331,3 +331,37 @@ func (s *Service) GetCurrentBackupInfo(w http.ResponseWriter, r *http.Request) { ) } } + +// CancelCurrentBackup +// @Summary Cancel current backup. +// @ID cancelCurrentBackup +// @Tags Backup +// @Param name path string true "Backup routine name" +// @Router /v1/backups/cancel/{name} [post] +// @Success 202 +// @Failure 404 {string} string +// @Failure 500 {string} string +func (s *Service) CancelCurrentBackup(w http.ResponseWriter, r *http.Request) { + hLogger := s.logger.With(slog.String("handler", "CancelCurrentBackup")) + + routineName := mux.Vars(r)["name"] + if routineName == "" { + hLogger.Error("routine name required") + http.Error(w, "routine name required", http.StatusBadRequest) + return + } + + handler, found := s.handlerHolder[routineName] + if !found { + hLogger.Error("unknown routine name", + slog.String("name", routineName), + ) + http.Error(w, "unknown routine name "+routineName, http.StatusNotFound) + return + } + + handler.Cancel() + + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusAccepted) +} diff --git a/internal/server/handlers/backup_test.go b/internal/server/handlers/backup_test.go index cfd83285..ea4ece23 100644 --- a/internal/server/handlers/backup_test.go +++ b/internal/server/handlers/backup_test.go @@ -234,6 +234,7 @@ func TestService_ScheduleFullBackup(t *testing.T) { } } +//nolint:dupl func TestService_GetCurrentBackupInfo(t *testing.T) { t.Parallel() h := newServiceMock(t) @@ -273,3 +274,41 @@ func TestService_GetCurrentBackupInfo(t *testing.T) { End() } } + +//nolint:dupl +func TestService_CancelRunningBackup(t *testing.T) { + t.Parallel() + h := newServiceMock(t) + router := mux.NewRouter() + router.HandleFunc( + "/backups/cancel/{name}", + h.CancelCurrentBackup, + ).Methods(http.MethodPost) + + const name = testRoutineName + + testCases := []struct { + method string + statusCode int + name string + }{ + {http.MethodPost, http.StatusNotFound, name}, + {http.MethodPost, http.StatusNotFound, ""}, + {http.MethodGet, http.StatusMethodNotAllowed, name}, + {http.MethodConnect, http.StatusMethodNotAllowed, name}, + {http.MethodDelete, http.StatusMethodNotAllowed, name}, + {http.MethodPatch, http.StatusMethodNotAllowed, name}, + {http.MethodPut, http.StatusMethodNotAllowed, name}, + {http.MethodTrace, http.StatusMethodNotAllowed, name}, + } + + for _, tt := range testCases { + apitest.New(). + Handler(router). + Method(tt.method). + URL(fmt.Sprintf("/backups/cancel/%s", tt.name)). + Expect(t). + Status(tt.statusCode). + End() + } +} diff --git a/internal/server/handlers/restore.go b/internal/server/handlers/restore.go index 4f8bdc43..1727ec84 100644 --- a/internal/server/handlers/restore.go +++ b/internal/server/handlers/restore.go @@ -23,7 +23,7 @@ import ( // @Router /v1/restore/full [post] // @Accept json // @Param request body dto.RestoreRequest true "Restore request details" -// @Success 202 {integer} int "Restore operation job id" +// @Success 202 {int64} int64 "Restore operation job id" // @Failure 400 {string} string // @Failure 405 {string} string func (s *Service) RestoreFullHandler(w http.ResponseWriter, r *http.Request) { @@ -76,7 +76,7 @@ func (s *Service) RestoreFullHandler(w http.ResponseWriter, r *http.Request) { // @Router /v1/restore/incremental [post] // @Accept json // @Param request body dto.RestoreRequest true "Restore request details" -// @Success 202 {integer} int "Restore operation job id" +// @Success 202 {int64} int64 "Restore operation job id" // @Failure 400 {string} string // @Failure 405 {string} string func (s *Service) RestoreIncrementalHandler(w http.ResponseWriter, r *http.Request) { @@ -131,7 +131,7 @@ func (s *Service) RestoreIncrementalHandler(w http.ResponseWriter, r *http.Reque // @Router /v1/restore/timestamp [post] // @Accept json // @Param request body dto.RestoreTimestampRequest true "Restore request details" -// @Success 202 {integer} int "Restore operation job id" +// @Success 202 {int64} int64 "Restore operation job id" // @Failure 400 {string} string // @Failure 405 {string} string func (s *Service) RestoreByTimeHandler(w http.ResponseWriter, r *http.Request) { @@ -185,7 +185,7 @@ func (s *Service) RestoreByTimeHandler(w http.ResponseWriter, r *http.Request) { // @ID restoreStatus // @Tags Restore // @Produce json -// @Param jobId path int true "Job ID to retrieve the status" +// @Param jobId path int true "Job ID to retrieve the status" format(int64) // @Router /v1/restore/status/{jobId} [get] // @Success 200 {object} dto.RestoreJobStatus "Restore job status details" // @Failure 400 {string} string @@ -304,7 +304,7 @@ func (s *Service) RetrieveConfig(w http.ResponseWriter, r *http.Request) { // @ID cancelRestore // @Tags Restore // @Router /v1/restore/cancel/{jobID} [post] -// @Param jobID path int true "Restore job ID" +// @Param jobID path int true "Restore job ID" format(int64) // @Success 200 {string} string "Restore job canceled successfully" // @Failure 400 {string} string "Invalid job ID" // @Failure 404 {string} string "Job not found" diff --git a/internal/server/handlers/restore_test.go b/internal/server/handlers/restore_test.go index 27bcd9c1..1f919eab 100644 --- a/internal/server/handlers/restore_test.go +++ b/internal/server/handlers/restore_test.go @@ -247,3 +247,40 @@ func TestService_RetrieveConfig(t *testing.T) { End() } } + +func TestService_CancelRestore(t *testing.T) { + t.Parallel() + h := newServiceMock(t) + router := mux.NewRouter() + router.HandleFunc( + "/v1/restore/cancel/{jobId}", + h.CancelRestoreHandler, + ).Methods(http.MethodPost) + + const jobID = "1" + + testCases := []struct { + method string + statusCode int + jobID string + }{ + {http.MethodPost, http.StatusOK, jobID}, + {http.MethodPost, http.StatusBadRequest, "a"}, + {http.MethodGet, http.StatusMethodNotAllowed, jobID}, + {http.MethodConnect, http.StatusMethodNotAllowed, jobID}, + {http.MethodDelete, http.StatusMethodNotAllowed, jobID}, + {http.MethodPatch, http.StatusMethodNotAllowed, jobID}, + {http.MethodPut, http.StatusMethodNotAllowed, jobID}, + {http.MethodTrace, http.StatusMethodNotAllowed, jobID}, + } + + for _, tt := range testCases { + apitest.New(). + Handler(router). + Method(tt.method). + URL(fmt.Sprintf("/v1/restore/cancel/%s", tt.jobID)). + Expect(t). + Status(tt.statusCode). + End() + } +} diff --git a/internal/server/router.go b/internal/server/router.go index 01498564..91d17ed3 100644 --- a/internal/server/router.go +++ b/internal/server/router.go @@ -94,6 +94,9 @@ func NewRouter(apiPath, sysPath string, h *handlers.Service, middlewares ...mux. // Get information on currently running backups apiRouter.HandleFunc("/backups/currentBackup/{name}", h.GetCurrentBackupInfo).Methods(http.MethodGet) + // Cancel running backup + apiRouter.HandleFunc("/backups/cancel/{name}", h.CancelCurrentBackup).Methods(http.MethodPost) + return r } diff --git a/pkg/model/restore_request.go b/pkg/model/restore_request.go index 8605e64d..8ba23237 100644 --- a/pkg/model/restore_request.go +++ b/pkg/model/restore_request.go @@ -6,7 +6,7 @@ import ( ) // RestoreJobID represents the restore operation job id. -type RestoreJobID int +type RestoreJobID int64 // RestoreRequest represents a restore operation request. // @Description RestoreRequest represents a restore operation request. diff --git a/pkg/service/backup_routine_handler.go b/pkg/service/backup_routine_handler.go index d9d1ceb5..7014d478 100644 --- a/pkg/service/backup_routine_handler.go +++ b/pkg/service/backup_routine_handler.go @@ -33,8 +33,8 @@ type BackupRoutineHandler struct { clusterConfigWriter ClusterConfigWriter // backup handlers by namespace - fullBackupHandlers map[string]BackupHandler - incrBackupHandlers map[string]BackupHandler + fullBackupHandlers map[string]CancelableBackupHandler + incrBackupHandlers map[string]CancelableBackupHandler } // Backup represents a backup service. @@ -56,11 +56,16 @@ type Backup interface { type BackupHandler interface { // GetStats returns the statistics of the backup job. GetStats() *models.BackupStats - // Wait waits for the backup job to complete and returns an error if the - // job failed. + // Wait waits for the backup job to complete and returns an error if the job failed. Wait(context.Context) error } +type CancelableBackupHandler interface { + BackupHandler + // Cancel cancels the backup operation. + Cancel() +} + // backupMetadataManager handles backup metadata. type backupMetadataManager interface { // writeBackupMetadata writes backup metadata to storage after successful backup. @@ -105,8 +110,8 @@ func newBackupRoutineHandler( retry: newRetryExecutor( backupPolicy.GetRetryPolicyOrDefault(), logger), - fullBackupHandlers: make(map[string]BackupHandler), - incrBackupHandlers: make(map[string]BackupHandler), + fullBackupHandlers: make(map[string]CancelableBackupHandler), + incrBackupHandlers: make(map[string]CancelableBackupHandler), clientManager: clientManager, clusterConfigWriter: NewClusterConfigWriter( backupStorage, @@ -188,7 +193,7 @@ func (h *BackupRoutineHandler) prepareCluster(retry executor) (*backup.Client, [ func (h *BackupRoutineHandler) startNamespaceBackup( ctx context.Context, namespace string, now time.Time, client *backup.Client, -) BackupHandler { +) CancelableBackupHandler { backupFolder := getFullPath(h.routineName, h.backupFullPolicy, namespace, now) timebounds := h.createTimebounds(true, now) @@ -327,7 +332,7 @@ func (h *BackupRoutineHandler) runIncrementalBackupInternal(ctx context.Context, func (h *BackupRoutineHandler) startIncrementalNamespaceBackup( ctx context.Context, namespace string, now time.Time, client *backup.Client, -) BackupHandler { +) CancelableBackupHandler { backupFolder := getIncrementalPathForNamespace(h.routineName, namespace, now) timebounds := h.createTimebounds(false, now) @@ -371,3 +376,13 @@ func (h *BackupRoutineHandler) GetCurrentStat() *model.CurrentBackups { Incremental: currentBackupStatus(h.incrBackupHandlers), } } + +func (h *BackupRoutineHandler) Cancel() { + h.logger.Info("Canceling backup") + for _, handler := range h.fullBackupHandlers { + handler.Cancel() + } + for _, handler := range h.incrBackupHandlers { + handler.Cancel() + } +} diff --git a/pkg/service/backup_routine_handler_test.go b/pkg/service/backup_routine_handler_test.go index b88c6f5b..73690cff 100644 --- a/pkg/service/backup_routine_handler_test.go +++ b/pkg/service/backup_routine_handler_test.go @@ -64,6 +64,8 @@ func (m *mockBackupHandler) Wait(ctx context.Context) error { return args.Error(0) } +func (m *mockBackupHandler) Cancel() {} + type mockMetadataWriter struct { mock.Mock } @@ -104,8 +106,8 @@ func setupTestHandler( SourceCluster: &model.AerospikeCluster{}, }, backupFullPolicy: &model.BackupPolicy{}, - fullBackupHandlers: make(map[string]BackupHandler), - incrBackupHandlers: make(map[string]BackupHandler), + fullBackupHandlers: make(map[string]CancelableBackupHandler), + incrBackupHandlers: make(map[string]CancelableBackupHandler), lastRun: lastBackupRun{}, storage: &model.LocalStorage{Path: "/tmp"}, logger: slog.Default(), diff --git a/pkg/service/estimates.go b/pkg/service/estimates.go index 8d151362..eb430d47 100644 --- a/pkg/service/estimates.go +++ b/pkg/service/estimates.go @@ -6,7 +6,7 @@ import ( "github.com/aerospike/aerospike-backup-service/v2/pkg/model" ) -func currentBackupStatus(handlers map[string]BackupHandler) *model.RunningJob { +func currentBackupStatus(handlers map[string]CancelableBackupHandler) *model.RunningJob { activeHandlers := 0 var ( diff --git a/pkg/service/jobs_holder.go b/pkg/service/jobs_holder.go index 357d1afd..757ba2b5 100644 --- a/pkg/service/jobs_holder.go +++ b/pkg/service/jobs_holder.go @@ -34,7 +34,7 @@ func NewRestoreJobsHolder() *RestoreJobsHolder { // newJob creates a new restore job and return its id. func (h *RestoreJobsHolder) newJob(label string) model.RestoreJobID { // #nosec G404 - id := model.RestoreJobID(rand.Int()) + id := model.RestoreJobID(rand.Int63()) h.Lock() defer h.Unlock() diff --git a/pkg/service/retry_service.go b/pkg/service/retry_service.go index 7840436a..e07293f3 100644 --- a/pkg/service/retry_service.go +++ b/pkg/service/retry_service.go @@ -49,7 +49,7 @@ func (r *retryExecutor) run(label string, f func() error) error { for attempt := uint(1); attempt <= totalAttempts; attempt++ { lastErr = f() if lastErr == nil || errors.Is(lastErr, context.Canceled) { - return nil // success + return lastErr // success } if attempt < r.policy.MaxRetries { // Log and wait only if there are attempts left diff --git a/pkg/service/retryable_backup_handler.go b/pkg/service/retryable_backup_handler.go index 17114423..64d4b10f 100644 --- a/pkg/service/retryable_backup_handler.go +++ b/pkg/service/retryable_backup_handler.go @@ -8,9 +8,12 @@ import ( "github.com/aerospike/backup-go/models" ) +// retryableBackupHandler is a wrapper around BackupHandler that adds +// retry logic and cancellation support type retryableBackupHandler struct { sync.RWMutex handler BackupHandler + cancel context.CancelFunc errCh chan error } @@ -22,9 +25,11 @@ func startBackup( start func(ctx context.Context) (BackupHandler, error), onFail func(ctx context.Context), onSuccess func(ctx context.Context, stats *models.BackupStats) error, -) BackupHandler { +) CancelableBackupHandler { + ctxWithCancel, cancel := context.WithCancel(ctx) h := &retryableBackupHandler{ - errCh: make(chan error, 1), + errCh: make(chan error, 1), + cancel: cancel, } // Helper to retry onSuccess only @@ -49,7 +54,7 @@ func startBackup( h.setHandler(handler) - if err = handler.Wait(ctx); err != nil { + if err = handler.Wait(ctxWithCancel); err != nil { onFail(ctx) h.setHandler(nil) return fmt.Errorf("backup failed: %w", err) @@ -90,3 +95,11 @@ func (h *retryableBackupHandler) GetStats() *models.BackupStats { return nil } + +func (h *retryableBackupHandler) Cancel() { + h.Lock() + defer h.Unlock() + if h.cancel != nil { + h.cancel() + } +} diff --git a/pkg/service/retryable_backup_handler_test.go b/pkg/service/retryable_backup_handler_test.go index 67dd8422..7799f0a1 100644 --- a/pkg/service/retryable_backup_handler_test.go +++ b/pkg/service/retryable_backup_handler_test.go @@ -3,6 +3,7 @@ package service import ( "context" "errors" + "fmt" "log/slog" "sync" "testing" @@ -11,6 +12,7 @@ import ( "github.com/aerospike/backup-go/models" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" ) var retry = newRetryExecutor(models.RetryPolicy{ @@ -195,3 +197,56 @@ func TestStartRetryableBackup_StartFails(t *testing.T) { assert.Equal(t, 0, failureCount) assert.Equal(t, 0, successCount) } + +type mockCancelBackupHandler struct { +} + +func (m *mockCancelBackupHandler) GetStats() *models.BackupStats { + panic("process should be cancelled before GetStats is called") +} + +func (m *mockCancelBackupHandler) Wait(ctx context.Context) error { + select { + case <-time.After(1 * time.Second): + return fmt.Errorf("cancel was not called") + case <-ctx.Done(): + return ctx.Err() + } +} + +func TestStartRetryableBackup_Cancel(t *testing.T) { + ctx := context.Background() + mockHandler := &mockCancelBackupHandler{} + + successCount := 0 + failureCount := 0 + + start := func(_ context.Context) (BackupHandler, error) { + return mockHandler, nil + } + + onFail := func(_ context.Context) { + failureCount++ + } + + onSuccess := func(_ context.Context, _ *models.BackupStats) error { + successCount++ + return nil + } + + handler := startBackup(ctx, retry, start, onFail, onSuccess) + var err error + var wg sync.WaitGroup + wg.Add(1) + go func() { + err = handler.Wait(ctx) + wg.Done() + }() + + handler.Cancel() + wg.Wait() + + require.ErrorIs(t, err, context.Canceled) + require.Equal(t, 1, failureCount) + require.Equal(t, 0, successCount) +}