diff --git a/api/backup_storage.go b/api/backup_storage.go index fd78cccd..76838f87 100644 --- a/api/backup_storage.go +++ b/api/backup_storage.go @@ -162,10 +162,10 @@ func (e *EverestServer) DeleteBackupStorage(ctx echo.Context, backupStorageName if err != nil { return errors.Wrap(err, "Could not list Kubernetes clusters") } - + e.waitGroup.Add(1) go configs.DeleteConfigFromK8sClusters( //nolint:contextcheck context.Background(), ks, bs, - e.initKubeClient, kubernetes.IsBackupStorageConfigInUse, e.l, + e.initKubeClient, kubernetes.IsBackupStorageConfigInUse, e.l, e.waitGroup, ) return nil @@ -176,17 +176,17 @@ func (e *EverestServer) DeleteBackupStorage(ctx echo.Context, backupStorageName }) } - go func() { - if _, err := e.secretsStorage.DeleteSecret(c, bs.AccessKeyID); err != nil { - e.l.Error(errors.Wrap(err, "could not delete access key from secrets storage")) - } - }() + if _, err := e.secretsStorage.DeleteSecret(c, bs.AccessKeyID); err != nil { + err = errors.Wrap(err, "could not delete access key from secrets storage") + e.l.Error(err) + return ctx.JSON(http.StatusInternalServerError, Error{Message: pointer.ToString(err.Error())}) + } - go func() { - if _, err := e.secretsStorage.DeleteSecret(c, bs.SecretKeyID); err != nil { - e.l.Error(errors.Wrap(err, "could not delete secret key from secrets storage")) - } - }() + if _, err := e.secretsStorage.DeleteSecret(c, bs.SecretKeyID); err != nil { + err = errors.Wrap(err, "could not delete secret key from secrets storage") + e.l.Error(err) + return ctx.JSON(http.StatusInternalServerError, Error{Message: pointer.ToString(err.Error())}) + } return ctx.NoContent(http.StatusNoContent) } @@ -274,10 +274,10 @@ func (e *EverestServer) performBackupStorageUpdate( if err != nil { return errors.Wrap(err, "Could not list Kubernetes clusters") } - + e.waitGroup.Add(1) go configs.UpdateConfigInAllK8sClusters( //nolint:contextcheck context.Background(), ks, bs, - e.secretsStorage.GetSecret, e.initKubeClient, e.l, + e.secretsStorage.GetSecret, e.initKubeClient, e.l, e.waitGroup, ) return nil diff --git a/api/database_cluster.go b/api/database_cluster.go index 4787f218..4798e76d 100644 --- a/api/database_cluster.go +++ b/api/database_cluster.go @@ -108,9 +108,11 @@ func (e *EverestServer) DeleteDatabaseCluster(ctx echo.Context, kubernetesID str } names := kubernetes.BackupStorageNamesFromDBCluster(db) + e.waitGroup.Add(1) go e.deleteK8SBackupStorages(context.Background(), kubeClient, names) if db.Spec.Monitoring != nil && db.Spec.Monitoring.MonitoringConfigName != "" { + e.waitGroup.Add(1) go e.deleteK8SMonitoringConfig(context.Background(), kubeClient, db.Spec.Monitoring.MonitoringConfigName) } @@ -181,8 +183,9 @@ func (e *EverestServer) UpdateDatabaseCluster(ctx echo.Context, kubernetesID str if ctx.Response().Status >= http.StatusMultipleChoices { return nil } - + e.waitGroup.Add(1) go e.deleteBackupStoragesOnUpdate(context.Background(), kubeClient, oldDB, newBackupNames) + e.waitGroup.Add(1) go e.deleteMonitoringInstanceOnUpdate(context.Background(), kubeClient, oldDB, newMonitoringName) return nil @@ -266,6 +269,7 @@ func (e *EverestServer) rollbackCreatedBackupStorages(ctx context.Context, kubeC func (e *EverestServer) deleteK8SMonitoringConfig( ctx context.Context, kubeClient *kubernetes.Kubernetes, name string, ) { + defer e.waitGroup.Done() i, err := e.storage.GetMonitoringInstance(name) if err != nil { e.l.Error(errors.Wrap(err, "could get monitoring instance")) @@ -284,6 +288,7 @@ func (e *EverestServer) deleteK8SMonitoringConfig( func (e *EverestServer) deleteK8SBackupStorages( ctx context.Context, kubeClient *kubernetes.Kubernetes, names map[string]struct{}, ) { + defer e.waitGroup.Done() for name := range names { bs, err := e.storage.GetBackupStorage(ctx, nil, name) if err != nil { @@ -351,6 +356,7 @@ func (e *EverestServer) deleteBackupStoragesOnUpdate( oldDB *everestv1alpha1.DatabaseCluster, newNames map[string]struct{}, ) { + defer e.waitGroup.Done() oldNames := withBackupStorageNamesFromDBCluster(make(map[string]struct{}), *oldDB) toDelete := uniqueKeys(newNames, oldNames) for name := range toDelete { @@ -393,6 +399,7 @@ func (e *EverestServer) deleteMonitoringInstanceOnUpdate( oldDB *everestv1alpha1.DatabaseCluster, newName string, ) { + defer e.waitGroup.Done() oldName := "" if oldDB.Spec.Monitoring != nil { oldName = oldDB.Spec.Monitoring.MonitoringConfigName diff --git a/api/database_cluster_backup.go b/api/database_cluster_backup.go index edad7351..01efde6d 100644 --- a/api/database_cluster_backup.go +++ b/api/database_cluster_backup.go @@ -113,6 +113,7 @@ func (e *EverestServer) DeleteDatabaseClusterBackup(ctx echo.Context, kubernetes bsNames := map[string]struct{}{ backup.Spec.BackupStorageName: {}, } + e.waitGroup.Add(1) go e.deleteK8SBackupStorages(context.Background(), kubeClient, bsNames) } diff --git a/api/database_cluster_restore.go b/api/database_cluster_restore.go index 82e98ece..5736e364 100644 --- a/api/database_cluster_restore.go +++ b/api/database_cluster_restore.go @@ -114,6 +114,7 @@ func (e *EverestServer) DeleteDatabaseClusterRestore(ctx echo.Context, kubernete bsNames := map[string]struct{}{ restore.Spec.DataSource.BackupSource.BackupStorageName: {}, } + e.waitGroup.Add(1) go e.deleteK8SBackupStorages(context.Background(), kubeClient, bsNames) } @@ -184,7 +185,7 @@ func (e *EverestServer) UpdateDatabaseClusterRestore(ctx echo.Context, kubernete toDeleteNames := map[string]struct{}{ oldRestore.Spec.DataSource.BackupSource.BackupStorageName: {}, } - + e.waitGroup.Add(1) go e.deleteK8SBackupStorages(context.Background(), kubeClient, toDeleteNames) return nil } diff --git a/api/everest.go b/api/everest.go index 5c994dcf..872b5263 100644 --- a/api/everest.go +++ b/api/everest.go @@ -49,6 +49,7 @@ type EverestServer struct { l *zap.SugaredLogger storage storage secretsStorage secretsStorage + waitGroup *sync.WaitGroup echo *echo.Echo } @@ -60,9 +61,10 @@ type List struct { // NewEverestServer creates and configures everest API. func NewEverestServer(c *config.EverestConfig, l *zap.SugaredLogger) (*EverestServer, error) { e := &EverestServer{ - config: c, - l: l, - echo: echo.New(), + config: c, + l: l, + echo: echo.New(), + waitGroup: &sync.WaitGroup{}, } if err := e.initHTTPServer(); err != nil { return e, err @@ -159,11 +161,11 @@ func (e *EverestServer) Shutdown(ctx context.Context) error { } e.l.Info("Shutting down Everest") - wg := sync.WaitGroup{} + e.waitGroup.Wait() - wg.Add(1) + e.waitGroup.Add(1) go func() { - defer wg.Done() + defer e.waitGroup.Done() e.l.Info("Shutting down database storage") if err := e.storage.Close(); err != nil { e.l.Error(errors.Wrap(err, "could not shut down database storage")) @@ -172,9 +174,9 @@ func (e *EverestServer) Shutdown(ctx context.Context) error { } }() - wg.Add(1) + e.waitGroup.Add(1) go func() { - defer wg.Done() + defer e.waitGroup.Done() e.l.Info("Shutting down secrets storage") if err := e.secretsStorage.Close(); err != nil { e.l.Error(errors.Wrap(err, "could not shut down secret storage")) @@ -185,7 +187,7 @@ func (e *EverestServer) Shutdown(ctx context.Context) error { done := make(chan struct{}, 1) go func() { - wg.Wait() + e.waitGroup.Wait() close(done) }() diff --git a/api/kubernetes.go b/api/kubernetes.go index 260b35c7..36ed8ccc 100644 --- a/api/kubernetes.go +++ b/api/kubernetes.go @@ -257,24 +257,23 @@ func (e *EverestServer) disableK8sClusterMonitoring(ctx echo.Context, kubeClient }) } - go func() { - ctx := context.Background() - for _, s := range kubeClient.SecretNamesFromVMAgent(vmAgent) { - mcs, err := kubeClient.GetMonitoringConfigsBySecretName(ctx, s) - if err != nil { - e.l.Error(errors.Wrapf(err, "could not list monitoring configs by secret name %s", s)) - continue - } + for _, s := range kubeClient.SecretNamesFromVMAgent(vmAgent) { + mcs, err := kubeClient.GetMonitoringConfigsBySecretName(ctx.Request().Context(), s) + if err != nil { + err = errors.Wrapf(err, "could not list monitoring configs by secret name %s", s) + e.l.Error(err) + return ctx.JSON(http.StatusInternalServerError, Error{Message: pointer.ToString(err.Error())}) + } - for _, mc := range mcs { - err = kubeClient.DeleteMonitoringConfig(ctx, mc.Name, mc.Spec.CredentialsSecretName) - if err != nil && !errors.Is(err, kubernetes.ErrMonitoringConfigInUse) { - e.l.Error(errors.Wrapf(err, "could not delete monitoring config %s from Kubernetes", mc.Name)) - continue - } + for _, mc := range mcs { + err = kubeClient.DeleteMonitoringConfig(ctx.Request().Context(), mc.Name, mc.Spec.CredentialsSecretName) + if err != nil && !errors.Is(err, kubernetes.ErrMonitoringConfigInUse) { + err = errors.Wrapf(err, "could not delete monitoring config %s from Kubernetes", mc.Name) + e.l.Error(err) + return ctx.JSON(http.StatusInternalServerError, Error{Message: pointer.ToString(err.Error())}) } } - }() + } return ctx.NoContent(http.StatusOK) } diff --git a/api/monitoring_instance.go b/api/monitoring_instance.go index aab180fc..04dec2b7 100644 --- a/api/monitoring_instance.go +++ b/api/monitoring_instance.go @@ -70,14 +70,13 @@ func (e *EverestServer) CreateMonitoringInstance(ctx echo.Context) error { APIKeySecretID: apiKeyID, }) if err != nil { - go func() { - _, err := e.secretsStorage.DeleteSecret(ctx.Request().Context(), apiKeyID) - if err != nil { - e.l.Warnf("Could not delete secret %s from secret storage due to error: %s", apiKeyID, err) - } - }() - e.l.Error(err) + + _, err := e.secretsStorage.DeleteSecret(ctx.Request().Context(), apiKeyID) + if err != nil { + e.l.Warnf("Could not delete secret %s from secret storage due to error: %s", apiKeyID, err) + } + return ctx.JSON(http.StatusInternalServerError, Error{Message: pointer.ToString("Could not save monitoring instance")}) } @@ -174,9 +173,10 @@ func (e *EverestServer) DeleteMonitoringInstance(ctx echo.Context, name string) return errors.Wrap(err, "Could not list Kubernetes clusters") } + e.waitGroup.Add(1) go configs.DeleteConfigFromK8sClusters( context.Background(), ks, i, - e.initKubeClient, kubernetes.IsMonitoringConfigInUse, e.l, + e.initKubeClient, kubernetes.IsMonitoringConfigInUse, e.l, e.waitGroup, ) return nil @@ -187,12 +187,12 @@ func (e *EverestServer) DeleteMonitoringInstance(ctx echo.Context, name string) }) } - go func() { - _, err := e.secretsStorage.DeleteSecret(context.Background(), i.APIKeySecretID) - if err != nil { - e.l.Warn(errors.Wrapf(err, "could not delete monitoring instance API key secret %s", i.APIKeySecretID)) - } - }() + _, err = e.secretsStorage.DeleteSecret(context.Background(), i.APIKeySecretID) + if err != nil { + err = errors.Wrapf(err, "could not delete monitoring instance API key secret %s", i.APIKeySecretID) + e.l.Warn(err) + return ctx.JSON(http.StatusInternalServerError, Error{Message: pointer.ToString(err.Error())}) + } return ctx.NoContent(http.StatusNoContent) } @@ -241,7 +241,9 @@ func (e *EverestServer) performMonitoringInstanceUpdate( APIKeySecretID: apiKeyID, }) if err != nil { + e.waitGroup.Add(1) go func() { + defer e.waitGroup.Done() _, err := e.secretsStorage.DeleteSecret(ctx.Request().Context(), *apiKeyID) if err != nil { e.l.Warnf("Could not delete secret %s from secret storage due to error: %s", apiKeyID, err) @@ -262,10 +264,10 @@ func (e *EverestServer) performMonitoringInstanceUpdate( if err != nil { return errors.Wrap(err, "Could not list Kubernetes clusters") } - + e.waitGroup.Add(1) go configs.UpdateConfigInAllK8sClusters( context.Background(), ks, monitoringInstance, - e.secretsStorage.GetSecret, e.initKubeClient, e.l, + e.secretsStorage.GetSecret, e.initKubeClient, e.l, e.waitGroup, ) return nil @@ -277,12 +279,12 @@ func (e *EverestServer) performMonitoringInstanceUpdate( } if apiKeyID != nil { - go func() { - _, err := e.secretsStorage.DeleteSecret(context.Background(), previousAPIKeyID) - if err != nil { - e.l.Warn(errors.Wrapf(err, "could not delete monitoring instance api key secret %s", previousAPIKeyID)) - } - }() + _, err := e.secretsStorage.DeleteSecret(context.Background(), previousAPIKeyID) + if err != nil { + err = errors.Wrapf(err, "could not delete monitoring instance api key secret %s", previousAPIKeyID) + e.l.Warn(err) + return ctx.JSON(http.StatusInternalServerError, Error{Message: pointer.ToString(err.Error())}) + } } return ctx.JSON(http.StatusOK, e.monitoringInstanceToAPIJson(monitoringInstance)) diff --git a/pkg/configs/configs.go b/pkg/configs/configs.go index 37e2250d..9805335e 100644 --- a/pkg/configs/configs.go +++ b/pkg/configs/configs.go @@ -43,8 +43,9 @@ func DeleteConfigFromK8sClusters( initKubeClient initKubeClientFn, isInUse isInUseFn, l *zap.SugaredLogger, + wg *sync.WaitGroup, ) { - wg := &sync.WaitGroup{} + defer wg.Done() // Delete configs in all k8s clusters for _, k := range kubernetesClusters { @@ -75,10 +76,9 @@ func DeleteConfigFromK8sClusters( // UpdateConfigInAllK8sClusters updates config resources in all the provided Kubernetes clusters. func UpdateConfigInAllK8sClusters( ctx context.Context, kubernetesClusters []model.KubernetesCluster, cfg kubernetes.ConfigK8sResourcer, - getSecret getSecretFn, initKubeClient initKubeClientFn, l *zap.SugaredLogger, + getSecret getSecretFn, initKubeClient initKubeClientFn, l *zap.SugaredLogger, wg *sync.WaitGroup, ) { - wg := &sync.WaitGroup{} - + defer wg.Done() // Update configs in all k8s clusters for _, k := range kubernetesClusters { k := k