Skip to content

Commit

Permalink
K8SPSMDB-926: Implement PiTR for physical restores
Browse files Browse the repository at this point in the history
  • Loading branch information
egegunes committed Aug 17, 2023
1 parent 5e071b9 commit 60d59ae
Show file tree
Hide file tree
Showing 13 changed files with 475 additions and 260 deletions.
9 changes: 5 additions & 4 deletions clientcmd/clientcmd.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package clientcmd

import (
"context"
"io"

corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -42,9 +43,9 @@ func NewClient() (*Client, error) {
}, nil
}

func (c *Client) Exec(pod *corev1.Pod, containerName string, command []string, stdin io.Reader, stdout, stderr io.Writer, tty bool) error {
// Prepare the API URL used to execute another process within the Pod. In
// this case, we'll run a remote shell.
func (c *Client) Exec(ctx context.Context, pod *corev1.Pod, containerName string, command []string, stdin io.Reader, stdout, stderr io.Writer, tty bool) error {
// Prepare the API URL used to execute another process within the Pod.
// In this case, we'll run a remote shell.
req := c.client.RESTClient().
Post().
Namespace(pod.Namespace).
Expand All @@ -66,7 +67,7 @@ func (c *Client) Exec(pod *corev1.Pod, containerName string, command []string, s
}

// Connect this process' std{in,out,err} to the remote shell process.
return exec.Stream(remotecommand.StreamOptions{
return exec.StreamWithContext(ctx, remotecommand.StreamOptions{
Stdin: stdin,
Stdout: stdout,
Stderr: stderr,
Expand Down
42 changes: 36 additions & 6 deletions pkg/apis/psmdb/v1/psmdb_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,12 +284,6 @@ func (pmm *PMMSpec) HasSecret(secret *corev1.Secret) bool {
return false
}

const (
PMMUserKey = "PMM_SERVER_USER"
PMMPasswordKey = "PMM_SERVER_PASSWORD"
PMMAPIKey = "PMM_SERVER_API_KEY"
)

func (spec *PMMSpec) ShouldUseAPIKeyAuth(secret *corev1.Secret) bool {
if _, ok := secret.Data[PMMAPIKey]; !ok {
_, okl := secret.Data[PMMUserKey]
Expand Down Expand Up @@ -524,6 +518,10 @@ type ReplsetSpec struct {
NonVoting NonVotingSpec `json:"nonvoting,omitempty"`
}

func (r *ReplsetSpec) PodName(cr *PerconaServerMongoDB, idx int) string {
return fmt.Sprintf("%s-%s-%d", cr.Name, r.Name, idx)
}

func (r *ReplsetSpec) ServiceName(cr *PerconaServerMongoDB) string {
return cr.Name + "-" + r.Name
}
Expand Down Expand Up @@ -899,6 +897,38 @@ const (
userPostfix = "-users"
)

const (
PMMUserKey = "PMM_SERVER_USER"
PMMPasswordKey = "PMM_SERVER_PASSWORD"
PMMAPIKey = "PMM_SERVER_API_KEY"
)

const (
EnvMongoDBDatabaseAdminUser = "MONGODB_DATABASE_ADMIN_USER"
EnvMongoDBDatabaseAdminPassword = "MONGODB_DATABASE_ADMIN_PASSWORD"
EnvMongoDBClusterAdminUser = "MONGODB_CLUSTER_ADMIN_USER"
EnvMongoDBClusterAdminPassword = "MONGODB_CLUSTER_ADMIN_PASSWORD"
EnvMongoDBUserAdminUser = "MONGODB_USER_ADMIN_USER"
EnvMongoDBUserAdminPassword = "MONGODB_USER_ADMIN_PASSWORD"
EnvMongoDBBackupUser = "MONGODB_BACKUP_USER"
EnvMongoDBBackupPassword = "MONGODB_BACKUP_PASSWORD"
EnvMongoDBClusterMonitorUser = "MONGODB_CLUSTER_MONITOR_USER"
EnvMongoDBClusterMonitorPassword = "MONGODB_CLUSTER_MONITOR_PASSWORD"
EnvPMMServerUser = PMMUserKey
EnvPMMServerPassword = PMMPasswordKey
EnvPMMServerAPIKey = PMMAPIKey
)

type UserRole string

const (
RoleDatabaseAdmin UserRole = "databaseAdmin"
RoleClusterAdmin UserRole = "clusterAdmin"
RoleUserAdmin UserRole = "userAdmin"
RoleClusterMonitor UserRole = "clusterMonitor"
RoleBackup UserRole = "backup"
)

func InternalUserSecretName(cr *PerconaServerMongoDB) string {
return internalPrefix + cr.Name + userPostfix
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/perconaservermongodb/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -516,7 +516,7 @@ func (r *ReconcilePerconaServerMongoDB) resyncPBMIfNeeded(ctx context.Context, c
command := []string{"pbm", "config", "--force-resync"}
log.Info("Starting PBM resync", "command", command)

err = r.clientcmd.Exec(pod, "backup-agent", command, nil, &stdoutBuffer, &stderrBuffer, false)
err = r.clientcmd.Exec(ctx, pod, "backup-agent", command, nil, &stdoutBuffer, &stderrBuffer, false)
if err != nil {
return errors.Wrapf(err, "start PBM resync: run %v", command)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/perconaservermongodb/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (r *ReconcilePerconaServerMongoDB) enableBalancerIfNeeded(ctx context.Conte
}
}

mongosSession, err := r.mongosClientWithRole(ctx, cr, roleClusterAdmin)
mongosSession, err := r.mongosClientWithRole(ctx, cr, api.RoleClusterAdmin)
if err != nil {
return errors.Wrap(err, "failed to get mongos connection")
}
Expand Down Expand Up @@ -121,7 +121,7 @@ func (r *ReconcilePerconaServerMongoDB) disableBalancer(ctx context.Context, cr
return errors.Wrapf(err, "get mongos statefulset %s", msSts.Name)
}

mongosSession, err := r.mongosClientWithRole(ctx, cr, roleClusterAdmin)
mongosSession, err := r.mongosClientWithRole(ctx, cr, api.RoleClusterAdmin)
if err != nil {
return errors.Wrap(err, "failed to get mongos connection")
}
Expand Down
7 changes: 3 additions & 4 deletions pkg/controller/perconaservermongodb/connections.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@ import (
"github.com/percona/percona-server-mongodb-operator/pkg/psmdb"
)

func (r *ReconcilePerconaServerMongoDB) mongoClientWithRole(ctx context.Context, cr *api.PerconaServerMongoDB, rs api.ReplsetSpec,
role UserRole) (*mgo.Client, error) {
func (r *ReconcilePerconaServerMongoDB) mongoClientWithRole(ctx context.Context, cr *api.PerconaServerMongoDB, rs api.ReplsetSpec, role api.UserRole) (*mgo.Client, error) {

c, err := r.getInternalCredentials(ctx, cr, role)
if err != nil {
Expand All @@ -21,7 +20,7 @@ func (r *ReconcilePerconaServerMongoDB) mongoClientWithRole(ctx context.Context,
return psmdb.MongoClient(ctx, r.client, cr, rs, c)
}

func (r *ReconcilePerconaServerMongoDB) mongosClientWithRole(ctx context.Context, cr *api.PerconaServerMongoDB, role UserRole) (*mgo.Client, error) {
func (r *ReconcilePerconaServerMongoDB) mongosClientWithRole(ctx context.Context, cr *api.PerconaServerMongoDB, role api.UserRole) (*mgo.Client, error) {
c, err := r.getInternalCredentials(ctx, cr, role)
if err != nil {
return nil, errors.Wrap(err, "failed to get credentials")
Expand All @@ -30,7 +29,7 @@ func (r *ReconcilePerconaServerMongoDB) mongosClientWithRole(ctx context.Context
return psmdb.MongosClient(ctx, r.client, cr, c)
}

func (r *ReconcilePerconaServerMongoDB) standaloneClientWithRole(ctx context.Context, cr *api.PerconaServerMongoDB, role UserRole, host string) (*mgo.Client, error) {
func (r *ReconcilePerconaServerMongoDB) standaloneClientWithRole(ctx context.Context, cr *api.PerconaServerMongoDB, role api.UserRole, host string) (*mgo.Client, error) {
c, err := r.getInternalCredentials(ctx, cr, role)
if err != nil {
return nil, errors.Wrap(err, "failed to get credentials")
Expand Down
6 changes: 3 additions & 3 deletions pkg/controller/perconaservermongodb/fcv.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
)

func (r *ReconcilePerconaServerMongoDB) getFCV(ctx context.Context, cr *api.PerconaServerMongoDB) (string, error) {
c, err := r.mongoClientWithRole(ctx, cr, *cr.Spec.Replsets[0], roleClusterAdmin)
c, err := r.mongoClientWithRole(ctx, cr, *cr.Spec.Replsets[0], api.RoleClusterAdmin)
if err != nil {
return "", errors.Wrap(err, "failed to get connection")
}
Expand Down Expand Up @@ -41,9 +41,9 @@ func (r *ReconcilePerconaServerMongoDB) setFCV(ctx context.Context, cr *api.Perc
var connErr error

if cr.Spec.Sharding.Enabled {
cli, connErr = r.mongosClientWithRole(ctx, cr, roleClusterAdmin)
cli, connErr = r.mongosClientWithRole(ctx, cr, api.RoleClusterAdmin)
} else {
cli, connErr = r.mongoClientWithRole(ctx, cr, *cr.Spec.Replsets[0], roleClusterAdmin)
cli, connErr = r.mongoClientWithRole(ctx, cr, *cr.Spec.Replsets[0], api.RoleClusterAdmin)
}

if connErr != nil {
Expand Down
36 changes: 18 additions & 18 deletions pkg/controller/perconaservermongodb/mgo.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func (r *ReconcilePerconaServerMongoDB) reconcileCluster(ctx context.Context, cr
}
}

cli, err := r.mongoClientWithRole(ctx, cr, *replset, roleClusterAdmin)
cli, err := r.mongoClientWithRole(ctx, cr, *replset, api.RoleClusterAdmin)
if err != nil {
if cr.Spec.Unmanaged {
return api.AppStateInit, nil
Expand Down Expand Up @@ -169,7 +169,7 @@ func (r *ReconcilePerconaServerMongoDB) reconcileCluster(ctx context.Context, cr
replset.ClusterRole == api.ClusterRoleShardSvr &&
len(mongosPods) > 0 {

mongosSession, err := r.mongosClientWithRole(ctx, cr, roleClusterAdmin)
mongosSession, err := r.mongosClientWithRole(ctx, cr, api.RoleClusterAdmin)
if err != nil {
return api.AppStateError, errors.Wrap(err, "failed to get mongos connection")
}
Expand Down Expand Up @@ -447,7 +447,7 @@ func (r *ReconcilePerconaServerMongoDB) removeRSFromShard(ctx context.Context, c
return nil
}

cli, err := r.mongosClientWithRole(ctx, cr, roleClusterAdmin)
cli, err := r.mongosClientWithRole(ctx, cr, api.RoleClusterAdmin)
if err != nil {
return errors.Errorf("failed to get mongos connection: %v", err)
}
Expand Down Expand Up @@ -491,7 +491,7 @@ func (r *ReconcilePerconaServerMongoDB) handleRsAddToShard(ctx context.Context,
return errors.Wrapf(err, "get rsPod %s host", rspod.Name)
}

cli, err := r.mongosClientWithRole(ctx, cr, roleClusterAdmin)
cli, err := r.mongosClientWithRole(ctx, cr, api.RoleClusterAdmin)
if err != nil {
return errors.Wrap(err, "failed to get mongos client")
}
Expand Down Expand Up @@ -531,7 +531,7 @@ func (r *ReconcilePerconaServerMongoDB) handleReplsetInit(ctx context.Context, c
}
var errb, outb bytes.Buffer

err = r.clientcmd.Exec(&pod, "mongod", []string{"mongod", "--version"}, nil, &outb, &errb, false)
err = r.clientcmd.Exec(ctx, &pod, "mongod", []string{"mongod", "--version"}, nil, &outb, &errb, false)
if err != nil {
return fmt.Errorf("exec --version: %v / %s / %s", err, outb.String(), errb.String())
}
Expand Down Expand Up @@ -565,22 +565,22 @@ func (r *ReconcilePerconaServerMongoDB) handleReplsetInit(ctx context.Context, c

errb.Reset()
outb.Reset()
err = r.clientcmd.Exec(&pod, "mongod", cmd, nil, &outb, &errb, false)
err = r.clientcmd.Exec(ctx, &pod, "mongod", cmd, nil, &outb, &errb, false)
if err != nil {
return fmt.Errorf("exec rs.initiate: %v / %s / %s", err, outb.String(), errb.String())
}

time.Sleep(time.Second * 5)

userAdmin, err := r.getInternalCredentials(ctx, cr, roleUserAdmin)
userAdmin, err := r.getInternalCredentials(ctx, cr, api.RoleUserAdmin)
if err != nil {
return errors.Wrap(err, "failed to get userAdmin credentials")
}

cmd[2] = fmt.Sprintf(`%s --eval %s`, mongoCmd, mongoInitAdminUser(userAdmin.Username, userAdmin.Password))
errb.Reset()
outb.Reset()
err = r.clientcmd.Exec(&pod, "mongod", cmd, nil, &outb, &errb, false)
err = r.clientcmd.Exec(ctx, &pod, "mongod", cmd, nil, &outb, &errb, false)
if err != nil {
return fmt.Errorf("exec add admin user: %v / %s / %s", err, outb.String(), errb.String())
}
Expand All @@ -593,29 +593,29 @@ func (r *ReconcilePerconaServerMongoDB) handleReplsetInit(ctx context.Context, c
return errNoRunningMongodContainers
}

func getRoles(cr *api.PerconaServerMongoDB, role UserRole) []map[string]interface{} {
func getRoles(cr *api.PerconaServerMongoDB, role api.UserRole) []map[string]interface{} {
roles := make([]map[string]interface{}, 0)
switch role {
case roleDatabaseAdmin:
case api.RoleDatabaseAdmin:
return []map[string]interface{}{
{"role": "readWriteAnyDatabase", "db": "admin"},
{"role": "readAnyDatabase", "db": "admin"},
{"role": "restore", "db": "admin"},
{"role": "backup", "db": "admin"},
{"role": "dbAdminAnyDatabase", "db": "admin"},
{"role": string(roleClusterMonitor), "db": "admin"},
{"role": string(api.RoleClusterMonitor), "db": "admin"},
}
case roleClusterMonitor:
case api.RoleClusterMonitor:
if cr.CompareVersion("1.12.0") >= 0 {
roles = []map[string]interface{}{
{"db": "admin", "role": "explainRole"},
{"db": "local", "role": "read"},
}
}
case roleBackup:
case api.RoleBackup:
roles = []map[string]interface{}{
{"db": "admin", "role": "readWrite"},
{"db": "admin", "role": string(roleClusterMonitor)},
{"db": "admin", "role": string(api.RoleClusterMonitor)},
{"db": "admin", "role": "restore"},
{"db": "admin", "role": "pbmAnyAction"},
}
Expand Down Expand Up @@ -699,7 +699,7 @@ func compareRoles(x []map[string]interface{}, y []map[string]interface{}) bool {
func (r *ReconcilePerconaServerMongoDB) createOrUpdateSystemUsers(ctx context.Context, cr *api.PerconaServerMongoDB, replset *api.ReplsetSpec) error {
log := logf.FromContext(ctx)

cli, err := r.mongoClientWithRole(ctx, cr, *replset, roleUserAdmin)
cli, err := r.mongoClientWithRole(ctx, cr, *replset, api.RoleUserAdmin)
if err != nil {
return errors.Wrap(err, "failed to get mongo client")
}
Expand Down Expand Up @@ -741,9 +741,9 @@ func (r *ReconcilePerconaServerMongoDB) createOrUpdateSystemUsers(ctx context.Co
return errors.Wrap(err, "create or update system role")
}

users := []UserRole{roleClusterAdmin, roleClusterMonitor, roleBackup}
users := []api.UserRole{api.RoleClusterAdmin, api.RoleClusterMonitor, api.RoleBackup}
if cr.CompareVersion("1.13.0") >= 0 {
users = append(users, roleDatabaseAdmin)
users = append(users, api.RoleDatabaseAdmin)
}

for _, role := range users {
Expand Down Expand Up @@ -779,7 +779,7 @@ func (r *ReconcilePerconaServerMongoDB) recoverReplsetNoPrimary(ctx context.Cont
return errors.Wrapf(err, "get mongo hostname for pod/%s", pod.Name)
}

cli, err := r.standaloneClientWithRole(ctx, cr, roleClusterAdmin, host)
cli, err := r.standaloneClientWithRole(ctx, cr, api.RoleClusterAdmin, host)
if err != nil {
return errors.Wrap(err, "get standalone client")
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/perconaservermongodb/psmdb_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -666,7 +666,7 @@ func (r *ReconcilePerconaServerMongoDB) checkIfPossibleToRemove(ctx context.Cont
"config": {},
}

client, err := r.mongoClientWithRole(ctx, cr, api.ReplsetSpec{Name: rsName}, roleClusterAdmin)
client, err := r.mongoClientWithRole(ctx, cr, api.ReplsetSpec{Name: rsName}, api.RoleClusterAdmin)
if err != nil {
return errors.Wrap(err, "dial:")
}
Expand Down
Loading

0 comments on commit 60d59ae

Please sign in to comment.