Skip to content

Commit

Permalink
Merge branch 'main' into K8SPSMDB-965
Browse files Browse the repository at this point in the history
  • Loading branch information
hors authored Sep 6, 2023
2 parents 905c353 + 79f2809 commit 909c0ec
Show file tree
Hide file tree
Showing 34 changed files with 1,157 additions and 296 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ percona-server-mongodb-operator
mongodb-healthcheck

!cmd/percona-server-mongodb-operator
!cmd/mongodb-healthcheck

# End of https://www.gitignore.io/api/go,vim,emacs,visualstudiocode

Expand Down
19 changes: 12 additions & 7 deletions cmd/mongodb-healthcheck/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@ package main
import (
"context"
"os"
"os/signal"
"strconv"
"strings"
"syscall"

uzap "go.uber.org/zap"
"go.uber.org/zap/zapcore"
Expand All @@ -36,6 +38,9 @@ var (
)

func main() {
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGTERM, os.Interrupt)
defer stop()

app := tool.New("Performs health and readiness checks for MongoDB", GitCommit, GitBranch)

k8sCmd := app.Command("k8s", "Performs liveness check for MongoDB on Kubernetes")
Expand Down Expand Up @@ -77,14 +82,14 @@ func main() {
os.Exit(1)
}

client, err := db.Dial(cnf)
client, err := db.Dial(ctx, cnf)
if err != nil {
log.Error(err, "connection error")
os.Exit(1)
}

defer func() {
if err := client.Disconnect(context.TODO()); err != nil {
if err := client.Disconnect(ctx); err != nil {
log.Error(err, "failed to disconnect")
os.Exit(1)
}
Expand All @@ -99,7 +104,7 @@ func main() {
case "mongod":
memberState, err := healthcheck.HealthCheckMongodLiveness(client, int64(*startupDelaySeconds))
if err != nil {
client.Disconnect(context.TODO()) // nolint:golint,errcheck
client.Disconnect(ctx) // nolint:golint,errcheck
log.Error(err, "Member failed Kubernetes liveness check")
os.Exit(1)
}
Expand All @@ -108,7 +113,7 @@ func main() {
case "mongos":
err := healthcheck.HealthCheckMongosLiveness(client)
if err != nil {
client.Disconnect(context.TODO()) // nolint:golint,errcheck
client.Disconnect(ctx) // nolint:golint,errcheck
log.Error(err, "Member failed Kubernetes liveness check")
os.Exit(1)
}
Expand All @@ -120,14 +125,14 @@ func main() {
switch *component {

case "mongod":
client.Disconnect(context.TODO()) // nolint:golint,errcheck
client.Disconnect(ctx) // nolint:golint,errcheck
log.Error(err, "readiness check for mongod is not implemented")
os.Exit(1)

case "mongos":
err := healthcheck.MongosReadinessCheck(client)
err := healthcheck.MongosReadinessCheck(ctx, client)
if err != nil {
client.Disconnect(context.TODO()) // nolint:golint,errcheck
client.Disconnect(ctx) // nolint:golint,errcheck
log.Error(err, "Member failed Kubernetes readiness check")
os.Exit(1)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ spec:
cpu: 100m
memory: 0.1G
expose:
exposeType: ClusterIP
exposeType: LoadBalancer
configsvrReplSet:
affinity:
antiAffinityTopologyKey: none
Expand Down
19 changes: 19 additions & 0 deletions e2e-tests/demand-backup-physical-sharded/run
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,15 @@ run_recovery_check() {
set -o xtrace
}

check_exported_mongos_service_endpoint() {
local host=$1

if [ "$host" != "$(kubectl_bin get psmdb $cluster -o=jsonpath='{.status.host}')" ]; then
echo "Exported host is not correct after the restore"
exit 1
fi
}

create_infra "${namespace}"

deploy_minio
Expand All @@ -68,6 +77,13 @@ wait_for_running ${cluster}-cfg 3
wait_for_running ${cluster}-mongos 3
wait_cluster_consistency ${cluster}

lbEndpoint=$(kubectl_bin get svc $cluster-mongos -o=jsonpath='{.status}' \
| jq -r 'select(.loadBalancer != null and .loadBalancer.ingress != null and .loadBalancer.ingress != []) | .loadBalancer.ingress[0].ip')
if [ -z $lbEndpoint ]; then
echo "mongos service not exported correctly"
exit 1
fi

run_mongos \
'db.createUser({user:"myApp",pwd:"myPass",roles:[{db:"myApp",role:"readWrite"}]})' \
"userAdmin:userAdmin123456@${cluster}-mongos.${namespace}"
Expand Down Expand Up @@ -102,18 +118,21 @@ if [ -z "$SKIP_BACKUPS_TO_AWS_GCP_AZURE" ]; then
echo 'check backup and restore -- aws-s3'
run_restore ${backup_name_aws} "_restore_sharded"
run_recovery_check ${backup_name_aws} "_restore_sharded"
check_exported_mongos_service_endpoint "$lbEndpoint"

echo "drop collection"
run_mongos 'use myApp\n db.test.drop()' "myApp:myPass@${cluster}-mongos.${namespace}"
echo 'check backup and restore -- gcp-cs'
run_restore ${backup_name_gcp} "_restore_sharded"
run_recovery_check ${backup_name_gcp} "_restore_sharded"
check_exported_mongos_service_endpoint "$lbEndpoint"

echo "drop collection"
run_mongos 'use myApp\n db.test.drop()' "myApp:myPass@${cluster}-mongos.${namespace}"
echo 'check backup and restore -- azure-blob'
run_restore ${backup_name_azure} "_restore_sharded"
run_recovery_check ${backup_name_azure} "_restore_sharded"
check_exported_mongos_service_endpoint "$lbEndpoint"
fi

echo "drop collection"
Expand Down
6 changes: 3 additions & 3 deletions e2e-tests/demand-backup-sharded/run
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ if [ -z "$SKIP_BACKUPS_TO_AWS_GCP_AZURE" ]; then
insert_data "100501"
check_data "-2nd"
run_restore "$backup_name_gcp"
wait_restore "$backup_name_aws" "$cluster"
wait_restore "$backup_name_gcp" "$cluster"
check_data

desc 'check backup and restore -- azure-blob'
Expand All @@ -142,7 +142,7 @@ if [ -z "$SKIP_BACKUPS_TO_AWS_GCP_AZURE" ]; then
insert_data "100501"
check_data "-2nd"
run_restore "$backup_name_azure"
wait_restore "$backup_name_aws" "$cluster"
wait_restore "$backup_name_azure" "$cluster"
check_data
fi

Expand All @@ -155,7 +155,7 @@ kubectl_bin run -i --rm aws-cli --image=perconalab/awscli --restart=Never -- \
insert_data "100501"
check_data "-2nd"
run_restore "$backup_name_minio"
wait_restore "$backup_name_aws" "$cluster"
wait_restore "$backup_name_minio" "$cluster"
check_data

desc 'delete backup and check if it is removed from bucket -- minio'
Expand Down
15 changes: 7 additions & 8 deletions healthcheck/health.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"github.com/pkg/errors"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
mgo "go.mongodb.org/mongo-driver/mongo"
)

// OkMemberStates is a slice of acceptable replication member states
Expand Down Expand Up @@ -57,8 +56,8 @@ func isStateOk(memberState *mongo.MemberState, okMemberStates []mongo.MemberStat
}

// HealthCheck checks the replication member state of the local MongoDB member
func HealthCheck(client *mgo.Client, okMemberStates []mongo.MemberState) (State, *mongo.MemberState, error) {
rsStatus, err := mongo.RSStatus(context.TODO(), client)
func HealthCheck(client mongo.Client, okMemberStates []mongo.MemberState) (State, *mongo.MemberState, error) {
rsStatus, err := client.RSStatus(context.TODO())
if err != nil {
return StateFailed, nil, errors.Wrap(err, "get replica set status")
}
Expand All @@ -74,8 +73,8 @@ func HealthCheck(client *mgo.Client, okMemberStates []mongo.MemberState) (State,
return StateFailed, state, errors.Errorf("member has unhealthy replication state: %d", state)
}

func HealthCheckMongosLiveness(client *mgo.Client) error {
isMasterResp, err := mongo.IsMaster(context.TODO(), client)
func HealthCheckMongosLiveness(client mongo.Client) error {
isMasterResp, err := client.IsMaster(context.TODO())
if err != nil {
return errors.Wrap(err, "get isMaster response")
}
Expand All @@ -87,13 +86,13 @@ func HealthCheckMongosLiveness(client *mgo.Client) error {
return nil
}

func HealthCheckMongodLiveness(client *mgo.Client, startupDelaySeconds int64) (*mongo.MemberState, error) {
isMasterResp, err := mongo.IsMaster(context.TODO(), client)
func HealthCheckMongodLiveness(client mongo.Client, startupDelaySeconds int64) (*mongo.MemberState, error) {
isMasterResp, err := client.IsMaster(context.TODO())
if err != nil {
return nil, errors.Wrap(err, "get isMaster response")
}

buildInfo, err := mongo.RSBuildInfo(context.TODO(), client)
buildInfo, err := client.RSBuildInfo(context.TODO())
if err != nil {
return nil, errors.Wrap(err, "get buildInfo response")
}
Expand Down
11 changes: 6 additions & 5 deletions healthcheck/readiness.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,23 @@ import (

"github.com/pkg/errors"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/readpref"

"github.com/percona/percona-server-mongodb-operator/pkg/psmdb/mongo"
)

// ReadinessCheck runs a ping on a pmgo.SessionManager to check server readiness
func ReadinessCheck(client *mongo.Client) (State, error) {
if err := client.Ping(context.TODO(), readpref.Primary()); err != nil {
func ReadinessCheck(ctx context.Context, client mongo.Client) (State, error) {
if err := client.Ping(ctx, readpref.Primary()); err != nil {
return StateFailed, errors.Wrap(err, "ping")
}

return StateOk, nil
}

func MongosReadinessCheck(client *mongo.Client) error {
func MongosReadinessCheck(ctx context.Context, client mongo.Client) error {
ss := ServerStatus{}
cur := client.Database("admin").RunCommand(context.TODO(), bson.D{
cur := client.Database("admin").RunCommand(ctx, bson.D{
{Key: "listDatabases", Value: 1},
{Key: "filter", Value: bson.D{{Key: "name", Value: "admin"}}},
{Key: "nameOnly", Value: true}})
Expand Down
20 changes: 11 additions & 9 deletions healthcheck/tools/db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,16 @@ import (
log "github.com/sirupsen/logrus"
mgo "go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"

"github.com/percona/percona-server-mongodb-operator/pkg/psmdb/mongo"
)

var (
ErrMsgAuthFailedStr string = "server returned error on SASL authentication step: Authentication failed."
ErrNoReachableServersStr string = "no reachable servers"
ErrMsgAuthFailedStr = "server returned error on SASL authentication step: Authentication failed."
ErrNoReachableServersStr = "no reachable servers"
)

func Dial(conf *Config) (*mgo.Client, error) {
func Dial(ctx context.Context, conf *Config) (mongo.Client, error) {
log.WithFields(log.Fields{
"hosts": conf.Hosts,
"ssl": conf.SSL.Enabled,
Expand All @@ -52,13 +54,13 @@ func Dial(conf *Config) (*mgo.Client, error) {
log.WithFields(log.Fields{"user": conf.Username}).Debug("Enabling authentication for session")
}

client, err := mgo.Connect(context.TODO(), opts)
client, err := mgo.Connect(ctx, opts)
if err != nil {
return nil, errors.Wrap(err, "connect to mongo replica set")
}

if err := client.Ping(context.TODO(), nil); err != nil {
if err := client.Disconnect(context.TODO()); err != nil {
if err := client.Ping(ctx, nil); err != nil {
if err := client.Disconnect(ctx); err != nil {
return nil, errors.Wrap(err, "disconnect client")
}

Expand All @@ -69,15 +71,15 @@ func Dial(conf *Config) (*mgo.Client, error) {
SetServerSelectionTimeout(10 * time.Second).
SetDirect(true)

client, err = mgo.Connect(context.TODO(), opts)
client, err = mgo.Connect(ctx, opts)
if err != nil {
return nil, errors.Wrap(err, "connect to mongo replica set with direct")
}

if err := client.Ping(context.TODO(), nil); err != nil {
if err := client.Ping(ctx, nil); err != nil {
return nil, errors.Wrap(err, "ping mongo")
}
}

return client, nil
return mongo.ToInterface(client), nil
}
28 changes: 14 additions & 14 deletions pkg/controller/perconaservermongodb/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ func (r *ReconcilePerconaServerMongoDB) updatePITR(ctx context.Context, cr *api.
}
}

val, err := pbm.C.GetConfigVar("pitr.enabled")
val, err := pbm.GetConfigVar("pitr.enabled")
if err != nil {
if !errors.Is(err, mongo.ErrNoDocuments) {
return errors.Wrap(err, "get pitr.enabled")
Expand All @@ -374,7 +374,7 @@ func (r *ReconcilePerconaServerMongoDB) updatePITR(ctx context.Context, cr *api.
if enabled != cr.Spec.Backup.PITR.Enabled {
val := strconv.FormatBool(cr.Spec.Backup.PITR.Enabled)
log.Info("Setting pitr.enabled in PBM config", "enabled", val)
if err := pbm.C.SetConfigVar("pitr.enabled", val); err != nil {
if err := pbm.SetConfigVar("pitr.enabled", val); err != nil {
return errors.Wrap(err, "update pitr.enabled")
}
}
Expand All @@ -383,7 +383,7 @@ func (r *ReconcilePerconaServerMongoDB) updatePITR(ctx context.Context, cr *api.
return nil
}

val, err = pbm.C.GetConfigVar("pitr.oplogSpanMin")
val, err = pbm.GetConfigVar("pitr.oplogSpanMin")
if err != nil {
if !errors.Is(err, mongo.ErrNoDocuments) {
return errors.Wrap(err, "get pitr.oplogSpanMin")
Expand All @@ -399,12 +399,12 @@ func (r *ReconcilePerconaServerMongoDB) updatePITR(ctx context.Context, cr *api.

if oplogSpanMin != cr.Spec.Backup.PITR.OplogSpanMin.Float64() {
val := cr.Spec.Backup.PITR.OplogSpanMin.String()
if err := pbm.C.SetConfigVar("pitr.oplogSpanMin", val); err != nil {
if err := pbm.SetConfigVar("pitr.oplogSpanMin", val); err != nil {
return errors.Wrap(err, "update pitr.oplogSpanMin")
}
}

val, err = pbm.C.GetConfigVar("pitr.compression")
val, err = pbm.GetConfigVar("pitr.compression")
var compression = ""
if err != nil {
if errors.Is(err, mongo.ErrNoDocuments) {
Expand All @@ -421,23 +421,23 @@ func (r *ReconcilePerconaServerMongoDB) updatePITR(ctx context.Context, cr *api.

if compression != string(cr.Spec.Backup.PITR.CompressionType) {
if string(cr.Spec.Backup.PITR.CompressionType) == "" {
if err := pbm.C.DeleteConfigVar("pitr.compression"); err != nil {
if err := pbm.DeleteConfigVar("pitr.compression"); err != nil {
return errors.Wrap(err, "delete pitr.compression")
}
} else if err := pbm.C.SetConfigVar("pitr.compression", string(cr.Spec.Backup.PITR.CompressionType)); err != nil {
} else if err := pbm.SetConfigVar("pitr.compression", string(cr.Spec.Backup.PITR.CompressionType)); err != nil {
return errors.Wrap(err, "update pitr.compression")
}

// PBM needs to disabling and enabling PITR to change compression type
if err := pbm.C.SetConfigVar("pitr.enabled", "false"); err != nil {
if err := pbm.SetConfigVar("pitr.enabled", "false"); err != nil {
return errors.Wrap(err, "disable pitr")
}
if err := pbm.C.SetConfigVar("pitr.enabled", "true"); err != nil {
if err := pbm.SetConfigVar("pitr.enabled", "true"); err != nil {
return errors.Wrap(err, "enable pitr")
}
}

val, err = pbm.C.GetConfigVar("pitr.compressionLevel")
val, err = pbm.GetConfigVar("pitr.compressionLevel")
var compressionLevel *int = nil
if err != nil {
if errors.Is(err, mongo.ErrNoDocuments) {
Expand All @@ -455,18 +455,18 @@ func (r *ReconcilePerconaServerMongoDB) updatePITR(ctx context.Context, cr *api.

if !reflect.DeepEqual(compressionLevel, cr.Spec.Backup.PITR.CompressionLevel) {
if cr.Spec.Backup.PITR.CompressionLevel == nil {
if err := pbm.C.DeleteConfigVar("pitr.compressionLevel"); err != nil {
if err := pbm.DeleteConfigVar("pitr.compressionLevel"); err != nil {
return errors.Wrap(err, "delete pitr.compressionLevel")
}
} else if err := pbm.C.SetConfigVar("pitr.compressionLevel", strconv.FormatInt(int64(*cr.Spec.Backup.PITR.CompressionLevel), 10)); err != nil {
} else if err := pbm.SetConfigVar("pitr.compressionLevel", strconv.FormatInt(int64(*cr.Spec.Backup.PITR.CompressionLevel), 10)); err != nil {
return errors.Wrap(err, "update pitr.compressionLevel")
}

// PBM needs to disabling and enabling PITR to change compression level
if err := pbm.C.SetConfigVar("pitr.enabled", "false"); err != nil {
if err := pbm.SetConfigVar("pitr.enabled", "false"); err != nil {
return errors.Wrap(err, "disable pitr")
}
if err := pbm.C.SetConfigVar("pitr.enabled", "true"); err != nil {
if err := pbm.SetConfigVar("pitr.enabled", "true"); err != nil {
return errors.Wrap(err, "enable pitr")
}
}
Expand Down
Loading

0 comments on commit 909c0ec

Please sign in to comment.