From 4bc7f900ee4b4232b9fa8c2ac21547a466e13703 Mon Sep 17 00:00:00 2001 From: Alexandria Barghi Date: Wed, 27 Dec 2023 10:28:44 -0800 Subject: [PATCH 1/4] remove experimental warnings --- .../bulk_sampling/cugraph_bulk_sampling.py | 2 +- .../bulk_sampling/mg_utils/README.md | 6 + .../bulk_sampling/mg_utils/default-config.sh | 39 +++ .../bulk_sampling/mg_utils/functions.sh | 66 +++++ .../mg_utils/run-dask-process.sh | 247 ++++++++++++++++++ .../mg_utils/wait_for_workers.py | 124 +++++++++ .../cugraph_dgl/dataloading/dataloader.py | 2 +- .../cugraph-pyg/cugraph_pyg/data/__init__.py | 6 +- .../cugraph_pyg/data/cugraph_store.py | 2 +- .../cugraph_pyg/loader/__init__.py | 12 +- .../cugraph_pyg/loader/cugraph_node_loader.py | 8 +- .../cugraph/experimental/gnn/__init__.py | 6 +- python/cugraph/cugraph/gnn/__init__.py | 1 + .../cugraph/gnn/data_loading/__init__.py | 2 +- .../cugraph/gnn/data_loading/bulk_sampler.py | 4 +- .../tests/sampling/test_bulk_sampler.py | 2 +- .../tests/sampling/test_bulk_sampler_mg.py | 2 +- 17 files changed, 501 insertions(+), 30 deletions(-) create mode 100644 benchmarks/cugraph/standalone/bulk_sampling/mg_utils/README.md create mode 100755 benchmarks/cugraph/standalone/bulk_sampling/mg_utils/default-config.sh create mode 100644 benchmarks/cugraph/standalone/bulk_sampling/mg_utils/functions.sh create mode 100755 benchmarks/cugraph/standalone/bulk_sampling/mg_utils/run-dask-process.sh create mode 100644 benchmarks/cugraph/standalone/bulk_sampling/mg_utils/wait_for_workers.py diff --git a/benchmarks/cugraph/standalone/bulk_sampling/cugraph_bulk_sampling.py b/benchmarks/cugraph/standalone/bulk_sampling/cugraph_bulk_sampling.py index 1ca5d6db637..9de6c3a2b01 100644 --- a/benchmarks/cugraph/standalone/bulk_sampling/cugraph_bulk_sampling.py +++ b/benchmarks/cugraph/standalone/bulk_sampling/cugraph_bulk_sampling.py @@ -28,7 +28,7 @@ ) from cugraph.structure.symmetrize import symmetrize -from cugraph.experimental.gnn import BulkSampler +from cugraph.gnn import BulkSampler import cugraph diff --git a/benchmarks/cugraph/standalone/bulk_sampling/mg_utils/README.md b/benchmarks/cugraph/standalone/bulk_sampling/mg_utils/README.md new file mode 100644 index 00000000000..26dbbd5e705 --- /dev/null +++ b/benchmarks/cugraph/standalone/bulk_sampling/mg_utils/README.md @@ -0,0 +1,6 @@ +This directory contains various scripts helpful for cugraph users and developers. + +The following scripts were copied from https://github.com/rapidsai/multi-gpu-tools and are useful for starting a dask cluster, which is needed by cugraph for multi-GPU support. +* `run-dask-process.sh` +* `functions.sh` +* `default-config.sh` diff --git a/benchmarks/cugraph/standalone/bulk_sampling/mg_utils/default-config.sh b/benchmarks/cugraph/standalone/bulk_sampling/mg_utils/default-config.sh new file mode 100755 index 00000000000..26cef2aee78 --- /dev/null +++ b/benchmarks/cugraph/standalone/bulk_sampling/mg_utils/default-config.sh @@ -0,0 +1,39 @@ +# Copyright (c) 2022-2023, NVIDIA CORPORATION. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +THIS_DIR=$(cd $(dirname ${BASH_SOURCE[0]}) && pwd) + +# Most are defined using the bash := or :- syntax, which means they +# will be set only if they were previously unset. The project config +# is loaded first, which gives it the opportunity to override anything +# in this file that uses that syntax. If there are variables in this +# file that should not be overridded by a project, then they will +# simply not use that syntax and override, since these variables are +# read last. +SCRIPTS_DIR=$THIS_DIR +WORKSPACE=$THIS_DIR + +# These really should be oerridden by the project config! +CONDA_ENV=${CONDA_ENV:-rapids} + +GPUS_PER_NODE=${GPUS_PER_NODE:-8} +WORKER_RMM_POOL_SIZE=${WORKER_RMM_POOL_SIZE:-12G} +DASK_CUDA_INTERFACE=${DASK_CUDA_INTERFACE:-ib0} +DASK_SCHEDULER_PORT=${DASK_SCHEDULER_PORT:-8792} +DASK_DEVICE_MEMORY_LIMIT=${DASK_DEVICE_MEMORY_LIMIT:-auto} +DASK_HOST_MEMORY_LIMIT=${DASK_HOST_MEMORY_LIMIT:-auto} + +BUILD_LOG_FILE=${BUILD_LOG_FILE:-${RESULTS_DIR}/build_log.txt} +SCHEDULER_FILE=${SCHEDULER_FILE:-${WORKSPACE}/dask-scheduler.json} +DATE=${DATE:-$(date --utc "+%Y-%m-%d_%H:%M:%S")_UTC} +ENV_EXPORT_FILE=${ENV_EXPORT_FILE:-${WORKSPACE}/$(basename ${CONDA_ENV})-${DATE}.txt} diff --git a/benchmarks/cugraph/standalone/bulk_sampling/mg_utils/functions.sh b/benchmarks/cugraph/standalone/bulk_sampling/mg_utils/functions.sh new file mode 100644 index 00000000000..7eedb5f1b1f --- /dev/null +++ b/benchmarks/cugraph/standalone/bulk_sampling/mg_utils/functions.sh @@ -0,0 +1,66 @@ +# Copyright (c) 2022-2023, NVIDIA CORPORATION. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# This file is source'd from script-env.sh to add functions to the +# calling environment, hence no #!/bin/bash as the first line. This +# also assumes the variables used in this file have been defined +# elsewhere. + +NUMARGS=$# +ARGS=$* +function hasArg { + (( ${NUMARGS} != 0 )) && (echo " ${ARGS} " | grep -q " $1 ") +} + +function logger { + echo -e ">>>> $@" +} + +# Calling "setTee outfile" will cause all stdout and stderr of the +# current script to be output to "tee", which outputs to stdout and +# "outfile" simultaneously. This is useful by allowing a script to +# "tee" itself at any point without being called with tee. +_origFileDescriptorsSaved=0 +function setTee { + if [[ $_origFileDescriptorsSaved == 0 ]]; then + # Save off the original file descr 1 and 2 as 3 and 4 + exec 3>&1 4>&2 + _origFileDescriptorsSaved=1 + fi + teeFile=$1 + # Create a named pipe. + pipeName=$(mktemp -u) + mkfifo $pipeName + # Close the currnet 1 and 2 and restore to original (3, 4) in the + # event this function is called repeatedly. + exec 1>&- 2>&- + exec 1>&3 2>&4 + # Start a tee process reading from the named pipe. Redirect stdout + # and stderr to the named pipe which goes to the tee process. The + # named pipe "file" can be removed and the tee process stays alive + # until the fd is closed. + tee -a < $pipeName $teeFile & + exec > $pipeName 2>&1 + rm $pipeName +} + +# Call this to stop script output from going to "tee" after a prior +# call to setTee. +function unsetTee { + if [[ $_origFileDescriptorsSaved == 1 ]]; then + # Close the current fd 1 and 2 which should stop the tee + # process, then restore 1 and 2 to original (saved as 3, 4). + exec 1>&- 2>&- + exec 1>&3 2>&4 + fi +} diff --git a/benchmarks/cugraph/standalone/bulk_sampling/mg_utils/run-dask-process.sh b/benchmarks/cugraph/standalone/bulk_sampling/mg_utils/run-dask-process.sh new file mode 100755 index 00000000000..b88abb685ec --- /dev/null +++ b/benchmarks/cugraph/standalone/bulk_sampling/mg_utils/run-dask-process.sh @@ -0,0 +1,247 @@ +#!/bin/bash +# Copyright (c) 2022-2023, NVIDIA CORPORATION. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +THIS_DIR=$(cd $(dirname ${BASH_SOURCE[0]}) && pwd) + +source ${THIS_DIR}/default-config.sh +source ${THIS_DIR}/functions.sh + +# Logs can be written to a specific location by setting the LOGS_DIR +# env var. +LOGS_DIR=${LOGS_DIR:-dask_logs-$$} + +######################################## +NUMARGS=$# +ARGS=$* +function hasArg { + (( ${NUMARGS} != 0 )) && (echo " ${ARGS} " | grep -q " $1 ") +} +VALIDARGS="-h --help scheduler workers --tcp --ucx --ucxib --ucx-ib" +HELP="$0 [ ...] [ ...] + where is: + scheduler - start dask scheduler + workers - start dask workers + and is: + --tcp - initalize a tcp cluster (default) + --ucx - initialize a ucx cluster with NVLink + --ucxib | --ucx-ib - initialize a ucx cluster with IB+NVLink + -h | --help - print this text + + The cluster config order of precedence is any specification on the + command line (--tcp, --ucx, etc.) if provided, then the value of the + env var CLUSTER_CONFIG_TYPE if set, then the default value of tcp. + +" + +# CLUSTER_CONFIG_TYPE defaults to the env var value if set, else TCP +CLUSTER_CONFIG_TYPE=${CLUSTER_CONFIG_TYPE:-TCP} +START_SCHEDULER=0 +START_WORKERS=0 + +if (( ${NUMARGS} == 0 )); then + echo "${HELP}" + exit 0 +else + if hasArg -h || hasArg --help; then + echo "${HELP}" + exit 0 + fi + for a in ${ARGS}; do + if ! (echo " ${VALIDARGS} " | grep -q " ${a} "); then + echo "Invalid option: ${a}" + exit 1 + fi + done +fi + +if hasArg scheduler; then + START_SCHEDULER=1 +fi +if hasArg workers; then + START_WORKERS=1 +fi +# Allow the command line to take precedence +if hasArg --tcp; then + CLUSTER_CONFIG_TYPE=TCP +elif hasArg --ucx; then + CLUSTER_CONFIG_TYPE=UCX +elif hasArg --ucxib || hasArg --ucx-ib; then + CLUSTER_CONFIG_TYPE=UCXIB +fi + +######################################## + +#export DASK_LOGGING__DISTRIBUTED="DEBUG" + +#ulimit -n 100000 + +SCHEDULER_LOG=${LOGS_DIR}/scheduler_log.txt +WORKERS_LOG=${LOGS_DIR}/worker-${HOSTNAME}_log.txt + + +function buildTcpArgs { + export DASK_DISTRIBUTED__COMM__TIMEOUTS__CONNECT="100s" + export DASK_DISTRIBUTED__COMM__TIMEOUTS__TCP="600s" + export DASK_DISTRIBUTED__COMM__RETRY__DELAY__MIN="1s" + export DASK_DISTRIBUTED__COMM__RETRY__DELAY__MAX="60s" + export DASK_DISTRIBUTED__WORKER__MEMORY__Terminate="False" + + SCHEDULER_ARGS="--protocol=tcp + --scheduler-file $SCHEDULER_FILE + " + + WORKER_ARGS="--rmm-pool-size=$WORKER_RMM_POOL_SIZE + --rmm-async + --local-directory=/tmp/$LOGNAME + --scheduler-file=$SCHEDULER_FILE + --memory-limit=$DASK_HOST_MEMORY_LIMIT + --device-memory-limit=$DASK_DEVICE_MEMORY_LIMIT + " + +} + +function buildUCXWithInfinibandArgs { + + export UCX_MAX_RNDV_RAILS=1 + export UCX_MEMTYPE_REG_WHOLE_ALLOC_TYPES=cuda + export DASK_RMM__POOL_SIZE=0.5GB + export DASK_DISTRIBUTED__COMM__UCX__CREATE_CUDA_CONTEXT=True + + SCHEDULER_ARGS="--protocol=ucx + --interface=$DASK_CUDA_INTERFACE + --scheduler-file $SCHEDULER_FILE + " + + WORKER_ARGS="--interface=$DASK_CUDA_INTERFACE + --rmm-pool-size=$WORKER_RMM_POOL_SIZE + --rmm-maximum-pool-size=$WORKER_RMM_POOL_SIZE + --local-directory=/tmp/$LOGNAME + --scheduler-file=$SCHEDULER_FILE + --memory-limit=$DASK_HOST_MEMORY_LIMIT + --device-memory-limit=$DASK_DEVICE_MEMORY_LIMIT + --enable-jit-unspill + " +} + + +function buildUCXwithoutInfinibandArgs { + + export UCX_TCP_CM_REUSEADDR=y + export UCX_MAX_RNDV_RAILS=1 + export UCX_TCP_TX_SEG_SIZE=8M + export UCX_TCP_RX_SEG_SIZE=8M + + export DASK_DISTRIBUTED__COMM__UCX__CUDA_COPY=True + export DASK_DISTRIBUTED__COMM__UCX__TCP=True + export DASK_DISTRIBUTED__COMM__UCX__NVLINK=True + export DASK_DISTRIBUTED__COMM__UCX__INFINIBAND=False + export DASK_DISTRIBUTED__COMM__UCX__RDMACM=False + export DASK_RMM__POOL_SIZE=0.5GB + + + SCHEDULER_ARGS="--protocol=ucx + --scheduler-file $SCHEDULER_FILE + " + + WORKER_ARGS="--enable-tcp-over-ucx + --enable-nvlink + --disable-infiniband + --disable-rdmacm + --rmm-pool-size=$WORKER_RMM_POOL_SIZE + --rmm-maximum-pool-size=$WORKER_RMM_POOL_SIZE + --local-directory=/tmp/$LOGNAME + --scheduler-file=$SCHEDULER_FILE + --memory-limit=$DASK_HOST_MEMORY_LIMIT + --device-memory-limit=$DASK_DEVICE_MEMORY_LIMIT + --enable-jit-unspill + " +} + +if [[ "$CLUSTER_CONFIG_TYPE" == "UCX" ]]; then + logger "Using cluster configurtion for UCX" + buildUCXwithoutInfinibandArgs +elif [[ "$CLUSTER_CONFIG_TYPE" == "UCXIB" ]]; then + logger "Using cluster configurtion for UCX with Infiniband" + buildUCXWithInfinibandArgs +else + logger "Using cluster configurtion for TCP" + buildTcpArgs +fi + + +######################################## + +scheduler_pid="" +worker_pid="" +num_scheduler_tries=0 + +function startScheduler { + mkdir -p $(dirname $SCHEDULER_FILE) + echo "RUNNING: \"python -m distributed.cli.dask_scheduler $SCHEDULER_ARGS\"" > $SCHEDULER_LOG + dask-scheduler $SCHEDULER_ARGS >> $SCHEDULER_LOG 2>&1 & + scheduler_pid=$! +} + +mkdir -p $LOGS_DIR +logger "Logs written to: $LOGS_DIR" + +if [[ $START_SCHEDULER == 1 ]]; then + rm -f $SCHEDULER_FILE $SCHEDULER_LOG $WORKERS_LOG + + startScheduler + sleep 6 + num_scheduler_tries=$(python -c "print($num_scheduler_tries+1)") + + # Wait for the scheduler to start first before proceeding, since + # it may require several retries (if prior run left ports open + # that need time to close, etc.) + while [ ! -f "$SCHEDULER_FILE" ]; do + scheduler_alive=$(ps -p $scheduler_pid > /dev/null ; echo $?) + if [[ $scheduler_alive != 0 ]]; then + if [[ $num_scheduler_tries != 30 ]]; then + echo "scheduler failed to start, retry #$num_scheduler_tries" + startScheduler + sleep 6 + num_scheduler_tries=$(echo $num_scheduler_tries+1 | bc) + else + echo "could not start scheduler, exiting." + exit 1 + fi + fi + done + echo "scheduler started." +fi + +if [[ $START_WORKERS == 1 ]]; then + rm -f $WORKERS_LOG + while [ ! -f "$SCHEDULER_FILE" ]; do + echo "run-dask-process.sh: $SCHEDULER_FILE not present - waiting to start workers..." + sleep 2 + done + echo "RUNNING: \"python -m dask_cuda.cli.dask_cuda_worker $WORKER_ARGS\"" > $WORKERS_LOG + dask-cuda-worker $WORKER_ARGS >> $WORKERS_LOG 2>&1 & + worker_pid=$! + echo "worker(s) started." +fi + +# This script will not return until the following background process +# have been completed/killed. +if [[ $worker_pid != "" ]]; then + echo "waiting for worker pid $worker_pid to finish before exiting script..." + wait $worker_pid +fi +if [[ $scheduler_pid != "" ]]; then + echo "waiting for scheduler pid $scheduler_pid to finish before exiting script..." + wait $scheduler_pid +fi diff --git a/benchmarks/cugraph/standalone/bulk_sampling/mg_utils/wait_for_workers.py b/benchmarks/cugraph/standalone/bulk_sampling/mg_utils/wait_for_workers.py new file mode 100644 index 00000000000..29d5cb7fbd7 --- /dev/null +++ b/benchmarks/cugraph/standalone/bulk_sampling/mg_utils/wait_for_workers.py @@ -0,0 +1,124 @@ +# Copyright (c) 2023, NVIDIA CORPORATION. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import sys +import time +import yaml + +from dask.distributed import Client + + +def initialize_dask_cuda(communication_type): + communication_type = communication_type.lower() + if "ucx" in communication_type: + os.environ["UCX_MAX_RNDV_RAILS"] = "1" + + if communication_type == "ucx-ib": + os.environ["UCX_MEMTYPE_REG_WHOLE_ALLOC_TYPES"]="cuda" + os.environ["DASK_RMM__POOL_SIZE"]="0.5GB" + os.environ["DASK_DISTRIBUTED__COMM__UCX__CREATE_CUDA_CONTEXT"]="True" + + +def wait_for_workers( + num_expected_workers, scheduler_file_path, communication_type, timeout_after=0 +): + """ + Waits until num_expected_workers workers are available based on + the workers managed by scheduler_file_path, then returns 0. If + timeout_after is specified, will return 1 if num_expected_workers + workers are not available before the timeout. + """ + # FIXME: use scheduler file path from global environment if none + # supplied in configuration yaml + + print("wait_for_workers.py - initializing client...", end="") + sys.stdout.flush() + initialize_dask_cuda(communication_type) + print("done.") + sys.stdout.flush() + + ready = False + start_time = time.time() + while not ready: + if timeout_after and ((time.time() - start_time) >= timeout_after): + print( + f"wait_for_workers.py timed out after {timeout_after} seconds before finding {num_expected_workers} workers." + ) + sys.stdout.flush() + break + with Client(scheduler_file=scheduler_file_path) as client: + num_workers = len(client.scheduler_info()["workers"]) + if num_workers < num_expected_workers: + print( + f"wait_for_workers.py expected {num_expected_workers} but got {num_workers}, waiting..." + ) + sys.stdout.flush() + time.sleep(5) + else: + print(f"wait_for_workers.py got {num_workers} workers, done.") + sys.stdout.flush() + ready = True + + if ready is False: + return 1 + return 0 + + +if __name__ == "__main__": + import argparse + + ap = argparse.ArgumentParser() + ap.add_argument( + "--num-expected-workers", + type=int, + required=False, + help="Number of workers to wait for. If not specified, " + "uses the NUM_WORKERS env var if set, otherwise defaults " + "to 16.", + ) + ap.add_argument( + "--scheduler-file-path", + type=str, + required=True, + help="Path to shared scheduler file to read.", + ) + ap.add_argument( + "--communication-type", + type=str, + default="tcp", + required=False, + help="Initiliaze dask_cuda based on the cluster communication type." + "Supported values are tcp(default), ucx, ucxib, ucx-ib.", + ) + ap.add_argument( + "--timeout-after", + type=int, + default=0, + required=False, + help="Number of seconds to wait for workers. " + "Default is 0 which means wait forever.", + ) + args = ap.parse_args() + + if args.num_expected_workers is None: + args.num_expected_workers = os.environ.get("NUM_WORKERS", 16) + + exitcode = wait_for_workers( + num_expected_workers=args.num_expected_workers, + scheduler_file_path=args.scheduler_file_path, + communication_type=args.communication_type, + timeout_after=args.timeout_after, + ) + + sys.exit(exitcode) diff --git a/python/cugraph-dgl/cugraph_dgl/dataloading/dataloader.py b/python/cugraph-dgl/cugraph_dgl/dataloading/dataloader.py index f154b096256..11139910931 100644 --- a/python/cugraph-dgl/cugraph_dgl/dataloading/dataloader.py +++ b/python/cugraph-dgl/cugraph_dgl/dataloading/dataloader.py @@ -17,7 +17,7 @@ import cupy as cp import cudf from cugraph.utilities.utils import import_optional -from cugraph.experimental import BulkSampler +from cugraph.gnn import BulkSampler from dask.distributed import default_client, Event from cugraph_dgl.dataloading import ( HomogenousBulkSamplerDataset, diff --git a/python/cugraph-pyg/cugraph_pyg/data/__init__.py b/python/cugraph-pyg/cugraph_pyg/data/__init__.py index 0567b69ecf2..66a9843c047 100644 --- a/python/cugraph-pyg/cugraph_pyg/data/__init__.py +++ b/python/cugraph-pyg/cugraph_pyg/data/__init__.py @@ -11,8 +11,4 @@ # See the License for the specific language governing permissions and # limitations under the License. -from cugraph.utilities.api_tools import experimental_warning_wrapper - -from cugraph_pyg.data.cugraph_store import EXPERIMENTAL__CuGraphStore - -CuGraphStore = experimental_warning_wrapper(EXPERIMENTAL__CuGraphStore) +from cugraph_pyg.data.cugraph_store import CuGraphStore diff --git a/python/cugraph-pyg/cugraph_pyg/data/cugraph_store.py b/python/cugraph-pyg/cugraph_pyg/data/cugraph_store.py index 14dc5d84f90..05d540b7c45 100644 --- a/python/cugraph-pyg/cugraph_pyg/data/cugraph_store.py +++ b/python/cugraph-pyg/cugraph_pyg/data/cugraph_store.py @@ -199,7 +199,7 @@ def cast(cls, *args, **kwargs): return cls(*args, **kwargs) -class EXPERIMENTAL__CuGraphStore: +class CuGraphStore: """ Duck-typed version of PyG's GraphStore and FeatureStore. """ diff --git a/python/cugraph-pyg/cugraph_pyg/loader/__init__.py b/python/cugraph-pyg/cugraph_pyg/loader/__init__.py index 0682c598fdf..2c3d7eff89e 100644 --- a/python/cugraph-pyg/cugraph_pyg/loader/__init__.py +++ b/python/cugraph-pyg/cugraph_pyg/loader/__init__.py @@ -11,14 +11,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -from cugraph.utilities.api_tools import experimental_warning_wrapper +from cugraph_pyg.loader.cugraph_node_loader import CuGraphNeighborLoader -from cugraph_pyg.loader.cugraph_node_loader import EXPERIMENTAL__CuGraphNeighborLoader - -CuGraphNeighborLoader = experimental_warning_wrapper( - EXPERIMENTAL__CuGraphNeighborLoader -) - -from cugraph_pyg.loader.cugraph_node_loader import EXPERIMENTAL__BulkSampleLoader - -BulkSampleLoader = experimental_warning_wrapper(EXPERIMENTAL__BulkSampleLoader) +from cugraph_pyg.loader.cugraph_node_loader import BulkSampleLoader diff --git a/python/cugraph-pyg/cugraph_pyg/loader/cugraph_node_loader.py b/python/cugraph-pyg/cugraph_pyg/loader/cugraph_node_loader.py index 200a82b460b..8a1db4edf29 100644 --- a/python/cugraph-pyg/cugraph_pyg/loader/cugraph_node_loader.py +++ b/python/cugraph-pyg/cugraph_pyg/loader/cugraph_node_loader.py @@ -20,7 +20,7 @@ import cupy import cudf -from cugraph.experimental.gnn import BulkSampler +from cugraph.gnn import BulkSampler from cugraph.utilities.utils import import_optional, MissingModule from cugraph_pyg.data import CuGraphStore @@ -42,7 +42,7 @@ ) -class EXPERIMENTAL__BulkSampleLoader: +class BulkSampleLoader: __ex_parquet_file = re.compile(r"batch=([0-9]+)\-([0-9]+)\.parquet") @@ -478,7 +478,7 @@ def __iter__(self): return self -class EXPERIMENTAL__CuGraphNeighborLoader: +class CuGraphNeighborLoader: def __init__( self, data: Union[CuGraphStore, Tuple[CuGraphStore, CuGraphStore]], @@ -527,7 +527,7 @@ def batch_size(self) -> int: return self.__batch_size def __iter__(self): - self.current_loader = EXPERIMENTAL__BulkSampleLoader( + self.current_loader = BulkSampleLoader( self.__feature_store, self.__graph_store, self.__input_nodes, diff --git a/python/cugraph/cugraph/experimental/gnn/__init__.py b/python/cugraph/cugraph/experimental/gnn/__init__.py index 2f06bb20abe..9c366a2ee28 100644 --- a/python/cugraph/cugraph/experimental/gnn/__init__.py +++ b/python/cugraph/cugraph/experimental/gnn/__init__.py @@ -11,7 +11,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from cugraph.gnn.data_loading import EXPERIMENTAL__BulkSampler -from cugraph.utilities.api_tools import experimental_warning_wrapper +from cugraph.gnn.data_loading import BulkSampler +from cugraph.utilities.api_tools import promoted_experimental_warning_wrapper -BulkSampler = experimental_warning_wrapper(EXPERIMENTAL__BulkSampler) +BulkSampler = promoted_experimental_warning_wrapper(BulkSampler) diff --git a/python/cugraph/cugraph/gnn/__init__.py b/python/cugraph/cugraph/gnn/__init__.py index a62e0cbd242..f8a3035440b 100644 --- a/python/cugraph/cugraph/gnn/__init__.py +++ b/python/cugraph/cugraph/gnn/__init__.py @@ -12,3 +12,4 @@ # limitations under the License. from .feature_storage.feat_storage import FeatureStore +from .data_loading.bulk_sampler import BulkSampler diff --git a/python/cugraph/cugraph/gnn/data_loading/__init__.py b/python/cugraph/cugraph/gnn/data_loading/__init__.py index 6150bf5b422..4b725fba75a 100644 --- a/python/cugraph/cugraph/gnn/data_loading/__init__.py +++ b/python/cugraph/cugraph/gnn/data_loading/__init__.py @@ -11,4 +11,4 @@ # See the License for the specific language governing permissions and # limitations under the License. -from cugraph.gnn.data_loading.bulk_sampler import EXPERIMENTAL__BulkSampler +from cugraph.gnn.data_loading.bulk_sampler import BulkSampler diff --git a/python/cugraph/cugraph/gnn/data_loading/bulk_sampler.py b/python/cugraph/cugraph/gnn/data_loading/bulk_sampler.py index dbfcb124ce5..ff72e0ea2d6 100644 --- a/python/cugraph/cugraph/gnn/data_loading/bulk_sampler.py +++ b/python/cugraph/cugraph/gnn/data_loading/bulk_sampler.py @@ -31,7 +31,7 @@ import time -class EXPERIMENTAL__BulkSampler: +class BulkSampler: """ Performs sampling based on input seeds grouped into batches by a batch id. Writes the output minibatches to parquet, with @@ -158,7 +158,7 @@ def add_batches( Examples -------- >>> import cudf - >>> from cugraph.experimental.gnn import BulkSampler + >>> from cugraph.gnn import BulkSampler >>> from cugraph.datasets import karate >>> import tempfile >>> df = cudf.DataFrame({ diff --git a/python/cugraph/cugraph/tests/sampling/test_bulk_sampler.py b/python/cugraph/cugraph/tests/sampling/test_bulk_sampler.py index a945881394b..943681fb6ff 100644 --- a/python/cugraph/cugraph/tests/sampling/test_bulk_sampler.py +++ b/python/cugraph/cugraph/tests/sampling/test_bulk_sampler.py @@ -17,7 +17,7 @@ import cupy import cugraph from cugraph.datasets import karate, email_Eu_core -from cugraph.experimental.gnn import BulkSampler +from cugraph.gnn import BulkSampler from cugraph.utilities.utils import create_directory_with_overwrite import os diff --git a/python/cugraph/cugraph/tests/sampling/test_bulk_sampler_mg.py b/python/cugraph/cugraph/tests/sampling/test_bulk_sampler_mg.py index aee81e5ffed..1f7c4277773 100644 --- a/python/cugraph/cugraph/tests/sampling/test_bulk_sampler_mg.py +++ b/python/cugraph/cugraph/tests/sampling/test_bulk_sampler_mg.py @@ -22,7 +22,7 @@ import cugraph import dask_cudf from cugraph.datasets import karate, email_Eu_core -from cugraph.experimental import BulkSampler +from cugraph.gnn import BulkSampler from cugraph.utilities.utils import create_directory_with_overwrite From ec0417e1f7041f1f57b00efd7398a7d9c09d1f5f Mon Sep 17 00:00:00 2001 From: Alexandria Barghi Date: Fri, 8 Mar 2024 11:08:23 -0800 Subject: [PATCH 2/4] update examples --- python/cugraph-pyg/cugraph_pyg/examples/graph_sage_mg.py | 2 +- python/cugraph-pyg/cugraph_pyg/examples/graph_sage_sg.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/python/cugraph-pyg/cugraph_pyg/examples/graph_sage_mg.py b/python/cugraph-pyg/cugraph_pyg/examples/graph_sage_mg.py index 9c0adaad879..9afe7341dda 100644 --- a/python/cugraph-pyg/cugraph_pyg/examples/graph_sage_mg.py +++ b/python/cugraph-pyg/cugraph_pyg/examples/graph_sage_mg.py @@ -21,7 +21,7 @@ import torch import numpy as np -from torch_geometric.nn import CuGraphSAGEConv +from cugraph_pyg.nn import SAGEConv as CuGraphSAGEConv import torch.nn as nn import torch.nn.functional as F diff --git a/python/cugraph-pyg/cugraph_pyg/examples/graph_sage_sg.py b/python/cugraph-pyg/cugraph_pyg/examples/graph_sage_sg.py index 82f5e7ea67d..eb5f43fad34 100644 --- a/python/cugraph-pyg/cugraph_pyg/examples/graph_sage_sg.py +++ b/python/cugraph-pyg/cugraph_pyg/examples/graph_sage_sg.py @@ -18,7 +18,7 @@ import torch -from torch_geometric.nn import CuGraphSAGEConv +from cugraph_pyg.nn import SAGEConv as CuGraphSAGEConv import torch.nn as nn import torch.nn.functional as F From 90ca821703439f87c9f4a0be1ed57a19210d4085 Mon Sep 17 00:00:00 2001 From: Alexandria Barghi Date: Fri, 8 Mar 2024 11:09:03 -0800 Subject: [PATCH 3/4] style --- python/cugraph-pyg/cugraph_pyg/examples/graph_sage_mg.py | 2 +- python/cugraph-pyg/cugraph_pyg/examples/graph_sage_sg.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/python/cugraph-pyg/cugraph_pyg/examples/graph_sage_mg.py b/python/cugraph-pyg/cugraph_pyg/examples/graph_sage_mg.py index 9afe7341dda..4ca573504a1 100644 --- a/python/cugraph-pyg/cugraph_pyg/examples/graph_sage_mg.py +++ b/python/cugraph-pyg/cugraph_pyg/examples/graph_sage_mg.py @@ -1,4 +1,4 @@ -# Copyright (c) 2023, NVIDIA CORPORATION. +# Copyright (c) 2023-2024, NVIDIA CORPORATION. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at diff --git a/python/cugraph-pyg/cugraph_pyg/examples/graph_sage_sg.py b/python/cugraph-pyg/cugraph_pyg/examples/graph_sage_sg.py index eb5f43fad34..9c96a707e4d 100644 --- a/python/cugraph-pyg/cugraph_pyg/examples/graph_sage_sg.py +++ b/python/cugraph-pyg/cugraph_pyg/examples/graph_sage_sg.py @@ -1,4 +1,4 @@ -# Copyright (c) 2023, NVIDIA CORPORATION. +# Copyright (c) 2023-2024, NVIDIA CORPORATION. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at From ffe45a302cb79fb70b79b0c207f2d40b18288c4c Mon Sep 17 00:00:00 2001 From: Alexandria Barghi Date: Fri, 8 Mar 2024 11:11:33 -0800 Subject: [PATCH 4/4] remove mg utils --- .../bulk_sampling/mg_utils/README.md | 6 - .../bulk_sampling/mg_utils/default-config.sh | 39 --- .../bulk_sampling/mg_utils/functions.sh | 66 ----- .../mg_utils/run-dask-process.sh | 247 ------------------ .../mg_utils/wait_for_workers.py | 124 --------- 5 files changed, 482 deletions(-) delete mode 100644 benchmarks/cugraph/standalone/bulk_sampling/mg_utils/README.md delete mode 100755 benchmarks/cugraph/standalone/bulk_sampling/mg_utils/default-config.sh delete mode 100644 benchmarks/cugraph/standalone/bulk_sampling/mg_utils/functions.sh delete mode 100755 benchmarks/cugraph/standalone/bulk_sampling/mg_utils/run-dask-process.sh delete mode 100644 benchmarks/cugraph/standalone/bulk_sampling/mg_utils/wait_for_workers.py diff --git a/benchmarks/cugraph/standalone/bulk_sampling/mg_utils/README.md b/benchmarks/cugraph/standalone/bulk_sampling/mg_utils/README.md deleted file mode 100644 index 26dbbd5e705..00000000000 --- a/benchmarks/cugraph/standalone/bulk_sampling/mg_utils/README.md +++ /dev/null @@ -1,6 +0,0 @@ -This directory contains various scripts helpful for cugraph users and developers. - -The following scripts were copied from https://github.com/rapidsai/multi-gpu-tools and are useful for starting a dask cluster, which is needed by cugraph for multi-GPU support. -* `run-dask-process.sh` -* `functions.sh` -* `default-config.sh` diff --git a/benchmarks/cugraph/standalone/bulk_sampling/mg_utils/default-config.sh b/benchmarks/cugraph/standalone/bulk_sampling/mg_utils/default-config.sh deleted file mode 100755 index 26cef2aee78..00000000000 --- a/benchmarks/cugraph/standalone/bulk_sampling/mg_utils/default-config.sh +++ /dev/null @@ -1,39 +0,0 @@ -# Copyright (c) 2022-2023, NVIDIA CORPORATION. -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -THIS_DIR=$(cd $(dirname ${BASH_SOURCE[0]}) && pwd) - -# Most are defined using the bash := or :- syntax, which means they -# will be set only if they were previously unset. The project config -# is loaded first, which gives it the opportunity to override anything -# in this file that uses that syntax. If there are variables in this -# file that should not be overridded by a project, then they will -# simply not use that syntax and override, since these variables are -# read last. -SCRIPTS_DIR=$THIS_DIR -WORKSPACE=$THIS_DIR - -# These really should be oerridden by the project config! -CONDA_ENV=${CONDA_ENV:-rapids} - -GPUS_PER_NODE=${GPUS_PER_NODE:-8} -WORKER_RMM_POOL_SIZE=${WORKER_RMM_POOL_SIZE:-12G} -DASK_CUDA_INTERFACE=${DASK_CUDA_INTERFACE:-ib0} -DASK_SCHEDULER_PORT=${DASK_SCHEDULER_PORT:-8792} -DASK_DEVICE_MEMORY_LIMIT=${DASK_DEVICE_MEMORY_LIMIT:-auto} -DASK_HOST_MEMORY_LIMIT=${DASK_HOST_MEMORY_LIMIT:-auto} - -BUILD_LOG_FILE=${BUILD_LOG_FILE:-${RESULTS_DIR}/build_log.txt} -SCHEDULER_FILE=${SCHEDULER_FILE:-${WORKSPACE}/dask-scheduler.json} -DATE=${DATE:-$(date --utc "+%Y-%m-%d_%H:%M:%S")_UTC} -ENV_EXPORT_FILE=${ENV_EXPORT_FILE:-${WORKSPACE}/$(basename ${CONDA_ENV})-${DATE}.txt} diff --git a/benchmarks/cugraph/standalone/bulk_sampling/mg_utils/functions.sh b/benchmarks/cugraph/standalone/bulk_sampling/mg_utils/functions.sh deleted file mode 100644 index 7eedb5f1b1f..00000000000 --- a/benchmarks/cugraph/standalone/bulk_sampling/mg_utils/functions.sh +++ /dev/null @@ -1,66 +0,0 @@ -# Copyright (c) 2022-2023, NVIDIA CORPORATION. -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# This file is source'd from script-env.sh to add functions to the -# calling environment, hence no #!/bin/bash as the first line. This -# also assumes the variables used in this file have been defined -# elsewhere. - -NUMARGS=$# -ARGS=$* -function hasArg { - (( ${NUMARGS} != 0 )) && (echo " ${ARGS} " | grep -q " $1 ") -} - -function logger { - echo -e ">>>> $@" -} - -# Calling "setTee outfile" will cause all stdout and stderr of the -# current script to be output to "tee", which outputs to stdout and -# "outfile" simultaneously. This is useful by allowing a script to -# "tee" itself at any point without being called with tee. -_origFileDescriptorsSaved=0 -function setTee { - if [[ $_origFileDescriptorsSaved == 0 ]]; then - # Save off the original file descr 1 and 2 as 3 and 4 - exec 3>&1 4>&2 - _origFileDescriptorsSaved=1 - fi - teeFile=$1 - # Create a named pipe. - pipeName=$(mktemp -u) - mkfifo $pipeName - # Close the currnet 1 and 2 and restore to original (3, 4) in the - # event this function is called repeatedly. - exec 1>&- 2>&- - exec 1>&3 2>&4 - # Start a tee process reading from the named pipe. Redirect stdout - # and stderr to the named pipe which goes to the tee process. The - # named pipe "file" can be removed and the tee process stays alive - # until the fd is closed. - tee -a < $pipeName $teeFile & - exec > $pipeName 2>&1 - rm $pipeName -} - -# Call this to stop script output from going to "tee" after a prior -# call to setTee. -function unsetTee { - if [[ $_origFileDescriptorsSaved == 1 ]]; then - # Close the current fd 1 and 2 which should stop the tee - # process, then restore 1 and 2 to original (saved as 3, 4). - exec 1>&- 2>&- - exec 1>&3 2>&4 - fi -} diff --git a/benchmarks/cugraph/standalone/bulk_sampling/mg_utils/run-dask-process.sh b/benchmarks/cugraph/standalone/bulk_sampling/mg_utils/run-dask-process.sh deleted file mode 100755 index b88abb685ec..00000000000 --- a/benchmarks/cugraph/standalone/bulk_sampling/mg_utils/run-dask-process.sh +++ /dev/null @@ -1,247 +0,0 @@ -#!/bin/bash -# Copyright (c) 2022-2023, NVIDIA CORPORATION. -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -THIS_DIR=$(cd $(dirname ${BASH_SOURCE[0]}) && pwd) - -source ${THIS_DIR}/default-config.sh -source ${THIS_DIR}/functions.sh - -# Logs can be written to a specific location by setting the LOGS_DIR -# env var. -LOGS_DIR=${LOGS_DIR:-dask_logs-$$} - -######################################## -NUMARGS=$# -ARGS=$* -function hasArg { - (( ${NUMARGS} != 0 )) && (echo " ${ARGS} " | grep -q " $1 ") -} -VALIDARGS="-h --help scheduler workers --tcp --ucx --ucxib --ucx-ib" -HELP="$0 [ ...] [ ...] - where is: - scheduler - start dask scheduler - workers - start dask workers - and is: - --tcp - initalize a tcp cluster (default) - --ucx - initialize a ucx cluster with NVLink - --ucxib | --ucx-ib - initialize a ucx cluster with IB+NVLink - -h | --help - print this text - - The cluster config order of precedence is any specification on the - command line (--tcp, --ucx, etc.) if provided, then the value of the - env var CLUSTER_CONFIG_TYPE if set, then the default value of tcp. - -" - -# CLUSTER_CONFIG_TYPE defaults to the env var value if set, else TCP -CLUSTER_CONFIG_TYPE=${CLUSTER_CONFIG_TYPE:-TCP} -START_SCHEDULER=0 -START_WORKERS=0 - -if (( ${NUMARGS} == 0 )); then - echo "${HELP}" - exit 0 -else - if hasArg -h || hasArg --help; then - echo "${HELP}" - exit 0 - fi - for a in ${ARGS}; do - if ! (echo " ${VALIDARGS} " | grep -q " ${a} "); then - echo "Invalid option: ${a}" - exit 1 - fi - done -fi - -if hasArg scheduler; then - START_SCHEDULER=1 -fi -if hasArg workers; then - START_WORKERS=1 -fi -# Allow the command line to take precedence -if hasArg --tcp; then - CLUSTER_CONFIG_TYPE=TCP -elif hasArg --ucx; then - CLUSTER_CONFIG_TYPE=UCX -elif hasArg --ucxib || hasArg --ucx-ib; then - CLUSTER_CONFIG_TYPE=UCXIB -fi - -######################################## - -#export DASK_LOGGING__DISTRIBUTED="DEBUG" - -#ulimit -n 100000 - -SCHEDULER_LOG=${LOGS_DIR}/scheduler_log.txt -WORKERS_LOG=${LOGS_DIR}/worker-${HOSTNAME}_log.txt - - -function buildTcpArgs { - export DASK_DISTRIBUTED__COMM__TIMEOUTS__CONNECT="100s" - export DASK_DISTRIBUTED__COMM__TIMEOUTS__TCP="600s" - export DASK_DISTRIBUTED__COMM__RETRY__DELAY__MIN="1s" - export DASK_DISTRIBUTED__COMM__RETRY__DELAY__MAX="60s" - export DASK_DISTRIBUTED__WORKER__MEMORY__Terminate="False" - - SCHEDULER_ARGS="--protocol=tcp - --scheduler-file $SCHEDULER_FILE - " - - WORKER_ARGS="--rmm-pool-size=$WORKER_RMM_POOL_SIZE - --rmm-async - --local-directory=/tmp/$LOGNAME - --scheduler-file=$SCHEDULER_FILE - --memory-limit=$DASK_HOST_MEMORY_LIMIT - --device-memory-limit=$DASK_DEVICE_MEMORY_LIMIT - " - -} - -function buildUCXWithInfinibandArgs { - - export UCX_MAX_RNDV_RAILS=1 - export UCX_MEMTYPE_REG_WHOLE_ALLOC_TYPES=cuda - export DASK_RMM__POOL_SIZE=0.5GB - export DASK_DISTRIBUTED__COMM__UCX__CREATE_CUDA_CONTEXT=True - - SCHEDULER_ARGS="--protocol=ucx - --interface=$DASK_CUDA_INTERFACE - --scheduler-file $SCHEDULER_FILE - " - - WORKER_ARGS="--interface=$DASK_CUDA_INTERFACE - --rmm-pool-size=$WORKER_RMM_POOL_SIZE - --rmm-maximum-pool-size=$WORKER_RMM_POOL_SIZE - --local-directory=/tmp/$LOGNAME - --scheduler-file=$SCHEDULER_FILE - --memory-limit=$DASK_HOST_MEMORY_LIMIT - --device-memory-limit=$DASK_DEVICE_MEMORY_LIMIT - --enable-jit-unspill - " -} - - -function buildUCXwithoutInfinibandArgs { - - export UCX_TCP_CM_REUSEADDR=y - export UCX_MAX_RNDV_RAILS=1 - export UCX_TCP_TX_SEG_SIZE=8M - export UCX_TCP_RX_SEG_SIZE=8M - - export DASK_DISTRIBUTED__COMM__UCX__CUDA_COPY=True - export DASK_DISTRIBUTED__COMM__UCX__TCP=True - export DASK_DISTRIBUTED__COMM__UCX__NVLINK=True - export DASK_DISTRIBUTED__COMM__UCX__INFINIBAND=False - export DASK_DISTRIBUTED__COMM__UCX__RDMACM=False - export DASK_RMM__POOL_SIZE=0.5GB - - - SCHEDULER_ARGS="--protocol=ucx - --scheduler-file $SCHEDULER_FILE - " - - WORKER_ARGS="--enable-tcp-over-ucx - --enable-nvlink - --disable-infiniband - --disable-rdmacm - --rmm-pool-size=$WORKER_RMM_POOL_SIZE - --rmm-maximum-pool-size=$WORKER_RMM_POOL_SIZE - --local-directory=/tmp/$LOGNAME - --scheduler-file=$SCHEDULER_FILE - --memory-limit=$DASK_HOST_MEMORY_LIMIT - --device-memory-limit=$DASK_DEVICE_MEMORY_LIMIT - --enable-jit-unspill - " -} - -if [[ "$CLUSTER_CONFIG_TYPE" == "UCX" ]]; then - logger "Using cluster configurtion for UCX" - buildUCXwithoutInfinibandArgs -elif [[ "$CLUSTER_CONFIG_TYPE" == "UCXIB" ]]; then - logger "Using cluster configurtion for UCX with Infiniband" - buildUCXWithInfinibandArgs -else - logger "Using cluster configurtion for TCP" - buildTcpArgs -fi - - -######################################## - -scheduler_pid="" -worker_pid="" -num_scheduler_tries=0 - -function startScheduler { - mkdir -p $(dirname $SCHEDULER_FILE) - echo "RUNNING: \"python -m distributed.cli.dask_scheduler $SCHEDULER_ARGS\"" > $SCHEDULER_LOG - dask-scheduler $SCHEDULER_ARGS >> $SCHEDULER_LOG 2>&1 & - scheduler_pid=$! -} - -mkdir -p $LOGS_DIR -logger "Logs written to: $LOGS_DIR" - -if [[ $START_SCHEDULER == 1 ]]; then - rm -f $SCHEDULER_FILE $SCHEDULER_LOG $WORKERS_LOG - - startScheduler - sleep 6 - num_scheduler_tries=$(python -c "print($num_scheduler_tries+1)") - - # Wait for the scheduler to start first before proceeding, since - # it may require several retries (if prior run left ports open - # that need time to close, etc.) - while [ ! -f "$SCHEDULER_FILE" ]; do - scheduler_alive=$(ps -p $scheduler_pid > /dev/null ; echo $?) - if [[ $scheduler_alive != 0 ]]; then - if [[ $num_scheduler_tries != 30 ]]; then - echo "scheduler failed to start, retry #$num_scheduler_tries" - startScheduler - sleep 6 - num_scheduler_tries=$(echo $num_scheduler_tries+1 | bc) - else - echo "could not start scheduler, exiting." - exit 1 - fi - fi - done - echo "scheduler started." -fi - -if [[ $START_WORKERS == 1 ]]; then - rm -f $WORKERS_LOG - while [ ! -f "$SCHEDULER_FILE" ]; do - echo "run-dask-process.sh: $SCHEDULER_FILE not present - waiting to start workers..." - sleep 2 - done - echo "RUNNING: \"python -m dask_cuda.cli.dask_cuda_worker $WORKER_ARGS\"" > $WORKERS_LOG - dask-cuda-worker $WORKER_ARGS >> $WORKERS_LOG 2>&1 & - worker_pid=$! - echo "worker(s) started." -fi - -# This script will not return until the following background process -# have been completed/killed. -if [[ $worker_pid != "" ]]; then - echo "waiting for worker pid $worker_pid to finish before exiting script..." - wait $worker_pid -fi -if [[ $scheduler_pid != "" ]]; then - echo "waiting for scheduler pid $scheduler_pid to finish before exiting script..." - wait $scheduler_pid -fi diff --git a/benchmarks/cugraph/standalone/bulk_sampling/mg_utils/wait_for_workers.py b/benchmarks/cugraph/standalone/bulk_sampling/mg_utils/wait_for_workers.py deleted file mode 100644 index 29d5cb7fbd7..00000000000 --- a/benchmarks/cugraph/standalone/bulk_sampling/mg_utils/wait_for_workers.py +++ /dev/null @@ -1,124 +0,0 @@ -# Copyright (c) 2023, NVIDIA CORPORATION. -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import os -import sys -import time -import yaml - -from dask.distributed import Client - - -def initialize_dask_cuda(communication_type): - communication_type = communication_type.lower() - if "ucx" in communication_type: - os.environ["UCX_MAX_RNDV_RAILS"] = "1" - - if communication_type == "ucx-ib": - os.environ["UCX_MEMTYPE_REG_WHOLE_ALLOC_TYPES"]="cuda" - os.environ["DASK_RMM__POOL_SIZE"]="0.5GB" - os.environ["DASK_DISTRIBUTED__COMM__UCX__CREATE_CUDA_CONTEXT"]="True" - - -def wait_for_workers( - num_expected_workers, scheduler_file_path, communication_type, timeout_after=0 -): - """ - Waits until num_expected_workers workers are available based on - the workers managed by scheduler_file_path, then returns 0. If - timeout_after is specified, will return 1 if num_expected_workers - workers are not available before the timeout. - """ - # FIXME: use scheduler file path from global environment if none - # supplied in configuration yaml - - print("wait_for_workers.py - initializing client...", end="") - sys.stdout.flush() - initialize_dask_cuda(communication_type) - print("done.") - sys.stdout.flush() - - ready = False - start_time = time.time() - while not ready: - if timeout_after and ((time.time() - start_time) >= timeout_after): - print( - f"wait_for_workers.py timed out after {timeout_after} seconds before finding {num_expected_workers} workers." - ) - sys.stdout.flush() - break - with Client(scheduler_file=scheduler_file_path) as client: - num_workers = len(client.scheduler_info()["workers"]) - if num_workers < num_expected_workers: - print( - f"wait_for_workers.py expected {num_expected_workers} but got {num_workers}, waiting..." - ) - sys.stdout.flush() - time.sleep(5) - else: - print(f"wait_for_workers.py got {num_workers} workers, done.") - sys.stdout.flush() - ready = True - - if ready is False: - return 1 - return 0 - - -if __name__ == "__main__": - import argparse - - ap = argparse.ArgumentParser() - ap.add_argument( - "--num-expected-workers", - type=int, - required=False, - help="Number of workers to wait for. If not specified, " - "uses the NUM_WORKERS env var if set, otherwise defaults " - "to 16.", - ) - ap.add_argument( - "--scheduler-file-path", - type=str, - required=True, - help="Path to shared scheduler file to read.", - ) - ap.add_argument( - "--communication-type", - type=str, - default="tcp", - required=False, - help="Initiliaze dask_cuda based on the cluster communication type." - "Supported values are tcp(default), ucx, ucxib, ucx-ib.", - ) - ap.add_argument( - "--timeout-after", - type=int, - default=0, - required=False, - help="Number of seconds to wait for workers. " - "Default is 0 which means wait forever.", - ) - args = ap.parse_args() - - if args.num_expected_workers is None: - args.num_expected_workers = os.environ.get("NUM_WORKERS", 16) - - exitcode = wait_for_workers( - num_expected_workers=args.num_expected_workers, - scheduler_file_path=args.scheduler_file_path, - communication_type=args.communication_type, - timeout_after=args.timeout_after, - ) - - sys.exit(exitcode)