From 91f5d8fce1d31b1cf71afa021200ef52d86741e7 Mon Sep 17 00:00:00 2001 From: Meng Yan Date: Tue, 14 Jan 2025 11:43:35 +0800 Subject: [PATCH] :sparkles: Init postgres user and database from annotation (#1322) * add postgres annotation Signed-off-by: myan * add the update Signed-off-by: myan * add it Signed-off-by: myan * fix the sonar Signed-off-by: myan * add review Signed-off-by: myan --------- Signed-off-by: myan --- operator/pkg/config/storage_config.go | 4 + operator/pkg/constants/constants.go | 3 + .../controllers/storage/storage_reconciler.go | 271 +++++++++++++++--- .../operator/controllers/storage_test.go | 37 ++- 4 files changed, 270 insertions(+), 45 deletions(-) diff --git a/operator/pkg/config/storage_config.go b/operator/pkg/config/storage_config.go index 2a68e58ad..a17707c3c 100644 --- a/operator/pkg/config/storage_config.go +++ b/operator/pkg/config/storage_config.go @@ -65,6 +65,10 @@ func IsBYOPostgres() bool { return isBYOPostgres } +func SetBYOPostgres(byo bool) { + isBYOPostgres = byo +} + func SetDatabaseReady(ready bool) { databaseReady = ready } diff --git a/operator/pkg/constants/constants.go b/operator/pkg/constants/constants.go index 09f23111c..7dbdeae3f 100644 --- a/operator/pkg/constants/constants.go +++ b/operator/pkg/constants/constants.go @@ -75,6 +75,9 @@ const ( // development environments, where is is convenient to reduce the poll interval. The value should be a string // that can be parsed with the time.ParseDuration function. AnnotationMGHWithStackroxPollInterval = "global-hub.open-cluster-management.io/with-stackrox-poll-interval" + + // AnnotationBuiltInPostgresUser indicates to create the postgres users and databases based on the value configuration + AnnotationBuiltInPostgresUsers = "global-hub.open-cluster-management.io/postgres-users" ) // hub installation constants diff --git a/operator/pkg/controllers/storage/storage_reconciler.go b/operator/pkg/controllers/storage/storage_reconciler.go index 3199b451d..e283721fc 100644 --- a/operator/pkg/controllers/storage/storage_reconciler.go +++ b/operator/pkg/controllers/storage/storage_reconciler.go @@ -4,6 +4,7 @@ import ( "context" "crypto/rand" "embed" + "encoding/json" "fmt" iofs "io/fs" "math/big" @@ -22,12 +23,14 @@ import ( ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/predicate" "github.com/stolostron/multicluster-global-hub/operator/api/operator/v1alpha4" "github.com/stolostron/multicluster-global-hub/operator/pkg/config" + operatorconstants "github.com/stolostron/multicluster-global-hub/operator/pkg/constants" "github.com/stolostron/multicluster-global-hub/operator/pkg/utils" "github.com/stolostron/multicluster-global-hub/pkg/constants" "github.com/stolostron/multicluster-global-hub/pkg/database" @@ -78,8 +81,10 @@ var WatchedConfigMap = sets.NewString( ) var ( - storageReconciler *StorageReconciler - updateConnection bool + storageReconciler *StorageReconciler + updateConnection bool + appliedPgUserValue string + postgresUserNameTemplate = "postgresql-user-%s" ) func (r *StorageReconciler) IsResourceRemoved() bool { @@ -207,7 +212,7 @@ func (r *StorageReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct } updateConnection = config.SetStorageConnection(storageConn) - needRequeue, err := r.reconcileDatabase(ctx, mgh) + needRequeue, err := r.ReconcileDatabase(ctx, mgh) if err != nil { reconcileErr = fmt.Errorf("database not ready, Error: %v", err) return ctrl.Result{}, reconcileErr @@ -282,84 +287,261 @@ func (r *StorageReconciler) ReconcileStorage(ctx context.Context, return pgConnection, nil } -func (r *StorageReconciler) reconcileDatabase(ctx context.Context, mgh *v1alpha4.MulticlusterGlobalHub) (bool, error) { - var reconcileErr error +func (r *StorageReconciler) ReconcileDatabase(ctx context.Context, mgh *v1alpha4.MulticlusterGlobalHub) (bool, error) { + var conn *pgx.Conn + var err error + + postgresUsersValue := mgh.Annotations[operatorconstants.AnnotationBuiltInPostgresUsers] + // Don't reconcile, or create the connection, when + // 1. postgres users isn't updated + // 2. database has been initialized + if (postgresUsersValue == "" || postgresUsersValue == appliedPgUserValue) && r.databaseReconcileCount > 0 { + return false, nil + } + + defer func() { + if conn != nil { + if err := conn.Close(ctx); err != nil { + log.Error(err, "failed to close connection to database") + } + } + }() + storageConn := config.GetStorageConnection() if storageConn == nil { - reconcileErr = fmt.Errorf("storage connection is nil") - return true, reconcileErr + return false, fmt.Errorf("storage connection is nil") } - // if the operator is restarted, reconcile the database again - if r.databaseReconcileCount > 0 { - return false, nil + conn, err = database.PostgresConnection(ctx, storageConn.SuperuserDatabaseURI, storageConn.CACert) + if err != nil { + log.Infof("wait database ready, failed to connect database: %v", err) + return true, nil + } + + // apply the init users + if !config.IsBYOPostgres() && postgresUsersValue != "" && postgresUsersValue != appliedPgUserValue { + if err = r.applyPostgresUsers(ctx, conn, postgresUsersValue, mgh); err != nil { + return false, err + } + log.Info("applied the annotation postgres users successfully!") + appliedPgUserValue = postgresUsersValue + } + + // apply the global hub init SQL when the operator restarted + if r.databaseReconcileCount == 0 { + err = r.applyGlobalHubInitSQL(ctx, conn, storageConn.ReadonlyUserDatabaseURI) + if err != nil { + return false, err + } + log.Debug("global hub database initialized") + r.databaseReconcileCount++ + } + + return false, nil +} + +func (r *StorageReconciler) applyPostgresUsers(ctx context.Context, conn *pgx.Conn, pgUsersValue string, + mgh *v1alpha4.MulticlusterGlobalHub, +) error { + var postgresUsers []AnnotationPGUser + err := json.Unmarshal([]byte(pgUsersValue), &postgresUsers) + if err != nil { + return fmt.Errorf("failed to unmarshal postgres users from annotations: %v", err) } - conn, err := database.PostgresConnection(ctx, storageConn.SuperuserDatabaseURI, storageConn.CACert) + for _, user := range postgresUsers { + // create postgres user + pwd, err := r.createPostgresUser(ctx, conn, user) + if err != nil { + return fmt.Errorf("error creating postgres user %s: %v", user.Name, err) + } + // create database and add permission for the user + for _, db := range user.Databases { + err = r.createDatabaseIfNotExists(ctx, conn, db) + if err != nil { + return fmt.Errorf("error creating database %s: %v", db, err) + } + err = r.grantPermissions(ctx, conn, user.Name, db) + if err != nil { + return fmt.Errorf("failed to grant permissions to user %s on database %s: %v", user.Name, db, err) + } + } + // create the secret for the postgres user and databases + if err = r.createPostgresUserSecret(ctx, user, mgh, pwd); err != nil { + return fmt.Errorf("error creating postgres user secret %v", err) + } + } + return nil +} + +func (r *StorageReconciler) createDatabaseIfNotExists(ctx context.Context, conn *pgx.Conn, dbName string) error { + var exists bool + err := conn.QueryRow(ctx, `SELECT EXISTS(SELECT 1 FROM pg_database WHERE datname = $1);`, dbName).Scan(&exists) if err != nil { - reconcileErr = fmt.Errorf("failed to connect to database: %v", err) - log.Infof("wait database ready, %v", reconcileErr) - return true, nil + return fmt.Errorf("error checking if database %s exists: %v", dbName, err) } - defer func() { - if err := conn.Close(ctx); err != nil { - log.Error(err, "failed to close connection to database") + if !exists { + createDBQuery := fmt.Sprintf("CREATE DATABASE %s;", dbName) + _, err := conn.Exec(ctx, createDBQuery) + if err != nil { + return fmt.Errorf("error creating database %s: %v", dbName, err) } - }() + log.Infof("database %s created.", dbName) + } else { + log.Infof("database %s already exists.", dbName) + } + + return nil +} +// createPostgresUser return the password of the created user, if the password is empty if the user is already existing +func (r *StorageReconciler) createPostgresUser(ctx context.Context, conn *pgx.Conn, user AnnotationPGUser, +) (string, error) { + var roleExists bool + err := conn.QueryRow(ctx, "SELECT EXISTS(SELECT 1 FROM pg_catalog.pg_roles WHERE rolname = $1)", + user.Name).Scan(&roleExists) + if err != nil { + return "", fmt.Errorf("error checking if role exists: %v", err) + } + + password := "" + if roleExists { + // updatePasswordQuery := fmt.Sprintf("ALTER ROLE \"%s\" WITH PASSWORD '%s';", user.Name, password) + // _, err = conn.Exec(ctx, updatePasswordQuery) + // if err != nil { + // return fmt.Errorf("error updating password for role %s: %v", user.Name, err) + // } + log.Infof("postgres user already exist: %s", user.Name) + } else { + password = generatePassword(16) + createRoleQuery := fmt.Sprintf("CREATE ROLE \"%s\" LOGIN PASSWORD '%s';", user.Name, password) + _, err = conn.Exec(ctx, createRoleQuery) + if err != nil { + return "", fmt.Errorf("error creating role %s password: %v", user.Name, err) + } + log.Infof("create postgres user: %s", user.Name) + } + + return password, nil +} + +func (r *StorageReconciler) createPostgresUserSecret(ctx context.Context, user AnnotationPGUser, + mgh *v1alpha4.MulticlusterGlobalHub, password string, +) error { + userSecret := &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf(postgresUserNameTemplate, user.Name), + Namespace: mgh.Namespace, + }, + } + err := r.GetClient().Get(ctx, client.ObjectKeyFromObject(userSecret), userSecret) + if err != nil && !errors.IsNotFound(err) { + return err + } + + // update the databases if exists + if err == nil { + log.Infof("the postgresql user secret already exists: %s", userSecret.Name) + previousDatabases := userSecret.Data["databases"] + currentDatabases := strings.Join(user.Databases, ",") + if string(previousDatabases) != currentDatabases { + userSecret.Data["databases"] = []byte(currentDatabases) + err = r.GetClient().Update(ctx, userSecret) + if err != nil { + return fmt.Errorf("failed to updating postgres user secret %s, err %v", user.Name, err) + } + log.Infof("update the postgres user secret databases: %s", userSecret.Name) + } + return nil + } + + // create secret + storageConn := config.GetStorageConnection() + pgConfig, err := pgx.ParseConfig(storageConn.SuperuserDatabaseURI) + if err != nil { + return fmt.Errorf("failed the parse the supper user database URI") + } + userSecret.Data = map[string][]byte{ + "db.host": []byte(pgConfig.Host), + "db.port": []byte(fmt.Sprintf("%d", pgConfig.Port)), + "db.user": []byte(user.Name), + "databases": []byte(strings.Join(user.Databases, ",")), + "ca": storageConn.CACert, + } + if password != "" { + userSecret.Data["db.password"] = []byte(password) + } + err = controllerutil.SetControllerReference(mgh, userSecret, r.Manager.GetScheme()) + if err != nil { + return fmt.Errorf("failed to add the owner reference to the user secret: %s", userSecret.Name) + } + err = r.GetClient().Create(ctx, userSecret) + log.Infof("create the postgresql user secret: %s", userSecret.Name) + if err != nil { + return fmt.Errorf("failed to create the postgresql user secret: %s", userSecret.Name) + } + + return nil +} + +func (r *StorageReconciler) grantPermissions(ctx context.Context, conn *pgx.Conn, user, dbName string) error { + grantQuery := fmt.Sprintf("GRANT ALL PRIVILEGES ON DATABASE %s TO %s;", dbName, user) + _, err := conn.Exec(ctx, grantQuery) + if err != nil { + return fmt.Errorf("error granting permissions to user %s on database %s: %v", user, dbName, err) + } + log.Infof("granted all privileges to user %s on database %s.\n", user, dbName) + return nil +} + +func (r *StorageReconciler) applyGlobalHubInitSQL(ctx context.Context, conn *pgx.Conn, readonlyUserURI string) error { // Check if backup is enabled var backupEnabled bool - backupEnabled, reconcileErr = commonutils.IsBackupEnabled(ctx, r.GetClient()) - if reconcileErr != nil { - log.Error(reconcileErr, "failed to get backup status") - return true, reconcileErr + backupEnabled, err := commonutils.IsBackupEnabled(ctx, r.GetClient()) + if err != nil { + return fmt.Errorf("failed to get the backup status: %v", err) } if backupEnabled || !r.upgrade { lockSql := fmt.Sprintf("select pg_advisory_lock(%s)", constants.LockId) unLockSql := fmt.Sprintf("select pg_advisory_unlock(%s)", constants.LockId) defer func() { - _, reconcileErr = conn.Exec(ctx, unLockSql) - if reconcileErr != nil { - log.Error(reconcileErr, "failed to unlock db") + _, err = conn.Exec(ctx, unLockSql) + if err != nil { + log.Errorf("failed to unlock db: %v", err) } }() - _, reconcileErr = conn.Exec(ctx, lockSql) - if reconcileErr != nil { - log.Error(reconcileErr, "failed to parse database_uri_with_readonlyuser") - return true, reconcileErr + _, err = conn.Exec(ctx, lockSql) + if err != nil { + return fmt.Errorf("failed to parse database_uri_with_readonlyuser: %v", err) } } - objURI, err := url.Parse(storageConn.ReadonlyUserDatabaseURI) + objURI, err := url.Parse(readonlyUserURI) if err != nil { log.Error(err, "failed to parse database_uri_with_readonlyuser") } readonlyUsername := objURI.User.Username() - if reconcileErr = applySQL(ctx, conn, databaseFS, "database", readonlyUsername); reconcileErr != nil { - return true, reconcileErr + if err = applySQL(ctx, conn, databaseFS, "database", readonlyUsername); err != nil { + return fmt.Errorf("failed to apply the database sql: %v", err) } if r.enableGlobalResource { - if reconcileErr = applySQL(ctx, conn, databaseOldFS, "database.old", readonlyUsername); reconcileErr != nil { - return true, reconcileErr + if err = applySQL(ctx, conn, databaseOldFS, "database.old", readonlyUsername); err != nil { + return fmt.Errorf("failed to apply the database.old sql: %v", err) } } if !r.upgrade { - reconcileErr = applySQL(ctx, conn, upgradeFS, "upgrade", readonlyUsername) - if reconcileErr != nil { - log.Error(reconcileErr, "failed to exec the upgrade sql files") - return true, reconcileErr + err = applySQL(ctx, conn, upgradeFS, "upgrade", readonlyUsername) + if err != nil { + return fmt.Errorf("failed to apply the upgrade sql: %v", err) } r.upgrade = true } - log.Debug("database initialized") - r.databaseReconcileCount++ - - return false, nil + return nil } func applySQL(ctx context.Context, conn *pgx.Conn, databaseFS embed.FS, rootDir, username string) error { @@ -442,3 +624,8 @@ func getDatabaseComponentStatus(ctx context.Context, c client.Client, } return config.GetStatefulSetComponentStatus(ctx, c, namespace, name) } + +type AnnotationPGUser struct { + Name string `json:"name"` + Databases []string `json:"databases"` +} diff --git a/test/integration/operator/controllers/storage_test.go b/test/integration/operator/controllers/storage_test.go index f7b6b78bb..c8c437971 100644 --- a/test/integration/operator/controllers/storage_test.go +++ b/test/integration/operator/controllers/storage_test.go @@ -23,12 +23,13 @@ import ( "github.com/stolostron/multicluster-global-hub/operator/pkg/controllers/storage" operatorutils "github.com/stolostron/multicluster-global-hub/operator/pkg/utils" "github.com/stolostron/multicluster-global-hub/pkg/constants" + "github.com/stolostron/multicluster-global-hub/pkg/utils" testutils "github.com/stolostron/multicluster-global-hub/test/integration/utils" ) -// go test ./test/integration/operator -ginkgo.focus "storage" -v +// go test ./test/integration/operator/controllers -ginkgo.focus "storage" -v var _ = Describe("storage", Ordered, func() { - It("should init database with BYO", func() { + It("should init database", func() { namespace := fmt.Sprintf("namespace-%s", rand.String(6)) mghName := "test-mgh" @@ -52,7 +53,7 @@ var _ = Describe("storage", Ordered, func() { Expect(runtimeClient.Create(ctx, mgh)).To(Succeed()) Expect(runtimeClient.Get(ctx, client.ObjectKeyFromObject(mgh), mgh)).To(Succeed()) - // storage secret + // storage secret - BYO // pgURI := strings.Replace(testPostgres.URI, "sslmode=verify-ca", "sslmode=require", -1) storageSecret := &corev1.Secret{ ObjectMeta: metav1.ObjectMeta{ @@ -85,6 +86,36 @@ var _ = Describe("storage", Ordered, func() { err = runtimeClient.Get(ctx, client.ObjectKeyFromObject(mgh), mgh) Expect(err).To(Succeed()) + // reconcile database(annotation) -> mock builtin + config.SetBYOPostgres(false) + // add 1 test user + mgh.Annotations = map[string]string{ + "global-hub.open-cluster-management.io/postgres-users": "[{\"name\": \"testuser1\", \"databases\": [\"test1\"]}]", + } + _, err = storageReconciler.ReconcileDatabase(ctx, mgh) + Expect(err).To(Succeed()) + secret := &corev1.Secret{} + err = runtimeClient.Get(ctx, types.NamespacedName{ + Namespace: mgh.Namespace, + Name: "postgresql-user-testuser1", + }, secret) + Expect(err).To(Succeed()) + + // add 2 test users + mgh.Annotations = map[string]string{ + "global-hub.open-cluster-management.io/postgres-users": "[{\"name\": \"testuser1\", \"databases\": [\"test1\"]}, {\"name\": \"testuser2\", \"databases\": [\"test2\"]}]", + } + _, err = storageReconciler.ReconcileDatabase(ctx, mgh) + Expect(err).To(Succeed()) + secret = &corev1.Secret{} + err = runtimeClient.Get(ctx, types.NamespacedName{ + Namespace: mgh.Namespace, + Name: "postgresql-user-testuser2", + }, secret) + Expect(err).To(Succeed()) + utils.PrettyPrint(secret) + config.SetBYOPostgres(true) + err = runtimeClient.Delete(ctx, storageSecret) Expect(err).To(Succeed()) Eventually(func() error {