diff --git a/api-tests/tests/database-cluster.spec.ts b/api-tests/tests/database-cluster.spec.ts index bf82252c..557f219d 100644 --- a/api-tests/tests/database-cluster.spec.ts +++ b/api-tests/tests/database-cluster.spec.ts @@ -20,8 +20,8 @@ const testPrefix = `${(Math.random() + 1).toString(36).substring(10)}` let kubernetesId let recommendedVersion -const monitoringConfigName1 = `${testPrefix}-1` -const monitoringConfigName2 = `${testPrefix}-2` +const monitoringConfigName1 = `a${testPrefix}-1` +const monitoringConfigName2 = `b${testPrefix}-2` test.setTimeout(360 * 1000) @@ -62,7 +62,6 @@ test.beforeAll(async ({ request }) => { test.afterAll(async ({ request }) => { let res = await request.delete(`/v1/monitoring-instances/${monitoringConfigName1}`) - expect(res.ok()).toBeTruthy() res = await request.delete(`/v1/monitoring-instances/${monitoringConfigName2}`) diff --git a/api/backup_storage.go b/api/backup_storage.go index fd78cccd..51ad06e8 100644 --- a/api/backup_storage.go +++ b/api/backup_storage.go @@ -28,7 +28,6 @@ import ( "github.com/pkg/errors" "github.com/percona/percona-everest-backend/model" - "github.com/percona/percona-everest-backend/pkg/configs" "github.com/percona/percona-everest-backend/pkg/kubernetes" ) @@ -138,7 +137,7 @@ func (e *EverestServer) CreateBackupStorage(ctx echo.Context) error { //nolint:f } // DeleteBackupStorage deletes the specified backup storage. -func (e *EverestServer) DeleteBackupStorage(ctx echo.Context, backupStorageName string) error { +func (e *EverestServer) DeleteBackupStorage(ctx echo.Context, backupStorageName string) error { //nolint:cyclop c := ctx.Request().Context() bs, err := e.storage.GetBackupStorage(c, nil, backupStorageName) if err != nil { @@ -151,22 +150,42 @@ func (e *EverestServer) DeleteBackupStorage(ctx echo.Context, backupStorageName }) } + ks, err := e.storage.ListKubernetesClusters(c) + if err != nil { + e.l.Error(errors.Wrap(err, "Could not list Kubernetes clusters")) + return ctx.JSON(http.StatusInternalServerError, Error{Message: pointer.ToString("Could not list Kubernetes clusters")}) + } + if len(ks) == 0 { + return ctx.JSON(http.StatusInternalServerError, Error{Message: pointer.ToString("No registered kubernetes clusters available")}) + } + // FIXME: Revisit it once multi k8s support will be enabled + _, kubeClient, _, err := e.initKubeClient(ctx.Request().Context(), ks[0].ID) + if err != nil { + e.l.Error(errors.Wrap(err, "could not init kube client for config")) + return nil + } + + err = kubeClient.DeleteConfig(ctx.Request().Context(), bs, func(ctx context.Context, name string) (bool, error) { + return kubernetes.IsBackupStorageConfigInUse(ctx, name, kubeClient) + }) + if err != nil && !errors.Is(err, kubernetes.ErrConfigInUse) { + e.l.Error(errors.Wrap(err, "could not delete config")) + return nil + } + err = e.storage.Transaction(func(tx *gorm.DB) error { err := e.storage.DeleteBackupStorage(c, backupStorageName, tx) if err != nil { e.l.Error(err) return errors.New("Could not delete backup storage") } - - ks, err := e.storage.ListKubernetesClusters(c) - if err != nil { - return errors.Wrap(err, "Could not list Kubernetes clusters") + if _, err := e.secretsStorage.DeleteSecret(c, bs.AccessKeyID); err != nil { + return errors.Wrap(err, "could not delete access key from secrets storage") } - go configs.DeleteConfigFromK8sClusters( //nolint:contextcheck - context.Background(), ks, bs, - e.initKubeClient, kubernetes.IsBackupStorageConfigInUse, e.l, - ) + if _, err := e.secretsStorage.DeleteSecret(c, bs.SecretKeyID); err != nil { + return errors.Wrap(err, "could not delete secret key from secrets storage") + } return nil }) @@ -176,18 +195,6 @@ func (e *EverestServer) DeleteBackupStorage(ctx echo.Context, backupStorageName }) } - go func() { - if _, err := e.secretsStorage.DeleteSecret(c, bs.AccessKeyID); err != nil { - e.l.Error(errors.Wrap(err, "could not delete access key from secrets storage")) - } - }() - - go func() { - if _, err := e.secretsStorage.DeleteSecret(c, bs.SecretKeyID); err != nil { - e.l.Error(errors.Wrap(err, "could not delete secret key from secrets storage")) - } - }() - return ctx.NoContent(http.StatusNoContent) } @@ -256,7 +263,6 @@ func (e *EverestServer) performBackupStorageUpdate( c := ctx.Request().Context() httpStatusCode := http.StatusInternalServerError - var bs *model.BackupStorage err := e.storage.Transaction(func(tx *gorm.DB) error { var err error httpStatusCode, err = e.updateBackupStorage(c, tx, backupStorageName, params, newAccessKeyID, newSecretKeyID) @@ -264,22 +270,6 @@ func (e *EverestServer) performBackupStorageUpdate( return err } - bs, err = e.storage.GetBackupStorage(c, tx, backupStorageName) - if err != nil { - e.l.Error(err) - return errors.New("Could not find updated backup storage") - } - - ks, err := e.storage.ListKubernetesClusters(c) - if err != nil { - return errors.Wrap(err, "Could not list Kubernetes clusters") - } - - go configs.UpdateConfigInAllK8sClusters( //nolint:contextcheck - context.Background(), ks, bs, - e.secretsStorage.GetSecret, e.initKubeClient, e.l, - ) - return nil }) if err != nil { @@ -287,6 +277,30 @@ func (e *EverestServer) performBackupStorageUpdate( Message: pointer.ToString(err.Error()), }) } + bs, err := e.storage.GetBackupStorage(c, nil, backupStorageName) + if err != nil { + e.l.Error(err) + return ctx.JSON(http.StatusInternalServerError, Error{Message: pointer.ToString("Could not find updated backup storage")}) + } + ks, err := e.storage.ListKubernetesClusters(c) + if err != nil { + e.l.Error(err) + return ctx.JSON(http.StatusInternalServerError, Error{Message: pointer.ToString("Could not list Kubernetes clusters")}) + } + if len(ks) == 0 { + return ctx.JSON(http.StatusInternalServerError, Error{Message: pointer.ToString("No registered kubernetes clusters available")}) + } + // FIXME: Revisit it once multi k8s support will be enabled + _, kubeClient, _, err := e.initKubeClient(ctx.Request().Context(), ks[0].ID) + if err != nil { + e.l.Error(errors.Wrap(err, "could not init kube client to update config")) + return ctx.JSON(http.StatusInternalServerError, Error{Message: pointer.ToString("Could not init kubernetes client to update config")}) + } + + if err := kubeClient.UpdateConfig(ctx.Request().Context(), bs, e.secretsStorage.GetSecret); err != nil { + e.l.Error(errors.Wrap(err, "could not update config")) + return ctx.JSON(http.StatusInternalServerError, Error{Message: pointer.ToString("Could not update config on the kubernetes cluster")}) + } e.deleteOldSecretsAfterUpdate(c, params, s) diff --git a/api/database_cluster.go b/api/database_cluster.go index 4787f218..4798e76d 100644 --- a/api/database_cluster.go +++ b/api/database_cluster.go @@ -108,9 +108,11 @@ func (e *EverestServer) DeleteDatabaseCluster(ctx echo.Context, kubernetesID str } names := kubernetes.BackupStorageNamesFromDBCluster(db) + e.waitGroup.Add(1) go e.deleteK8SBackupStorages(context.Background(), kubeClient, names) if db.Spec.Monitoring != nil && db.Spec.Monitoring.MonitoringConfigName != "" { + e.waitGroup.Add(1) go e.deleteK8SMonitoringConfig(context.Background(), kubeClient, db.Spec.Monitoring.MonitoringConfigName) } @@ -181,8 +183,9 @@ func (e *EverestServer) UpdateDatabaseCluster(ctx echo.Context, kubernetesID str if ctx.Response().Status >= http.StatusMultipleChoices { return nil } - + e.waitGroup.Add(1) go e.deleteBackupStoragesOnUpdate(context.Background(), kubeClient, oldDB, newBackupNames) + e.waitGroup.Add(1) go e.deleteMonitoringInstanceOnUpdate(context.Background(), kubeClient, oldDB, newMonitoringName) return nil @@ -266,6 +269,7 @@ func (e *EverestServer) rollbackCreatedBackupStorages(ctx context.Context, kubeC func (e *EverestServer) deleteK8SMonitoringConfig( ctx context.Context, kubeClient *kubernetes.Kubernetes, name string, ) { + defer e.waitGroup.Done() i, err := e.storage.GetMonitoringInstance(name) if err != nil { e.l.Error(errors.Wrap(err, "could get monitoring instance")) @@ -284,6 +288,7 @@ func (e *EverestServer) deleteK8SMonitoringConfig( func (e *EverestServer) deleteK8SBackupStorages( ctx context.Context, kubeClient *kubernetes.Kubernetes, names map[string]struct{}, ) { + defer e.waitGroup.Done() for name := range names { bs, err := e.storage.GetBackupStorage(ctx, nil, name) if err != nil { @@ -351,6 +356,7 @@ func (e *EverestServer) deleteBackupStoragesOnUpdate( oldDB *everestv1alpha1.DatabaseCluster, newNames map[string]struct{}, ) { + defer e.waitGroup.Done() oldNames := withBackupStorageNamesFromDBCluster(make(map[string]struct{}), *oldDB) toDelete := uniqueKeys(newNames, oldNames) for name := range toDelete { @@ -393,6 +399,7 @@ func (e *EverestServer) deleteMonitoringInstanceOnUpdate( oldDB *everestv1alpha1.DatabaseCluster, newName string, ) { + defer e.waitGroup.Done() oldName := "" if oldDB.Spec.Monitoring != nil { oldName = oldDB.Spec.Monitoring.MonitoringConfigName diff --git a/api/database_cluster_backup.go b/api/database_cluster_backup.go index edad7351..01efde6d 100644 --- a/api/database_cluster_backup.go +++ b/api/database_cluster_backup.go @@ -113,6 +113,7 @@ func (e *EverestServer) DeleteDatabaseClusterBackup(ctx echo.Context, kubernetes bsNames := map[string]struct{}{ backup.Spec.BackupStorageName: {}, } + e.waitGroup.Add(1) go e.deleteK8SBackupStorages(context.Background(), kubeClient, bsNames) } diff --git a/api/database_cluster_restore.go b/api/database_cluster_restore.go index 82e98ece..5736e364 100644 --- a/api/database_cluster_restore.go +++ b/api/database_cluster_restore.go @@ -114,6 +114,7 @@ func (e *EverestServer) DeleteDatabaseClusterRestore(ctx echo.Context, kubernete bsNames := map[string]struct{}{ restore.Spec.DataSource.BackupSource.BackupStorageName: {}, } + e.waitGroup.Add(1) go e.deleteK8SBackupStorages(context.Background(), kubeClient, bsNames) } @@ -184,7 +185,7 @@ func (e *EverestServer) UpdateDatabaseClusterRestore(ctx echo.Context, kubernete toDeleteNames := map[string]struct{}{ oldRestore.Spec.DataSource.BackupSource.BackupStorageName: {}, } - + e.waitGroup.Add(1) go e.deleteK8SBackupStorages(context.Background(), kubeClient, toDeleteNames) return nil } diff --git a/api/everest.go b/api/everest.go index 5c994dcf..872b5263 100644 --- a/api/everest.go +++ b/api/everest.go @@ -49,6 +49,7 @@ type EverestServer struct { l *zap.SugaredLogger storage storage secretsStorage secretsStorage + waitGroup *sync.WaitGroup echo *echo.Echo } @@ -60,9 +61,10 @@ type List struct { // NewEverestServer creates and configures everest API. func NewEverestServer(c *config.EverestConfig, l *zap.SugaredLogger) (*EverestServer, error) { e := &EverestServer{ - config: c, - l: l, - echo: echo.New(), + config: c, + l: l, + echo: echo.New(), + waitGroup: &sync.WaitGroup{}, } if err := e.initHTTPServer(); err != nil { return e, err @@ -159,11 +161,11 @@ func (e *EverestServer) Shutdown(ctx context.Context) error { } e.l.Info("Shutting down Everest") - wg := sync.WaitGroup{} + e.waitGroup.Wait() - wg.Add(1) + e.waitGroup.Add(1) go func() { - defer wg.Done() + defer e.waitGroup.Done() e.l.Info("Shutting down database storage") if err := e.storage.Close(); err != nil { e.l.Error(errors.Wrap(err, "could not shut down database storage")) @@ -172,9 +174,9 @@ func (e *EverestServer) Shutdown(ctx context.Context) error { } }() - wg.Add(1) + e.waitGroup.Add(1) go func() { - defer wg.Done() + defer e.waitGroup.Done() e.l.Info("Shutting down secrets storage") if err := e.secretsStorage.Close(); err != nil { e.l.Error(errors.Wrap(err, "could not shut down secret storage")) @@ -185,7 +187,7 @@ func (e *EverestServer) Shutdown(ctx context.Context) error { done := make(chan struct{}, 1) go func() { - wg.Wait() + e.waitGroup.Wait() close(done) }() diff --git a/api/kubernetes.go b/api/kubernetes.go index 260b35c7..f9abc75e 100644 --- a/api/kubernetes.go +++ b/api/kubernetes.go @@ -57,13 +57,21 @@ func (e *EverestServer) ListKubernetesClusters(ctx echo.Context) error { // RegisterKubernetesCluster registers a k8s cluster in Everest server. func (e *EverestServer) RegisterKubernetesCluster(ctx echo.Context) error { + list, err := e.storage.ListKubernetesClusters(ctx.Request().Context()) + if err != nil { + e.l.Error(err) + return ctx.JSON(http.StatusBadRequest, Error{Message: pointer.ToString("Could not list Kubernetes clusters")}) + } + if len(list) != 0 { + return ctx.JSON(http.StatusBadRequest, Error{Message: pointer.ToString("Everest does not support multiple kubernetes clusters right now. Please delete the existing cluster before registering a new one")}) + } var params CreateKubernetesClusterParams if err := ctx.Bind(¶ms); err != nil { return ctx.JSON(http.StatusBadRequest, Error{Message: pointer.ToString(err.Error())}) } c := ctx.Request().Context() - _, err := clientcmd.BuildConfigFromKubeconfigGetter("", newConfigGetter(params.Kubeconfig).loadFromString) + _, err = clientcmd.BuildConfigFromKubeconfigGetter("", newConfigGetter(params.Kubeconfig).loadFromString) if err != nil { e.l.Error(err) return ctx.JSON(http.StatusInternalServerError, Error{ @@ -257,24 +265,23 @@ func (e *EverestServer) disableK8sClusterMonitoring(ctx echo.Context, kubeClient }) } - go func() { - ctx := context.Background() - for _, s := range kubeClient.SecretNamesFromVMAgent(vmAgent) { - mcs, err := kubeClient.GetMonitoringConfigsBySecretName(ctx, s) - if err != nil { - e.l.Error(errors.Wrapf(err, "could not list monitoring configs by secret name %s", s)) - continue - } + for _, s := range kubeClient.SecretNamesFromVMAgent(vmAgent) { + mcs, err := kubeClient.GetMonitoringConfigsBySecretName(ctx.Request().Context(), s) + if err != nil { + err = errors.Wrapf(err, "could not list monitoring configs by secret name %s", s) + e.l.Error(err) + return ctx.JSON(http.StatusInternalServerError, Error{Message: pointer.ToString(err.Error())}) + } - for _, mc := range mcs { - err = kubeClient.DeleteMonitoringConfig(ctx, mc.Name, mc.Spec.CredentialsSecretName) - if err != nil && !errors.Is(err, kubernetes.ErrMonitoringConfigInUse) { - e.l.Error(errors.Wrapf(err, "could not delete monitoring config %s from Kubernetes", mc.Name)) - continue - } + for _, mc := range mcs { + err = kubeClient.DeleteMonitoringConfig(ctx.Request().Context(), mc.Name, mc.Spec.CredentialsSecretName) + if err != nil && !errors.Is(err, kubernetes.ErrMonitoringConfigInUse) { + err = errors.Wrapf(err, "could not delete monitoring config %s from Kubernetes", mc.Name) + e.l.Error(err) + return ctx.JSON(http.StatusInternalServerError, Error{Message: pointer.ToString(err.Error())}) } } - }() + } return ctx.NoContent(http.StatusOK) } diff --git a/api/monitoring_instance.go b/api/monitoring_instance.go index aab180fc..c07bacc8 100644 --- a/api/monitoring_instance.go +++ b/api/monitoring_instance.go @@ -28,7 +28,6 @@ import ( "github.com/pkg/errors" "github.com/percona/percona-everest-backend/model" - "github.com/percona/percona-everest-backend/pkg/configs" "github.com/percona/percona-everest-backend/pkg/kubernetes" "github.com/percona/percona-everest-backend/pkg/pmm" ) @@ -70,14 +69,13 @@ func (e *EverestServer) CreateMonitoringInstance(ctx echo.Context) error { APIKeySecretID: apiKeyID, }) if err != nil { - go func() { - _, err := e.secretsStorage.DeleteSecret(ctx.Request().Context(), apiKeyID) - if err != nil { - e.l.Warnf("Could not delete secret %s from secret storage due to error: %s", apiKeyID, err) - } - }() - e.l.Error(err) + + _, err := e.secretsStorage.DeleteSecret(ctx.Request().Context(), apiKeyID) + if err != nil { + e.l.Warnf("Could not delete secret %s from secret storage due to error: %s", apiKeyID, err) + } + return ctx.JSON(http.StatusInternalServerError, Error{Message: pointer.ToString("Could not save monitoring instance")}) } @@ -149,7 +147,7 @@ func (e *EverestServer) UpdateMonitoringInstance(ctx echo.Context, name string) } // DeleteMonitoringInstance deletes a monitoring instance. -func (e *EverestServer) DeleteMonitoringInstance(ctx echo.Context, name string) error { +func (e *EverestServer) DeleteMonitoringInstance(ctx echo.Context, name string) error { //nolint:cyclop i, err := e.storage.GetMonitoringInstance(name) if err != nil && !errors.Is(err, gorm.ErrRecordNotFound) { e.l.Error(err) @@ -163,22 +161,39 @@ func (e *EverestServer) DeleteMonitoringInstance(ctx echo.Context, name string) }) } + ks, err := e.storage.ListKubernetesClusters(ctx.Request().Context()) + if err != nil { + return errors.Wrap(err, "Could not list Kubernetes clusters") + } + if len(ks) == 0 { + return ctx.JSON(http.StatusInternalServerError, Error{Message: pointer.ToString("No registered kubernetes clusters available")}) + } + // FIXME: Revisit it once multi k8s support will be enabled + _, kubeClient, _, err := e.initKubeClient(ctx.Request().Context(), ks[0].ID) + if err != nil { + e.l.Error(errors.Wrap(err, "could not init kube client")) + return ctx.JSON(http.StatusInternalServerError, Error{Message: pointer.ToString("Could not make connection to the kubernetes cluster")}) + } + + err = kubeClient.DeleteConfig(ctx.Request().Context(), i, func(ctx context.Context, name string) (bool, error) { + return kubernetes.IsMonitoringConfigInUse(ctx, name, kubeClient) + }) + if err != nil && !errors.Is(err, kubernetes.ErrConfigInUse) { + e.l.Error(errors.Wrap(err, "could not delete monitoring config from kubernetes cluster")) + return ctx.JSON(http.StatusInternalServerError, Error{Message: pointer.ToString("Could not delete monitoring config from the Kubernetes cluster")}) + } + err = e.storage.Transaction(func(tx *gorm.DB) error { if err := e.storage.DeleteMonitoringInstance(i.Name, tx); err != nil { e.l.Error(err) return errors.New("Could not delete monitoring instance") } - ks, err := e.storage.ListKubernetesClusters(ctx.Request().Context()) + _, err = e.secretsStorage.DeleteSecret(context.Background(), i.APIKeySecretID) if err != nil { - return errors.Wrap(err, "Could not list Kubernetes clusters") + return errors.Wrapf(err, "could not delete monitoring instance API key secret %s", i.APIKeySecretID) } - go configs.DeleteConfigFromK8sClusters( - context.Background(), ks, i, - e.initKubeClient, kubernetes.IsMonitoringConfigInUse, e.l, - ) - return nil }) if err != nil { @@ -187,13 +202,6 @@ func (e *EverestServer) DeleteMonitoringInstance(ctx echo.Context, name string) }) } - go func() { - _, err := e.secretsStorage.DeleteSecret(context.Background(), i.APIKeySecretID) - if err != nil { - e.l.Warn(errors.Wrapf(err, "could not delete monitoring instance API key secret %s", i.APIKeySecretID)) - } - }() - return ctx.NoContent(http.StatusNoContent) } @@ -229,24 +237,28 @@ func (e *EverestServer) createAndStorePMMApiKey(ctx context.Context, name, url, return apiKeyID, nil } -func (e *EverestServer) performMonitoringInstanceUpdate( +func (e *EverestServer) performMonitoringInstanceUpdate( //nolint:cyclop ctx echo.Context, name string, apiKeyID *string, previousAPIKeyID string, params *UpdateMonitoringInstanceJSONRequestBody, ) error { var monitoringInstance *model.MonitoringInstance err := e.storage.Transaction(func(tx *gorm.DB) error { - err := e.storage.UpdateMonitoringInstance(name, model.UpdateMonitoringInstanceParams{ + ks, err := e.storage.ListKubernetesClusters(ctx.Request().Context()) + if err != nil { + return errors.Wrap(err, "Could not list Kubernetes clusters") + } + if len(ks) == 0 { + return errors.New("No registered Kubernetes clusters available") + } + err = e.storage.UpdateMonitoringInstance(name, model.UpdateMonitoringInstanceParams{ Type: (*model.MonitoringInstanceType)(¶ms.Type), URL: ¶ms.Url, APIKeySecretID: apiKeyID, }) if err != nil { - go func() { - _, err := e.secretsStorage.DeleteSecret(ctx.Request().Context(), *apiKeyID) - if err != nil { - e.l.Warnf("Could not delete secret %s from secret storage due to error: %s", apiKeyID, err) - } - }() + if _, err := e.secretsStorage.DeleteSecret(ctx.Request().Context(), *apiKeyID); err != nil { + return errors.Wrapf(err, "Could not delete secret %s from secret storage", *apiKeyID) + } e.l.Error(err) return errors.New("Could not update monitoring instance") @@ -257,16 +269,24 @@ func (e *EverestServer) performMonitoringInstanceUpdate( e.l.Error(err) return errors.New("Could not find updated monitoring instance") } - - ks, err := e.storage.ListKubernetesClusters(ctx.Request().Context()) + // FIXME: Revisit it once multi k8s support will be enabled + // FIXME: This is not recommended to do network calls in a database transaction + // This will be removed during the implementation of multi k8s support + // However, right now it guarantees data consistency + _, kubeClient, _, err := e.initKubeClient(ctx.Request().Context(), ks[0].ID) if err != nil { - return errors.Wrap(err, "Could not list Kubernetes clusters") + return errors.Wrap(err, "could not init kube client to update config") } - go configs.UpdateConfigInAllK8sClusters( - context.Background(), ks, monitoringInstance, - e.secretsStorage.GetSecret, e.initKubeClient, e.l, - ) + if err := kubeClient.UpdateConfig(ctx.Request().Context(), monitoringInstance, e.secretsStorage.GetSecret); err != nil { + return errors.Wrap(err, "could not update config") + } + + if apiKeyID != nil { + if _, err := e.secretsStorage.DeleteSecret(context.Background(), previousAPIKeyID); err != nil { + return errors.Wrapf(err, "could not delete monitoring instance api key secret %s", previousAPIKeyID) + } + } return nil }) @@ -276,14 +296,5 @@ func (e *EverestServer) performMonitoringInstanceUpdate( }) } - if apiKeyID != nil { - go func() { - _, err := e.secretsStorage.DeleteSecret(context.Background(), previousAPIKeyID) - if err != nil { - e.l.Warn(errors.Wrapf(err, "could not delete monitoring instance api key secret %s", previousAPIKeyID)) - } - }() - } - return ctx.JSON(http.StatusOK, e.monitoringInstanceToAPIJson(monitoringInstance)) } diff --git a/pkg/configs/configs.go b/pkg/configs/configs.go index 37e2250d..9805335e 100644 --- a/pkg/configs/configs.go +++ b/pkg/configs/configs.go @@ -43,8 +43,9 @@ func DeleteConfigFromK8sClusters( initKubeClient initKubeClientFn, isInUse isInUseFn, l *zap.SugaredLogger, + wg *sync.WaitGroup, ) { - wg := &sync.WaitGroup{} + defer wg.Done() // Delete configs in all k8s clusters for _, k := range kubernetesClusters { @@ -75,10 +76,9 @@ func DeleteConfigFromK8sClusters( // UpdateConfigInAllK8sClusters updates config resources in all the provided Kubernetes clusters. func UpdateConfigInAllK8sClusters( ctx context.Context, kubernetesClusters []model.KubernetesCluster, cfg kubernetes.ConfigK8sResourcer, - getSecret getSecretFn, initKubeClient initKubeClientFn, l *zap.SugaredLogger, + getSecret getSecretFn, initKubeClient initKubeClientFn, l *zap.SugaredLogger, wg *sync.WaitGroup, ) { - wg := &sync.WaitGroup{} - + defer wg.Done() // Update configs in all k8s clusters for _, k := range kubernetesClusters { k := k diff --git a/pkg/kubernetes/client/client.go b/pkg/kubernetes/client/client.go index c27a5681..88dfeb24 100644 --- a/pkg/kubernetes/client/client.go +++ b/pkg/kubernetes/client/client.go @@ -17,6 +17,8 @@ package client import ( + "time" + "github.com/pkg/errors" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -65,6 +67,7 @@ func NewFromKubeConfig(kubeconfig []byte, namespace string) (*Client, error) { config.QPS = defaultQPSLimit config.Burst = defaultBurstLimit + config.Timeout = 10 * time.Second clientset, err := kubernetes.NewForConfig(config) if err != nil { return nil, err