Skip to content
This repository has been archived by the owner on Mar 4, 2024. It is now read-only.

EVEREST-827-pg-repos-validation #429

Merged
merged 16 commits into from
Feb 12, 2024
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 115 additions & 4 deletions api/validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
corev1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/percona/percona-everest-backend/cmd/config"
"github.com/percona/percona-everest-backend/pkg/kubernetes"
Expand All @@ -46,6 +47,7 @@ const (
psmdbDeploymentName = "percona-server-mongodb-operator"
pgDeploymentName = "percona-postgresql-operator"
dateFormat = "2006-01-02T15:04:05Z"
pgReposLimit = 3
)

var (
Expand All @@ -55,7 +57,9 @@ var (

errDBCEmptyMetadata = errors.New("databaseCluster's Metadata should not be empty")
errDBCNameEmpty = errors.New("databaseCluster's metadata.name should not be empty")
errDBCNamespaceEmpty = errors.New("databaseCluster's metadata.namespace should not be empty")
errDBCNameWrongFormat = errors.New("databaseCluster's metadata.name should be a string")
errDBCNamespaceWrongFormat = errors.New("databaseCluster's metadata.namespace should be a string")
errNotEnoughMemory = fmt.Errorf("memory limits should be above %s", minMemQuantity.String())
errInt64NotSupported = errors.New("specifying resources using int64 data type is not supported. Please use string format for that")
errNotEnoughCPU = fmt.Errorf("CPU limits should be above %s", minCPUQuantity.String())
Expand All @@ -79,6 +83,9 @@ var (
errDataSourceNoPath = errors.New("'path' should be specified in .Spec.DataSource.BackupSource")
errIncorrectDataSourceStruct = errors.New("incorrect data source struct")
errUnsupportedPitrType = errors.New("the given point-in-time recovery type is not supported")
errTooManyPGSchedules = fmt.Errorf("only %d schedules are allowed", pgReposLimit)
errTooManyPGStorages = fmt.Errorf("only %d different storages are allowed to use for a postgres cluster", pgReposLimit)
oksana-grishchenko marked this conversation as resolved.
Show resolved Hide resolved

//nolint:gochecknoglobals
operatorEngine = map[everestv1alpha1.EngineType]string{
everestv1alpha1.DatabaseEnginePXC: pxcDeploymentName,
Expand Down Expand Up @@ -453,22 +460,42 @@ func validateUpdateMonitoringInstanceType(params UpdateMonitoringInstanceJSONReq
}

func validateCreateDatabaseClusterRequest(dbc DatabaseCluster) error {
name, _, err := nameFromDatabaseCluster(dbc)
if err != nil {
return err
}

return validateRFC1035(name, "metadata.name")
}

func nameFromDatabaseCluster(dbc DatabaseCluster) (string, string, error) {
if dbc.Metadata == nil {
return errDBCEmptyMetadata
return "", "", errDBCEmptyMetadata
}

md := *dbc.Metadata
name, ok := md["name"]
if !ok {
return errDBCNameEmpty
return "", "", errDBCNameEmpty
}

strName, ok := name.(string)
if !ok {
return errDBCNameWrongFormat
return "", "", errDBCNameWrongFormat
}

return validateRFC1035(strName, "metadata.name")
md = *dbc.Metadata
ns, ok := md["namespace"]
if !ok {
return "", "", errDBCNamespaceEmpty
}

strNS, ok := ns.(string)
if !ok {
return "", "", errDBCNamespaceWrongFormat
}

return strName, strNS, nil
}

func (e *EverestServer) validateDatabaseClusterCR(ctx echo.Context, namespace string, databaseCluster *DatabaseCluster) error { //nolint:cyclop
Expand Down Expand Up @@ -514,6 +541,12 @@ func (e *EverestServer) validateDatabaseClusterCR(ctx echo.Context, namespace st
}
}

if databaseCluster.Spec.Engine.Type == DatabaseClusterSpecEngineType(everestv1alpha1.DatabaseEnginePostgresql) {
if err = validatePGReposForAPIDB(ctx.Request().Context(), databaseCluster, e.kubeClient.ListDatabaseClusterBackups); err != nil {
return err
}
}

return validateResourceLimits(databaseCluster)
}

Expand Down Expand Up @@ -866,6 +899,10 @@ func validateDatabaseClusterBackup(ctx context.Context, namespace string, backup
return err
}

if err = validatePGReposForBackup(ctx, *db, kubeClient, *b); err != nil {
return err
}

if db.Spec.Engine.Type == everestv1alpha1.DatabaseEnginePSMDB {
if db.Status.ActiveStorage != "" && db.Status.ActiveStorage != b.Spec.BackupStorageName {
return errPSMDBViolateActiveStorage
Expand All @@ -874,6 +911,37 @@ func validateDatabaseClusterBackup(ctx context.Context, namespace string, backup
return nil
}

func validatePGReposForBackup(ctx context.Context, db everestv1alpha1.DatabaseCluster, kubeClient *kubernetes.Kubernetes, newBackup everestv1alpha1.DatabaseClusterBackup) error {
if db.Spec.Engine.Type != everestv1alpha1.DatabaseEnginePostgresql {
return nil
}

// convert between k8s and api structure
str, err := json.Marshal(db)
if err != nil {
return err
}
apiDB := &DatabaseCluster{}
if err := json.Unmarshal(str, apiDB); err != nil {
return err
}

// put the backup that being validated to the list of all backups to calculate if the limitations are respected
getBackupsFunc := func(ctx context.Context, namespace string, options metav1.ListOptions) (*everestv1alpha1.DatabaseClusterBackupList, error) {
list, err := kubeClient.ListDatabaseClusterBackups(ctx, namespace, options)
if err != nil {
return nil, err
}
list.Items = append(list.Items, newBackup)
return list, nil
}

if err = validatePGReposForAPIDB(ctx, apiDB, getBackupsFunc); err != nil {
return err
}
return nil
}

func validateDatabaseClusterRestore(ctx context.Context, namespace string, restore *DatabaseClusterRestore, kubeClient *kubernetes.Kubernetes) error {
if restore == nil {
return errors.New("restore cannot be empty")
Expand Down Expand Up @@ -933,3 +1001,46 @@ type dataSourceStruct struct {
Type *string `json:"type,omitempty"`
} `json:"pitr,omitempty"`
}

func validatePGReposForAPIDB(ctx context.Context, dbc *DatabaseCluster, getBackupsFunc func(context.Context, string, metav1.ListOptions) (*everestv1alpha1.DatabaseClusterBackupList, error)) error {
bs := make(map[string]bool)
var reposCount int
if dbc.Spec != nil && dbc.Spec.Backup != nil && dbc.Spec.Backup.Schedules != nil {
for _, shed := range *dbc.Spec.Backup.Schedules {
bs[shed.BackupStorageName] = true
}
// each schedule counts as a separate repo regardless of the BS used in it
reposCount = len(*dbc.Spec.Backup.Schedules)
// first check if there are too many schedules. Each schedule is configured in a separate repo.
if reposCount > pgReposLimit {
return errTooManyPGSchedules
}
}

dbcName, dbcNamespace, err := nameFromDatabaseCluster(*dbc)
if err != nil {
return err
}

backups, err := getBackupsFunc(ctx, dbcNamespace, metav1.ListOptions{
LabelSelector: fmt.Sprintf("clusterName=%s", dbcName),
})
if err != nil {
return err
}

for _, backup := range backups.Items {
// repos count is increased only if there wasn't such a BS used
if _, ok := bs[backup.Spec.BackupStorageName]; !ok {
bs[backup.Spec.BackupStorageName] = true
reposCount++
}
}

// second check if there are too many repos used.
if reposCount > pgReposLimit {
return errTooManyPGStorages
}

return nil
}
Loading
Loading