From e4bc3050ca0fca6458f705cef4bdcd3182809691 Mon Sep 17 00:00:00 2001 From: Saad Zaher Date: Mon, 14 Apr 2025 14:20:34 +0100 Subject: [PATCH] Auto-Detect kuberay-operator namespace Enable auto-detection of the namespace where kuberay-operator is deployed so we can enable network traffic between raycluster namespace and kuberay-operator namespace. Signed-off-by: Saad Zaher --- pkg/controllers/raycluster_controller.go | 48 ++++++++++++++++++++---- 1 file changed, 41 insertions(+), 7 deletions(-) diff --git a/pkg/controllers/raycluster_controller.go b/pkg/controllers/raycluster_controller.go index f22d24d4..92f8cf0c 100644 --- a/pkg/controllers/raycluster_controller.go +++ b/pkg/controllers/raycluster_controller.go @@ -270,8 +270,16 @@ func (r *RayClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) // - First try to get the ODH / RHOAI application namespace from the DSCInitialization // - Or fallback to the well-known defaults // add check if running on openshift or vanilla kubernetes + kubeRayNamespace, err := r.getKubeRayOperatorNamespace(ctx) + if err != nil { + logger.Error(err, "Failed to get KubeRay operator namespace") + + return ctrl.Result{RequeueAfter: requeueTime}, err + } + logger.Info("Detected KubeRay operator namespace", "namespace", kubeRayNamespace) + var kubeRayNamespaces []string - kubeRayNamespaces = []string{kubeRayDefaultNamespace} + kubeRayNamespaces = []string{kubeRayNamespace} if r.IsOpenShift { dsci := &dsciv1.DSCInitialization{} @@ -287,7 +295,7 @@ func (r *RayClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) } - _, err := r.kubeClient.NetworkingV1().NetworkPolicies(cluster.Namespace).Apply(ctx, desiredHeadNetworkPolicy(cluster, r.Config, kubeRayNamespaces), metav1.ApplyOptions{FieldManager: controllerName, Force: true}) + _, err = r.kubeClient.NetworkingV1().NetworkPolicies(cluster.Namespace).Apply(ctx, desiredHeadNetworkPolicy(cluster, r.Config, kubeRayNamespaces), metav1.ApplyOptions{FieldManager: controllerName, Force: true}) if err != nil { logger.Error(err, "Failed to update NetworkPolicy") } @@ -319,6 +327,32 @@ func isMTLSEnabled(cfg *config.KubeRayConfiguration) bool { return cfg == nil || ptr.Deref(cfg.MTLSEnabled, true) } +// getKubeRayOperatorNamespace tries to get the namespace of the KubeRay operator +func (r *RayClusterReconciler) getKubeRayOperatorNamespace(ctx context.Context) (string, error) { + logger := ctrl.LoggerFrom(ctx) + + pods, err := r.kubeClient.CoreV1().Pods("").List(context.TODO(), metav1.ListOptions{ + LabelSelector: "app.kubernetes.io/component=kuberay-operator", + }) + if err != nil { + logger.Error(err, "failed to get kuberay-operator namespace") + + return kubeRayDefaultNamespace, err + } + + if len(pods.Items) == 0 { + logger.Info( + "No kuberay-operator pods found, using default kuberay-operator namespace", + "namespace", + kubeRayDefaultNamespace, + ) + + return kubeRayDefaultNamespace, nil + } + + return pods.Items[0].Namespace, nil +} + func isRayClusterSuspended(cluster *rayv1.RayCluster) bool { return cluster.Spec.Suspend != nil && ptr.Deref(cluster.Spec.Suspend, false) } @@ -549,8 +583,8 @@ func desiredHeadNetworkPolicy(cluster *rayv1.RayCluster, cfg *config.KubeRayConf ), networkingv1ac.NetworkPolicyIngressRule(). WithPorts( - networkingv1ac.NetworkPolicyPort().WithProtocol(corev1.ProtocolTCP).WithPort(intstr.FromInt(10001)), - networkingv1ac.NetworkPolicyPort().WithProtocol(corev1.ProtocolTCP).WithPort(intstr.FromInt(8265)), + networkingv1ac.NetworkPolicyPort().WithProtocol(corev1.ProtocolTCP).WithPort(intstr.FromInt32(10001)), + networkingv1ac.NetworkPolicyPort().WithProtocol(corev1.ProtocolTCP).WithPort(intstr.FromInt32(8265)), ).WithFrom( networkingv1ac.NetworkPolicyPeer().WithPodSelector(metav1ac.LabelSelector()), ), @@ -564,12 +598,12 @@ func desiredHeadNetworkPolicy(cluster *rayv1.RayCluster, cfg *config.KubeRayConf WithOperator(metav1.LabelSelectorOpIn). WithValues(kubeRayNamespaces...)))). WithPorts( - networkingv1ac.NetworkPolicyPort().WithProtocol(corev1.ProtocolTCP).WithPort(intstr.FromInt(8265)), - networkingv1ac.NetworkPolicyPort().WithProtocol(corev1.ProtocolTCP).WithPort(intstr.FromInt(10001)), + networkingv1ac.NetworkPolicyPort().WithProtocol(corev1.ProtocolTCP).WithPort(intstr.FromInt32(8265)), + networkingv1ac.NetworkPolicyPort().WithProtocol(corev1.ProtocolTCP).WithPort(intstr.FromInt32(10001)), ), networkingv1ac.NetworkPolicyIngressRule(). WithPorts( - networkingv1ac.NetworkPolicyPort().WithProtocol(corev1.ProtocolTCP).WithPort(intstr.FromInt(8080)), + networkingv1ac.NetworkPolicyPort().WithProtocol(corev1.ProtocolTCP).WithPort(intstr.FromInt32(8080)), ). WithFrom( networkingv1ac.NetworkPolicyPeer().WithNamespaceSelector(metav1ac.LabelSelector().