Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add arguments to pass Ray cluster head and worker templates #570

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ cryptography = "40.0.2"
executing = "1.2.0"
pydantic = "< 2"
ipywidgets = "8.1.2"
mergedeep = "1.3.4"

[tool.poetry.group.docs]
optional = true
Expand Down
10 changes: 5 additions & 5 deletions src/codeflare_sdk/cluster/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,9 @@
cluster setup queue, a list of all existing clusters, and the user's working namespace.
"""

import re
from time import sleep
from typing import List, Optional, Tuple, Dict

from kubernetes import config
from ray.job_submission import JobSubmissionClient

from .auth import config_check, api_config_handler
Expand All @@ -41,13 +39,11 @@
RayCluster,
RayClusterStatus,
)
from kubernetes import client, config
from kubernetes.utils import parse_quantity
import yaml
import os
import requests

from kubernetes import config
from kubernetes import client, config
from kubernetes.client.rest import ApiException


Expand Down Expand Up @@ -145,6 +141,8 @@ def create_app_wrapper(self):
gpu = self.config.num_gpus
workers = self.config.num_workers
template = self.config.template
head_template = self.config.head_template
worker_template = self.config.worker_template
image = self.config.image
appwrapper = self.config.appwrapper
env = self.config.envs
Expand All @@ -167,6 +165,8 @@ def create_app_wrapper(self):
gpu=gpu,
workers=workers,
template=template,
head_template=head_template,
worker_template=worker_template,
image=image,
appwrapper=appwrapper,
env=env,
Expand Down
4 changes: 4 additions & 0 deletions src/codeflare_sdk/cluster/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import pathlib
import typing

import kubernetes

dir = pathlib.Path(__file__).parent.parent.resolve()


Expand All @@ -46,6 +48,8 @@ class ClusterConfiguration:
max_memory: typing.Union[int, str] = 2
num_gpus: int = 0
template: str = f"{dir}/templates/base-template.yaml"
head_template: kubernetes.client.V1PodTemplateSpec = None
worker_template: kubernetes.client.V1PodTemplateSpec = None
appwrapper: bool = False
envs: dict = field(default_factory=dict)
image: str = ""
Expand Down
24 changes: 19 additions & 5 deletions src/codeflare_sdk/utils/generate_yaml.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,12 @@
from typing import Optional
import typing
import yaml
import sys
import os
import argparse
import uuid
from kubernetes import client, config
from .kube_api_helpers import _kube_api_error_handling
from ..cluster.auth import api_config_handler, config_check
from os import urandom
from base64 import b64encode
from urllib3.util import parse_url
from mergedeep import merge, Strategy


def read_template(template):
Expand Down Expand Up @@ -278,6 +274,16 @@ def write_user_yaml(user_yaml, output_file_name):
print(f"Written to: {output_file_name}")


def apply_head_template(cluster_yaml: dict, head_template: client.V1PodTemplateSpec):
head = cluster_yaml.get("spec").get("headGroupSpec")
merge(head["template"], head_template.to_dict(), strategy=Strategy.ADDITIVE)


def apply_worker_template(cluster_yaml: dict, worker_template: client.V1PodTemplateSpec):
worker = cluster_yaml.get("spec").get("workerGroupSpecs")[0]
merge(worker["template"], worker_template.to_dict(), strategy=Strategy.ADDITIVE)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How should a user edit the container spec? Specifying partial container spec will only add a new partial entry to the containers list. This becomes an issue when specifying something like extra volume mounts.

Copy link
Contributor Author

@astefanutti astefanutti Jun 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, ideally we'd want to leverage strategic merge patch for that: https://kubernetes.io/docs/tasks/manage-kubernetes-objects/update-api-object-kubectl-patch/#notes-on-the-strategic-merge-patch. It's easy to do in Go by using https://github.com/kubernetes/apimachinery/blob/master/pkg/util/strategicpatch/patch.go, but it don't know if that's possible in Python.

With mergedeep and the ADDITIVE strategy, lists are not replaced, but elements appended. The problem being that something like:

cluster = Cluster(ClusterConfiguration(
        head_template=V1PodTemplateSpec(
          spec=V1PodSpec(
              containers=[
                  V1Container(name="ray-head", volume_mounts=[
                      V1VolumeMount(name="config", mount_path="path"),
                  ])
              ],
              volumes=[
                  V1Volume(name="config", config_map=V1ConfigMapVolumeSource(name="config")),
              ],
          )
        ),

It appends an entire new container, while it should only append the extra volume mount to the existing container.

Strategic merge patch solves this, by relying on information like patchMergeKey in Go structs (x-kubernetes-patch-merge-key in CRDs) so it knows how to match items by key.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I couldn't seem to find anything for merging V1PodTemplate specs, only namespaced pods by sending API requests. There's this package https://pypi.org/project/jsonmerge/ which we could use, it would require maintaining a bit of redundant config however 😒.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I've actually stumbled upon jsonmerge this morning as well. it seems it'd be possible to provide a merge strategy by key for lists. Let's give it a try? I agree we'd have to maintain some config, unless we figure a way to leverage Kubernetes JSON schema https://github.com/yannh/kubernetes-json-schema?tab=readme-ov-file, that do contain x-kubernetes-patch-merge-key information.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@varshaprasad96 maybe you would have some ideas?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One step better than just additive merging could be calculating the diff and then merging them (deepdiff and then deepmerge - with probably custom merging strategies if needed). But this would still not solve the problem with conflicts at the very least. Looks like if we want to leverage JSON schema we either need to use a live client or load the config to guide merging process.

Copy link
Collaborator

@KPostOffice KPostOffice Jul 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here's a script that we could use as a starting point for generate a jsonmerge schema from a k8 schema. I haven't gotten around to testing it yet. The output looks right based on my understanding of the jsonmerge package documentation, not based on testing done with actual objects. Sorry for the messiness

def merge_schema_from_k8_schema(k8_schema):
    to_return = {}
    k8_properties = {}
    if "array" in k8_schema.get("type", []) and k8_schema.get("x-kubernetes-list-type") != "atomic":
        to_return["items"] = {}
        to_return["items"]["properties"] = {}
        toret_properties = to_return["items"]["properties"]
        k8_properties = k8_schema.get("items", {}).get("properties", {})
    elif "object" in k8_schema.get("type", []):
        to_return["properties"] = {}
        toret_properties = to_return["properties"]
        k8_properties = k8_schema.get("properties", {})
    for key, value in k8_properties.items():
        if "object" in value.get("type", []) and "properties" in value:
            toret_properties[key] = merge_schema_from_k8_schema(value)
        elif "array" in value.get("type", []) and "items" in value:
            toret_properties[key] = merge_schema_from_k8_schema(value)
        else:
            toret_properties[key] = {"mergeStrategy": "overwrite"}

    if "array" in k8_schema.get("type", []):
        if k8_schema.get("x-kubernetes-list-type") == "set":
            to_return["mergeStrategy"] = "arrayMergeById"
            to_return["idRef"] = "/"
        elif k8_schema.get("x-kubernetes-list-type") == "atomic":
            to_return["mergeStrategy"] = "overwrite"
        elif k8_schema.get("x-kubernetes-list-type") == "map":
            if k8_schema.get("x-kubernetes-patch-merge-key"):
                to_return["mergeStrategy"] = "arrayMergeById"
                to_return["id"] = k8_schema["x-kubernetes-patch-merge-key"]
            else:
                to_return["mergeStrategy"] = "overwrite"
    elif "object" in k8_schema.get("type", []):
        to_return["mergeStrategy"] = "objectMerge"
    else:
        to_return["mergeStrategy"] = "overwrite"
    return to_return



def generate_appwrapper(
name: str,
namespace: str,
Expand All @@ -291,6 +297,8 @@ def generate_appwrapper(
gpu: int,
workers: int,
template: str,
head_template: client.V1PodTemplateSpec,
worker_template: client.V1PodTemplateSpec,
image: str,
appwrapper: bool,
env,
Expand All @@ -302,6 +310,12 @@ def generate_appwrapper(
volume_mounts: list[client.V1VolumeMount],
):
cluster_yaml = read_template(template)

if head_template:
apply_head_template(cluster_yaml, head_template)
if worker_template:
apply_worker_template(cluster_yaml, worker_template)

appwrapper_name, cluster_name = gen_names(name)
update_names(cluster_yaml, cluster_name, namespace)
update_nodes(
Expand Down
42 changes: 38 additions & 4 deletions tests/unit_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@
import re
import uuid

from codeflare_sdk.cluster import cluster

parent = Path(__file__).resolve().parents[1]
aw_dir = os.path.expanduser("~/.codeflare/resources/")
sys.path.append(str(parent) + "/src")
Expand Down Expand Up @@ -69,17 +67,18 @@
createClusterConfig,
)

import codeflare_sdk.utils.kube_api_helpers
from codeflare_sdk.utils.generate_yaml import (
gen_names,
is_openshift_cluster,
)

import openshift
from openshift.selector import Selector
import ray
import pytest
import yaml

from kubernetes.client import V1PodTemplateSpec, V1PodSpec, V1Toleration

from unittest.mock import MagicMock
from pytest_mock import MockerFixture
from ray.job_submission import JobSubmissionClient
Expand Down Expand Up @@ -268,6 +267,41 @@ def test_config_creation():
assert config.appwrapper == True


def test_cluster_config_with_worker_template(mocker):
mocker.patch("kubernetes.client.ApisApi.get_api_versions")
mocker.patch(
"kubernetes.client.CustomObjectsApi.list_namespaced_custom_object",
return_value=get_local_queue("kueue.x-k8s.io", "v1beta1", "ns", "localqueues"),
)

cluster = Cluster(ClusterConfiguration(
name="unit-test-cluster",
namespace="ns",
num_workers=2,
min_cpus=3,
max_cpus=4,
min_memory=5,
max_memory=6,
num_gpus=7,
image="test/ray:2.20.0-py39-cu118",
worker_template=V1PodTemplateSpec(
spec=V1PodSpec(
containers=[],
tolerations=[V1Toleration(
key="nvidia.com/gpu",
operator="Exists",
effect="NoSchedule",
)],
node_selector={
"nvidia.com/gpu.present": "true",
},
)
),
))

assert cluster


def test_cluster_creation(mocker):
# Create AppWrapper containing a Ray Cluster with no local queue specified
mocker.patch("kubernetes.client.ApisApi.get_api_versions")
Expand Down
Loading