diff --git a/docs/user-guide/CPUvsGPU.rst b/docs/user-guide/CPUvsGPU.rst index 5fd901d19..fa5ea6aa8 100644 --- a/docs/user-guide/CPUvsGPU.rst +++ b/docs/user-guide/CPUvsGPU.rst @@ -96,3 +96,11 @@ Every SLURM cluster is different, so make sure you understand how your SLURM clu Our Python examples are designed to work such that they can be run locally on their own, or easily substituted into the ``start-slurm.sh`` to run on multiple nodes. You can adapt your scripts easily too by simply following the pattern of adding ``get_client`` with ``add_distributed_args``. + +----------------------------------------- +Dask with K8s +----------------------------------------- + +We also provide an example guide for how to get started with NeMo Curator on a Kubernetes cluster. + +Please visit :ref:`curator_kubernetes` for more information. diff --git a/docs/user-guide/KubernetesCurator.rst b/docs/user-guide/KubernetesCurator.rst new file mode 100644 index 000000000..c7f727df0 --- /dev/null +++ b/docs/user-guide/KubernetesCurator.rst @@ -0,0 +1,386 @@ +.. _curator_kubernetes: + +====================================== +Running NeMo Curator on Kubernetes +====================================== +The following example demonstrates how to run the NeMo Curator with NVIDIA GPUs on a Kubernetes cluster, +with PersistentVolumeClaims as the storage option. + +.. note:: + This project will download and install additional third-party open source software projects. Review the license terms of these open source projects before use. + +Prerequisuites +-------------- +* Kubernetes cluster + * `GPU operator `__ + * `Dask Operator `__ +* `kubectl `__: the Kubernetes Cluster CLI + * Please reach out to your Kubernetes cluster admin for how to setup your ``kubectl`` KUBECONFIG +* `ReadWriteMany `__ `StorageClass `__ (setup by Kubernetes cluster admin) + +Storage +------- +To run NeMo Curator, we need to setup storage to upload and store the input +files, as well as any processed outputs. + +Here is an example of how to create a dynamic PV from a StorageClass setup +by your cluster admin. Replace ``STORAGE_CLASS=<...>`` with the name of +your StorageClass. + +This example requests ``150Gi`` of space. Adjust that number for your +workloads and be aware that not all storage provisioners support volume +resizing. + +.. code-block:: bash + + STORAGE_CLASS=<...> + PVC_NAME=nemo-workspace + + kubectl apply -f - <`` with your NGC secret +key. Note that if you have any special characters in your key you might need to wrap +the key in single quotes (``'``) so it can be parsed correctly by k8s):: + + kubectl create secret docker-registry ngc-registry --docker-server=nvcr.io --docker-username=\$oauthtoken --docker-password= + +Setup Python Environment +------------------------ + +The environment to run the provided scripts in this example does not need the full +``nemo_curator`` package, so you can create a virtual environment with just the +required packages as follows: + +.. code-block:: bash + + python3 -m venv venv + source venv/bin/activate + + pip install 'dask_kubernetes>=2024.4.1' + +Upload Data to PVC +------------------ + +To copy into the ``nemo-workspace`` PVC, we will do so with ``kubectl exec``. You may also +use ``kubectl cp``, but ``exec`` has fewer surprises regarding compressed files: + +.. code-block:: bash + + # Replace <...> with a path on your local machine + LOCAL_WORKSPACE=<...> + + # This copies $LOCAL_WORKSPACE/my_dataset to /my_dataset within the PVC. + # Change foobar to the directory or file you wish to upload. + ( cd $LOCAL_WORKSPACE; tar cf - my_dataset | kubectl exec -i nemo-workspace-busybox -- tar xf - -C /nemo-workspace ) + +.. note:: + See :ref:`data-curator-download` for an example of how to download local data that can be uploaded to the PVC + with the above instruction. + +Create a Dask Cluster +--------------------- + +Use the ``create_dask_cluster.py`` to create a CPU or GPU dask cluster. + +.. note:: + If you are creating another Dask cluster with the same ``--name ``, first delete it via:: + + kubectl delete daskcluster + +.. code-block:: bash + + # Creates a CPU Dask cluster with 1 worker + python create_dask_cluster.py \ + --name rapids-dask \ + --n_workers 1 \ + --image nvcr.io/nvidian/bignlp-train:nemofw-nightly \ + --image_pull_secret ngc-registry \ + --pvcs nemo-workspace:/nemo-workspace + + #╭───────────────────── Creating KubeCluster 'rapids-dask' ─────────────────────╮ + #│ │ + #│ DaskCluster Running │ + #│ Scheduler Pod Running │ + #│ Scheduler Service Created │ + #│ Default Worker Group Created │ + #│ │ + #│ ⠧ Getting dashboard URL │ + #╰──────────────────────────────────────────────────────────────────────────────╯ + #cluster = KubeCluster(rapids-dask, 'tcp://localhost:61757', workers=2, threads=510, memory=3.94 TiB) + + # Creates a GPU Dask cluster with 2 workers with 1 GPU each + python create_dask_cluster.py \ + --name rapids-dask \ + --n_workers 2 \ + --n_gpus_per_worker 1 \ + --image nvcr.io/nvidian/bignlp-train:nemofw-nightly \ + --image_pull_secret ngc-registry \ + --pvcs nemo-workspace:/nemo-workspace + +After creating a cluster, you should be able to proceed after confirming the scheduler and the workers are all ``Running``: + +.. code-block:: bash + + # Set DASK_CLUSTER_NAME to the value of --name + DASK_CLUSTER_NAME=rapids-dask + kubectl get pods -l "dask.org/cluster-name=$DASK_CLUSTER_NAME" + + # NAME READY STATUS RESTARTS AGE + # rapids-dask-default-worker-587238cf2c-7d685f4d75-k6rnq 1/1 Running 0 57m + # rapids-dask-default-worker-f8ff963886-5577fff76b-qmvcd 1/1 Running 3 (52m ago) 57m + # rapids-dask-scheduler-654799869d-9bw4z 1/1 Running 0 57m + +(Opt #1) Running Existing Module +-------------------------------- + +Here is an example of running the existing `gpu_exact_dedup` Curator module. The arguments and script name +will need to be changed according to the module you wish to run: + +.. code-block:: bash + + # Set DASK_CLUSTER_NAME to the value of --name + DASK_CLUSTER_NAME=rapids-dask + SCHEDULER_POD=$(kubectl get pods -l "dask.org/cluster-name=$DASK_CLUSTER_NAME,dask.org/component=scheduler" -o name) + # Starts an interactive shell session in the scheduler pod + kubectl exec -it $SCHEDULER_POD -- bash + + ######################## + # Inside SCHEDULER_POD # + ######################## + # Run the following inside the interactive shell to launch script in the background and + # tee the logs to the /nemo-workspace PVC that was mounted in for persistence. + # The command line flags will need to be replaced with whatever the module script accepts. + # Recall that the PVC is mounted at /nemo-workspace, so any outputs should be written + # to somewhere under /nemo-workspace. + + mkdir -p /nemo-workspace/curator/{output,log,profile} + # Write logs to script.log and to a log file with a date suffix + LOGS="/nemo-workspace/curator/script.log /nemo-workspace/curator/script.log.$(date +%y_%m_%d-%H-%M-%S)" + ( + echo "Writing to: $LOGS" + gpu_exact_dedup \ + --input-data-dirs /nemo-workspace/my_dataset \ + --output-dir /nemo-workspace/curator/output \ + --hash-method md5 \ + --log-dir /nemo-workspace/curator/log \ + --num-files -1 \ + --files-per-partition 1 \ + --profile-path /nemo-workspace/curator/profile \ + --log-frequency 250 \ + --scheduler-address localhost:8786 \ + 2>&1 + echo "Finished!" + ) | tee $LOGS & + + # At this point, feel free to disconnect the shell via Ctrl+D or simply + exit + +At this point you can tail the logs and look for ``Finished!`` in ``/nemo-workspace/curator/script.log``: + +.. code-block:: bash + + # Command will follow the logs of the running module (Press ctrl+C to close) + kubectl exec -it $SCHEDULER_POD -- tail -f /nemo-workspace/curator/script.log + + # Writing to: /nemo-workspace/curator/script.log /nemo-workspace/curator/script.log.24_03_27-15-52-31 + # Computing hashes for /nemo-workspace/my_dataset + # adlr_id _hashes + # 0 cc-2023-14-0397113620 91b77eae49c10a65d485ac8ca18d6c43 + # 1 cc-2023-14-0397113621 a266f0794cc8ffbd431823e6930e4f80 + # 2 cc-2023-14-0397113622 baee533e2eddae764de2cd6faaa1286c + # 3 cc-2023-14-0397113623 87dd52a468448b99078f97e76f528eab + # 4 cc-2023-14-0397113624 a17664daf4f24be58e0e3a3dcf81124a + # Finished! + + +(Opt #2) Running Custom Module +------------------------------ + +In this example, we'll demonstrate how to run a NeMo Curator module that you have defined locally. + +Since your curator module may depend on version of the Curator that differs from what is in the +container, we will need to build a custom image with your code installed: + +.. code-block:: bash + + # Clone your repo. This example uses the official repo + git clone https://github.com/NVIDIA/NeMo-Curator.git NeMo-Curator-dev + + # Checkout specific ref. This example uses a commit in the main branch + git -C NeMo-Curator-dev checkout fc167a6edffd38a55c333742972a5a25b901cb26 + + # Example NeMo base image. Change it according to your requirements + BASE_IMAGE=nvcr.io/nvidian/bignlp-train:nemofw-nightly + docker build -t nemo-curator-custom ./NeMo-Curator-dev -f - </: accordingly + docker tag nemo-curator-custom /: + docker push /: + +.. note:: + When using a custom image, you will likely need to create a different secret unless you pushed to a public registry: + + .. code-block:: bash + + # Fill in // + kubectl create secret docker-registry my-private-registry --docker-server= --docker-username= --docker-password= + + And with this new secret, you create your new dask cluster: + + .. code-block:: bash + + # Fill in // + python create_dask_cluster.py \ + --name rapids-dask \ + --n_workers 2 \ + --n_gpus_per_worker 1 \ + --image /: \ + --image_pull_secret my-private-registry \ + --pvcs nemo-workspace:/nemo-workspace + +After the Dask cluster is deployed, you can proceed to run your module. In this example we'll use +the ``NeMo-Curator/nemo_curator/scripts/find_exact_duplicates.py`` module, but you can find other templates +in `NeMo-Curator/examples `__: + +.. code-block:: bash + + # Set DASK_CLUSTER_NAME to the value of --name + DASK_CLUSTER_NAME=rapids-dask + SCHEDULER_POD=$(kubectl get pods -l "dask.org/cluster-name=$DASK_CLUSTER_NAME,dask.org/component=scheduler" -o name) + # Starts an interactive shell session in the scheduler pod + kubectl exec -it $SCHEDULER_POD -- bash + + ######################## + # Inside SCHEDULER_POD # + ######################## + # Run the following inside the interactive shell to launch script in the background and + # tee the logs to the /nemo-workspace PVC that was mounted in for persistence. + # The command line flags will need to be replaced with whatever the module script accepts. + # Recall that the PVC is mounted at /nemo-workspace, so any outputs should be written + # to somewhere under /nemo-workspace. + + mkdir -p /nemo-workspace/curator/{output,log,profile} + # Append logs to script.log and write to a log file with a date suffix + LOGS="/nemo-workspace/curator/script.log /nemo-workspace/curator/script.log.$(date +%y_%m_%d-%H-%M-%S)" + ( + echo "Writing to: $LOGS" + # Recall that /NeMo-Curator-dev was copied and installed in the Dockerfile above + python3 -u /NeMo-Curator-dev/nemo_curator/scripts/find_exact_duplicates.py \ + --input-data-dirs /nemo-workspace/my_dataset \ + --output-dir /nemo-workspace/curator/output \ + --hash-method md5 \ + --log-dir /nemo-workspace/curator/log \ + --files-per-partition 1 \ + --profile-path /nemo-workspace/curator/profile \ + --log-frequency 250 \ + --scheduler-address localhost:8786 \ + 2>&1 + echo "Finished!" + ) | tee $LOGS & + + # At this point, feel free to disconnect the shell via Ctrl+D or simply + exit + +At this point you can tail the logs and look for ``Finished!`` in ``/nemo-workspace/curator/script.log``: + +.. code-block:: bash + + # Command will follow the logs of the running module (Press ctrl+C to close) + kubectl exec -it $SCHEDULER_POD -- tail -f /nemo-workspace/curator/script.log + + # Writing to: /nemo-workspace/curator/script.log /nemo-workspace/curator/script.log.24_03_27-20-52-07 + # Reading 2 files + # /NeMo-Curator-dev/nemo_curator/modules/exact_dedup.py:157: UserWarning: Output path f/nemo-workspace/curator/output/_exact_duplicates.parquet already exists and will be overwritten + # warnings.warn( + # Finished! + +Deleting Cluster +---------------- +After you have finished using the created dask cluster, you can delete it to release the resources: + +.. code-block:: bash + + # Where is the flag passed to create_dask_cluster.py. Example: `--name ` + kubectl delete daskcluster + +Download Data from PVC +---------------------- + +To download data from your PVC, you can use the ``nemo-workspace-busybox`` Pod created earlier: + +.. code-block:: bash + + # Replace <...> with a path on your local machine + LOCAL_WORKSPACE=<...> + + # Tar will fail if LOCAL_WORKSPACE doesn't exist + mkdir -p $LOCAL_WORKSPACE + + # Copy file in PVC at /nemo-workspace/foobar.txt to local file-system at $LOCAL_WORKSPACE/nemo-workspace/foobar.txt + kubectl exec nemo-workspace-busybox -- tar cf - /nemo-workspace/foobar.txt | tar xf - -C $LOCAL_WORKSPACE + + # Copy directory in PVC /nemo-workspace/fizzbuzz to local file-system at $LOCAL_WORKSPACE/fizzbuzz + kubectl exec nemo-workspace-busybox -- tar cf - /nemo-workspace/fizzbuzz | tar xf - -C $LOCAL_WORKSPACE diff --git a/docs/user-guide/index.rst b/docs/user-guide/index.rst index 278e47ab3..7ba84c03e 100644 --- a/docs/user-guide/index.rst +++ b/docs/user-guide/index.rst @@ -27,6 +27,9 @@ :ref:`Personally Identifiable Information Identification and Removal ` The purpose of the personally identifiable information (PII) redaction tool is to help scrub sensitive data out of training datasets +:ref:`curator-kubernetes` + Demonstration of how to run the NeMo Curator on a Dask Cluster deployed on top of Kubernetes + .. toctree:: :maxdepth: 4 :titlesonly: @@ -41,3 +44,4 @@ TaskDecontamination.rst PersonalIdentifiableInformationIdentificationAndRemoval.rst DistributedDataClassification.rst + KubernetesCurator.rst diff --git a/examples/k8s/create_dask_cluster.py b/examples/k8s/create_dask_cluster.py new file mode 100644 index 000000000..28be575eb --- /dev/null +++ b/examples/k8s/create_dask_cluster.py @@ -0,0 +1,134 @@ +import argparse + +from dask_kubernetes.operator.kubecluster import KubeCluster, make_cluster_spec + + +def create_cluster( + name: str, + n_workers: int, + n_gpus_per_worker: int, + n_cpus_per_worker: int, + image: str, + image_pull_secret: str, + pvcs: dict[str, str], +): + dask_worker_command = "dask-worker" + if n_gpus_per_worker and n_gpus_per_worker > 0: + dask_worker_command = "dask-cuda-worker" + + custom_cluster_spec = make_cluster_spec( + name=name, + worker_command=dask_worker_command, + n_workers=n_workers, + image=image, + ) + scheduler_spec = custom_cluster_spec["spec"]["scheduler"]["spec"] + worker_spec = custom_cluster_spec["spec"]["worker"]["spec"] + if image_pull_secret: + scheduler_spec["imagePullSecrets"] = [{"name": image_pull_secret}] + worker_spec["imagePullSecrets"] = [{"name": image_pull_secret}] + + obj_vols = [] + obj_vol_mounts = [] + for pvc_name, mount_path in pvcs.items(): + obj_vols.append( + { + "name": pvc_name, + "persistentVolumeClaim": { + "claimName": pvc_name, + }, + } + ) + obj_vol_mounts.append( + { + "name": pvc_name, + "mountPath": mount_path, + } + ) + + scheduler_spec["volumes"] = obj_vols + for ctr in scheduler_spec["containers"]: + ctr["volumeMounts"] = obj_vol_mounts + + worker_spec["volumes"] = obj_vols + for ctr in worker_spec["containers"]: + ctr["volumeMounts"] = obj_vol_mounts + # Resources are added to only the worker, since the scheduler doesn't need GPUs + if n_gpus_per_worker or n_cpus_per_worker: + if not ctr["resources"]: + ctr["resources"] = {"limits": {}} + if n_gpus_per_worker: + ctr["resources"]["limits"]["nvidia.com/gpu"] = str(n_gpus_per_worker) + if n_cpus_per_worker: + ctr["resources"]["limits"]["cpu"] = str(n_cpus_per_worker) + + cluster = KubeCluster( + custom_cluster_spec=custom_cluster_spec, shutdown_on_close=False + ) + print(f"{cluster = }") + + +if __name__ == "__main__": + + def parse_pvcs(specs: str) -> dict[str, str]: + name_to_path = {} + for pvc in specs.split(","): + # Can be empty + if not pvc: + continue + name, _, path = pvc.partition(":") + name_to_path[name] = path + return name_to_path + + parser = argparse.ArgumentParser() + parser.add_argument( + "-n", + "--name", + type=str, + default="rapids-dask", + help="The name of the DaskCluster which you would be able to inspect via `kubectl describe daskcluster `.", + ) + parser.add_argument( + "-w", "--n_workers", type=int, default=2, help="Number of workers" + ) + parser.add_argument( + "-g", + "--n_gpus_per_worker", + type=int, + default=None, + help="Number of GPUs per worker. If not specified, the Dask Cluster defaults to a CPU cluster.", + ) + parser.add_argument( + "-c", + "--n_cpus_per_worker", + type=int, + default=None, + help="Number of CPUs per worker. Provide this flag if you want to limit your CPU resources and K8s will throttle the workers to make sure this limit is satisfied.", + ) + parser.add_argument( + "-i", + "--image", + type=str, + default="nvcr.io/nvidia/nemo:24.03.framework", + help="The image used for the Dask Cluster scheduler and workers.", + ) + parser.add_argument( + "-s", + "--image_pull_secret", + type=str, + default=None, + help="If --image is from a private registry, specify the appropriate pull secret you created to allow these to be pulled.", + ) + parser.add_argument( + "-p", + "--pvcs", + type=parse_pvcs, + default="", + help="Comma sep PVC specificiation of $pvc_name_1:$mount_path_1,$pvc_name_2:$mount_path_2. Example: foo:/foo,bar:/bar mounts pvcs named foo and bar to /foo and /bar respectively.", + ) + + args = parser.parse_args() + + create_cluster( + **vars(args), + )