diff --git a/docs/proposals/2170-kubeflow-training-v2/README.md b/docs/proposals/2170-kubeflow-training-v2/README.md index cb829345ab..c04b615dcc 100644 --- a/docs/proposals/2170-kubeflow-training-v2/README.md +++ b/docs/proposals/2170-kubeflow-training-v2/README.md @@ -281,6 +281,8 @@ type TrainJob struct { type TrainJobSpec struct { // Reference to the training runtime. + // The field is immutable. + // +kubebuilder:validation:XValidation:rule="self == oldSelf", message="runtimeRef is immutable" RuntimeRef RuntimeRef `json:"runtimeRef"` // Configuration of the desired trainer. diff --git a/manifests/v2/base/crds/kubeflow.org_trainjobs.yaml b/manifests/v2/base/crds/kubeflow.org_trainjobs.yaml index feb9fdaee8..8307cdc51e 100644 --- a/manifests/v2/base/crds/kubeflow.org_trainjobs.yaml +++ b/manifests/v2/base/crds/kubeflow.org_trainjobs.yaml @@ -2747,7 +2747,9 @@ spec: type: object type: array runtimeRef: - description: Reference to the training runtime. + description: |- + Reference to the training runtime. + The field is immutable. properties: apiGroup: default: kubeflow.org @@ -2770,6 +2772,9 @@ spec: required: - name type: object + x-kubernetes-validations: + - message: runtimeRef is immutable + rule: self == oldSelf suspend: default: false description: |- diff --git a/pkg/apis/kubeflow.org/v2alpha1/openapi_generated.go b/pkg/apis/kubeflow.org/v2alpha1/openapi_generated.go index 5c91173a72..24f7b3d19f 100644 --- a/pkg/apis/kubeflow.org/v2alpha1/openapi_generated.go +++ b/pkg/apis/kubeflow.org/v2alpha1/openapi_generated.go @@ -1010,7 +1010,7 @@ func schema_pkg_apis_kubefloworg_v2alpha1_TrainJobSpec(ref common.ReferenceCallb Properties: map[string]spec.Schema{ "runtimeRef": { SchemaProps: spec.SchemaProps{ - Description: "Reference to the training runtime.", + Description: "Reference to the training runtime. The field is immutable.", Default: map[string]interface{}{}, Ref: ref("github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v2alpha1.RuntimeRef"), }, diff --git a/pkg/apis/kubeflow.org/v2alpha1/trainjob_types.go b/pkg/apis/kubeflow.org/v2alpha1/trainjob_types.go index 3e698429fa..49c25449c6 100644 --- a/pkg/apis/kubeflow.org/v2alpha1/trainjob_types.go +++ b/pkg/apis/kubeflow.org/v2alpha1/trainjob_types.go @@ -66,6 +66,8 @@ type TrainJobList struct { // TrainJobSpec represents specification of the desired TrainJob. type TrainJobSpec struct { // Reference to the training runtime. + // The field is immutable. + // +kubebuilder:validation:XValidation:rule="self == oldSelf", message="runtimeRef is immutable" RuntimeRef RuntimeRef `json:"runtimeRef"` // Configuration of the desired trainer. diff --git a/pkg/util.v2/testing/errormatcher.go b/pkg/util.v2/testing/errormatcher.go new file mode 100644 index 0000000000..772daf1000 --- /dev/null +++ b/pkg/util.v2/testing/errormatcher.go @@ -0,0 +1,82 @@ +/* +Copyright 2024 The Kubeflow Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package testing + +import ( + "fmt" + + "github.com/onsi/gomega/format" + "github.com/onsi/gomega/types" + apierrors "k8s.io/apimachinery/pkg/api/errors" +) + +func BeNotFoundError() types.GomegaMatcher { + return BeAPIError(NotFoundError) +} + +func BeForbiddenError() types.GomegaMatcher { + return BeAPIError(ForbiddenError) +} + +func BeInvalidError() types.GomegaMatcher { + return BeAPIError(InvalidError) +} + +type errorMatcher int + +const ( + NotFoundError errorMatcher = iota + ForbiddenError + InvalidError +) + +func (em errorMatcher) String() string { + return []string{"NotFoundError", "ForbiddenError", "InvalidError"}[em] +} + +type apiError func(error) bool + +func (em errorMatcher) isAPIError(err error) bool { + return []apiError{apierrors.IsNotFound, apierrors.IsForbidden, apierrors.IsInvalid}[em](err) +} + +type isErrorMatch struct { + name errorMatcher +} + +func BeAPIError(name errorMatcher) types.GomegaMatcher { + return &isErrorMatch{ + name: name, + } +} + +func (matcher *isErrorMatch) Match(actual interface{}) (success bool, err error) { + err, ok := actual.(error) + if !ok { + return false, fmt.Errorf("%s expects an error", matcher.name.String()) + } + + return err != nil && matcher.name.isAPIError(err), nil +} + +func (matcher *isErrorMatch) FailureMessage(actual interface{}) (message string) { + return format.Message(actual, "to be a %s", matcher.name.String()) +} + +func (matcher *isErrorMatch) NegatedFailureMessage(actual interface{}) (message string) { + return format.Message(actual, "not to be %s", matcher.name.String()) +} diff --git a/pkg/util.v2/testing/wrapper.go b/pkg/util.v2/testing/wrapper.go index de83294aae..4ffad88742 100644 --- a/pkg/util.v2/testing/wrapper.go +++ b/pkg/util.v2/testing/wrapper.go @@ -236,11 +236,16 @@ func (t *TrainJobWrapper) SpecAnnotation(key, value string) *TrainJobWrapper { } func (t *TrainJobWrapper) RuntimeRef(gvk schema.GroupVersionKind, name string) *TrainJobWrapper { - t.Spec.RuntimeRef = kubeflowv2.RuntimeRef{ - APIGroup: &gvk.Group, - Kind: &gvk.Kind, - Name: name, + runtimeRef := kubeflowv2.RuntimeRef{ + Name: name, } + if gvk.Group != "" { + runtimeRef.APIGroup = &gvk.Group + } + if gvk.Kind != "" { + runtimeRef.Kind = &gvk.Kind + } + t.Spec.RuntimeRef = runtimeRef return t } @@ -249,6 +254,11 @@ func (t *TrainJobWrapper) Trainer(trainer *kubeflowv2.Trainer) *TrainJobWrapper return t } +func (t *TrainJobWrapper) ManagedBy(m string) *TrainJobWrapper { + t.Spec.ManagedBy = &m + return t +} + func (t *TrainJobWrapper) Obj() *kubeflowv2.TrainJob { return &t.TrainJob } diff --git a/test/integration/controller.v2/trainjob_controller_test.go b/test/integration/controller.v2/trainjob_controller_test.go index 24f3d401f2..098ae39c39 100644 --- a/test/integration/controller.v2/trainjob_controller_test.go +++ b/test/integration/controller.v2/trainjob_controller_test.go @@ -23,6 +23,7 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/utils/ptr" "sigs.k8s.io/controller-runtime/pkg/client" jobsetv1alpha2 "sigs.k8s.io/jobset/api/jobset/v1alpha2" @@ -241,93 +242,156 @@ var _ = ginkgo.Describe("TrainJob controller", ginkgo.Ordered, func() { }, util.Timeout, util.Interval).Should(gomega.Succeed()) }) }) +}) - ginkgo.When("TrainJob CR Validation", func() { - ginkgo.AfterEach(func() { - gomega.Expect(k8sClient.DeleteAllOf(ctx, &kubeflowv2.TrainJob{}, client.InNamespace(ns.Name))).Should( - gomega.Succeed()) - }) +var _ = ginkgo.Describe("TrainJob marker validations and defaulting", ginkgo.Ordered, func() { + var ns *corev1.Namespace - ginkgo.It("Should succeed in creating TrainJob", func() { + ginkgo.BeforeAll(func() { + fwk = &framework.Framework{} + cfg = fwk.Init() + ctx, k8sClient = fwk.RunManager(cfg) + }) + ginkgo.AfterAll(func() { + fwk.Teardown() + }) - managedBy := "kubeflow.org/trainjob-controller" + ginkgo.BeforeEach(func() { + ns = &corev1.Namespace{ + TypeMeta: metav1.TypeMeta{ + APIVersion: corev1.SchemeGroupVersion.String(), + Kind: "Namespace", + }, + ObjectMeta: metav1.ObjectMeta{ + GenerateName: "trainjob-marker-", + }, + } + gomega.Expect(k8sClient.Create(ctx, ns)).To(gomega.Succeed()) + }) + ginkgo.AfterEach(func() { + gomega.Expect(k8sClient.DeleteAllOf(ctx, &kubeflowv2.TrainJob{}, client.InNamespace(ns.Name))).Should(gomega.Succeed()) + }) - trainingRuntimeRef := kubeflowv2.RuntimeRef{ - Name: "TorchRuntime", - APIGroup: ptr.To(kubeflowv2.GroupVersion.Group), - Kind: ptr.To(kubeflowv2.TrainingRuntimeKind), - } - jobSpec := kubeflowv2.TrainJobSpec{ - RuntimeRef: trainingRuntimeRef, - ManagedBy: &managedBy, - } - trainJob := &kubeflowv2.TrainJob{ - TypeMeta: metav1.TypeMeta{ - APIVersion: kubeflowv2.SchemeGroupVersion.String(), - Kind: kubeflowv2.TrainJobKind, + ginkgo.When("Creating TrainJob", func() { + ginkgo.DescribeTable("Validate TrainJob on creation", func(trainJob func() *kubeflowv2.TrainJob, errorMatcher gomega.OmegaMatcher) { + gomega.Expect(k8sClient.Create(ctx, trainJob())).Should(errorMatcher) + }, + ginkgo.Entry("Should succeed to create TrainJob with 'managedBy: kubeflow.org/trainjob-conteroller'", + func() *kubeflowv2.TrainJob { + return testingutil.MakeTrainJobWrapper(ns.Name, "managed-by-trainjob-controller"). + ManagedBy("kubeflow.org/trainjob-controller"). + RuntimeRef(kubeflowv2.GroupVersion.WithKind(kubeflowv2.TrainingRuntimeKind), "testing"). + Obj() }, - ObjectMeta: metav1.ObjectMeta{ - GenerateName: "valid-trainjob-", - Namespace: ns.Name, + gomega.Succeed()), + ginkgo.Entry("Should succeed to create TrainJob with 'managedBy: kueue.x-k8s.io/multukueue'", + func() *kubeflowv2.TrainJob { + return testingutil.MakeTrainJobWrapper(ns.Name, "managed-by-trainjob-controller"). + ManagedBy("kueue.x-k8s.io/multikueue"). + RuntimeRef(kubeflowv2.GroupVersion.WithKind(kubeflowv2.TrainingRuntimeKind), "testing"). + Obj() }, - Spec: jobSpec, - } - - err := k8sClient.Create(ctx, trainJob) - gomega.Expect(err).Should(gomega.Succeed()) - }) - - ginkgo.It("Should fail in creating TrainJob with invalid spec.managedBy", func() { - managedBy := "invalidManagedBy" - jobSpec := kubeflowv2.TrainJobSpec{ - ManagedBy: &managedBy, - } - trainJob := &kubeflowv2.TrainJob{ - TypeMeta: metav1.TypeMeta{ - APIVersion: kubeflowv2.SchemeGroupVersion.String(), - Kind: kubeflowv2.TrainJobKind, + gomega.Succeed()), + ginkgo.Entry("Should fail to create TrainJob with invalid managedBy", + func() *kubeflowv2.TrainJob { + return testingutil.MakeTrainJobWrapper(ns.Name, "invalid-managed-by"). + ManagedBy("invalid"). + RuntimeRef(kubeflowv2.GroupVersion.WithKind(kubeflowv2.TrainingRuntimeKind), "testing"). + Obj() }, - ObjectMeta: metav1.ObjectMeta{ - Name: "invalid-trainjob", - Namespace: ns.Name, + testingutil.BeInvalidError()), + ) + ginkgo.DescribeTable("Defaulting TrainJob on creation", func(trainJob func() *kubeflowv2.TrainJob, wantTrainJob func() *kubeflowv2.TrainJob) { + created := trainJob() + gomega.Expect(k8sClient.Create(ctx, created)).Should(gomega.Succeed()) + gomega.Expect(created).Should(gomega.BeComparableTo(wantTrainJob(), util.IgnoreObjectMetadata)) + }, + ginkgo.Entry("Should succeed to default suspend=false", + func() *kubeflowv2.TrainJob { + return testingutil.MakeTrainJobWrapper(ns.Name, "null-suspend"). + ManagedBy("kueue.x-k8s.io/multikueue"). + RuntimeRef(kubeflowv2.SchemeGroupVersion.WithKind(kubeflowv2.ClusterTrainingRuntimeKind), "testing"). + Obj() }, - Spec: jobSpec, - } - gomega.Expect(k8sClient.Create(ctx, trainJob)).To(gomega.MatchError( - gomega.ContainSubstring("spec.managedBy: Invalid value"))) - }) - - ginkgo.It("Should fail in updating spec.managedBy", func() { - - managedBy := "kubeflow.org/trainjob-controller" - - trainingRuntimeRef := kubeflowv2.RuntimeRef{ - Name: "TorchRuntime", - APIGroup: ptr.To(kubeflowv2.GroupVersion.Group), - Kind: ptr.To(kubeflowv2.TrainingRuntimeKind), - } - jobSpec := kubeflowv2.TrainJobSpec{ - RuntimeRef: trainingRuntimeRef, - ManagedBy: &managedBy, - } - trainJob := &kubeflowv2.TrainJob{ - TypeMeta: metav1.TypeMeta{ - APIVersion: kubeflowv2.SchemeGroupVersion.String(), - Kind: kubeflowv2.TrainJobKind, + func() *kubeflowv2.TrainJob { + return testingutil.MakeTrainJobWrapper(ns.Name, "null-suspend"). + ManagedBy("kueue.x-k8s.io/multikueue"). + RuntimeRef(kubeflowv2.SchemeGroupVersion.WithKind(kubeflowv2.ClusterTrainingRuntimeKind), "testing"). + Suspend(false). + Obj() + }), + ginkgo.Entry("Should succeed to default managedBy=kubeflow.org/trainjob-controller", + func() *kubeflowv2.TrainJob { + return testingutil.MakeTrainJobWrapper(ns.Name, "null-managed-by"). + RuntimeRef(kubeflowv2.SchemeGroupVersion.WithKind(kubeflowv2.TrainingRuntimeKind), "testing"). + Suspend(true). + Obj() }, - ObjectMeta: metav1.ObjectMeta{ - Name: "job-with-failed-update", - Namespace: ns.Name, + func() *kubeflowv2.TrainJob { + return testingutil.MakeTrainJobWrapper(ns.Name, "null-managed-by"). + ManagedBy("kubeflow.org/trainjob-controller"). + RuntimeRef(kubeflowv2.SchemeGroupVersion.WithKind(kubeflowv2.TrainingRuntimeKind), "testing"). + Suspend(true). + Obj() + }), + ginkgo.Entry("Should succeed to default runtimeRef.apiGroup", + func() *kubeflowv2.TrainJob { + return testingutil.MakeTrainJobWrapper(ns.Name, "empty-api-group"). + RuntimeRef(schema.GroupVersionKind{Group: "", Version: "", Kind: kubeflowv2.TrainingRuntimeKind}, "testing"). + Obj() + }, + func() *kubeflowv2.TrainJob { + return testingutil.MakeTrainJobWrapper(ns.Name, "empty-api-group"). + ManagedBy("kubeflow.org/trainjob-controller"). + RuntimeRef(kubeflowv2.SchemeGroupVersion.WithKind(kubeflowv2.TrainingRuntimeKind), "testing"). + Suspend(false). + Obj() + }), + ginkgo.Entry("Should succeed to default runtimeRef.kind", + func() *kubeflowv2.TrainJob { + return testingutil.MakeTrainJobWrapper(ns.Name, "empty-kind"). + RuntimeRef(kubeflowv2.SchemeGroupVersion.WithKind(""), "testing"). + Obj() }, - Spec: jobSpec, - } + func() *kubeflowv2.TrainJob { + return testingutil.MakeTrainJobWrapper(ns.Name, "empty-kind"). + ManagedBy("kubeflow.org/trainjob-controller"). + RuntimeRef(kubeflowv2.SchemeGroupVersion.WithKind(kubeflowv2.ClusterTrainingRuntimeKind), "testing"). + Suspend(false). + Obj() + }), + ) + }) - gomega.Expect(k8sClient.Create(ctx, trainJob)).Should(gomega.Succeed()) - updatedManagedBy := "kueue.x-k8s.io/multikueue" - jobSpec.ManagedBy = &updatedManagedBy - trainJob.Spec = jobSpec - gomega.Expect(k8sClient.Update(ctx, trainJob)).To(gomega.MatchError( - gomega.ContainSubstring("ManagedBy value is immutable"))) - }) + ginkgo.When("Updating TrainJob", func() { + ginkgo.DescribeTable("Validate TrainJob on update", func(old func() *kubeflowv2.TrainJob, new func(*kubeflowv2.TrainJob) *kubeflowv2.TrainJob, errorMatcher gomega.OmegaMatcher) { + oldTrainJob := old() + gomega.Expect(k8sClient.Create(ctx, oldTrainJob)).Should(gomega.Succeed()) + gomega.Expect(k8sClient.Update(ctx, new(oldTrainJob))).Should(errorMatcher) + }, + ginkgo.Entry("Should fail to update TrainJob managedBy", + func() *kubeflowv2.TrainJob { + return testingutil.MakeTrainJobWrapper(ns.Name, "valid-managed-by"). + ManagedBy("kubeflow.org/trainjob-controller"). + RuntimeRef(kubeflowv2.SchemeGroupVersion.WithKind(kubeflowv2.TrainingRuntimeKind), "testing"). + Obj() + }, + func(job *kubeflowv2.TrainJob) *kubeflowv2.TrainJob { + job.Spec.ManagedBy = ptr.To("kueue.x-k8s.io/multikueue") + return job + }, + testingutil.BeInvalidError()), + ginkgo.Entry("Should fail to update runtimeRef", + func() *kubeflowv2.TrainJob { + return testingutil.MakeTrainJobWrapper(ns.Name, "valid-runtimeref"). + RuntimeRef(kubeflowv2.SchemeGroupVersion.WithKind(kubeflowv2.TrainJobKind), "testing"). + Obj() + }, + func(job *kubeflowv2.TrainJob) *kubeflowv2.TrainJob { + job.Spec.RuntimeRef.Name = "forbidden-update" + return job + }, + testingutil.BeInvalidError()), + ) }) })