diff --git a/executorlib/__init__.py b/executorlib/__init__.py index 3337653a..4adfad8c 100644 --- a/executorlib/__init__.py +++ b/executorlib/__init__.py @@ -179,7 +179,7 @@ def __new__( resource_dict.update( {k: v for k, v in default_resource_dict.items() if k not in resource_dict} ) - if "pysqa_" in backend and not plot_dependency_graph: + if "_submission" in backend and not plot_dependency_graph: from executorlib.cache.executor import create_file_executor return create_file_executor( @@ -197,7 +197,7 @@ def __new__( init_function=init_function, disable_dependencies=disable_dependencies, ) - elif not disable_dependencies: + elif not disable_dependencies and (plot_dependency_graph or "_allocation" in backend): _check_pysqa_config_directory(pysqa_config_directory=pysqa_config_directory) return ExecutorWithDependencies( max_workers=max_workers, diff --git a/executorlib/cache/executor.py b/executorlib/cache/executor.py index a2de5bf1..ad817915 100644 --- a/executorlib/cache/executor.py +++ b/executorlib/cache/executor.py @@ -81,7 +81,7 @@ def __init__( def create_file_executor( max_workers: int = 1, - backend: str = "pysqa_flux", + backend: str = "flux_submission", max_cores: int = 1, cache_directory: Optional[str] = None, resource_dict: Optional[dict] = None, @@ -113,6 +113,6 @@ def create_file_executor( cache_directory=cache_directory, resource_dict=resource_dict, pysqa_config_directory=pysqa_config_directory, - backend=backend.split("pysqa_")[-1], + backend=backend.split("_submission")[0], disable_dependencies=disable_dependencies, ) diff --git a/executorlib/interactive/executor.py b/executorlib/interactive/executor.py index 067dec50..fb5dbf71 100644 --- a/executorlib/interactive/executor.py +++ b/executorlib/interactive/executor.py @@ -198,13 +198,13 @@ def create_executor( init_function (None): optional function to preset arguments for functions which are submitted later """ check_init_function(block_allocation=block_allocation, init_function=init_function) - if flux_executor is not None and backend != "flux": - backend = "flux" + if flux_executor is not None and backend != "flux_allocation": + backend = "flux_allocation" check_pmi(backend=backend, pmi=flux_executor_pmi_mode) cores_per_worker = resource_dict["cores"] resource_dict["cache_directory"] = cache_directory resource_dict["hostname_localhost"] = hostname_localhost - if backend == "flux": + if backend == "flux_allocation": check_oversubscribe(oversubscribe=resource_dict["openmpi_oversubscribe"]) check_command_line_argument_lst( command_line_argument_lst=resource_dict["slurm_cmd_args"] @@ -233,7 +233,7 @@ def create_executor( executor_kwargs=resource_dict, spawner=FluxPythonSpawner, ) - elif backend == "slurm": + elif backend == "slurm_allocation": check_executor(executor=flux_executor) check_nested_flux_executor(nested_flux_executor=flux_executor_nesting) if block_allocation: @@ -255,7 +255,7 @@ def create_executor( executor_kwargs=resource_dict, spawner=SrunSpawner, ) - else: # backend="local" + elif backend == "local": check_executor(executor=flux_executor) check_nested_flux_executor(nested_flux_executor=flux_executor_nesting) check_threads_per_core(threads_per_core=resource_dict["threads_per_core"]) @@ -285,3 +285,5 @@ def create_executor( executor_kwargs=resource_dict, spawner=MpiExecSpawner, ) + else: + raise ValueError("The supported backends are slurm_allocation, slurm_submission, flux_allocation, flux_submission and local.") \ No newline at end of file diff --git a/notebooks/examples.ipynb b/notebooks/examples.ipynb index 70c90a86..7334e486 100644 --- a/notebooks/examples.ipynb +++ b/notebooks/examples.ipynb @@ -69,7 +69,7 @@ "source": [ "from executorlib import Executor\n", "\n", - "with Executor(max_cores=1, backend=\"flux\") as exe:\n", + "with Executor(max_cores=1, backend=\"flux_allocation\") as exe:\n", " future = exe.submit(sum, [1, 1])\n", " print(future.result())" ] @@ -103,7 +103,7 @@ " return sum(*args)\n", "\n", "\n", - "with Executor(max_cores=2, backend=\"flux\") as exe:\n", + "with Executor(max_cores=2, backend=\"flux_allocation\") as exe:\n", " fs_1 = exe.submit(calc, [2, 1])\n", " fs_2 = exe.submit(calc, [2, 2])\n", " fs_3 = exe.submit(calc, [2, 3])\n", @@ -159,7 +159,7 @@ " return sum(*args)\n", "\n", "\n", - "with Executor(max_cores=2, backend=\"flux\") as exe:\n", + "with Executor(max_cores=2, backend=\"flux_allocation\") as exe:\n", " print(list(exe.map(calc, [[2, 1], [2, 2], [2, 3], [2, 4]])))" ] }, @@ -277,7 +277,7 @@ " # Resource definition on the executor level\n", " max_cores=2, # total number of cores available to the Executor\n", " block_allocation=True, # reuse python processes\n", - " backend=\"flux\",\n", + " backend=\"flux_allocation\",\n", ") as exe:\n", " future_obj = exe.submit(\n", " calc_function,\n", @@ -332,7 +332,7 @@ "with Executor(\n", " max_cores=1,\n", " init_function=init_function,\n", - " backend=\"flux\",\n", + " backend=\"flux_allocation\",\n", " block_allocation=True,\n", ") as exe:\n", " fs = exe.submit(calc, 2, j=5)\n", @@ -462,7 +462,7 @@ "with Executor(\n", " max_cores=2,\n", " resource_dict={\"cores\": 2},\n", - " backend=\"flux\",\n", + " backend=\"flux_allocation\",\n", " flux_executor_pmi_mode=\"pmix\",\n", ") as exe:\n", " fs = exe.submit(calc, 3)\n", @@ -519,7 +519,7 @@ "with Executor(\n", " max_workers=2, \n", " gpus_per_worker=1,\n", - " backend=\"flux\",\n", + " backend=\"flux_allocation\",\n", ") as exe:\n", " fs_1 = exe.submit(get_available_gpus)\n", " fs_2 = exe.submit(get_available_gpus)\n", @@ -683,7 +683,7 @@ "id": "ae8dd860-f90f-47b4-b3e5-664f5c949350", "metadata": {}, "source": [ - "The `backend=\"slurm\"` parameter is optional as `executorlib` automatically recognizes if [flux framework](https://flux-framework.org)\n", + "The `backend=\"slurm_allocation\"` parameter is optional as `executorlib` automatically recognizes if [flux framework](https://flux-framework.org)\n", "or SLURM are available. \n", "\n", "In addition, the SLURM backend introduces the `command_line_argument_lst=[]` parameter, which allows the user to provide\n",