Skip to content

[Feature][RayCluster]: Implement the HeadReady condition #2261

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 21 commits into from
Jul 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions helm-chart/kuberay-operator/templates/_helpers.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,22 @@ Create the name of the service account to use
{{- end -}}
{{- end -}}


{{/*
FeatureGates
*/}}
{{- define "kuberay.featureGates" -}}
{{- $features := "" }}
{{- range .Values.featureGates }}
{{- $str := printf "%s=%t," .name .enabled }}
{{- $features = print $features $str }}
{{- end }}
{{- with .Values.featureGates }}
--feature-gates={{ $features | trimSuffix "," }}
{{- end }}
{{- end }}


{{/*
Create a template to ensure consistency for Role and ClusterRole.
*/}}
Expand Down
1 change: 1 addition & 0 deletions helm-chart/kuberay-operator/templates/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ spec:
- /manager
args:
{{- $argList := list -}}
{{- $argList = append $argList (include "kuberay.featureGates" . | trim) -}}
{{- if .Values.batchScheduler.enabled -}}
{{- $argList = append $argList "--enable-batch-scheduler" -}}
{{- end -}}
Expand Down
5 changes: 5 additions & 0 deletions helm-chart/kuberay-operator/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@ readinessProbe:
batchScheduler:
enabled: false

featureGates:
- name: RayClusterStatusConditions
enabled: false


# Set up `securityContext` to improve Pod security.
# See https://github.com/ray-project/kuberay/blob/master/docs/guidance/pod-security.md for further guidance.
podSecurityContext: {}
Expand Down
12 changes: 10 additions & 2 deletions ray-operator/apis/ray/v1/raycluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,9 +166,17 @@ type RayClusterStatus struct {

type RayClusterConditionType string

// Custom Reason for RayClusterCondition
const (
// HeadReady is added in a RayCluster when its Head Pod is ready for requests.
HeadReady RayClusterConditionType = "HeadReady"
// PodRunningAndReady says that the pod is running and ready.
PodRunningAndReady = "PodRunningAndReady"
// UnknownReason says that the reason for the condition is unknown.
UnknownReason = "Unknown"
)

const (
// HeadPodReady is added in a RayCluster when its Head Pod is ready for requests.
HeadPodReady RayClusterConditionType = "HeadPodReady"
// RayClusterReplicaFailure is added in a RayCluster when one of its pods fails to be created or deleted.
RayClusterReplicaFailure RayClusterConditionType = "ReplicaFailure"
)
Expand Down
20 changes: 20 additions & 0 deletions ray-operator/controllers/ray/raycluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -1201,6 +1201,26 @@ func (r *RayClusterReconciler) calculateStatus(ctx context.Context, instance *ra
newInstance.Status.State = rayv1.Ready
}

// Check if the head node is running and ready by checking the head pod's status.
if features.Enabled(features.RayClusterStatusConditions) {
headPod, err := common.GetRayClusterHeadPod(ctx, r, newInstance)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's possible for GetRayClusterHeadPod to return nil, nil when no head Pod exists. We need to handle the case.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @rueian maybe we can return a non-nil error when there is no head Pod exists?

I found that we always return a non-nil error.

        headPod, err := common.GetRayClusterHeadPod(ctx, r, newInstance)
	if headPod == nil {
		// return an error
	}

if err != nil {
return nil, err
}
// GetRayClusterHeadPod can return nil, nil when pod is not found, we handle it separately.
if headPod == nil {
meta.SetStatusCondition(&newInstance.Status.Conditions, metav1.Condition{
Type: string(rayv1.HeadPodReady),
Status: metav1.ConditionFalse,
Reason: "HeadPodNotFound",
Message: "Head Pod not found",
})
} else {
replicaHeadPodReadyCondition := utils.FindPodReadyCondition(headPod, rayv1.HeadPodReady)
meta.SetStatusCondition(&newInstance.Status.Conditions, replicaHeadPodReadyCondition)
}
}

if newInstance.Spec.Suspend != nil && *newInstance.Spec.Suspend && len(runtimePods.Items) == 0 {
newInstance.Status.State = rayv1.Suspended
}
Expand Down
41 changes: 40 additions & 1 deletion ray-operator/controllers/ray/raycluster_controller_unit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1676,6 +1676,12 @@ func TestCalculateStatus(t *testing.T) {
Status: corev1.PodStatus{
PodIP: headNodeIP,
Phase: corev1.PodRunning,
Conditions: []corev1.PodCondition{
{
Type: corev1.PodReady,
Status: corev1.ConditionTrue,
},
},
},
}
runtimeObjects := []runtime.Object{headPod, headService}
Expand Down Expand Up @@ -1705,8 +1711,41 @@ func TestCalculateStatus(t *testing.T) {
assert.Nil(t, err)
assert.Empty(t, newInstance.Status.Conditions)

// Test reconcilePodsErr with the feature gate enabled
// enable feature gate for the following tests
defer features.SetFeatureGateDuringTest(t, features.RayClusterStatusConditions, true)()

// Test CheckRayHeadRunningAndReady with head pod running and ready
newInstance, _ = r.calculateStatus(ctx, testRayCluster, nil)
assert.True(t, meta.IsStatusConditionPresentAndEqual(newInstance.Status.Conditions, string(rayv1.HeadPodReady), metav1.ConditionTrue))
condition := meta.FindStatusCondition(newInstance.Status.Conditions, string(rayv1.HeadPodReady))
assert.Equal(t, metav1.ConditionTrue, condition.Status)

// Test CheckRayHeadRunningAndReady with head pod not ready
headPod.Status.Conditions = []corev1.PodCondition{
{
Type: corev1.PodReady,
Status: corev1.ConditionFalse,
},
}
runtimeObjects = []runtime.Object{headPod, headService}
fakeClient = clientFake.NewClientBuilder().WithScheme(newScheme).WithRuntimeObjects(runtimeObjects...).Build()
r.Client = fakeClient
newInstance, _ = r.calculateStatus(ctx, testRayCluster, nil)
assert.True(t, meta.IsStatusConditionPresentAndEqual(newInstance.Status.Conditions, string(rayv1.HeadPodReady), metav1.ConditionFalse))
condition = meta.FindStatusCondition(newInstance.Status.Conditions, string(rayv1.HeadPodReady))
assert.Equal(t, metav1.ConditionFalse, condition.Status)

// Test CheckRayHeadRunningAndReady with head pod not running
headPod.Status.Phase = corev1.PodFailed
runtimeObjects = []runtime.Object{headPod, headService}
fakeClient = clientFake.NewClientBuilder().WithScheme(newScheme).WithRuntimeObjects(runtimeObjects...).Build()
r.Client = fakeClient
newInstance, _ = r.calculateStatus(ctx, testRayCluster, nil)
assert.True(t, meta.IsStatusConditionPresentAndEqual(newInstance.Status.Conditions, string(rayv1.HeadPodReady), metav1.ConditionFalse))
condition = meta.FindStatusCondition(newInstance.Status.Conditions, string(rayv1.HeadPodReady))
assert.Equal(t, metav1.ConditionFalse, condition.Status)

// Test reconcilePodsErr with the feature gate enabled
newInstance, err = r.calculateStatus(ctx, testRayCluster, utils.ErrFailedCreateHeadPod)
assert.Nil(t, err)
assert.True(t, meta.IsStatusConditionPresentAndEqual(newInstance.Status.Conditions, string(rayv1.RayClusterReplicaFailure), metav1.ConditionTrue))
Expand Down
28 changes: 20 additions & 8 deletions ray-operator/controllers/ray/rayservice_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,26 @@ import (
networkingv1 "k8s.io/api/networking/v1"

"github.com/ray-project/kuberay/ray-operator/controllers/ray/common"
"github.com/ray-project/kuberay/ray-operator/pkg/features"

cmap "github.com/orcaman/concurrent-map/v2"

"github.com/go-logr/logr"
fmtErrors "github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/ray-project/kuberay/ray-operator/controllers/ray/utils"

"k8s.io/client-go/tools/record"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

"github.com/ray-project/kuberay/ray-operator/controllers/ray/utils"

"k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -1055,13 +1058,22 @@ func (r *RayServiceReconciler) reconcileServe(ctx context.Context, rayServiceIns
// after the head pod is running and ready. Hence, some requests to the Dashboard (e.g. `UpdateDeployments`) may fail.
// This is not an issue since `UpdateDeployments` is an idempotent operation.
logger.Info("Check the head Pod status of the pending RayCluster", "RayCluster name", rayClusterInstance.Name)
if isRunningAndReady, err := r.isHeadPodRunningAndReady(ctx, rayClusterInstance); err != nil || !isRunningAndReady {
if err != nil {
logger.Error(err, "Failed to check if head Pod is running and ready!")
} else {
logger.Info("Skipping the update of Serve deployments because the Ray head Pod is not ready.")

// check the latest condition of the head Pod to see if it is ready.
if features.Enabled(features.RayClusterStatusConditions) {
if !meta.IsStatusConditionTrue(rayClusterInstance.Status.Conditions, string(rayv1.HeadPodReady)) {
logger.Info("The head Pod is not ready, requeue the resource event to avoid redundant custom resource status updates.")
return ctrl.Result{RequeueAfter: ServiceDefaultRequeueDuration}, false, nil
}
} else {
if isRunningAndReady, err := r.isHeadPodRunningAndReady(ctx, rayClusterInstance); err != nil || !isRunningAndReady {
if err != nil {
logger.Error(err, "Failed to check if head Pod is running and ready!")
} else {
logger.Info("Skipping the update of Serve deployments because the Ray head Pod is not ready.")
}
return ctrl.Result{RequeueAfter: ServiceDefaultRequeueDuration}, false, err
}
return ctrl.Result{RequeueAfter: ServiceDefaultRequeueDuration}, false, err
}

// TODO(architkulkarni): Check the RayVersion. If < 2.8.0, error.
Expand Down
32 changes: 32 additions & 0 deletions ray-operator/controllers/ray/utils/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,38 @@ func IsCreated(pod *corev1.Pod) bool {
return pod.Status.Phase != ""
}

func FindPodReadyCondition(pod *corev1.Pod, condType rayv1.RayClusterConditionType) metav1.Condition {
replicaPodReadyCondition := metav1.Condition{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible for pod.Status.Conditions not to have corev1.PodReady? If so, we will create a condition with an empty Reason.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes you are right, i update the logic here to have an initial Condition

replicaPodReadyCondition := metav1.Condition{
Type: string(condType),
Status: metav1.ConditionFalse,
Reason: rayv1.UnknownReason,
}

plus this logic

// Update the reason if it's not empty
if reason != "" {
replicaPodReadyCondition.Reason = reason
}

to prevent empty reason sneak into later SetStatusCondition

Type: string(condType),
Status: metav1.ConditionFalse,
Reason: rayv1.UnknownReason,
}

for _, cond := range pod.Status.Conditions {
if cond.Type != corev1.PodReady {
continue
}
// Set the status based on the PodReady condition
replicaPodReadyCondition.Status = metav1.ConditionStatus(cond.Status)
replicaPodReadyCondition.Message = cond.Message

// Determine the reason; default to PodRunningAndReady if the pod is ready but no specific reason is provided
reason := cond.Reason
if cond.Status == corev1.ConditionTrue && reason == "" {
reason = rayv1.PodRunningAndReady
}

// Update the reason if it's not empty
if reason != "" {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this seems to be unnecessary?

Copy link
Contributor Author

@cchen777 cchen777 Jul 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think it could happen when cond.Status != corev1.ConditionTrue and cond.Reason for some reason give us empty string, which i think is pretty rare when we are checking cond.Type == PodReady, but just to be safe i add a check here so it will fall back to default rayv1.UnknownReason non empty reason

replicaPodReadyCondition.Reason = reason
}

// Since we're only interested in the PodReady condition, break after processing it
break
}
return replicaPodReadyCondition
}

// IsRunningAndReady returns true if pod is in the PodRunning Phase, if it has a condition of PodReady.
func IsRunningAndReady(pod *corev1.Pod) bool {
if pod.Status.Phase != corev1.PodRunning {
Expand Down
61 changes: 61 additions & 0 deletions ray-operator/controllers/ray/utils/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,31 @@ func createSomePodWithCondition(typ corev1.PodConditionType, status corev1.Condi
}
}

func createRayHeadPodWithPhaseAndCondition(phase corev1.PodPhase, typ corev1.PodConditionType, status corev1.ConditionStatus) (pod *corev1.Pod) {
return &corev1.Pod{
TypeMeta: metav1.TypeMeta{
APIVersion: "v1",
Kind: "Pod",
},
ObjectMeta: metav1.ObjectMeta{
Name: "raycluster-sample-head",
Namespace: "default",
Labels: map[string]string{
"ray.io/node-type": string(rayv1.HeadNode),
},
},
Status: corev1.PodStatus{
Phase: phase,
Conditions: []corev1.PodCondition{
{
Type: typ,
Status: status,
},
},
},
}
}

func TestGetHeadGroupServiceAccountName(t *testing.T) {
tests := map[string]struct {
input *rayv1.RayCluster
Expand Down Expand Up @@ -588,6 +613,42 @@ env_vars:
}
}

func TestFindHeadPodReadyCondition(t *testing.T) {
tests := map[string]struct {
pod *corev1.Pod
expected metav1.Condition
}{
"condition true if Ray head pod is running and ready": {
pod: createRayHeadPodWithPhaseAndCondition(corev1.PodRunning, corev1.PodReady, corev1.ConditionTrue),
expected: metav1.Condition{
Type: string(rayv1.HeadPodReady),
Status: metav1.ConditionTrue,
},
},
"condition false if Ray head pod is not running": {
pod: createRayHeadPodWithPhaseAndCondition(corev1.PodPending, corev1.PodReady, corev1.ConditionFalse),
expected: metav1.Condition{
Type: string(rayv1.HeadPodReady),
Status: metav1.ConditionFalse,
},
},
"condition false if Ray head pod is not ready": {
pod: createRayHeadPodWithPhaseAndCondition(corev1.PodRunning, corev1.PodReady, corev1.ConditionFalse),
expected: metav1.Condition{
Type: string(rayv1.HeadPodReady),
Status: metav1.ConditionFalse,
},
},
}

for name, tc := range tests {
t.Run(name, func(t *testing.T) {
replicaHeadPodReadyCondition := FindPodReadyCondition(tc.pod, rayv1.HeadPodReady)
assert.Equal(t, tc.expected.Status, replicaHeadPodReadyCondition.Status)
})
}
}

func TestErrRayClusterReplicaFailureReason(t *testing.T) {
assert.Equal(t, RayClusterReplicaFailureReason(ErrFailedDeleteAllPods), "FailedDeleteAllPods")
assert.Equal(t, RayClusterReplicaFailureReason(ErrFailedDeleteHeadPod), "FailedDeleteHeadPod")
Expand Down
Loading