From b955e982565b794eeba12322fa73d9b581cfa993 Mon Sep 17 00:00:00 2001 From: Ian Bolliger Date: Wed, 22 Dec 2021 17:52:42 +0000 Subject: [PATCH 1/3] add client_kwargs arg --- rhg_compute_tools/kubernetes.py | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/rhg_compute_tools/kubernetes.py b/rhg_compute_tools/kubernetes.py index d58b161..c332734 100644 --- a/rhg_compute_tools/kubernetes.py +++ b/rhg_compute_tools/kubernetes.py @@ -47,16 +47,19 @@ def decorator(func): return decorator -def _get_cluster_dask_gateway(**kwargs): +def _get_cluster_dask_gateway(client_kwargs={}, **kwargs): """ Start dask.kubernetes cluster and dask.distributed client All arguments are optional. If not provided, defaults will be used. To view defaults, instantiate a :class:`dask_gateway.Gateway` object and call - `gateway.cluster_options()`. + `gateway.cluster_options()`. Kwargs are passed to the + :class:`dask_gateway.GatewayCluster` object, except for ``client_kwargs`` which are + passed to the :class:`distributed.Client` object. Parameters ---------- + client name : str, optional Name of worker image to use (e.g. ``rhodium/worker:latest``). If ``None`` (default), default to worker specified in ``template_path``. @@ -166,7 +169,7 @@ def _get_cluster_dask_gateway(**kwargs): elif k == "extra_pod_tolerations": if ( "keep_default_tolerations" in kwargs.keys() - and kwargs["keep_default_tolerations"] == False + and not kwargs["keep_default_tolerations"] ): base_tols = {} else: @@ -191,7 +194,12 @@ def _get_cluster_dask_gateway(**kwargs): del new_kwargs["tag"] cluster = gateway.new_cluster(**new_kwargs) - client = cluster.get_client() + + # this snippet replicates the GatewayCluster.get_client functionality but with the + # option to pass extra kwargs to the Client instantiation + client = dd.Client(cluster, **client_kwargs) + if not cluster.asynchronous: + cluster._clients.add(client) return client, cluster From 94622f97883bd00775ff87263af83dd2c0719787 Mon Sep 17 00:00:00 2001 From: Ian Bolliger Date: Wed, 22 Dec 2021 17:54:27 +0000 Subject: [PATCH 2/3] fix client kwargs in rhgx --- rhg_compute_tools/xarray.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rhg_compute_tools/xarray.py b/rhg_compute_tools/xarray.py index c996a16..3ccce6e 100644 --- a/rhg_compute_tools/xarray.py +++ b/rhg_compute_tools/xarray.py @@ -243,7 +243,7 @@ def datasets_from_delayed(futures, client=None, **client_kwargs): delayed_arrays = [ { - k: client.submit(lambda x: x[k].data, futures[i], client_kwargs) + k: client.submit(lambda x: x[k].data, futures[i], **client_kwargs) for k in data_var_keys[i] } for i in range(len(futures)) From d21b15869d80c849dfd78d6b02a561bd5953cd6a Mon Sep 17 00:00:00 2001 From: Ian Bolliger Date: Wed, 22 Dec 2021 18:01:04 +0000 Subject: [PATCH 3/3] update history --- HISTORY.rst | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/HISTORY.rst b/HISTORY.rst index c3bb677..ca18809 100644 --- a/HISTORY.rst +++ b/HISTORY.rst @@ -4,6 +4,14 @@ History .. current developments +v1.3 +------ +New features: +* Add `client_kwargs` option when starting a cluster with ``rhg_compute_tools.kubernetes.get_*_cluster`` + +Bug fixes: +* Fix typo preventing ``rhg_compute_tools.xarray.datasets_from_delayed`` from executing + v1.2.1 ------ Bug fixes: @@ -14,7 +22,6 @@ v1.2 New features: * Adds google storage directory marker utilities and rctools gcs mkdirs command line app - v1.1.4 ------ * Add ``dask_kwargs`` to the ``rhg_compute_tools.xarray`` functions