Skip to content
This repository has been archived by the owner on Mar 4, 2024. It is now read-only.

Commit

Permalink
Fix graceful shutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
gen1us2k committed Sep 14, 2023
1 parent 72a120c commit b1ae742
Show file tree
Hide file tree
Showing 8 changed files with 78 additions and 66 deletions.
28 changes: 14 additions & 14 deletions api/backup_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,10 +162,10 @@ func (e *EverestServer) DeleteBackupStorage(ctx echo.Context, backupStorageName
if err != nil {
return errors.Wrap(err, "Could not list Kubernetes clusters")
}

e.waitGroup.Add(1)
go configs.DeleteConfigFromK8sClusters( //nolint:contextcheck
context.Background(), ks, bs,
e.initKubeClient, kubernetes.IsBackupStorageConfigInUse, e.l,
e.initKubeClient, kubernetes.IsBackupStorageConfigInUse, e.l, e.waitGroup,
)

return nil
Expand All @@ -176,17 +176,17 @@ func (e *EverestServer) DeleteBackupStorage(ctx echo.Context, backupStorageName
})
}

go func() {
if _, err := e.secretsStorage.DeleteSecret(c, bs.AccessKeyID); err != nil {
e.l.Error(errors.Wrap(err, "could not delete access key from secrets storage"))
}
}()
if _, err := e.secretsStorage.DeleteSecret(c, bs.AccessKeyID); err != nil {
err = errors.Wrap(err, "could not delete access key from secrets storage")
e.l.Error(err)
return ctx.JSON(http.StatusInternalServerError, Error{Message: pointer.ToString(err.Error())})
}

go func() {
if _, err := e.secretsStorage.DeleteSecret(c, bs.SecretKeyID); err != nil {
e.l.Error(errors.Wrap(err, "could not delete secret key from secrets storage"))
}
}()
if _, err := e.secretsStorage.DeleteSecret(c, bs.SecretKeyID); err != nil {
err = errors.Wrap(err, "could not delete secret key from secrets storage")
e.l.Error(err)
return ctx.JSON(http.StatusInternalServerError, Error{Message: pointer.ToString(err.Error())})
}

return ctx.NoContent(http.StatusNoContent)
}
Expand Down Expand Up @@ -274,10 +274,10 @@ func (e *EverestServer) performBackupStorageUpdate(
if err != nil {
return errors.Wrap(err, "Could not list Kubernetes clusters")
}

e.waitGroup.Add(1)
go configs.UpdateConfigInAllK8sClusters( //nolint:contextcheck
context.Background(), ks, bs,
e.secretsStorage.GetSecret, e.initKubeClient, e.l,
e.secretsStorage.GetSecret, e.initKubeClient, e.l, e.waitGroup,
)

return nil
Expand Down
9 changes: 8 additions & 1 deletion api/database_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,11 @@ func (e *EverestServer) DeleteDatabaseCluster(ctx echo.Context, kubernetesID str
}

names := kubernetes.BackupStorageNamesFromDBCluster(db)
e.waitGroup.Add(1)
go e.deleteK8SBackupStorages(context.Background(), kubeClient, names)

if db.Spec.Monitoring != nil && db.Spec.Monitoring.MonitoringConfigName != "" {
e.waitGroup.Add(1)
go e.deleteK8SMonitoringConfig(context.Background(), kubeClient, db.Spec.Monitoring.MonitoringConfigName)
}

Expand Down Expand Up @@ -181,8 +183,9 @@ func (e *EverestServer) UpdateDatabaseCluster(ctx echo.Context, kubernetesID str
if ctx.Response().Status >= http.StatusMultipleChoices {
return nil
}

e.waitGroup.Add(1)
go e.deleteBackupStoragesOnUpdate(context.Background(), kubeClient, oldDB, newBackupNames)
e.waitGroup.Add(1)
go e.deleteMonitoringInstanceOnUpdate(context.Background(), kubeClient, oldDB, newMonitoringName)

return nil
Expand Down Expand Up @@ -266,6 +269,7 @@ func (e *EverestServer) rollbackCreatedBackupStorages(ctx context.Context, kubeC
func (e *EverestServer) deleteK8SMonitoringConfig(
ctx context.Context, kubeClient *kubernetes.Kubernetes, name string,
) {
defer e.waitGroup.Done()
i, err := e.storage.GetMonitoringInstance(name)
if err != nil {
e.l.Error(errors.Wrap(err, "could get monitoring instance"))
Expand All @@ -284,6 +288,7 @@ func (e *EverestServer) deleteK8SMonitoringConfig(
func (e *EverestServer) deleteK8SBackupStorages(
ctx context.Context, kubeClient *kubernetes.Kubernetes, names map[string]struct{},
) {
defer e.waitGroup.Done()
for name := range names {
bs, err := e.storage.GetBackupStorage(ctx, nil, name)
if err != nil {
Expand Down Expand Up @@ -351,6 +356,7 @@ func (e *EverestServer) deleteBackupStoragesOnUpdate(
oldDB *everestv1alpha1.DatabaseCluster,
newNames map[string]struct{},
) {
defer e.waitGroup.Done()
oldNames := withBackupStorageNamesFromDBCluster(make(map[string]struct{}), *oldDB)
toDelete := uniqueKeys(newNames, oldNames)
for name := range toDelete {
Expand Down Expand Up @@ -393,6 +399,7 @@ func (e *EverestServer) deleteMonitoringInstanceOnUpdate(
oldDB *everestv1alpha1.DatabaseCluster,
newName string,
) {
defer e.waitGroup.Done()
oldName := ""
if oldDB.Spec.Monitoring != nil {
oldName = oldDB.Spec.Monitoring.MonitoringConfigName
Expand Down
1 change: 1 addition & 0 deletions api/database_cluster_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ func (e *EverestServer) DeleteDatabaseClusterBackup(ctx echo.Context, kubernetes
bsNames := map[string]struct{}{
backup.Spec.BackupStorageName: {},
}
e.waitGroup.Add(1)
go e.deleteK8SBackupStorages(context.Background(), kubeClient, bsNames)
}

Expand Down
3 changes: 2 additions & 1 deletion api/database_cluster_restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ func (e *EverestServer) DeleteDatabaseClusterRestore(ctx echo.Context, kubernete
bsNames := map[string]struct{}{
restore.Spec.DataSource.BackupSource.BackupStorageName: {},
}
e.waitGroup.Add(1)
go e.deleteK8SBackupStorages(context.Background(), kubeClient, bsNames)
}

Expand Down Expand Up @@ -184,7 +185,7 @@ func (e *EverestServer) UpdateDatabaseClusterRestore(ctx echo.Context, kubernete
toDeleteNames := map[string]struct{}{
oldRestore.Spec.DataSource.BackupSource.BackupStorageName: {},
}

e.waitGroup.Add(1)
go e.deleteK8SBackupStorages(context.Background(), kubeClient, toDeleteNames)
return nil
}
20 changes: 11 additions & 9 deletions api/everest.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ type EverestServer struct {
l *zap.SugaredLogger
storage storage
secretsStorage secretsStorage
waitGroup *sync.WaitGroup
echo *echo.Echo
}

Expand All @@ -60,9 +61,10 @@ type List struct {
// NewEverestServer creates and configures everest API.
func NewEverestServer(c *config.EverestConfig, l *zap.SugaredLogger) (*EverestServer, error) {
e := &EverestServer{
config: c,
l: l,
echo: echo.New(),
config: c,
l: l,
echo: echo.New(),
waitGroup: &sync.WaitGroup{},
}
if err := e.initHTTPServer(); err != nil {
return e, err
Expand Down Expand Up @@ -159,11 +161,11 @@ func (e *EverestServer) Shutdown(ctx context.Context) error {
}

e.l.Info("Shutting down Everest")
wg := sync.WaitGroup{}
e.waitGroup.Wait()

wg.Add(1)
e.waitGroup.Add(1)
go func() {
defer wg.Done()
defer e.waitGroup.Done()
e.l.Info("Shutting down database storage")
if err := e.storage.Close(); err != nil {
e.l.Error(errors.Wrap(err, "could not shut down database storage"))
Expand All @@ -172,9 +174,9 @@ func (e *EverestServer) Shutdown(ctx context.Context) error {
}
}()

wg.Add(1)
e.waitGroup.Add(1)
go func() {
defer wg.Done()
defer e.waitGroup.Done()
e.l.Info("Shutting down secrets storage")
if err := e.secretsStorage.Close(); err != nil {
e.l.Error(errors.Wrap(err, "could not shut down secret storage"))
Expand All @@ -185,7 +187,7 @@ func (e *EverestServer) Shutdown(ctx context.Context) error {

done := make(chan struct{}, 1)
go func() {
wg.Wait()
e.waitGroup.Wait()
close(done)
}()

Expand Down
29 changes: 14 additions & 15 deletions api/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,24 +257,23 @@ func (e *EverestServer) disableK8sClusterMonitoring(ctx echo.Context, kubeClient
})
}

go func() {
ctx := context.Background()
for _, s := range kubeClient.SecretNamesFromVMAgent(vmAgent) {
mcs, err := kubeClient.GetMonitoringConfigsBySecretName(ctx, s)
if err != nil {
e.l.Error(errors.Wrapf(err, "could not list monitoring configs by secret name %s", s))
continue
}
for _, s := range kubeClient.SecretNamesFromVMAgent(vmAgent) {
mcs, err := kubeClient.GetMonitoringConfigsBySecretName(ctx.Request().Context(), s)
if err != nil {
err = errors.Wrapf(err, "could not list monitoring configs by secret name %s", s)
e.l.Error(err)
return ctx.JSON(http.StatusInternalServerError, Error{Message: pointer.ToString(err.Error())})
}

for _, mc := range mcs {
err = kubeClient.DeleteMonitoringConfig(ctx, mc.Name, mc.Spec.CredentialsSecretName)
if err != nil && !errors.Is(err, kubernetes.ErrMonitoringConfigInUse) {
e.l.Error(errors.Wrapf(err, "could not delete monitoring config %s from Kubernetes", mc.Name))
continue
}
for _, mc := range mcs {
err = kubeClient.DeleteMonitoringConfig(ctx.Request().Context(), mc.Name, mc.Spec.CredentialsSecretName)
if err != nil && !errors.Is(err, kubernetes.ErrMonitoringConfigInUse) {
err = errors.Wrapf(err, "could not delete monitoring config %s from Kubernetes", mc.Name)
e.l.Error(err)
return ctx.JSON(http.StatusInternalServerError, Error{Message: pointer.ToString(err.Error())})
}
}
}()
}

return ctx.NoContent(http.StatusOK)
}
Expand Down
46 changes: 24 additions & 22 deletions api/monitoring_instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,14 +70,13 @@ func (e *EverestServer) CreateMonitoringInstance(ctx echo.Context) error {
APIKeySecretID: apiKeyID,
})
if err != nil {
go func() {
_, err := e.secretsStorage.DeleteSecret(ctx.Request().Context(), apiKeyID)
if err != nil {
e.l.Warnf("Could not delete secret %s from secret storage due to error: %s", apiKeyID, err)
}
}()

e.l.Error(err)

_, err := e.secretsStorage.DeleteSecret(ctx.Request().Context(), apiKeyID)
if err != nil {
e.l.Warnf("Could not delete secret %s from secret storage due to error: %s", apiKeyID, err)
}

return ctx.JSON(http.StatusInternalServerError, Error{Message: pointer.ToString("Could not save monitoring instance")})
}

Expand Down Expand Up @@ -174,9 +173,10 @@ func (e *EverestServer) DeleteMonitoringInstance(ctx echo.Context, name string)
return errors.Wrap(err, "Could not list Kubernetes clusters")
}

e.waitGroup.Add(1)
go configs.DeleteConfigFromK8sClusters(
context.Background(), ks, i,
e.initKubeClient, kubernetes.IsMonitoringConfigInUse, e.l,
e.initKubeClient, kubernetes.IsMonitoringConfigInUse, e.l, e.waitGroup,
)

return nil
Expand All @@ -187,12 +187,12 @@ func (e *EverestServer) DeleteMonitoringInstance(ctx echo.Context, name string)
})
}

go func() {
_, err := e.secretsStorage.DeleteSecret(context.Background(), i.APIKeySecretID)
if err != nil {
e.l.Warn(errors.Wrapf(err, "could not delete monitoring instance API key secret %s", i.APIKeySecretID))
}
}()
_, err = e.secretsStorage.DeleteSecret(context.Background(), i.APIKeySecretID)
if err != nil {
err = errors.Wrapf(err, "could not delete monitoring instance API key secret %s", i.APIKeySecretID)
e.l.Warn(err)
return ctx.JSON(http.StatusInternalServerError, Error{Message: pointer.ToString(err.Error())})
}

return ctx.NoContent(http.StatusNoContent)
}
Expand Down Expand Up @@ -241,7 +241,9 @@ func (e *EverestServer) performMonitoringInstanceUpdate(
APIKeySecretID: apiKeyID,
})
if err != nil {
e.waitGroup.Add(1)
go func() {
defer e.waitGroup.Done()
_, err := e.secretsStorage.DeleteSecret(ctx.Request().Context(), *apiKeyID)
if err != nil {
e.l.Warnf("Could not delete secret %s from secret storage due to error: %s", apiKeyID, err)
Expand All @@ -262,10 +264,10 @@ func (e *EverestServer) performMonitoringInstanceUpdate(
if err != nil {
return errors.Wrap(err, "Could not list Kubernetes clusters")
}

e.waitGroup.Add(1)
go configs.UpdateConfigInAllK8sClusters(
context.Background(), ks, monitoringInstance,
e.secretsStorage.GetSecret, e.initKubeClient, e.l,
e.secretsStorage.GetSecret, e.initKubeClient, e.l, e.waitGroup,
)

return nil
Expand All @@ -277,12 +279,12 @@ func (e *EverestServer) performMonitoringInstanceUpdate(
}

if apiKeyID != nil {
go func() {
_, err := e.secretsStorage.DeleteSecret(context.Background(), previousAPIKeyID)
if err != nil {
e.l.Warn(errors.Wrapf(err, "could not delete monitoring instance api key secret %s", previousAPIKeyID))
}
}()
_, err := e.secretsStorage.DeleteSecret(context.Background(), previousAPIKeyID)
if err != nil {
err = errors.Wrapf(err, "could not delete monitoring instance api key secret %s", previousAPIKeyID)
e.l.Warn(err)
return ctx.JSON(http.StatusInternalServerError, Error{Message: pointer.ToString(err.Error())})
}
}

return ctx.JSON(http.StatusOK, e.monitoringInstanceToAPIJson(monitoringInstance))
Expand Down
8 changes: 4 additions & 4 deletions pkg/configs/configs.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,9 @@ func DeleteConfigFromK8sClusters(
initKubeClient initKubeClientFn,
isInUse isInUseFn,
l *zap.SugaredLogger,
wg *sync.WaitGroup,
) {
wg := &sync.WaitGroup{}
defer wg.Done()

// Delete configs in all k8s clusters
for _, k := range kubernetesClusters {
Expand Down Expand Up @@ -75,10 +76,9 @@ func DeleteConfigFromK8sClusters(
// UpdateConfigInAllK8sClusters updates config resources in all the provided Kubernetes clusters.
func UpdateConfigInAllK8sClusters(
ctx context.Context, kubernetesClusters []model.KubernetesCluster, cfg kubernetes.ConfigK8sResourcer,
getSecret getSecretFn, initKubeClient initKubeClientFn, l *zap.SugaredLogger,
getSecret getSecretFn, initKubeClient initKubeClientFn, l *zap.SugaredLogger, wg *sync.WaitGroup,
) {
wg := &sync.WaitGroup{}

defer wg.Done()
// Update configs in all k8s clusters
for _, k := range kubernetesClusters {
k := k
Expand Down

0 comments on commit b1ae742

Please sign in to comment.