Skip to content

Commit

Permalink
✨ Init postgres user and database from annotation (#1322)
Browse files Browse the repository at this point in the history
* add postgres annotation

Signed-off-by: myan <[email protected]>

* add the update

Signed-off-by: myan <[email protected]>

* add it

Signed-off-by: myan <[email protected]>

* fix the sonar

Signed-off-by: myan <[email protected]>

* add review

Signed-off-by: myan <[email protected]>

---------

Signed-off-by: myan <[email protected]>
  • Loading branch information
yanmxa authored Jan 14, 2025
1 parent 4c38e5c commit 91f5d8f
Show file tree
Hide file tree
Showing 4 changed files with 270 additions and 45 deletions.
4 changes: 4 additions & 0 deletions operator/pkg/config/storage_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ func IsBYOPostgres() bool {
return isBYOPostgres
}

func SetBYOPostgres(byo bool) {
isBYOPostgres = byo
}

func SetDatabaseReady(ready bool) {
databaseReady = ready
}
Expand Down
3 changes: 3 additions & 0 deletions operator/pkg/constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ const (
// development environments, where is is convenient to reduce the poll interval. The value should be a string
// that can be parsed with the time.ParseDuration function.
AnnotationMGHWithStackroxPollInterval = "global-hub.open-cluster-management.io/with-stackrox-poll-interval"

// AnnotationBuiltInPostgresUser indicates to create the postgres users and databases based on the value configuration
AnnotationBuiltInPostgresUsers = "global-hub.open-cluster-management.io/postgres-users"
)

// hub installation constants
Expand Down
271 changes: 229 additions & 42 deletions operator/pkg/controllers/storage/storage_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"crypto/rand"
"embed"
"encoding/json"
"fmt"
iofs "io/fs"
"math/big"
Expand All @@ -22,12 +23,14 @@ import (
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/predicate"

"github.com/stolostron/multicluster-global-hub/operator/api/operator/v1alpha4"
"github.com/stolostron/multicluster-global-hub/operator/pkg/config"
operatorconstants "github.com/stolostron/multicluster-global-hub/operator/pkg/constants"
"github.com/stolostron/multicluster-global-hub/operator/pkg/utils"
"github.com/stolostron/multicluster-global-hub/pkg/constants"
"github.com/stolostron/multicluster-global-hub/pkg/database"
Expand Down Expand Up @@ -78,8 +81,10 @@ var WatchedConfigMap = sets.NewString(
)

var (
storageReconciler *StorageReconciler
updateConnection bool
storageReconciler *StorageReconciler
updateConnection bool
appliedPgUserValue string
postgresUserNameTemplate = "postgresql-user-%s"
)

func (r *StorageReconciler) IsResourceRemoved() bool {
Expand Down Expand Up @@ -207,7 +212,7 @@ func (r *StorageReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
}
updateConnection = config.SetStorageConnection(storageConn)

needRequeue, err := r.reconcileDatabase(ctx, mgh)
needRequeue, err := r.ReconcileDatabase(ctx, mgh)
if err != nil {
reconcileErr = fmt.Errorf("database not ready, Error: %v", err)
return ctrl.Result{}, reconcileErr
Expand Down Expand Up @@ -282,84 +287,261 @@ func (r *StorageReconciler) ReconcileStorage(ctx context.Context,
return pgConnection, nil
}

func (r *StorageReconciler) reconcileDatabase(ctx context.Context, mgh *v1alpha4.MulticlusterGlobalHub) (bool, error) {
var reconcileErr error
func (r *StorageReconciler) ReconcileDatabase(ctx context.Context, mgh *v1alpha4.MulticlusterGlobalHub) (bool, error) {
var conn *pgx.Conn
var err error

postgresUsersValue := mgh.Annotations[operatorconstants.AnnotationBuiltInPostgresUsers]
// Don't reconcile, or create the connection, when
// 1. postgres users isn't updated
// 2. database has been initialized
if (postgresUsersValue == "" || postgresUsersValue == appliedPgUserValue) && r.databaseReconcileCount > 0 {
return false, nil
}

defer func() {
if conn != nil {
if err := conn.Close(ctx); err != nil {
log.Error(err, "failed to close connection to database")
}
}
}()

storageConn := config.GetStorageConnection()
if storageConn == nil {
reconcileErr = fmt.Errorf("storage connection is nil")
return true, reconcileErr
return false, fmt.Errorf("storage connection is nil")
}
// if the operator is restarted, reconcile the database again
if r.databaseReconcileCount > 0 {
return false, nil
conn, err = database.PostgresConnection(ctx, storageConn.SuperuserDatabaseURI, storageConn.CACert)
if err != nil {
log.Infof("wait database ready, failed to connect database: %v", err)
return true, nil
}

// apply the init users
if !config.IsBYOPostgres() && postgresUsersValue != "" && postgresUsersValue != appliedPgUserValue {
if err = r.applyPostgresUsers(ctx, conn, postgresUsersValue, mgh); err != nil {
return false, err
}
log.Info("applied the annotation postgres users successfully!")
appliedPgUserValue = postgresUsersValue
}

// apply the global hub init SQL when the operator restarted
if r.databaseReconcileCount == 0 {
err = r.applyGlobalHubInitSQL(ctx, conn, storageConn.ReadonlyUserDatabaseURI)
if err != nil {
return false, err
}
log.Debug("global hub database initialized")
r.databaseReconcileCount++
}

return false, nil
}

func (r *StorageReconciler) applyPostgresUsers(ctx context.Context, conn *pgx.Conn, pgUsersValue string,
mgh *v1alpha4.MulticlusterGlobalHub,
) error {
var postgresUsers []AnnotationPGUser
err := json.Unmarshal([]byte(pgUsersValue), &postgresUsers)
if err != nil {
return fmt.Errorf("failed to unmarshal postgres users from annotations: %v", err)
}

conn, err := database.PostgresConnection(ctx, storageConn.SuperuserDatabaseURI, storageConn.CACert)
for _, user := range postgresUsers {
// create postgres user
pwd, err := r.createPostgresUser(ctx, conn, user)
if err != nil {
return fmt.Errorf("error creating postgres user %s: %v", user.Name, err)
}
// create database and add permission for the user
for _, db := range user.Databases {
err = r.createDatabaseIfNotExists(ctx, conn, db)
if err != nil {
return fmt.Errorf("error creating database %s: %v", db, err)
}
err = r.grantPermissions(ctx, conn, user.Name, db)
if err != nil {
return fmt.Errorf("failed to grant permissions to user %s on database %s: %v", user.Name, db, err)
}
}
// create the secret for the postgres user and databases
if err = r.createPostgresUserSecret(ctx, user, mgh, pwd); err != nil {
return fmt.Errorf("error creating postgres user secret %v", err)
}
}
return nil
}

func (r *StorageReconciler) createDatabaseIfNotExists(ctx context.Context, conn *pgx.Conn, dbName string) error {
var exists bool
err := conn.QueryRow(ctx, `SELECT EXISTS(SELECT 1 FROM pg_database WHERE datname = $1);`, dbName).Scan(&exists)
if err != nil {
reconcileErr = fmt.Errorf("failed to connect to database: %v", err)
log.Infof("wait database ready, %v", reconcileErr)
return true, nil
return fmt.Errorf("error checking if database %s exists: %v", dbName, err)
}

defer func() {
if err := conn.Close(ctx); err != nil {
log.Error(err, "failed to close connection to database")
if !exists {
createDBQuery := fmt.Sprintf("CREATE DATABASE %s;", dbName)
_, err := conn.Exec(ctx, createDBQuery)
if err != nil {
return fmt.Errorf("error creating database %s: %v", dbName, err)
}
}()
log.Infof("database %s created.", dbName)
} else {
log.Infof("database %s already exists.", dbName)
}

return nil
}

// createPostgresUser return the password of the created user, if the password is empty if the user is already existing
func (r *StorageReconciler) createPostgresUser(ctx context.Context, conn *pgx.Conn, user AnnotationPGUser,
) (string, error) {
var roleExists bool
err := conn.QueryRow(ctx, "SELECT EXISTS(SELECT 1 FROM pg_catalog.pg_roles WHERE rolname = $1)",
user.Name).Scan(&roleExists)
if err != nil {
return "", fmt.Errorf("error checking if role exists: %v", err)
}

password := ""
if roleExists {
// updatePasswordQuery := fmt.Sprintf("ALTER ROLE \"%s\" WITH PASSWORD '%s';", user.Name, password)
// _, err = conn.Exec(ctx, updatePasswordQuery)
// if err != nil {
// return fmt.Errorf("error updating password for role %s: %v", user.Name, err)
// }
log.Infof("postgres user already exist: %s", user.Name)
} else {
password = generatePassword(16)
createRoleQuery := fmt.Sprintf("CREATE ROLE \"%s\" LOGIN PASSWORD '%s';", user.Name, password)
_, err = conn.Exec(ctx, createRoleQuery)
if err != nil {
return "", fmt.Errorf("error creating role %s password: %v", user.Name, err)
}
log.Infof("create postgres user: %s", user.Name)
}

return password, nil
}

func (r *StorageReconciler) createPostgresUserSecret(ctx context.Context, user AnnotationPGUser,
mgh *v1alpha4.MulticlusterGlobalHub, password string,
) error {
userSecret := &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf(postgresUserNameTemplate, user.Name),
Namespace: mgh.Namespace,
},
}
err := r.GetClient().Get(ctx, client.ObjectKeyFromObject(userSecret), userSecret)
if err != nil && !errors.IsNotFound(err) {
return err
}

// update the databases if exists
if err == nil {
log.Infof("the postgresql user secret already exists: %s", userSecret.Name)
previousDatabases := userSecret.Data["databases"]
currentDatabases := strings.Join(user.Databases, ",")
if string(previousDatabases) != currentDatabases {
userSecret.Data["databases"] = []byte(currentDatabases)
err = r.GetClient().Update(ctx, userSecret)
if err != nil {
return fmt.Errorf("failed to updating postgres user secret %s, err %v", user.Name, err)
}
log.Infof("update the postgres user secret databases: %s", userSecret.Name)
}
return nil
}

// create secret
storageConn := config.GetStorageConnection()
pgConfig, err := pgx.ParseConfig(storageConn.SuperuserDatabaseURI)
if err != nil {
return fmt.Errorf("failed the parse the supper user database URI")
}
userSecret.Data = map[string][]byte{
"db.host": []byte(pgConfig.Host),
"db.port": []byte(fmt.Sprintf("%d", pgConfig.Port)),
"db.user": []byte(user.Name),
"databases": []byte(strings.Join(user.Databases, ",")),
"ca": storageConn.CACert,
}
if password != "" {
userSecret.Data["db.password"] = []byte(password)
}
err = controllerutil.SetControllerReference(mgh, userSecret, r.Manager.GetScheme())
if err != nil {
return fmt.Errorf("failed to add the owner reference to the user secret: %s", userSecret.Name)
}
err = r.GetClient().Create(ctx, userSecret)
log.Infof("create the postgresql user secret: %s", userSecret.Name)
if err != nil {
return fmt.Errorf("failed to create the postgresql user secret: %s", userSecret.Name)
}

return nil
}

func (r *StorageReconciler) grantPermissions(ctx context.Context, conn *pgx.Conn, user, dbName string) error {
grantQuery := fmt.Sprintf("GRANT ALL PRIVILEGES ON DATABASE %s TO %s;", dbName, user)
_, err := conn.Exec(ctx, grantQuery)
if err != nil {
return fmt.Errorf("error granting permissions to user %s on database %s: %v", user, dbName, err)
}
log.Infof("granted all privileges to user %s on database %s.\n", user, dbName)
return nil
}

func (r *StorageReconciler) applyGlobalHubInitSQL(ctx context.Context, conn *pgx.Conn, readonlyUserURI string) error {
// Check if backup is enabled
var backupEnabled bool
backupEnabled, reconcileErr = commonutils.IsBackupEnabled(ctx, r.GetClient())
if reconcileErr != nil {
log.Error(reconcileErr, "failed to get backup status")
return true, reconcileErr
backupEnabled, err := commonutils.IsBackupEnabled(ctx, r.GetClient())
if err != nil {
return fmt.Errorf("failed to get the backup status: %v", err)
}

if backupEnabled || !r.upgrade {
lockSql := fmt.Sprintf("select pg_advisory_lock(%s)", constants.LockId)
unLockSql := fmt.Sprintf("select pg_advisory_unlock(%s)", constants.LockId)
defer func() {
_, reconcileErr = conn.Exec(ctx, unLockSql)
if reconcileErr != nil {
log.Error(reconcileErr, "failed to unlock db")
_, err = conn.Exec(ctx, unLockSql)
if err != nil {
log.Errorf("failed to unlock db: %v", err)
}
}()
_, reconcileErr = conn.Exec(ctx, lockSql)
if reconcileErr != nil {
log.Error(reconcileErr, "failed to parse database_uri_with_readonlyuser")
return true, reconcileErr
_, err = conn.Exec(ctx, lockSql)
if err != nil {
return fmt.Errorf("failed to parse database_uri_with_readonlyuser: %v", err)
}
}

objURI, err := url.Parse(storageConn.ReadonlyUserDatabaseURI)
objURI, err := url.Parse(readonlyUserURI)
if err != nil {
log.Error(err, "failed to parse database_uri_with_readonlyuser")
}
readonlyUsername := objURI.User.Username()

if reconcileErr = applySQL(ctx, conn, databaseFS, "database", readonlyUsername); reconcileErr != nil {
return true, reconcileErr
if err = applySQL(ctx, conn, databaseFS, "database", readonlyUsername); err != nil {
return fmt.Errorf("failed to apply the database sql: %v", err)
}

if r.enableGlobalResource {
if reconcileErr = applySQL(ctx, conn, databaseOldFS, "database.old", readonlyUsername); reconcileErr != nil {
return true, reconcileErr
if err = applySQL(ctx, conn, databaseOldFS, "database.old", readonlyUsername); err != nil {
return fmt.Errorf("failed to apply the database.old sql: %v", err)
}
}

if !r.upgrade {
reconcileErr = applySQL(ctx, conn, upgradeFS, "upgrade", readonlyUsername)
if reconcileErr != nil {
log.Error(reconcileErr, "failed to exec the upgrade sql files")
return true, reconcileErr
err = applySQL(ctx, conn, upgradeFS, "upgrade", readonlyUsername)
if err != nil {
return fmt.Errorf("failed to apply the upgrade sql: %v", err)
}
r.upgrade = true
}

log.Debug("database initialized")
r.databaseReconcileCount++

return false, nil
return nil
}

func applySQL(ctx context.Context, conn *pgx.Conn, databaseFS embed.FS, rootDir, username string) error {
Expand Down Expand Up @@ -442,3 +624,8 @@ func getDatabaseComponentStatus(ctx context.Context, c client.Client,
}
return config.GetStatefulSetComponentStatus(ctx, c, namespace, name)
}

type AnnotationPGUser struct {
Name string `json:"name"`
Databases []string `json:"databases"`
}
Loading

0 comments on commit 91f5d8f

Please sign in to comment.