Skip to content
This repository has been archived by the owner on Mar 4, 2024. It is now read-only.

Commit

Permalink
EVEREST-435 Ensure Backend does not run uncontrollable goroutines (#158)
Browse files Browse the repository at this point in the history
Co-authored-by: Diogo Recharte <[email protected]>
  • Loading branch information
gen1us2k and recharte authored Sep 15, 2023
1 parent 3861bc5 commit 2195205
Show file tree
Hide file tree
Showing 10 changed files with 165 additions and 120 deletions.
5 changes: 2 additions & 3 deletions api-tests/tests/database-cluster.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ const testPrefix = `${(Math.random() + 1).toString(36).substring(10)}`

let kubernetesId
let recommendedVersion
const monitoringConfigName1 = `${testPrefix}-1`
const monitoringConfigName2 = `${testPrefix}-2`
const monitoringConfigName1 = `a${testPrefix}-1`
const monitoringConfigName2 = `b${testPrefix}-2`

test.setTimeout(360 * 1000)

Expand Down Expand Up @@ -62,7 +62,6 @@ test.beforeAll(async ({ request }) => {

test.afterAll(async ({ request }) => {
let res = await request.delete(`/v1/monitoring-instances/${monitoringConfigName1}`)

expect(res.ok()).toBeTruthy()

res = await request.delete(`/v1/monitoring-instances/${monitoringConfigName2}`)
Expand Down
92 changes: 53 additions & 39 deletions api/backup_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"github.com/pkg/errors"

"github.com/percona/percona-everest-backend/model"
"github.com/percona/percona-everest-backend/pkg/configs"
"github.com/percona/percona-everest-backend/pkg/kubernetes"
)

Expand Down Expand Up @@ -138,7 +137,7 @@ func (e *EverestServer) CreateBackupStorage(ctx echo.Context) error { //nolint:f
}

// DeleteBackupStorage deletes the specified backup storage.
func (e *EverestServer) DeleteBackupStorage(ctx echo.Context, backupStorageName string) error {
func (e *EverestServer) DeleteBackupStorage(ctx echo.Context, backupStorageName string) error { //nolint:cyclop
c := ctx.Request().Context()
bs, err := e.storage.GetBackupStorage(c, nil, backupStorageName)
if err != nil {
Expand All @@ -151,22 +150,42 @@ func (e *EverestServer) DeleteBackupStorage(ctx echo.Context, backupStorageName
})
}

ks, err := e.storage.ListKubernetesClusters(c)
if err != nil {
e.l.Error(errors.Wrap(err, "Could not list Kubernetes clusters"))
return ctx.JSON(http.StatusInternalServerError, Error{Message: pointer.ToString("Could not list Kubernetes clusters")})
}
if len(ks) == 0 {
return ctx.JSON(http.StatusInternalServerError, Error{Message: pointer.ToString("No registered kubernetes clusters available")})
}
// FIXME: Revisit it once multi k8s support will be enabled
_, kubeClient, _, err := e.initKubeClient(ctx.Request().Context(), ks[0].ID)
if err != nil {
e.l.Error(errors.Wrap(err, "could not init kube client for config"))
return nil
}

err = kubeClient.DeleteConfig(ctx.Request().Context(), bs, func(ctx context.Context, name string) (bool, error) {
return kubernetes.IsBackupStorageConfigInUse(ctx, name, kubeClient)
})
if err != nil && !errors.Is(err, kubernetes.ErrConfigInUse) {
e.l.Error(errors.Wrap(err, "could not delete config"))
return nil
}

err = e.storage.Transaction(func(tx *gorm.DB) error {
err := e.storage.DeleteBackupStorage(c, backupStorageName, tx)
if err != nil {
e.l.Error(err)
return errors.New("Could not delete backup storage")
}

ks, err := e.storage.ListKubernetesClusters(c)
if err != nil {
return errors.Wrap(err, "Could not list Kubernetes clusters")
if _, err := e.secretsStorage.DeleteSecret(c, bs.AccessKeyID); err != nil {
return errors.Wrap(err, "could not delete access key from secrets storage")
}

go configs.DeleteConfigFromK8sClusters( //nolint:contextcheck
context.Background(), ks, bs,
e.initKubeClient, kubernetes.IsBackupStorageConfigInUse, e.l,
)
if _, err := e.secretsStorage.DeleteSecret(c, bs.SecretKeyID); err != nil {
return errors.Wrap(err, "could not delete secret key from secrets storage")
}

return nil
})
Expand All @@ -176,18 +195,6 @@ func (e *EverestServer) DeleteBackupStorage(ctx echo.Context, backupStorageName
})
}

go func() {
if _, err := e.secretsStorage.DeleteSecret(c, bs.AccessKeyID); err != nil {
e.l.Error(errors.Wrap(err, "could not delete access key from secrets storage"))
}
}()

go func() {
if _, err := e.secretsStorage.DeleteSecret(c, bs.SecretKeyID); err != nil {
e.l.Error(errors.Wrap(err, "could not delete secret key from secrets storage"))
}
}()

return ctx.NoContent(http.StatusNoContent)
}

Expand Down Expand Up @@ -256,37 +263,44 @@ func (e *EverestServer) performBackupStorageUpdate(
c := ctx.Request().Context()

httpStatusCode := http.StatusInternalServerError
var bs *model.BackupStorage
err := e.storage.Transaction(func(tx *gorm.DB) error {
var err error
httpStatusCode, err = e.updateBackupStorage(c, tx, backupStorageName, params, newAccessKeyID, newSecretKeyID)
if err != nil {
return err
}

bs, err = e.storage.GetBackupStorage(c, tx, backupStorageName)
if err != nil {
e.l.Error(err)
return errors.New("Could not find updated backup storage")
}

ks, err := e.storage.ListKubernetesClusters(c)
if err != nil {
return errors.Wrap(err, "Could not list Kubernetes clusters")
}

go configs.UpdateConfigInAllK8sClusters( //nolint:contextcheck
context.Background(), ks, bs,
e.secretsStorage.GetSecret, e.initKubeClient, e.l,
)

return nil
})
if err != nil {
return ctx.JSON(httpStatusCode, Error{
Message: pointer.ToString(err.Error()),
})
}
bs, err := e.storage.GetBackupStorage(c, nil, backupStorageName)
if err != nil {
e.l.Error(err)
return ctx.JSON(http.StatusInternalServerError, Error{Message: pointer.ToString("Could not find updated backup storage")})
}
ks, err := e.storage.ListKubernetesClusters(c)
if err != nil {
e.l.Error(err)
return ctx.JSON(http.StatusInternalServerError, Error{Message: pointer.ToString("Could not list Kubernetes clusters")})
}
if len(ks) == 0 {
return ctx.JSON(http.StatusInternalServerError, Error{Message: pointer.ToString("No registered kubernetes clusters available")})
}
// FIXME: Revisit it once multi k8s support will be enabled
_, kubeClient, _, err := e.initKubeClient(ctx.Request().Context(), ks[0].ID)
if err != nil {
e.l.Error(errors.Wrap(err, "could not init kube client to update config"))
return ctx.JSON(http.StatusInternalServerError, Error{Message: pointer.ToString("Could not init kubernetes client to update config")})
}

if err := kubeClient.UpdateConfig(ctx.Request().Context(), bs, e.secretsStorage.GetSecret); err != nil {
e.l.Error(errors.Wrap(err, "could not update config"))
return ctx.JSON(http.StatusInternalServerError, Error{Message: pointer.ToString("Could not update config on the kubernetes cluster")})
}

e.deleteOldSecretsAfterUpdate(c, params, s)

Expand Down
9 changes: 8 additions & 1 deletion api/database_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,11 @@ func (e *EverestServer) DeleteDatabaseCluster(ctx echo.Context, kubernetesID str
}

names := kubernetes.BackupStorageNamesFromDBCluster(db)
e.waitGroup.Add(1)
go e.deleteK8SBackupStorages(context.Background(), kubeClient, names)

if db.Spec.Monitoring != nil && db.Spec.Monitoring.MonitoringConfigName != "" {
e.waitGroup.Add(1)
go e.deleteK8SMonitoringConfig(context.Background(), kubeClient, db.Spec.Monitoring.MonitoringConfigName)
}

Expand Down Expand Up @@ -181,8 +183,9 @@ func (e *EverestServer) UpdateDatabaseCluster(ctx echo.Context, kubernetesID str
if ctx.Response().Status >= http.StatusMultipleChoices {
return nil
}

e.waitGroup.Add(1)
go e.deleteBackupStoragesOnUpdate(context.Background(), kubeClient, oldDB, newBackupNames)
e.waitGroup.Add(1)
go e.deleteMonitoringInstanceOnUpdate(context.Background(), kubeClient, oldDB, newMonitoringName)

return nil
Expand Down Expand Up @@ -266,6 +269,7 @@ func (e *EverestServer) rollbackCreatedBackupStorages(ctx context.Context, kubeC
func (e *EverestServer) deleteK8SMonitoringConfig(
ctx context.Context, kubeClient *kubernetes.Kubernetes, name string,
) {
defer e.waitGroup.Done()
i, err := e.storage.GetMonitoringInstance(name)
if err != nil {
e.l.Error(errors.Wrap(err, "could get monitoring instance"))
Expand All @@ -284,6 +288,7 @@ func (e *EverestServer) deleteK8SMonitoringConfig(
func (e *EverestServer) deleteK8SBackupStorages(
ctx context.Context, kubeClient *kubernetes.Kubernetes, names map[string]struct{},
) {
defer e.waitGroup.Done()
for name := range names {
bs, err := e.storage.GetBackupStorage(ctx, nil, name)
if err != nil {
Expand Down Expand Up @@ -351,6 +356,7 @@ func (e *EverestServer) deleteBackupStoragesOnUpdate(
oldDB *everestv1alpha1.DatabaseCluster,
newNames map[string]struct{},
) {
defer e.waitGroup.Done()
oldNames := withBackupStorageNamesFromDBCluster(make(map[string]struct{}), *oldDB)
toDelete := uniqueKeys(newNames, oldNames)
for name := range toDelete {
Expand Down Expand Up @@ -393,6 +399,7 @@ func (e *EverestServer) deleteMonitoringInstanceOnUpdate(
oldDB *everestv1alpha1.DatabaseCluster,
newName string,
) {
defer e.waitGroup.Done()
oldName := ""
if oldDB.Spec.Monitoring != nil {
oldName = oldDB.Spec.Monitoring.MonitoringConfigName
Expand Down
1 change: 1 addition & 0 deletions api/database_cluster_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ func (e *EverestServer) DeleteDatabaseClusterBackup(ctx echo.Context, kubernetes
bsNames := map[string]struct{}{
backup.Spec.BackupStorageName: {},
}
e.waitGroup.Add(1)
go e.deleteK8SBackupStorages(context.Background(), kubeClient, bsNames)
}

Expand Down
3 changes: 2 additions & 1 deletion api/database_cluster_restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ func (e *EverestServer) DeleteDatabaseClusterRestore(ctx echo.Context, kubernete
bsNames := map[string]struct{}{
restore.Spec.DataSource.BackupSource.BackupStorageName: {},
}
e.waitGroup.Add(1)
go e.deleteK8SBackupStorages(context.Background(), kubeClient, bsNames)
}

Expand Down Expand Up @@ -184,7 +185,7 @@ func (e *EverestServer) UpdateDatabaseClusterRestore(ctx echo.Context, kubernete
toDeleteNames := map[string]struct{}{
oldRestore.Spec.DataSource.BackupSource.BackupStorageName: {},
}

e.waitGroup.Add(1)
go e.deleteK8SBackupStorages(context.Background(), kubeClient, toDeleteNames)
return nil
}
20 changes: 11 additions & 9 deletions api/everest.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ type EverestServer struct {
l *zap.SugaredLogger
storage storage
secretsStorage secretsStorage
waitGroup *sync.WaitGroup
echo *echo.Echo
}

Expand All @@ -60,9 +61,10 @@ type List struct {
// NewEverestServer creates and configures everest API.
func NewEverestServer(c *config.EverestConfig, l *zap.SugaredLogger) (*EverestServer, error) {
e := &EverestServer{
config: c,
l: l,
echo: echo.New(),
config: c,
l: l,
echo: echo.New(),
waitGroup: &sync.WaitGroup{},
}
if err := e.initHTTPServer(); err != nil {
return e, err
Expand Down Expand Up @@ -159,11 +161,11 @@ func (e *EverestServer) Shutdown(ctx context.Context) error {
}

e.l.Info("Shutting down Everest")
wg := sync.WaitGroup{}
e.waitGroup.Wait()

wg.Add(1)
e.waitGroup.Add(1)
go func() {
defer wg.Done()
defer e.waitGroup.Done()
e.l.Info("Shutting down database storage")
if err := e.storage.Close(); err != nil {
e.l.Error(errors.Wrap(err, "could not shut down database storage"))
Expand All @@ -172,9 +174,9 @@ func (e *EverestServer) Shutdown(ctx context.Context) error {
}
}()

wg.Add(1)
e.waitGroup.Add(1)
go func() {
defer wg.Done()
defer e.waitGroup.Done()
e.l.Info("Shutting down secrets storage")
if err := e.secretsStorage.Close(); err != nil {
e.l.Error(errors.Wrap(err, "could not shut down secret storage"))
Expand All @@ -185,7 +187,7 @@ func (e *EverestServer) Shutdown(ctx context.Context) error {

done := make(chan struct{}, 1)
go func() {
wg.Wait()
e.waitGroup.Wait()
close(done)
}()

Expand Down
39 changes: 23 additions & 16 deletions api/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,21 @@ func (e *EverestServer) ListKubernetesClusters(ctx echo.Context) error {

// RegisterKubernetesCluster registers a k8s cluster in Everest server.
func (e *EverestServer) RegisterKubernetesCluster(ctx echo.Context) error {
list, err := e.storage.ListKubernetesClusters(ctx.Request().Context())
if err != nil {
e.l.Error(err)
return ctx.JSON(http.StatusBadRequest, Error{Message: pointer.ToString("Could not list Kubernetes clusters")})
}
if len(list) != 0 {
return ctx.JSON(http.StatusBadRequest, Error{Message: pointer.ToString("Everest does not support multiple kubernetes clusters right now. Please delete the existing cluster before registering a new one")})
}
var params CreateKubernetesClusterParams
if err := ctx.Bind(&params); err != nil {
return ctx.JSON(http.StatusBadRequest, Error{Message: pointer.ToString(err.Error())})
}
c := ctx.Request().Context()

_, err := clientcmd.BuildConfigFromKubeconfigGetter("", newConfigGetter(params.Kubeconfig).loadFromString)
_, err = clientcmd.BuildConfigFromKubeconfigGetter("", newConfigGetter(params.Kubeconfig).loadFromString)
if err != nil {
e.l.Error(err)
return ctx.JSON(http.StatusInternalServerError, Error{
Expand Down Expand Up @@ -257,24 +265,23 @@ func (e *EverestServer) disableK8sClusterMonitoring(ctx echo.Context, kubeClient
})
}

go func() {
ctx := context.Background()
for _, s := range kubeClient.SecretNamesFromVMAgent(vmAgent) {
mcs, err := kubeClient.GetMonitoringConfigsBySecretName(ctx, s)
if err != nil {
e.l.Error(errors.Wrapf(err, "could not list monitoring configs by secret name %s", s))
continue
}
for _, s := range kubeClient.SecretNamesFromVMAgent(vmAgent) {
mcs, err := kubeClient.GetMonitoringConfigsBySecretName(ctx.Request().Context(), s)
if err != nil {
err = errors.Wrapf(err, "could not list monitoring configs by secret name %s", s)
e.l.Error(err)
return ctx.JSON(http.StatusInternalServerError, Error{Message: pointer.ToString(err.Error())})
}

for _, mc := range mcs {
err = kubeClient.DeleteMonitoringConfig(ctx, mc.Name, mc.Spec.CredentialsSecretName)
if err != nil && !errors.Is(err, kubernetes.ErrMonitoringConfigInUse) {
e.l.Error(errors.Wrapf(err, "could not delete monitoring config %s from Kubernetes", mc.Name))
continue
}
for _, mc := range mcs {
err = kubeClient.DeleteMonitoringConfig(ctx.Request().Context(), mc.Name, mc.Spec.CredentialsSecretName)
if err != nil && !errors.Is(err, kubernetes.ErrMonitoringConfigInUse) {
err = errors.Wrapf(err, "could not delete monitoring config %s from Kubernetes", mc.Name)
e.l.Error(err)
return ctx.JSON(http.StatusInternalServerError, Error{Message: pointer.ToString(err.Error())})
}
}
}()
}

return ctx.NoContent(http.StatusOK)
}
Expand Down
Loading

0 comments on commit 2195205

Please sign in to comment.