diff --git a/CHANGELOG.md b/CHANGELOG.md index a9cd7bb29..92700d296 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -36,6 +36,8 @@ - Documentation improvements and cleanup [PR #521](https://github.com/aai-institute/pyDVL/pull/521), [PR #522](https://github.com/aai-institute/pyDVL/pull/522) +- Simplified parallel backend configuration + [PR #549](https://github.com/mkdocstrings/mkdocstrings/issues/615) ## 0.8.1 - 🆕 🏗 New method and notebook, Games with exact shapley values, bug fixes and cleanup diff --git a/docs/getting-started/advanced-usage.md b/docs/getting-started/advanced-usage.md index 17bdf674f..f068161b7 100644 --- a/docs/getting-started/advanced-usage.md +++ b/docs/getting-started/advanced-usage.md @@ -16,7 +16,7 @@ keep in mind when using pyDVL namely Parallelization and Caching. pyDVL uses parallelization to scale and speed up computations. It does so using one of Dask, Ray or Joblib. The first is used in the [influence][pydvl.influence] package whereas the other two -are used in the [value][pydvl.value] package. +are used in the [value][pydvl.value] package. ### Data valuation @@ -37,6 +37,33 @@ and to provide a running cluster (or run ray in local mode). if the re-training only happens on a subset of the data. This means that you should make sure that each worker has enough memory to handle the whole dataset. +We use backend classes for both joblib and ray as well as two types +of executors for the different algorithms: the first uses a map reduce pattern as seen in +the [MapReduceJob][pydvl.parallel.map_reduce.MapReduceJob] class +and the second implements the futures executor interface from [concurrent.futures][]. + +As a convenience, you can also instantiate a parallel backend class +by using the [init_parallel_backend][pydvl.parallel.init_parallel_backend] +function: + +```python +from pydvl.parallel import init_parallel_backend +parallel_backend = init_parallel_backend(backend_name="joblib") +``` + +!!! info + + The executor classes are not meant to be instantiated and used by users + of pyDVL. They are used internally as part of the computations of the + different methods. + +!!! danger "Deprecation notice" + + We are currently planning to deprecate + [MapReduceJob][pydvl.parallel.map_reduce.MapReduceJob] in favour of the + futures executor interface because it allows for more diverse computation + patterns with interruptions. + #### Joblib Please follow the instructions in Joblib's documentation @@ -48,19 +75,24 @@ to compute exact shapley values you would use: ```python import joblib -from pydvl.parallel import ParallelConfig +from pydvl.parallel import JoblibParallelBackend from pydvl.value.shapley import combinatorial_exact_shapley from pydvl.utils.utility import Utility -config = ParallelConfig(backend="joblib") +parallel_backend = JoblibParallelBackend() u = Utility(...) with joblib.parallel_config(backend="loky", verbose=100): - combinatorial_exact_shapley(u, config=config) + values = combinatorial_exact_shapley(u, parallel_backend=parallel_backend) ``` #### Ray +!!! warning "Additional dependencies" + + The Ray parallel backend requires optional dependencies. + See [Extras][installation-extras] for more information. + Please follow the instructions in Ray's documentation to [set up a remote cluster](https://docs.ray.io/en/latest/cluster/key-concepts.html). You could alternatively use a local cluster and in that case you don't have to set @@ -90,14 +122,58 @@ To use the ray parallel backend to compute exact shapley values you would use: ```python import ray -from pydvl.parallel import ParallelConfig +from pydvl.parallel import RayParallelBackend from pydvl.value.shapley import combinatorial_exact_shapley from pydvl.utils.utility import Utility ray.init() -config = ParallelConfig(backend="ray") +parallel_backend = RayParallelBackend() u = Utility(...) -combinatorial_exact_shapley(u, config=config) +vaues = combinatorial_exact_shapley(u, parallel_backend=parallel_backend) +``` + +#### Futures executor + +For the futures executor interface, we have implemented an executor +class for ray in [RayExecutor][pydvl.parallel.futures.ray.RayExecutor] +and rely on joblib's loky [get_reusable_executor][loky.get_reusable_executor] +function to instantiate an executor for local parallelization. + +They are both compatibles with the builtin +[ThreadPoolExecutor][concurrent.futures.ThreadPoolExecutor] +and [ProcessPoolExecutor][concurrent.futures.ProcessPoolExecutor] +classes. + +```pycon +>>> from joblib.externals.loky import _ReusablePoolExecutor +>>> from pydvl.parallel import JoblibParallelBackend +>>> parallel_backend = JoblibParallelBackend() +>>> with parallel_backend.executor() as executor: +... results = list(executor.map(lambda x: x + 1, range(3))) +... +>>> results +[1, 2, 3] +``` + +#### Map-reduce + +The map-reduce interface is older and more limited in the patterns +it allows us to use. + +To reproduce the previous example using +[MapReduceJob][pydvl.parallel.map_reduce.MapReduceJob], we would use: + +```pycon +>>> from pydvl.parallel import JoblibParallelBackend, MapReduceJob +>>> parallel_backend = JoblibParallelBackend() +>>> map_reduce_job = MapReduceJob( +... list(range(3)), +... map_func=lambda x: x[0] + 1, +... parallel_backend=parallel_backend, +... ) +>>> results = map_reduce_job() +>>> results +[1, 2, 3] ``` ### Influence functions diff --git a/mkdocs.yml b/mkdocs.yml index 1e4738720..df34dda4e 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -108,6 +108,7 @@ plugins: - https://pytorch.org/docs/stable/objects.inv - https://pymemcache.readthedocs.io/en/latest/objects.inv - https://joblib.readthedocs.io/en/stable/objects.inv + - https://loky.readthedocs.io/en/stable/objects.inv - https://docs.dask.org/en/latest/objects.inv - https://distributed.dask.org/en/latest/objects.inv - https://docs.ray.io/en/latest/objects.inv diff --git a/src/pydvl/parallel/__init__.py b/src/pydvl/parallel/__init__.py index c8feca005..a3e776943 100644 --- a/src/pydvl/parallel/__init__.py +++ b/src/pydvl/parallel/__init__.py @@ -1,37 +1,48 @@ """ This module provides a common interface to parallelization backends. The list of -supported backends is [here][pydvl.parallel.backends]. Backends can be -selected with the `backend` argument of an instance of -[ParallelConfig][pydvl.utils.config.ParallelConfig], as seen in the examples -below. +supported backends is [here][pydvl.parallel.backends]. Backends should be +instantiated directly and passed to the respective valuation method. -We use [executors][concurrent.futures.Executor] to submit tasks in parallel. The -basic high-level pattern is +We use executors that implement the [Executor][concurrent.futures.Executor] +interface to submit tasks in parallel. +The basic high-level pattern is: ```python -from pydvl.parallel import init_executor, ParallelConfig +from pydvl.parallel import JoblibParallelBackend -config = ParallelConfig(backend="ray") -with init_executor(max_workers=1, config=config) as executor: +parallel_backend = JoblibParallelBackend() +with parallel_backend.executor(max_workers=2) as executor: future = executor.submit(lambda x: x + 1, 1) result = future.result() assert result == 2 ``` -Running a map-reduce job is also easy: +Running a map-style job is also easy: ```python -from pydvl.parallel import init_executor, ParallelConfig +from pydvl.parallel import JoblibParallelBackend -config = ParallelConfig(backend="joblib") -with init_executor(config=config) as executor: +parallel_backend = JoblibParallelBackend() +with parallel_backend.executor(max_workers=2) as executor: results = list(executor.map(lambda x: x + 1, range(5))) assert results == [1, 2, 3, 4, 5] ``` - +!!! tip "Passsing large objects" + When running tasks which accept heavy inputs, it is important + to first use `put()` on the object and use the returned reference + as argument to the callable within `submit()`. For example: + ```python + u_ref = parallel_backend.put(u) + ... + executor.submit(task, utility=u) + ``` + Note that `task()` does not need to be changed in any way: + the backend will `get()` the object and pass it to the function + upon invocation. There is an alternative map-reduce implementation [MapReduceJob][pydvl.parallel.map_reduce.MapReduceJob] which internally -uses joblib's higher level API with `Parallel()` +uses joblib's higher level API with `Parallel()` which then indirectly also +supports the use of Dask and Ray. """ # HACK to avoid circular imports from ..utils.types import * # pylint: disable=wrong-import-order @@ -41,5 +52,5 @@ from .futures import * from .map_reduce import * -if len(BaseParallelBackend.BACKENDS) == 0: +if len(ParallelBackend.BACKENDS) == 0: raise ImportError("No parallel backend found. Please install ray or joblib.") diff --git a/src/pydvl/parallel/backend.py b/src/pydvl/parallel/backend.py index 84f885a7b..52f08a46c 100644 --- a/src/pydvl/parallel/backend.py +++ b/src/pydvl/parallel/backend.py @@ -2,19 +2,21 @@ import logging import os +import warnings from abc import abstractmethod from concurrent.futures import Executor from enum import Flag, auto from typing import Any, Callable, Type -from ..utils.types import NoPublicConstructor +from deprecate import deprecated + from .config import ParallelConfig __all__ = [ "init_parallel_backend", - "effective_n_jobs", + "_maybe_init_parallel_backend", "available_cpus", - "BaseParallelBackend", + "ParallelBackend", "CancellationPolicy", ] @@ -41,25 +43,26 @@ class CancellationPolicy(Flag): ALL = PENDING | RUNNING -class BaseParallelBackend(metaclass=NoPublicConstructor): +class ParallelBackend: """Abstract base class for all parallel backends.""" config: dict[str, Any] = {} - BACKENDS: dict[str, "Type[BaseParallelBackend]"] = {} + BACKENDS: dict[str, "Type[ParallelBackend]"] = {} def __init_subclass__(cls, *, backend_name: str, **kwargs): super().__init_subclass__(**kwargs) - BaseParallelBackend.BACKENDS[backend_name] = cls + ParallelBackend.BACKENDS[backend_name] = cls @classmethod @abstractmethod def executor( cls, max_workers: int | None = None, - config: ParallelConfig = ParallelConfig(), - cancel_futures: CancellationPolicy = CancellationPolicy.PENDING, + *, + config: ParallelConfig | None = None, + cancel_futures: CancellationPolicy | bool = CancellationPolicy.PENDING, ) -> Executor: - """Returns an executor for the parallel backend.""" + """Returns a futures executor for the parallel backend.""" ... @abstractmethod @@ -92,39 +95,90 @@ def __repr__(self) -> str: return f"<{self.__class__.__name__}: {self.config}>" -def init_parallel_backend(config: ParallelConfig) -> BaseParallelBackend: +@deprecated( + target=True, + args_mapping={"config": "config"}, + deprecated_in="0.9.0", + remove_in="0.10.0", +) +def init_parallel_backend( + config: ParallelConfig | None = None, backend_name: str | None = None +) -> ParallelBackend: """Initializes the parallel backend and returns an instance of it. The following example creates a parallel backend instance with the default configuration, which is a local joblib backend. + If you don't pass any arguments, then by default it will instantiate + the JoblibParallelBackend: + ??? Example - ``` python - config = ParallelConfig() - parallel_backend = init_parallel_backend(config) + ```python + parallel_backend = init_parallel_backend() ``` - To create a parallel backend instance with a different backend, e.g. ray, - you can pass the backend name as a string to the constructor of - [ParallelConfig][pydvl.utils.config.ParallelConfig]. + To create a parallel backend instance with for example `ray` as a backend, + you can pass the backend name as a string:. ??? Example ```python - config = ParallelConfig(backend="ray") + parallel_backend = init_parallel_backend(backend_name="ray") + ``` + + + The following is an example of the deprecated + way for instantiating a parallel backend: + + ??? Example + ``` python + config = ParallelConfig() parallel_backend = init_parallel_backend(config) ``` Args: - config: instance of [ParallelConfig][pydvl.utils.config.ParallelConfig] + backend_name: Name of the backend to instantiate. + config: (**DEPRECATED**) Object configuring parallel computation, with cluster address, number of cpus, etc. """ + if backend_name is None: + if config is None: + backend_name = "joblib" + else: + backend_name = config.backend + try: - parallel_backend_cls = BaseParallelBackend.BACKENDS[config.backend] + parallel_backend_cls = ParallelBackend.BACKENDS[backend_name] except KeyError: - raise NotImplementedError(f"Unexpected parallel backend {config.backend}") - return parallel_backend_cls.create(config) # type: ignore + raise NotImplementedError(f"Unexpected parallel backend {backend_name}") + return parallel_backend_cls(config) # type: ignore + + +# TODO: delete this class once it's made redundant in v0.10.0 +# This string for the benefit of deprecation searches: +# remove_in="0.10.0" +def _maybe_init_parallel_backend( + parallel_backend: ParallelBackend | None = None, + config: ParallelConfig | None = None, +) -> ParallelBackend: + """Helper function inside during the deprecation period of + [][pydvl.parallel.backend.init_parallel_backend] and should be removed in v0.10.0 + """ + if parallel_backend is not None: + if config is not None: + warnings.warn( + "You should not set both `config` and `parallel_backend`. The former will be ignored.", + UserWarning, + ) + else: + if config is not None: + parallel_backend = init_parallel_backend(config) + else: + from pydvl.parallel.backends import JoblibParallelBackend + + parallel_backend = JoblibParallelBackend() + return parallel_backend def available_cpus() -> int: @@ -140,30 +194,3 @@ def available_cpus() -> int: if system() != "Linux": return os.cpu_count() or 1 return len(os.sched_getaffinity(0)) # type: ignore - - -def effective_n_jobs(n_jobs: int, config: ParallelConfig = ParallelConfig()) -> int: - """Returns the effective number of jobs. - - This number may vary depending on the parallel backend and the resources - available. - - Args: - n_jobs: the number of jobs requested. If -1, the number of available - CPUs is returned. - config: instance of [ParallelConfig][pydvl.utils.config.ParallelConfig] with - cluster address, number of cpus, etc. - - Returns: - The effective number of jobs, guaranteed to be >= 1. - - Raises: - RuntimeError: if the effective number of jobs returned by the backend - is < 1. - """ - parallel_backend = init_parallel_backend(config) - if (eff_n_jobs := parallel_backend.effective_n_jobs(n_jobs)) < 1: - raise RuntimeError( - f"Invalid number of jobs {eff_n_jobs} obtained from parallel backend {config.backend}" - ) - return eff_n_jobs diff --git a/src/pydvl/parallel/backends/joblib.py b/src/pydvl/parallel/backends/joblib.py index 264fc2a0b..10f0a0cdf 100644 --- a/src/pydvl/parallel/backends/joblib.py +++ b/src/pydvl/parallel/backends/joblib.py @@ -5,11 +5,12 @@ from concurrent.futures import Executor from typing import Callable, TypeVar, cast -import joblib -from joblib import delayed +from deprecate import deprecated +from joblib import delayed, effective_n_jobs from joblib.externals.loky import get_reusable_executor +from joblib.parallel import get_active_backend -from pydvl.parallel.backend import BaseParallelBackend, CancellationPolicy +from pydvl.parallel.backend import CancellationPolicy, ParallelBackend from pydvl.parallel.config import ParallelConfig __all__ = ["JoblibParallelBackend"] @@ -19,48 +20,72 @@ logger = logging.getLogger(__name__) -class JoblibParallelBackend(BaseParallelBackend, backend_name="joblib"): +class JoblibParallelBackend(ParallelBackend, backend_name="joblib"): """Class used to wrap joblib to make it transparent to algorithms. - It shouldn't be initialized directly. You should instead call - [init_parallel_backend()][pydvl.parallel.backend.init_parallel_backend]. - - ??? Example - ``` python - from pydvl.parallel import init_paralle_backend, ParallelConfig - config = ParallelConfig(backend="joblib") - parallel_backend = init_parallel_backend(config) - ``` - - ??? Example + !!! Example ``` python - import joblib - from pydvl.parallel import init_paralle_backend, ParallelConfig - with joblib.parallel_config(verbose=100): - config = ParallelConfig(backend="joblib") - parallel_backend = init_parallel_backend(config) + from pydvl.parallel import JoblibParallelBackend + parallel_backend = JoblibParallelBackend() ``` - - Args: - config: instance of [ParallelConfig][pydvl.utils.config.ParallelConfig] - with cluster address, number of cpus, etc. """ - def __init__(self, config: ParallelConfig): + _joblib_backend_name: str = "loky" + """Name of the backend to use for joblib inside [MapReduceJob][pydvl.parallel.mapreduce.MapReduceJob].""" + + @deprecated( + target=True, + args_mapping={"config": None}, + deprecated_in="0.9.0", + remove_in="0.10.0", + ) + def __init__(self, config: ParallelConfig | None = None) -> None: + n_jobs: int | None = None + if config is not None: + n_jobs = config.n_cpus_local self.config = { - "n_jobs": config.n_cpus_local, + "n_jobs": n_jobs, } @classmethod def executor( cls, max_workers: int | None = None, - config: ParallelConfig = ParallelConfig(), - cancel_futures: CancellationPolicy = CancellationPolicy.NONE, + *, + config: ParallelConfig | None = None, + cancel_futures: CancellationPolicy | bool = CancellationPolicy.NONE, ) -> Executor: + """Returns a futures executor for the parallel backend. + + !!! Example + ``` python + from pydvl.parallel import JoblibParallelBackend + parallel_backend = JoblibParallelBackend() + with parallel_backend.executor() as executor: + executor.submit(...) + ``` + + Args: + max_workers: Maximum number of parallel workers. + config: (**DEPRECATED**) Object configuring parallel computation, + with cluster address, number of cpus, etc. + cancel_futures: Policy to use when cancelling futures + after exiting an Executor. + + Returns: + Instance of [_ReusablePoolExecutor][joblib.externals.loky.reusable_executor._ReusablePoolExecutor]. + """ + if config is not None: + warnings.warn( + "The `JoblibParallelBackend` uses deprecated arguments: " + "`config`. They were deprecated since v0.9.0 " + "and will be removed in v0.10.0.", + FutureWarning, + ) + if cancel_futures not in (CancellationPolicy.NONE, False): warnings.warn( - "Cancellation of futures is not supported by the joblib backend" + "Cancellation of futures is not supported by the joblib backend", ) return cast(Executor, get_reusable_executor(max_workers=max_workers)) @@ -85,8 +110,11 @@ def wait(self, v: list[T], *args, **kwargs) -> tuple[list[T], list[T]]: return v, [] def _effective_n_jobs(self, n_jobs: int) -> int: - eff_n_jobs: int = joblib.effective_n_jobs(n_jobs) + eff_n_jobs: int = effective_n_jobs(n_jobs) + _, backend_n_jobs = get_active_backend() if self.config["n_jobs"] is not None: maximum_n_jobs = self.config["n_jobs"] eff_n_jobs = min(eff_n_jobs, maximum_n_jobs) + if backend_n_jobs is not None: + eff_n_jobs = min(eff_n_jobs, backend_n_jobs) return eff_n_jobs diff --git a/src/pydvl/parallel/backends/ray.py b/src/pydvl/parallel/backends/ray.py index 3f9cc3f50..70953e9c4 100644 --- a/src/pydvl/parallel/backends/ray.py +++ b/src/pydvl/parallel/backends/ray.py @@ -1,14 +1,15 @@ from __future__ import annotations -import logging +import warnings from concurrent.futures import Executor from typing import Any, Callable, Iterable, TypeVar import ray +from deprecate import deprecated from ray import ObjectRef from ray.util.joblib import register_ray -from pydvl.parallel.backend import BaseParallelBackend, CancellationPolicy +from pydvl.parallel.backend import CancellationPolicy, ParallelBackend from pydvl.parallel.config import ParallelConfig __all__ = ["RayParallelBackend"] @@ -17,27 +18,28 @@ T = TypeVar("T") -class RayParallelBackend(BaseParallelBackend, backend_name="ray"): +class RayParallelBackend(ParallelBackend, backend_name="ray"): """Class used to wrap ray to make it transparent to algorithms. - It shouldn't be initialized directly. You should instead call - [init_parallel_backend()][pydvl.parallel.backend.init_parallel_backend]. - - ??? Example + !!! Example ``` python import ray - from pydvl.parallel import init_parallel_backend, ParallelConfig + from pydvl.parallel import RayParallelBackend ray.init() - config = ParallelConfig(backend="ray") - parallel_backend = init_parallel_backend(config) + parallel_backend = RayParallelBackend() ``` - - Args: - config: instance of [ParallelConfig][pydvl.utils.config.ParallelConfig] - with cluster address, number of cpus, etc. """ - def __init__(self, config: ParallelConfig): + _joblib_backend_name: str = "ray" + """Name of the backend to use for joblib inside [MapReduceJob][pydvl.parallel.mapreduce.MapReduceJob].""" + + @deprecated( + target=True, + args_mapping={"config": None}, + deprecated_in="0.9.0", + remove_in="0.10.0", + ) + def __init__(self, config: ParallelConfig | None = None) -> None: if not ray.is_initialized(): raise RuntimeError( "Starting from v0.9.0, ray is no longer automatically initialized. " @@ -51,12 +53,44 @@ def __init__(self, config: ParallelConfig): def executor( cls, max_workers: int | None = None, - config: ParallelConfig = ParallelConfig(), - cancel_futures: CancellationPolicy = CancellationPolicy.PENDING, + *, + config: ParallelConfig | None = None, + cancel_futures: CancellationPolicy | bool = CancellationPolicy.PENDING, ) -> Executor: + """Returns a futures executor for the parallel backend. + + !!! Example + ``` python + import ray + from pydvl.parallel import RayParallelBackend + ray.init() + parallel_backend = RayParallelBackend() + with parallel_backend.executor() as executor: + executor.submit(...) + ``` + + Args: + max_workers: Maximum number of parallel workers. + config: (**DEPRECATED**) Object configuring parallel computation, + with cluster address, number of cpus, etc. + cancel_futures: Policy to use when cancelling futures + after exiting an Executor. + + Returns: + Instance of [RayExecutor][pydvl.parallel.futures.ray.RayExecutor]. + """ + # Imported here to avoid circular import errors from pydvl.parallel.futures.ray import RayExecutor - return RayExecutor(max_workers, config=config, cancel_futures=cancel_futures) # type: ignore + if config is not None: + warnings.warn( + "The `RayParallelBackend` uses deprecated arguments: " + "`config`. They were deprecated since v0.9.0 " + "and will be removed in v0.10.0.", + FutureWarning, + ) + + return RayExecutor(max_workers, cancel_futures=cancel_futures) # type: ignore def get(self, v: ObjectRef | Iterable[ObjectRef] | T, *args, **kwargs) -> T | Any: timeout: float | None = kwargs.get("timeout", None) diff --git a/src/pydvl/parallel/futures/__init__.py b/src/pydvl/parallel/futures/__init__.py index ce42ecc91..c75d04299 100644 --- a/src/pydvl/parallel/futures/__init__.py +++ b/src/pydvl/parallel/futures/__init__.py @@ -2,18 +2,23 @@ from contextlib import contextmanager from typing import Generator, Optional -from pydvl.parallel.backend import BaseParallelBackend -from pydvl.parallel.config import ParallelConfig +from deprecate import deprecated -try: - from pydvl.parallel.futures.ray import RayExecutor -except ModuleNotFoundError: - pass +from pydvl.parallel.backend import ParallelBackend +from pydvl.parallel.config import ParallelConfig __all__ = ["init_executor"] +# TODO: delete this function once it's made redundant in v0.10.0 +# This string for the benefit of deprecation searches: +# remove_in="0.10.0" @contextmanager +@deprecated( + target=None, + deprecated_in="0.9.0", + remove_in="0.10.0", +) def init_executor( max_workers: Optional[int] = None, config: ParallelConfig = ParallelConfig(), @@ -46,7 +51,7 @@ def init_executor( ``` """ try: - cls = BaseParallelBackend.BACKENDS[config.backend] + cls = ParallelBackend.BACKENDS[config.backend] with cls.executor(max_workers=max_workers, config=config, **kwargs) as e: yield e except KeyError: diff --git a/src/pydvl/parallel/futures/ray.py b/src/pydvl/parallel/futures/ray.py index 0aa97f152..17d302a87 100644 --- a/src/pydvl/parallel/futures/ray.py +++ b/src/pydvl/parallel/futures/ray.py @@ -5,9 +5,11 @@ import time import types from concurrent.futures import Executor, Future -from typing import Any, Callable, Optional, TypeVar +from typing import Any, Callable, Optional, TypeVar, Union from weakref import WeakSet, ref +from deprecate import deprecated + try: import ray except ModuleNotFoundError as e: @@ -30,17 +32,12 @@ class RayExecutor(Executor): """Asynchronous executor using Ray that implements the concurrent.futures API. - It shouldn't be initialized directly. You should instead call - [init_executor()][pydvl.parallel.futures.init_executor]. - Args: max_workers: Maximum number of concurrent tasks. Each task can request itself any number of vCPUs. You must ensure the product of this value and the n_cpus_per_job parameter passed to submit() does not exceed available cluster resources. If set to `None`, it will default to the total number of vCPUs in the ray cluster. - config: instance of [ParallelConfig][pydvl.utils.config.ParallelConfig] - with cluster address, number of cpus, etc. cancel_futures: Select which futures will be cancelled when exiting this context manager. `Pending` is the default, which will cancel all pending futures, but not running ones, as done by @@ -49,17 +46,19 @@ class RayExecutor(Executor): any. See [CancellationPolicy][pydvl.parallel.backend.CancellationPolicy] """ + @deprecated( + target=True, + args_mapping={"config": None}, + deprecated_in="0.9.0", + remove_in="0.10.0", + ) def __init__( self, max_workers: Optional[int] = None, *, - config: ParallelConfig = ParallelConfig(), - cancel_futures: CancellationPolicy = CancellationPolicy.ALL, + config: Optional[ParallelConfig] = None, + cancel_futures: Union[CancellationPolicy, bool] = CancellationPolicy.ALL, ): - if config.backend != "ray": - raise ValueError( - f"Parallel backend must be set to 'ray' and not '{config.backend}'" - ) if max_workers is not None: if max_workers <= 0: raise ValueError("max_workers must be greater than 0") diff --git a/src/pydvl/parallel/map_reduce.py b/src/pydvl/parallel/map_reduce.py index a4cfd272a..937674141 100644 --- a/src/pydvl/parallel/map_reduce.py +++ b/src/pydvl/parallel/map_reduce.py @@ -6,17 +6,19 @@ This interface might be deprecated or changed in a future release before 1.0 """ +import warnings from functools import reduce from itertools import accumulate, repeat from typing import Any, Collection, Dict, Generic, List, Optional, TypeVar, Union +from deprecate import deprecated from joblib import Parallel, delayed from numpy.random import SeedSequence from numpy.typing import NDArray from ..utils.functional import maybe_add_argument from ..utils.types import MapFunction, ReduceFunction, Seed, ensure_seed_sequence -from .backend import init_parallel_backend +from .backend import ParallelBackend, _maybe_init_parallel_backend from .config import ParallelConfig __all__ = ["MapReduceJob"] @@ -46,7 +48,12 @@ class MapReduceJob(Generic[T, R]): each job. Alternatively, one can use [functools.partial][]. reduce_kwargs: Keyword arguments that will be passed to `reduce_func` in each job. Alternatively, one can use [functools.partial][]. - config: Instance of [ParallelConfig][pydvl.utils.config.ParallelConfig] + parallel_backend: Parallel backend instance to use + for parallelizing computations. If `None`, + use [JoblibParallelBackend][pydvl.parallel.backends.JoblibParallelBackend] backend. + See the [Parallel Backends][pydvl.parallel.backends] package + for available options. + config: (**DEPRECATED**) Object configuring parallel computation, with cluster address, number of cpus, etc. n_jobs: Number of parallel jobs to run. Does not accept 0 @@ -81,24 +88,32 @@ class MapReduceJob(Generic[T, R]): ``` """ + @deprecated( + target=True, + args_mapping={"config": "config"}, + deprecated_in="0.9.0", + remove_in="0.10.0", + ) def __init__( self, inputs: Union[Collection[T], T], map_func: MapFunction[R], reduce_func: ReduceFunction[R] = identity, + parallel_backend: Optional[ParallelBackend] = None, + config: Optional[ParallelConfig] = None, + *, map_kwargs: Optional[Dict] = None, reduce_kwargs: Optional[Dict] = None, - config: ParallelConfig = ParallelConfig(), - *, n_jobs: int = -1, timeout: Optional[float] = None, ): - self.config = config - parallel_backend = init_parallel_backend(self.config) + parallel_backend = _maybe_init_parallel_backend(parallel_backend, config) + self.parallel_backend = parallel_backend self.timeout = timeout + self._n_jobs = -1 # This uses the setter defined below self.n_jobs = n_jobs @@ -125,7 +140,20 @@ def __call__( The result of the reduce function. """ seed_seq = ensure_seed_sequence(seed) - with Parallel() as parallel: + + if hasattr(self.parallel_backend, "_joblib_backend_name"): + backend = getattr(self.parallel_backend, "_joblib_backend_name") + else: + warnings.warn( + "Parallel backend " + f"{self.parallel_backend.__class__.__name__}. " + "should have a `_joblib_backend_name` attribute in order to work " + "property with MapReduceJob. " + "Defaulting to joblib loky backend" + ) + backend = "loky" + + with Parallel(backend=backend, prefer="processes") as parallel: chunks = self._chunkify(self.inputs_, n_chunks=self.n_jobs) map_results: List[R] = parallel( delayed(self._map_func)( diff --git a/src/pydvl/utils/types.py b/src/pydvl/utils/types.py index 18a22bd26..c7c2e88cd 100644 --- a/src/pydvl/utils/types.py +++ b/src/pydvl/utils/types.py @@ -16,7 +16,6 @@ "IndexT", "NameT", "MapFunction", - "NoPublicConstructor", "ReduceFunction", "Seed", "SupervisedModel", @@ -84,32 +83,6 @@ def score(self, x: NDArray, y: NDArray) -> float: pass -class NoPublicConstructor(ABCMeta): - """Metaclass that ensures a private constructor - - If a class uses this metaclass like this: - - class SomeClass(metaclass=NoPublicConstructor): - pass - - If you try to instantiate your class (`SomeClass()`), - a `TypeError` will be thrown. - - Taken almost verbatim from: - [https://stackoverflow.com/a/64682734](https://stackoverflow.com/a/64682734) - """ - - def __call__(cls, *args, **kwargs): - raise TypeError( - f"{cls.__module__}.{cls.__qualname__} cannot be initialized directly. " - "Use the proper factory instead." - ) - - def create(cls, *args: Any, **kwargs: Any): - """Create an instance of the class""" - return super().__call__(*args, **kwargs) - - def ensure_seed_sequence( seed: Optional[Union[Seed, SeedSequence]] = None ) -> SeedSequence: diff --git a/src/pydvl/value/least_core/__init__.py b/src/pydvl/value/least_core/__init__.py index 6facf9396..02007f8bd 100644 --- a/src/pydvl/value/least_core/__init__.py +++ b/src/pydvl/value/least_core/__init__.py @@ -87,7 +87,7 @@ def compute_least_core_values( progress = False if n_iterations is None: raise ValueError("n_iterations cannot be None for Monte Carlo Least Core") - return montecarlo_least_core( + return montecarlo_least_core( # type: ignore u=u, n_iterations=n_iterations, n_jobs=n_jobs, diff --git a/src/pydvl/value/least_core/common.py b/src/pydvl/value/least_core/common.py index 984930217..5f3359fd0 100644 --- a/src/pydvl/value/least_core/common.py +++ b/src/pydvl/value/least_core/common.py @@ -5,9 +5,15 @@ import cvxpy as cp import numpy as np +from deprecate import deprecated from numpy.typing import NDArray -from pydvl.parallel import MapReduceJob, ParallelConfig +from pydvl.parallel import ( + MapReduceJob, + ParallelBackend, + ParallelConfig, + _maybe_init_parallel_backend, +) from pydvl.utils import Status, Utility from pydvl.value import ValuationResult @@ -144,11 +150,18 @@ def lc_solve_problem( ) +@deprecated( + target=True, + args_mapping={"config": "config"}, + deprecated_in="0.9.0", + remove_in="0.10.0", +) def lc_solve_problems( problems: Sequence[LeastCoreProblem], u: Utility, algorithm: str, - config: ParallelConfig = ParallelConfig(), + parallel_backend: Optional[ParallelBackend] = None, + config: Optional[ParallelConfig] = None, n_jobs: int = 1, non_negative_subsidy: bool = True, solver_options: Optional[dict] = None, @@ -161,8 +174,13 @@ def lc_solve_problems( problems: Least Core problems to solve, as returned by [mclc_prepare_problem()][pydvl.value.least_core.montecarlo.mclc_prepare_problem]. algorithm: Name of the valuation algorithm. - config: Object configuring parallel computation, with cluster address, - number of cpus, etc. + parallel_backend: Parallel backend instance to use + for parallelizing computations. If `None`, + use [JoblibParallelBackend][pydvl.parallel.backends.JoblibParallelBackend] backend. + See the [Parallel Backends][pydvl.parallel.backends] package + for available options. + config: (**DEPRECATED**) Object configuring parallel computation, + with cluster address, number of cpus, etc. n_jobs: Number of parallel jobs to run. non_negative_subsidy: If True, the least core subsidy $e$ is constrained to be non-negative. @@ -177,6 +195,8 @@ def _map_func( ) -> List[ValuationResult]: return [lc_solve_problem(p, *args, **kwargs) for p in problems] + parallel_backend = _maybe_init_parallel_backend(parallel_backend, config) + map_reduce_job: MapReduceJob[ "LeastCoreProblem", "List[ValuationResult]" ] = MapReduceJob( @@ -190,7 +210,7 @@ def _map_func( **options, ), reduce_func=lambda x: list(itertools.chain(*x)), - config=config, + parallel_backend=parallel_backend, n_jobs=n_jobs, ) solutions = map_reduce_job() diff --git a/src/pydvl/value/least_core/montecarlo.py b/src/pydvl/value/least_core/montecarlo.py index 5a7a3c883..2fee3b3f9 100644 --- a/src/pydvl/value/least_core/montecarlo.py +++ b/src/pydvl/value/least_core/montecarlo.py @@ -3,10 +3,16 @@ from typing import Iterable, Optional import numpy as np +from deprecate import deprecated from numpy.typing import NDArray from tqdm.auto import tqdm -from pydvl.parallel import MapReduceJob, ParallelConfig, effective_n_jobs +from pydvl.parallel import ( + MapReduceJob, + ParallelBackend, + ParallelConfig, + _maybe_init_parallel_backend, +) from pydvl.utils.numeric import random_powerset from pydvl.utils.types import Seed from pydvl.utils.utility import Utility @@ -19,12 +25,19 @@ __all__ = ["montecarlo_least_core", "mclc_prepare_problem"] +@deprecated( + target=True, + args_mapping={"config": "config"}, + deprecated_in="0.9.0", + remove_in="0.10.0", +) def montecarlo_least_core( u: Utility, n_iterations: int, *, n_jobs: int = 1, - config: ParallelConfig = ParallelConfig(), + parallel_backend: Optional[ParallelBackend] = None, + config: Optional[ParallelConfig] = None, non_negative_subsidy: bool = False, solver_options: Optional[dict] = None, progress: bool = False, @@ -51,8 +64,13 @@ def montecarlo_least_core( u: Utility object with model, data, and scoring function n_iterations: total number of iterations to use n_jobs: number of jobs across which to distribute the computation - config: Object configuring parallel computation, with cluster - address, number of cpus, etc. + parallel_backend: Parallel backend instance to use + for parallelizing computations. If `None`, + use [JoblibParallelBackend][pydvl.parallel.backends.JoblibParallelBackend] backend. + See the [Parallel Backends][pydvl.parallel.backends] package + for available options. + config: (**DEPRECATED**) Object configuring parallel computation, + with cluster address, number of cpus, etc. non_negative_subsidy: If True, the least core subsidy $e$ is constrained to be non-negative. solver_options: Dictionary of options that will be used to select a solver @@ -64,9 +82,20 @@ def montecarlo_least_core( Returns: Object with the data values and the least core value. + + !!! tip "Changed in version 0.9.0" + Deprecated `config` argument and added a `parallel_backend` + argument to allow users to pass the Parallel Backend instance + directly. """ problem = mclc_prepare_problem( - u, n_iterations, n_jobs=n_jobs, config=config, progress=progress, seed=seed + u, + n_iterations, + n_jobs=n_jobs, + parallel_backend=parallel_backend, + config=config, + progress=progress, + seed=seed, ) return lc_solve_problem( problem, @@ -77,12 +106,19 @@ def montecarlo_least_core( ) +@deprecated( + target=True, + args_mapping={"config": "config"}, + deprecated_in="0.9.0", + remove_in="0.10.0", +) def mclc_prepare_problem( u: Utility, n_iterations: int, *, n_jobs: int = 1, - config: ParallelConfig = ParallelConfig(), + parallel_backend: Optional[ParallelBackend] = None, + config: Optional[ParallelConfig] = None, progress: bool = False, seed: Optional[Seed] = None, ) -> LeastCoreProblem: @@ -94,6 +130,11 @@ def mclc_prepare_problem( See [montecarlo_least_core][pydvl.value.least_core.montecarlo.montecarlo_least_core] for argument descriptions. + + !!! note "Changed in version 0.9.0" + Deprecated `config` argument and added a `parallel_backend` + argument to allow users to pass the Parallel Backend instance + directly. """ n = len(u.data) @@ -112,7 +153,11 @@ def mclc_prepare_problem( ) n_iterations = 2**n - iterations_per_job = max(1, n_iterations // effective_n_jobs(n_jobs, config)) + parallel_backend = _maybe_init_parallel_backend(parallel_backend, config) + + iterations_per_job = max( + 1, n_iterations // parallel_backend.effective_n_jobs(n_jobs) + ) map_reduce_job: MapReduceJob["Utility", "LeastCoreProblem"] = MapReduceJob( inputs=u, @@ -120,7 +165,7 @@ def mclc_prepare_problem( reduce_func=_reduce_func, map_kwargs=dict(n_iterations=iterations_per_job, progress=progress), n_jobs=n_jobs, - config=config, + parallel_backend=parallel_backend, ) return map_reduce_job(seed=seed) diff --git a/src/pydvl/value/loo/loo.py b/src/pydvl/value/loo/loo.py index a507f6aad..24081f83e 100644 --- a/src/pydvl/value/loo/loo.py +++ b/src/pydvl/value/loo/loo.py @@ -1,21 +1,30 @@ from __future__ import annotations from concurrent.futures import FIRST_COMPLETED, Future, wait +from typing import Optional +from deprecate import deprecated from tqdm import tqdm -from pydvl.parallel import ParallelConfig, effective_n_jobs, init_executor +from pydvl.parallel import ParallelBackend, ParallelConfig, _maybe_init_parallel_backend from pydvl.utils import Utility from pydvl.value.result import ValuationResult __all__ = ["compute_loo"] +@deprecated( + target=True, + args_mapping={"config": "config"}, + deprecated_in="0.9.0", + remove_in="0.10.0", +) def compute_loo( u: Utility, *, n_jobs: int = 1, - config: ParallelConfig = ParallelConfig(), + parallel_backend: Optional[ParallelBackend] = None, + config: Optional[ParallelConfig] = None, progress: bool = True, ) -> ValuationResult: r"""Computes leave one out value: @@ -26,8 +35,13 @@ def compute_loo( u: Utility object with model, data, and scoring function progress: If True, display a progress bar n_jobs: Number of parallel jobs to use - config: Object configuring parallel computation, with cluster - address, number of cpus, etc. + parallel_backend: Parallel backend instance to use + for parallelizing computations. If `None`, + use [JoblibParallelBackend][pydvl.parallel.backends.JoblibParallelBackend] backend. + See the [Parallel Backends][pydvl.parallel.backends] package + for available options. + config: (**DEPRECATED**) Object configuring parallel computation, + with cluster address, number of cpus, etc. progress: If True, display a progress bar Returns: @@ -35,8 +49,12 @@ def compute_loo( !!! tip "New in version 0.7.0" Renamed from `naive_loo` and added parallel computation. - """ + !!! tip "Changed in version 0.9.0" + Deprecated `config` argument and added a `parallel_backend` + argument to allow users to pass the Parallel Backend instance + directly. + """ if len(u.data) < 3: raise ValueError("Dataset must have at least 2 elements") @@ -52,14 +70,15 @@ def compute_loo( def fun(idx: int) -> tuple[int, float]: return idx, total_utility - u(all_indices.difference({idx})) - max_workers = effective_n_jobs(n_jobs, config) + parallel_backend = _maybe_init_parallel_backend(parallel_backend, config) + max_workers = parallel_backend.effective_n_jobs(n_jobs) n_submitted_jobs = 2 * max_workers # number of jobs in the queue # NOTE: this could be done with a simple executor.map(), but we want to # display a progress bar - with init_executor( - max_workers=max_workers, config=config, cancel_futures=True + with parallel_backend.executor( + max_workers=max_workers, cancel_futures=True ) as executor: pending: set[Future] = set() index_it = iter(u.data.indices) diff --git a/src/pydvl/value/semivalues.py b/src/pydvl/value/semivalues.py index 841d25213..1407086a8 100644 --- a/src/pydvl/value/semivalues.py +++ b/src/pydvl/value/semivalues.py @@ -90,6 +90,7 @@ import logging import math import warnings +from concurrent.futures import FIRST_COMPLETED, Future, wait from enum import Enum from itertools import islice from typing import Iterable, List, Optional, Protocol, Tuple, Type, cast @@ -99,6 +100,11 @@ from deprecate import deprecated from tqdm import tqdm +from pydvl.parallel import ( + ParallelBackend, + _maybe_init_parallel_backend, + init_parallel_backend, +) from pydvl.parallel.config import ParallelConfig from pydvl.utils import Utility from pydvl.utils.types import IndexT, Seed @@ -172,6 +178,12 @@ def _marginal( # deprecated_in="0.8.0", # remove_in="0.9.0", # ) +@deprecated( + target=True, + args_mapping={"config": "config"}, + deprecated_in="0.9.0", + remove_in="0.10.0", +) def compute_generic_semivalues( sampler: PowersetSampler[IndexT], u: Utility, @@ -181,7 +193,8 @@ def compute_generic_semivalues( batch_size: int = 1, skip_converged: bool = False, n_jobs: int = 1, - config: ParallelConfig = ParallelConfig(), + parallel_backend: Optional[ParallelBackend] = None, + config: Optional[ParallelConfig] = None, progress: bool = False, ) -> ValuationResult: """Computes semi-values for a given utility function and subset sampler. @@ -202,8 +215,13 @@ def compute_generic_semivalues( [AbsoluteStandardError][pydvl.value.stopping.AbsoluteStandardError], you will probably have to carefully adjust the threshold). n_jobs: Number of parallel jobs to use. - config: Object configuring parallel computation, with cluster - address, number of cpus, etc. + parallel_backend: Parallel backend instance to use + for parallelizing computations. If `None`, + use [JoblibParallelBackend][pydvl.parallel.backends.JoblibParallelBackend] backend. + See the [Parallel Backends][pydvl.parallel.backends] package + for available options. + config: (**DEPRECATED**) Object configuring parallel computation, + with cluster address, number of cpus, etc. progress: Whether to display a progress bar. Returns: @@ -212,11 +230,12 @@ def compute_generic_semivalues( !!! warning "Deprecation notice" Parameter `batch_size` is for experimental use and will be removed in future versions. - """ - from concurrent.futures import FIRST_COMPLETED, Future, wait - - from pydvl.parallel import effective_n_jobs, init_executor, init_parallel_backend + !!! tip "Changed in version 0.9.0" + Deprecated `config` argument and added a `parallel_backend` + argument to allow users to pass the Parallel Backend instance + directly. + """ if isinstance(sampler, PermutationSampler) and u.cache is None: log.warning( "PermutationSampler requires caching to be enabled or computation " @@ -236,20 +255,20 @@ def compute_generic_semivalues( data_names=u.data.data_names, ) - parallel_backend = init_parallel_backend(config) + parallel_backend = _maybe_init_parallel_backend(parallel_backend, config) u = parallel_backend.put(u) correction = parallel_backend.put( lambda n, k: coefficient(n, k) * sampler.weight(n, k) ) - max_workers = effective_n_jobs(n_jobs, config) + max_workers = parallel_backend.effective_n_jobs(n_jobs) n_submitted_jobs = 2 * max_workers # number of jobs in the queue sampler_it = iter(sampler) pbar = tqdm(disable=not progress, total=100, unit="%") - with init_executor( - max_workers=max_workers, config=config, cancel_futures=True + with parallel_backend.executor( + max_workers=max_workers, cancel_futures=True ) as executor: pending: set[Future] = set() while True: @@ -322,6 +341,12 @@ def beta_coefficient_w(n: int, k: int) -> float: return cast(SVCoefficient, beta_coefficient_w) +@deprecated( + target=True, + args_mapping={"config": "config"}, + deprecated_in="0.9.0", + remove_in="0.10.0", +) def compute_shapley_semivalues( u: Utility, *, @@ -329,7 +354,8 @@ def compute_shapley_semivalues( sampler_t: Type[StochasticSampler] = PermutationSampler, batch_size: int = 1, n_jobs: int = 1, - config: ParallelConfig = ParallelConfig(), + parallel_backend: Optional[ParallelBackend] = None, + config: Optional[ParallelConfig] = None, progress: bool = False, seed: Optional[Seed] = None, ) -> ValuationResult: @@ -348,8 +374,13 @@ def compute_shapley_semivalues( [sampler][pydvl.value.sampler] module for a list. batch_size: Number of marginal evaluations per single parallel job. n_jobs: Number of parallel jobs to use. - config: Object configuring parallel computation, with cluster - address, number of cpus, etc. + parallel_backend: Parallel backend instance to use + for parallelizing computations. If `None`, + use [JoblibParallelBackend][pydvl.parallel.backends.JoblibParallelBackend] backend. + See the [Parallel Backends][pydvl.parallel.backends] package + for available options. + config: (**DEPRECATED**) Object configuring parallel computation, + with cluster address, number of cpus, etc. seed: Either an instance of a numpy random number generator or a seed for it. progress: Whether to display a progress bar. @@ -360,6 +391,11 @@ def compute_shapley_semivalues( !!! warning "Deprecation notice" Parameter `batch_size` is for experimental use and will be removed in future versions. + + !!! tip "Changed in version 0.9.0" + Deprecated `config` argument and added a `parallel_backend` + argument to allow users to pass the Parallel Backend instance + directly. """ # HACK: cannot infer return type because of useless IndexT, NameT return compute_generic_semivalues( # type: ignore @@ -369,11 +405,18 @@ def compute_shapley_semivalues( done, batch_size=batch_size, n_jobs=n_jobs, + parallel_backend=parallel_backend, config=config, progress=progress, ) +@deprecated( + target=True, + args_mapping={"config": "config"}, + deprecated_in="0.9.0", + remove_in="0.10.0", +) def compute_banzhaf_semivalues( u: Utility, *, @@ -381,7 +424,8 @@ def compute_banzhaf_semivalues( sampler_t: Type[StochasticSampler] = PermutationSampler, batch_size: int = 1, n_jobs: int = 1, - config: ParallelConfig = ParallelConfig(), + parallel_backend: Optional[ParallelBackend] = None, + config: Optional[ParallelConfig] = None, progress: bool = False, seed: Optional[Seed] = None, ) -> ValuationResult: @@ -400,8 +444,13 @@ def compute_banzhaf_semivalues( n_jobs: Number of parallel jobs to use. seed: Either an instance of a numpy random number generator or a seed for it. - config: Object configuring parallel computation, with cluster address, - number of cpus, etc. + parallel_backend: Parallel backend instance to use + for parallelizing computations. If `None`, + use [JoblibParallelBackend][pydvl.parallel.backends.JoblibParallelBackend] backend. + See the [Parallel Backends][pydvl.parallel.backends] package + for available options. + config: (**DEPRECATED**) Object configuring parallel computation, + with cluster address, number of cpus, etc. progress: Whether to display a progress bar. Returns: @@ -410,6 +459,11 @@ def compute_banzhaf_semivalues( !!! warning "Deprecation notice" Parameter `batch_size` is for experimental use and will be removed in future versions. + + !!! tip "Changed in version 0.9.0" + Deprecated `config` argument and added a `parallel_backend` + argument to allow users to pass the Parallel Backend instance + directly. """ # HACK: cannot infer return type because of useless IndexT, NameT return compute_generic_semivalues( # type: ignore @@ -419,11 +473,18 @@ def compute_banzhaf_semivalues( done, batch_size=batch_size, n_jobs=n_jobs, + parallel_backend=parallel_backend, config=config, progress=progress, ) +@deprecated( + target=True, + args_mapping={"config": "config"}, + deprecated_in="0.9.0", + remove_in="0.10.0", +) def compute_beta_shapley_semivalues( u: Utility, *, @@ -433,7 +494,8 @@ def compute_beta_shapley_semivalues( sampler_t: Type[StochasticSampler] = PermutationSampler, batch_size: int = 1, n_jobs: int = 1, - config: ParallelConfig = ParallelConfig(), + parallel_backend: Optional[ParallelBackend] = None, + config: Optional[ParallelConfig] = None, progress: bool = False, seed: Optional[Seed] = None, ) -> ValuationResult: @@ -453,8 +515,13 @@ def compute_beta_shapley_semivalues( batch_size: Number of marginal evaluations per (parallelized) task. n_jobs: Number of parallel jobs to use. seed: Either an instance of a numpy random number generator or a seed for it. - config: Object configuring parallel computation, with cluster address, number of - cpus, etc. + parallel_backend: Parallel backend instance to use + for parallelizing computations. If `None`, + use [JoblibParallelBackend][pydvl.parallel.backends.JoblibParallelBackend] backend. + See the [Parallel Backends][pydvl.parallel.backends] package + for available options. + config: (**DEPRECATED**) Object configuring parallel computation, + with cluster address, number of cpus, etc. progress: Whether to display a progress bar. Returns: @@ -463,6 +530,11 @@ def compute_beta_shapley_semivalues( !!! warning "Deprecation notice" Parameter `batch_size` is for experimental use and will be removed in future versions. + + !!! tip "Changed in version 0.9.0" + Deprecated `config` argument and added a `parallel_backend` + argument to allow users to pass the Parallel Backend instance + directly. """ # HACK: cannot infer return type because of useless IndexT, NameT return compute_generic_semivalues( # type: ignore @@ -472,6 +544,7 @@ def compute_beta_shapley_semivalues( done, batch_size=batch_size, n_jobs=n_jobs, + parallel_backend=parallel_backend, config=config, progress=progress, ) diff --git a/src/pydvl/value/shapley/classwise.py b/src/pydvl/value/shapley/classwise.py index f8fb0dbee..3dab20902 100644 --- a/src/pydvl/value/shapley/classwise.py +++ b/src/pydvl/value/shapley/classwise.py @@ -64,16 +64,12 @@ from typing import Callable, Optional, Set, Tuple, Union, cast import numpy as np +from deprecate import deprecated from numpy.random import SeedSequence from numpy.typing import NDArray from tqdm import tqdm -from pydvl.parallel import ( - ParallelConfig, - effective_n_jobs, - init_executor, - init_parallel_backend, -) +from pydvl.parallel import ParallelBackend, ParallelConfig, _maybe_init_parallel_backend from pydvl.utils import ( Dataset, Scorer, @@ -238,6 +234,12 @@ def estimate_in_class_and_out_of_class_score( return in_class_score, out_of_class_score +@deprecated( + target=True, + args_mapping={"config": "config"}, + deprecated_in="0.9.0", + remove_in="0.10.0", +) def compute_classwise_shapley_values( u: Utility, *, @@ -248,7 +250,8 @@ def compute_classwise_shapley_values( use_default_scorer_value: bool = True, min_elements_per_label: int = 1, n_jobs: int = 1, - config: ParallelConfig = ParallelConfig(), + parallel_backend: Optional[ParallelBackend] = None, + config: Optional[ParallelConfig] = None, progress: bool = False, seed: Optional[Seed] = None, ) -> ValuationResult: @@ -288,7 +291,13 @@ def compute_classwise_shapley_values( min_elements_per_label: The minimum number of elements for each opposite label. n_jobs: Number of parallel jobs to run. - config: Parallel configuration. + parallel_backend: Parallel backend instance to use + for parallelizing computations. If `None`, + use [JoblibParallelBackend][pydvl.parallel.backends.JoblibParallelBackend] backend. + See the [Parallel Backends][pydvl.parallel.backends] package + for available options. + config: (**DEPRECATED**) Object configuring parallel computation, + with cluster address, number of cpus, etc. progress: Whether to display a progress bar. seed: Either an instance of a numpy random number generator or a seed for it. @@ -314,9 +323,9 @@ def compute_classwise_shapley_values( " utility. See scoring argument of Utility." ) - parallel_backend = init_parallel_backend(config) + parallel_backend = _maybe_init_parallel_backend(parallel_backend, config) u_ref = parallel_backend.put(u) - n_jobs = effective_n_jobs(n_jobs, config) + n_jobs = parallel_backend.effective_n_jobs(n_jobs) n_submitted_jobs = 2 * n_jobs pbar = tqdm(disable=not progress, position=0, total=100, unit="%") @@ -327,7 +336,9 @@ def compute_classwise_shapley_values( terminate_exec = False seed_sequence = ensure_seed_sequence(seed) - with init_executor(max_workers=n_jobs, config=config) as executor: + parallel_backend = _maybe_init_parallel_backend(parallel_backend, config) + + with parallel_backend.executor(max_workers=n_jobs) as executor: pending: Set[Future] = set() while True: completed_futures, pending = wait( @@ -379,7 +390,6 @@ def _permutation_montecarlo_classwise_shapley_one_step( """Helper function for [compute_classwise_shapley_values()] [pydvl.value.shapley.classwise.compute_classwise_shapley_values]. - Args: u: Utility object containing model, data, and scoring function. The scorer must be of type [ClasswiseScorer] @@ -399,6 +409,10 @@ def _permutation_montecarlo_classwise_shapley_one_step( Returns: ValuationResult object containing computed data values. + + !!! tip "Changed in version 0.9.0" + Deprecated `config` argument and added a `parallel_backend` + argument to allow users to pass the Parallel Backend configuration. """ if done_sample_complements is None: done_sample_complements = MaxChecks(1) diff --git a/src/pydvl/value/shapley/common.py b/src/pydvl/value/shapley/common.py index 6eb07ae93..c407b3ba8 100644 --- a/src/pydvl/value/shapley/common.py +++ b/src/pydvl/value/shapley/common.py @@ -119,11 +119,11 @@ def compute_shapley_values( **kwargs, ) elif mode == ShapleyMode.CombinatorialMontecarlo: - return combinatorial_montecarlo_shapley( + return combinatorial_montecarlo_shapley( # type: ignore u, done=done, n_jobs=n_jobs, seed=seed, progress=progress ) elif mode == ShapleyMode.CombinatorialExact: - return combinatorial_exact_shapley(u, n_jobs=n_jobs, progress=progress) + return combinatorial_exact_shapley(u, n_jobs=n_jobs, progress=progress) # type: ignore elif mode == ShapleyMode.PermutationExact: return permutation_exact_shapley(u, progress=progress) elif mode == ShapleyMode.Owen or mode == ShapleyMode.OwenAntithetic: @@ -137,7 +137,7 @@ def compute_shapley_values( if mode == ShapleyMode.Owen else OwenAlgorithm.Antithetic ) - return owen_sampling_shapley( + return owen_sampling_shapley( # type: ignore u, n_samples=int(kwargs.get("n_samples", -1)), max_q=int(kwargs.get("max_q", -1)), @@ -155,7 +155,7 @@ def compute_shapley_values( if epsilon is None: raise ValueError("Group Testing requires error bound epsilon") delta = kwargs.pop("delta", 0.05) - return group_testing_shapley( + return group_testing_shapley( # type: ignore u, epsilon=float(epsilon), delta=delta, diff --git a/src/pydvl/value/shapley/gt.py b/src/pydvl/value/shapley/gt.py index d47056825..6ced22139 100644 --- a/src/pydvl/value/shapley/gt.py +++ b/src/pydvl/value/shapley/gt.py @@ -28,11 +28,17 @@ import cvxpy as cp import numpy as np +from deprecate import deprecated from numpy.random import SeedSequence from numpy.typing import NDArray from tqdm.auto import trange -from pydvl.parallel import MapReduceJob, ParallelConfig, effective_n_jobs +from pydvl.parallel import ( + MapReduceJob, + ParallelBackend, + ParallelConfig, + _maybe_init_parallel_backend, +) from pydvl.utils import Utility from pydvl.utils.numeric import random_subset_of_size from pydvl.utils.status import Status @@ -164,6 +170,12 @@ def _group_testing_shapley( return uu, betas +@deprecated( + target=True, + args_mapping={"config": "config"}, + deprecated_in="0.9.0", + remove_in="0.10.0", +) def group_testing_shapley( u: Utility, n_samples: int, @@ -171,7 +183,8 @@ def group_testing_shapley( delta: float, *, n_jobs: int = 1, - config: ParallelConfig = ParallelConfig(), + parallel_backend: Optional[ParallelBackend] = None, + config: Optional[ParallelConfig] = None, progress: bool = False, seed: Optional[Seed] = None, **options: dict, @@ -201,8 +214,13 @@ def group_testing_shapley( estimation of `n_iterations`. n_jobs: Number of parallel jobs to use. Each worker performs a chunk of all tests (i.e. utility evaluations). - config: Object configuring parallel computation, with cluster - address, number of cpus, etc. + parallel_backend: Parallel backend instance to use + for parallelizing computations. If `None`, + use [JoblibParallelBackend][pydvl.parallel.backends.JoblibParallelBackend] backend. + See the [Parallel Backends][pydvl.parallel.backends] package + for available options. + config: (**DEPRECATED**) Object configuring parallel computation, + with cluster address, number of cpus, etc. progress: Whether to display progress bars for each job. seed: Either an instance of a numpy random number generator or a seed for it. options: Additional options to pass to @@ -218,6 +236,11 @@ def group_testing_shapley( !!! tip "Changed in version 0.5.0" Changed the solver to cvxpy instead of scipy's linprog. Added the ability to pass arbitrary options to it. + + !!! tip "Changed in version 0.9.0" + Deprecated `config` argument and added a `parallel_backend` + argument to allow users to pass the Parallel Backend instance + directly. """ n = len(u.data.indices) @@ -235,7 +258,9 @@ def group_testing_shapley( f"ε={epsilon:.02f} guarantee at δ={1 - delta:.02f} probability" ) - samples_per_job = max(1, n_samples // effective_n_jobs(n_jobs, config)) + parallel_backend = _maybe_init_parallel_backend(parallel_backend, config) + + samples_per_job = max(1, n_samples // parallel_backend.effective_n_jobs(n_jobs)) def reducer( results_it: Iterable[Tuple[NDArray, NDArray]] @@ -252,7 +277,7 @@ def reducer( map_func=_group_testing_shapley, reduce_func=reducer, map_kwargs=dict(n_samples=samples_per_job, progress=progress), - config=config, + parallel_backend=parallel_backend, n_jobs=n_jobs, ) uu, betas = map_reduce_job(seed=map_reduce_seed_sequence) diff --git a/src/pydvl/value/shapley/montecarlo.py b/src/pydvl/value/shapley/montecarlo.py index c10d64df6..a660aed2d 100644 --- a/src/pydvl/value/shapley/montecarlo.py +++ b/src/pydvl/value/shapley/montecarlo.py @@ -50,6 +50,7 @@ from typing import Optional, Sequence, Union import numpy as np +from deprecate import deprecated from numpy.random import SeedSequence from numpy.typing import NDArray from tqdm.auto import tqdm @@ -57,10 +58,9 @@ from pydvl.parallel import ( CancellationPolicy, MapReduceJob, + ParallelBackend, ParallelConfig, - effective_n_jobs, - init_executor, - init_parallel_backend, + _maybe_init_parallel_backend, ) from pydvl.utils.numeric import random_powerset from pydvl.utils.progress import repeat_indices @@ -127,13 +127,20 @@ def _permutation_montecarlo_one_step( return result +@deprecated( + target=True, + args_mapping={"config": "config"}, + deprecated_in="0.9.0", + remove_in="0.10.0", +) def permutation_montecarlo_shapley( u: Utility, done: StoppingCriterion, *, truncation: TruncationPolicy = NoTruncation(), n_jobs: int = 1, - config: ParallelConfig = ParallelConfig(), + parallel_backend: Optional[ParallelBackend] = None, + config: Optional[ParallelConfig] = None, progress: bool = False, seed: Optional[Seed] = None, ) -> ValuationResult: @@ -179,19 +186,29 @@ def permutation_montecarlo_shapley( processing a permutation and set all subsequent marginals to zero. Typically used to stop computation when the marginal is small. n_jobs: number of jobs across which to distribute the computation. - config: Object configuring parallel computation, with cluster address, - number of cpus, etc. + parallel_backend: Parallel backend instance to use + for parallelizing computations. If `None`, + use [JoblibParallelBackend][pydvl.parallel.backends.JoblibParallelBackend] backend. + See the [Parallel Backends][pydvl.parallel.backends] package + for available options. + config: (**DEPRECATED**) Object configuring parallel computation, + with cluster address, number of cpus, etc. progress: Whether to display a progress bar. seed: Either an instance of a numpy random number generator or a seed for it. Returns: Object with the data values. + + !!! tip "Changed in version 0.9.0" + Deprecated `config` argument and added a `parallel_backend` + argument to allow users to pass the Parallel Backend instance + directly. """ algorithm = "permutation_montecarlo_shapley" - parallel_backend = init_parallel_backend(config) + parallel_backend = _maybe_init_parallel_backend(parallel_backend, config) u = parallel_backend.put(u) - max_workers = effective_n_jobs(n_jobs, config) + max_workers = parallel_backend.effective_n_jobs(n_jobs) n_submitted_jobs = 2 * max_workers # number of jobs in the executor's queue seed_sequence = ensure_seed_sequence(seed) @@ -201,17 +218,15 @@ def permutation_montecarlo_shapley( pbar = tqdm(disable=not progress, total=100, unit="%") - with init_executor( - max_workers=max_workers, config=config, cancel_futures=CancellationPolicy.ALL + with parallel_backend.executor( + max_workers=max_workers, cancel_futures=CancellationPolicy.ALL ) as executor: pending: set[Future] = set() while True: pbar.n = 100 * done.completion() pbar.refresh() - completed, pending = wait( - pending, timeout=config.wait_timeout, return_when=FIRST_COMPLETED - ) + completed, pending = wait(pending, timeout=1.0, return_when=FIRST_COMPLETED) for future in completed: result += future.result() # we could check outside the loop, but that means more @@ -288,12 +303,19 @@ def _combinatorial_montecarlo_shapley( return result +@deprecated( + target=True, + args_mapping={"config": "config"}, + deprecated_in="0.9.0", + remove_in="0.10.0", +) def combinatorial_montecarlo_shapley( u: Utility, done: StoppingCriterion, *, n_jobs: int = 1, - config: ParallelConfig = ParallelConfig(), + parallel_backend: Optional[ParallelBackend] = None, + config: Optional[ParallelConfig] = None, progress: bool = False, seed: Optional[Seed] = None, ) -> ValuationResult: @@ -321,14 +343,25 @@ def combinatorial_montecarlo_shapley( n_jobs: number of parallel jobs across which to distribute the computation. Each worker receives a chunk of [indices][pydvl.utils.dataset.Dataset.indices] - config: Object configuring parallel computation, with cluster address, - number of cpus, etc. + parallel_backend: Parallel backend instance to use + for parallelizing computations. If `None`, + use [JoblibParallelBackend][pydvl.parallel.backends.JoblibParallelBackend] backend. + See the [Parallel Backends][pydvl.parallel.backends] package + for available options. + config: (**DEPRECATED**) Object configuring parallel computation, + with cluster address, number of cpus, etc. progress: Whether to display progress bars for each job. seed: Either an instance of a numpy random number generator or a seed for it. Returns: Object with the data values. + + !!! tip "Changed in version 0.9.0" + Deprecated `config` argument and added a `parallel_backend` + argument to allow users to pass the Parallel Backend instance + directly. """ + parallel_backend = _maybe_init_parallel_backend(parallel_backend, config) map_reduce_job: MapReduceJob[NDArray, ValuationResult] = MapReduceJob( u.data.indices, @@ -336,6 +369,6 @@ def combinatorial_montecarlo_shapley( reduce_func=lambda results: reduce(operator.add, results), map_kwargs=dict(u=u, done=done, progress=progress), n_jobs=n_jobs, - config=config, + parallel_backend=parallel_backend, ) return map_reduce_job(seed=seed) diff --git a/src/pydvl/value/shapley/naive.py b/src/pydvl/value/shapley/naive.py index 8323d5582..ca06da15c 100644 --- a/src/pydvl/value/shapley/naive.py +++ b/src/pydvl/value/shapley/naive.py @@ -14,13 +14,19 @@ import math import warnings from itertools import permutations -from typing import List +from typing import List, Optional import numpy as np +from deprecate import deprecated from numpy.typing import NDArray from tqdm.auto import tqdm -from pydvl.parallel import MapReduceJob, ParallelConfig +from pydvl.parallel import ( + MapReduceJob, + ParallelBackend, + ParallelConfig, + _maybe_init_parallel_backend, +) from pydvl.utils import Utility, powerset from pydvl.utils.status import Status from pydvl.value.result import ValuationResult @@ -103,11 +109,18 @@ def _combinatorial_exact_shapley( return local_values / n +@deprecated( + target=True, + args_mapping={"config": "config"}, + deprecated_in="0.9.0", + remove_in="0.10.0", +) def combinatorial_exact_shapley( u: Utility, *, n_jobs: int = 1, - config: ParallelConfig = ParallelConfig(), + parallel_backend: Optional[ParallelBackend] = None, + config: Optional[ParallelConfig] = None, progress: bool = False, ) -> ValuationResult: r"""Computes the exact Shapley value using the combinatorial definition. @@ -127,12 +140,22 @@ def combinatorial_exact_shapley( Args: u: Utility object with model, data, and scoring function n_jobs: Number of parallel jobs to use - config: Object configuring parallel computation, with cluster address, - number of cpus, etc. + parallel_backend: Parallel backend instance to use + for parallelizing computations. If `None`, + use [JoblibParallelBackend][pydvl.parallel.backends.JoblibParallelBackend] backend. + See the [Parallel Backends][pydvl.parallel.backends] package + for available options. + config: (**DEPRECATED**) Object configuring parallel computation, + with cluster address, number of cpus, etc. progress: Whether to display progress bars for each job. Returns: Object with the data values. + + !!! tip "Changed in version 0.9.0" + Deprecated `config` argument and added a `parallel_backend` + argument to allow users to pass the Parallel Backend instance + directly. """ # Arbitrary choice, will depend on time required, caching, etc. if len(u.data) // n_jobs > 20: @@ -143,13 +166,15 @@ def combinatorial_exact_shapley( def reduce_fun(results: List[NDArray]) -> NDArray: return np.array(results).sum(axis=0) # type: ignore + parallel_backend = _maybe_init_parallel_backend(parallel_backend, config) + map_reduce_job: MapReduceJob[NDArray, NDArray] = MapReduceJob( u.data.indices, map_func=_combinatorial_exact_shapley, map_kwargs=dict(u=u, progress=progress), reduce_func=reduce_fun, n_jobs=n_jobs, - config=config, + parallel_backend=parallel_backend, ) values = map_reduce_job() return ValuationResult( diff --git a/src/pydvl/value/shapley/owen.py b/src/pydvl/value/shapley/owen.py index d13ed3934..68c76db56 100644 --- a/src/pydvl/value/shapley/owen.py +++ b/src/pydvl/value/shapley/owen.py @@ -12,9 +12,15 @@ from typing import Optional, Sequence import numpy as np +from deprecate import deprecated from numpy.typing import NDArray -from pydvl.parallel import MapReduceJob, ParallelConfig +from pydvl.parallel import ( + MapReduceJob, + ParallelBackend, + ParallelConfig, + _maybe_init_parallel_backend, +) from pydvl.utils import Utility, random_powerset from pydvl.utils.progress import repeat_indices from pydvl.utils.types import Seed @@ -106,6 +112,12 @@ def _owen_sampling_shapley( return result +@deprecated( + target=True, + args_mapping={"config": "config"}, + deprecated_in="0.9.0", + remove_in="0.10.0", +) def owen_sampling_shapley( u: Utility, n_samples: int, @@ -113,7 +125,8 @@ def owen_sampling_shapley( *, method: OwenAlgorithm = OwenAlgorithm.Standard, n_jobs: int = 1, - config: ParallelConfig = ParallelConfig(), + parallel_backend: Optional[ParallelBackend] = None, + config: Optional[ParallelConfig] = None, progress: bool = False, seed: Optional[Seed] = None ) -> ValuationResult: @@ -161,8 +174,13 @@ def owen_sampling_shapley( $q \in [0,0.5]$ and correlated samples n_jobs: Number of parallel jobs to use. Each worker receives a chunk of the total of `max_q` values for q. - config: Object configuring parallel computation, with cluster address, - number of cpus, etc. + parallel_backend: Parallel backend instance to use + for parallelizing computations. If `None`, + use [JoblibParallelBackend][pydvl.parallel.backends.JoblibParallelBackend] backend. + See the [Parallel Backends][pydvl.parallel.backends] package + for available options. + config: (**DEPRECATED**) Object configuring parallel computation, + with cluster address, number of cpus, etc. progress: Whether to display progress bars for each job. seed: Either an instance of a numpy random number generator or a seed for it. @@ -174,7 +192,14 @@ def owen_sampling_shapley( !!! tip "Changed in version 0.5.0" Support for parallel computation and enable antithetic sampling. + !!! tip "Changed in version 0.9.0" + Deprecated `config` argument and added a `parallel_backend` + argument to allow users to pass the Parallel Backend instance + directly. + """ + parallel_backend = _maybe_init_parallel_backend(parallel_backend, config) + map_reduce_job: MapReduceJob[NDArray, ValuationResult] = MapReduceJob( u.data.indices, map_func=_owen_sampling_shapley, @@ -187,7 +212,7 @@ def owen_sampling_shapley( progress=progress, ), n_jobs=n_jobs, - config=config, + parallel_backend=parallel_backend, ) return map_reduce_job(seed=seed) diff --git a/tests/parallel/conftest.py b/tests/parallel/conftest.py index 1326fa7ca..d6a489412 100644 --- a/tests/parallel/conftest.py +++ b/tests/parallel/conftest.py @@ -1,18 +1,20 @@ +import joblib import pytest -from pydvl.parallel.config import ParallelConfig +from pydvl.parallel import JoblibParallelBackend, RayParallelBackend from ..conftest import num_workers @pytest.fixture(scope="module", params=["joblib", "ray-local", "ray-external"]) -def parallel_config(request): +def parallel_backend(request): if request.param == "joblib": - yield ParallelConfig(backend="joblib", n_cpus_local=num_workers()) + with joblib.parallel_config(backend="loky", n_jobs=num_workers()): + yield JoblibParallelBackend() elif request.param == "ray-local": ray = pytest.importorskip("ray", reason="Ray not installed.") ray.init(num_cpus=num_workers()) - yield ParallelConfig(backend="ray") + yield RayParallelBackend() ray.shutdown() elif request.param == "ray-external": ray = pytest.importorskip("ray", reason="Ray not installed.") @@ -24,6 +26,6 @@ def parallel_config(request): initialize_head=True, head_node_args={"num_cpus": num_workers()} ) ray.init(cluster.address) - yield ParallelConfig(backend="ray", address=cluster.address) + yield RayParallelBackend() ray.shutdown() cluster.shutdown() diff --git a/tests/parallel/test_parallel.py b/tests/parallel/test_parallel.py index 9d27393f9..c71c8c743 100644 --- a/tests/parallel/test_parallel.py +++ b/tests/parallel/test_parallel.py @@ -7,35 +7,26 @@ import numpy as np import pytest -from pydvl.parallel import MapReduceJob, init_parallel_backend -from pydvl.parallel.backend import effective_n_jobs -from pydvl.parallel.futures import init_executor +from pydvl.parallel import MapReduceJob, RayParallelBackend, init_parallel_backend from pydvl.utils.types import Seed from ..conftest import num_workers -def test_effective_n_jobs(parallel_config): - parallel_backend = init_parallel_backend(parallel_config) +def test_effective_n_jobs(parallel_backend): assert parallel_backend.effective_n_jobs(1) == 1 assert parallel_backend.effective_n_jobs(4) == min(4, num_workers()) - if parallel_config.address is None: - assert parallel_backend.effective_n_jobs(-1) == num_workers() - else: - assert parallel_backend.effective_n_jobs(-1) == num_workers() + assert parallel_backend.effective_n_jobs(-1) == num_workers() for n_jobs in [-1, 1, 2]: - assert parallel_backend.effective_n_jobs(n_jobs) == effective_n_jobs( - n_jobs, parallel_config - ) - assert effective_n_jobs(n_jobs, parallel_config) > 0 + assert parallel_backend.effective_n_jobs(n_jobs) > 0 with pytest.raises(ValueError): parallel_backend.effective_n_jobs(0) @pytest.fixture() -def map_reduce_job_and_parameters(parallel_config, n_jobs, request): +def map_reduce_job_and_parameters(parallel_backend, n_jobs, request): try: kind, map_func, reduce_func = request.param assert kind == "custom" @@ -46,7 +37,7 @@ def map_reduce_job_and_parameters(parallel_config, n_jobs, request): MapReduceJob, map_func=np.sum, reduce_func=np.sum, - config=parallel_config, + parallel_backend=parallel_backend, n_jobs=n_jobs, ) elif kind == "list": @@ -54,7 +45,7 @@ def map_reduce_job_and_parameters(parallel_config, n_jobs, request): MapReduceJob, map_func=lambda x: x, reduce_func=lambda r: reduce(operator.add, r, []), - config=parallel_config, + parallel_backend=parallel_backend, n_jobs=n_jobs, ) elif kind == "range": @@ -62,7 +53,7 @@ def map_reduce_job_and_parameters(parallel_config, n_jobs, request): MapReduceJob, map_func=lambda x: list(x), reduce_func=lambda r: reduce(operator.add, list(r), []), - config=parallel_config, + parallel_backend=parallel_backend, n_jobs=n_jobs, ) elif kind == "custom": @@ -70,7 +61,7 @@ def map_reduce_job_and_parameters(parallel_config, n_jobs, request): MapReduceJob, map_func=map_func, reduce_func=reduce_func, - config=parallel_config, + parallel_backend=parallel_backend, n_jobs=n_jobs, ) else: @@ -78,7 +69,7 @@ def map_reduce_job_and_parameters(parallel_config, n_jobs, request): MapReduceJob, map_func=lambda x: x * x, reduce_func=lambda r: r, - config=parallel_config, + parallel_backend=parallel_backend, n_jobs=n_jobs, ) return map_reduce_job, n_jobs @@ -116,14 +107,16 @@ def test_map_reduce_job(map_reduce_job_and_parameters, indices, expected): (np.arange(10), 4, np.array_split(np.arange(10), 4)), ], ) -def test_chunkification(parallel_config, data, n_chunks, expected_chunks): - map_reduce_job = MapReduceJob([], map_func=lambda x: x, config=parallel_config) +def test_chunkification(parallel_backend, data, n_chunks, expected_chunks): + map_reduce_job = MapReduceJob( + [], map_func=lambda x: x, parallel_backend=parallel_backend + ) chunks = list(map_reduce_job._chunkify(data, n_chunks)) for x, y in zip(chunks, expected_chunks): assert np.all(x == y) -def test_map_reduce_job_partial_map_and_reduce_func(parallel_config): +def test_map_reduce_job_partial_map_and_reduce_func(parallel_backend): def map_func(x, y): return x + y @@ -137,7 +130,7 @@ def reduce_func(x, y): np.arange(10), map_func=map_func, reduce_func=reduce_func, - config=parallel_config, + parallel_backend=parallel_backend, ) result = map_reduce_job() assert result == 150 @@ -149,7 +142,7 @@ def reduce_func(x, y): (42, 12), ], ) -def test_map_reduce_seeding(parallel_config, seed_1, seed_2): +def test_map_reduce_seeding(parallel_backend, seed_1, seed_2): """Test that the same result is obtained when using the same seed. And that different results are obtained when using different seeds. """ @@ -163,7 +156,7 @@ def _sum_of_random_integers(x: None = None, seed: Optional[Seed] = None): None, map_func=_sum_of_random_integers, reduce_func=np.mean, - config=parallel_config, + parallel_backend=parallel_backend, ) result_1 = map_reduce_job(seed=seed_1) result_2 = map_reduce_job(seed=seed_1) @@ -172,15 +165,14 @@ def _sum_of_random_integers(x: None = None, seed: Optional[Seed] = None): assert result_1 != result_3 -def test_wrap_function(parallel_config): - if parallel_config.backend != "ray": +def test_wrap_function(parallel_backend): + if not isinstance(parallel_backend, RayParallelBackend): pytest.skip("Only makes sense for ray") def fun(x, **kwargs): return dict(x=x * x, **kwargs) - parallel_backend = init_parallel_backend(parallel_config) - # Try two kwargs for @ray.remote. Should be ignored in the sequential backend + # Try two kwargs for @ray.remote. Should be ignored in the joblib backend wrapped_func = parallel_backend.wrap(fun, num_cpus=1, max_calls=1) x = parallel_backend.put(2) ret = parallel_backend.get(wrapped_func(x)) @@ -198,29 +190,26 @@ def get_pid(): assert len(set(pids)) == num_workers() -def test_futures_executor_submit(parallel_config): - with init_executor(config=parallel_config) as executor: +def test_futures_executor_submit(parallel_backend): + with parallel_backend.executor() as executor: future = executor.submit(lambda x: x + 1, 1) result = future.result() assert result == 2 -def test_futures_executor_map(parallel_config): - with init_executor(config=parallel_config) as executor: +def test_futures_executor_map(parallel_backend): + with parallel_backend.executor() as executor: results = list(executor.map(lambda x: x + 1, range(3))) assert results == [1, 2, 3] -def test_futures_executor_map_with_max_workers(parallel_config): - if parallel_config.backend != "ray": - pytest.skip("Currently this test only works with Ray") - +def test_futures_executor_map_with_max_workers(parallel_backend): def func(_): time.sleep(1) return time.monotonic() start_time = time.monotonic() - with init_executor(config=parallel_config) as executor: + with parallel_backend.executor(max_workers=num_workers()) as executor: assert executor._max_workers == num_workers() list(executor.map(func, range(3))) end_time = time.monotonic() @@ -231,24 +220,20 @@ def func(_): @pytest.mark.timeout(30) @pytest.mark.tolerate(max_failures=1) -def test_future_cancellation(parallel_config): - if parallel_config.backend != "ray": +def test_future_cancellation(parallel_backend): + if not isinstance(parallel_backend, RayParallelBackend): pytest.skip("Currently this test only works with Ray") from pydvl.parallel import CancellationPolicy - with init_executor( - config=parallel_config, cancel_futures=CancellationPolicy.NONE - ) as executor: + with parallel_backend.executor(cancel_futures=CancellationPolicy.NONE) as executor: future = executor.submit(lambda x: x + 1, 1) assert future.result() == 2 from ray.exceptions import RayTaskError, TaskCancelledError - with init_executor( - config=parallel_config, cancel_futures=CancellationPolicy.ALL - ) as executor: + with parallel_backend.executor(cancel_futures=CancellationPolicy.ALL) as executor: future = executor.submit(lambda t: time.sleep(t), 5) while future._state != "FINISHED": diff --git a/tests/value/conftest.py b/tests/value/conftest.py index 139f0f5b6..1e3dad94e 100644 --- a/tests/value/conftest.py +++ b/tests/value/conftest.py @@ -1,3 +1,4 @@ +import joblib import numpy as np import pytest from numpy.typing import NDArray @@ -6,6 +7,7 @@ from sklearn.preprocessing import PolynomialFeatures from sklearn.utils import Bunch +from pydvl.parallel import JoblibParallelBackend from pydvl.parallel.config import ParallelConfig from pydvl.utils import Dataset, SupervisedModel, Utility from pydvl.utils.caching import InMemoryCacheBackend @@ -160,8 +162,9 @@ def linear_shapley(cache, linear_dataset, scorer, n_jobs): @pytest.fixture(scope="module") -def parallel_config(): - yield ParallelConfig(backend="joblib", n_cpus_local=num_workers(), wait_timeout=0.1) +def parallel_backend(): + with joblib.parallel_config(n_jobs=num_workers()): + yield JoblibParallelBackend() @pytest.fixture() diff --git a/tests/value/least_core/test_common.py b/tests/value/least_core/test_common.py index 6add2d12a..754666273 100644 --- a/tests/value/least_core/test_common.py +++ b/tests/value/least_core/test_common.py @@ -12,7 +12,7 @@ [("miner", {"n_players": 5})], indirect=True, ) -def test_lc_solve_problems(test_game, n_jobs, parallel_config): +def test_lc_solve_problems(test_game, n_jobs, parallel_backend): """Test solving LeastCoreProblems in parallel.""" n_problems = n_jobs @@ -22,7 +22,7 @@ def test_lc_solve_problems(test_game, n_jobs, parallel_config): test_game.u, algorithm="test_lc", n_jobs=n_jobs, - config=parallel_config, + parallel_backend=parallel_backend, ) assert len(solutions) == n_problems diff --git a/tests/value/loo/test_loo.py b/tests/value/loo/test_loo.py index 04922a7a0..6cc5cd99e 100644 --- a/tests/value/loo/test_loo.py +++ b/tests/value/loo/test_loo.py @@ -6,9 +6,9 @@ @pytest.mark.parametrize("num_samples", [10, 100]) -def test_loo(num_samples: int, n_jobs: int, parallel_config, analytic_loo): +def test_loo(num_samples: int, n_jobs: int, parallel_backend, analytic_loo): """Compares LOO with analytic values in a dummy model""" u, exact_values = analytic_loo - values = compute_loo(u, n_jobs=n_jobs, config=parallel_config, progress=False) + values = compute_loo(u, n_jobs=n_jobs, parallel_backend=parallel_backend) check_total_value(u, values, rtol=0.1) check_values(values, exact_values, rtol=0.1) diff --git a/tests/value/shapley/test_montecarlo.py b/tests/value/shapley/test_montecarlo.py index 157ebc40f..d59937bec 100644 --- a/tests/value/shapley/test_montecarlo.py +++ b/tests/value/shapley/test_montecarlo.py @@ -53,7 +53,7 @@ ) def test_games( test_game, - parallel_config, + parallel_backend, n_jobs, fun: ShapleyMode, rtol: float, @@ -79,7 +79,7 @@ def test_games( test_game.u, mode=fun, n_jobs=n_jobs, - config=parallel_config, + parallel_backend=parallel_backend, seed=seed, progress=True, **kwargs @@ -110,7 +110,7 @@ def test_games( ) def test_seed( test_game, - parallel_config: ParallelConfig, + parallel_backend: ParallelConfig, n_jobs: int, fun: ShapleyMode, kwargs: dict, @@ -122,7 +122,7 @@ def test_seed( test_game.u, mode=fun, n_jobs=n_jobs, - config=parallel_config, + parallel_backend=parallel_backend, seeds=(seed, seed, seed_alt), **deepcopy(kwargs) ) diff --git a/tests/value/shapley/test_truncated.py b/tests/value/shapley/test_truncated.py index 7d5977216..d725393ed 100644 --- a/tests/value/shapley/test_truncated.py +++ b/tests/value/shapley/test_truncated.py @@ -33,7 +33,7 @@ ) def test_games( test_game, - parallel_config, + parallel_backend, n_jobs, done, truncation_cls, @@ -52,7 +52,7 @@ def test_games( done=done, truncation=truncation, n_jobs=n_jobs, - config=parallel_config, + parallel_backend=parallel_backend, seed=seed, progress=True, ) diff --git a/tests/value/test_semivalues.py b/tests/value/test_semivalues.py index e33f92543..bea03bf27 100644 --- a/tests/value/test_semivalues.py +++ b/tests/value/test_semivalues.py @@ -104,7 +104,7 @@ def test_coefficients(n: int, coefficient: SVCoefficient): @pytest.mark.parametrize("coefficient", [shapley_coefficient, beta_coefficient(1, 1)]) def test_games_shapley_deterministic( test_game, - parallel_config, + parallel_backend, n_jobs, sampler: Type[PowersetSampler], coefficient: SVCoefficient, @@ -118,7 +118,7 @@ def test_games_shapley_deterministic( criterion, skip_converged=True, n_jobs=n_jobs, - config=parallel_config, + parallel_backend=parallel_backend, progress=True, ) exact_values = test_game.shapley_values() @@ -145,7 +145,7 @@ def test_games_shapley_deterministic( @pytest.mark.parametrize("coefficient", [shapley_coefficient, beta_coefficient(1, 1)]) def test_games_shapley( test_game, - parallel_config, + parallel_backend, n_jobs, sampler: Type[PowersetSampler], coefficient: SVCoefficient, @@ -159,7 +159,7 @@ def test_games_shapley( criterion, skip_converged=True, n_jobs=n_jobs, - config=parallel_config, + parallel_backend=parallel_backend, progress=True, ) @@ -196,7 +196,7 @@ def test_shapley_batch_size( coefficient: SVCoefficient, batch_size: int, n_jobs: int, - parallel_config: ParallelConfig, + parallel_backend: ParallelConfig, seed: Seed, ): timed_fn = timed(compute_generic_semivalues) @@ -208,7 +208,7 @@ def test_shapley_batch_size( skip_converged=True, n_jobs=n_jobs, batch_size=1, - config=parallel_config, + parallel_backend=parallel_backend, ) total_seconds_single_batch = timed_fn.execution_time @@ -220,7 +220,7 @@ def test_shapley_batch_size( skip_converged=True, n_jobs=n_jobs, batch_size=batch_size, - config=parallel_config, + parallel_backend=parallel_backend, ) total_seconds_multi_batch = timed_fn.execution_time assert total_seconds_multi_batch < total_seconds_single_batch * 1.1 @@ -246,7 +246,7 @@ def test_banzhaf( analytic_banzhaf, sampler: Type[PowersetSampler], n_jobs: int, - parallel_config: ParallelConfig, + parallel_backend: ParallelConfig, seed, ): u, exact_values = analytic_banzhaf @@ -258,6 +258,6 @@ def test_banzhaf( criterion, skip_converged=True, n_jobs=n_jobs, - config=parallel_config, + parallel_backend=parallel_backend, ) check_values(values, exact_values, rtol=0.2)