Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add venv support #1782

Draft
wants to merge 1 commit into
base: uv-install
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/api/python/image.rst
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ ImageSteupStepType
.. autoattribute:: CONDA_INSTALL
.. autoattribute:: UV_INSTALL
.. autoattribute:: SYNC_PACKAGE
.. autoattribute:: SET_VENV

ImageSetupStep
~~~~~~~~~~~~~~
Expand Down
52 changes: 46 additions & 6 deletions runhouse/resources/hardware/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
run_setup_command,
thread_coroutine,
ThreadWithException,
venv_cmd,
)

# Filter out DeprecationWarnings
Expand Down Expand Up @@ -219,10 +220,16 @@ def docker_user(self) -> Optional[str]:

@property
def conda_env_name(self) -> Optional[str]:
if self.image and self.image.conda_env_name:
if self.image:
return self.image.conda_env_name
return None

@property
def venv_path(self) -> Optional[str]:
if self.image:
return self.image.venv_path
return None

def _save_config_to_cluster(
self,
node: str = None,
Expand Down Expand Up @@ -691,6 +698,7 @@ def _sync_image_to_cluster(self, parallel: bool = True):
step_type=ImageSetupStepType.PIP_INSTALL,
reqs=["uv"],
conda_env_name=self.conda_env_name,
venv_path=self.venv_path,
),
parallel=parallel,
)
Expand All @@ -716,6 +724,7 @@ def _sync_runhouse_to_cluster(
step_type=ImageSetupStepType.PIP_INSTALL,
reqs=["ray"],
conda_env_name=self.conda_env_name,
venv_path=self.venv_path,
),
parallel=parallel,
)
Expand All @@ -735,6 +744,7 @@ def _sync_runhouse_to_cluster(
step_type=ImageSetupStepType.PIP_INSTALL,
reqs=["runhouse[server]"],
conda_env_name=self.conda_env_name,
venv_path=self.venv_path,
),
parallel=parallel,
)
Expand All @@ -744,6 +754,7 @@ def install_packages(
reqs: List[Union["Package", str]],
node: Optional[str] = None,
conda_env_name: Optional[str] = None,
venv_path: Optional[str] = None,
force_sync_local: bool = False,
):
"""Install the given packages on the cluster.
Expand All @@ -754,6 +765,7 @@ def install_packages(
package. (Default: ``None``)
conda_env_name (str, optional): Name of conda env to install the package in, if relevant. If left empty,
defaults to base environment. (Default: ``None``)
venv_path (str, optional): Path of venv to install the package in, if relevant. (Defautl: ``None``)
force_sync_local (bool, optional): If the package exists both locally and remotely, whether to override
the remote version with the local version. By default, the local version will be installed only if
the package does not already exist on the cluster. (Default: ``False``)
Expand All @@ -767,13 +779,15 @@ def install_packages(
self.install_package(
req,
conda_env_name=conda_env_name,
venv_path=venv_path,
force_sync_local=force_sync_local,
)
else:
self.install_package_over_ssh(
req,
node=node,
conda_env_name=conda_env_name,
venv_path=venv_path,
force_sync_local=force_sync_local,
)

Expand All @@ -782,6 +796,7 @@ def pip_install(
reqs: List[Union["Package", str]],
node: Optional[str] = None,
conda_env_name: Optional[str] = None,
venv_path: Optional[str] = None,
force_sync_local: bool = False,
):
from runhouse.resources.packages.package import Package
Expand All @@ -794,6 +809,7 @@ def pip_install(
reqs=pip_packages,
node=node,
conda_env_name=conda_env_name,
venv_path=venv_path,
force_sync_local=force_sync_local,
)

Expand All @@ -802,6 +818,7 @@ def uv_install(
reqs: List[Union["Package", str]],
node: Optional[str] = None,
conda_env_name: Optional[str] = None,
venv_path: Optional[str] = None,
force_sync_local: bool = False,
):
from runhouse.resources.packages.package import Package
Expand All @@ -814,6 +831,7 @@ def uv_install(
reqs=uv_packages,
node=node,
conda_env_name=conda_env_name,
venv_path=venv_path,
force_sync_local=force_sync_local,
)

Expand Down Expand Up @@ -841,7 +859,6 @@ def sync_package(
self,
package: Union["Package", str],
node: Optional[str] = None,
conda_env_name: Optional[str] = None,
):
from runhouse.resources.packages.package import Package

Expand All @@ -853,7 +870,6 @@ def sync_package(
self.install_packages(
reqs=[package],
node=node,
conda_env_name=conda_env_name,
)

def get(self, key: str, default: Any = None, remote=False):
Expand Down Expand Up @@ -1156,6 +1172,7 @@ def _start_ray_workers(self, ray_port, env_vars):
cluster=self,
env_vars=env_vars,
conda_env_name=self.conda_env_name,
venv_path=self.venv_path,
node=host,
stream_logs=True,
)
Expand Down Expand Up @@ -1287,6 +1304,7 @@ def _start_or_restart_helper(
cluster=self,
env_vars=image_env_vars,
conda_env_name=self.conda_env_name,
venv_path=self.venv_path,
stream_logs=True,
node=self.head_ip,
)
Expand Down Expand Up @@ -1382,6 +1400,7 @@ def stop_server(
stop_ray: bool = False,
cleanup_actors: bool = True,
conda_env_name: Optional[str] = None,
venv_path: Optional[str] = None,
):
"""Stop the RPC server.

Expand All @@ -1402,6 +1421,7 @@ def stop_server(
node=self.head_ip,
require_outputs=False,
conda_env_name=conda_env_name,
venv_path=venv_path,
)
assert status_codes[0] == 0

Expand Down Expand Up @@ -1969,6 +1989,7 @@ def run_bash_over_ssh(
require_outputs: bool = True,
_ssh_mode: str = "interactive", # Note, this only applies for non-password SSH
conda_env_name: Optional[str] = None,
venv_path: Optional[str] = None,
):
"""Run bash commands on the cluster over SSH. Will not work directly on the cluster, works strictly over
ssh.
Expand All @@ -1981,6 +2002,7 @@ def run_bash_over_ssh(
stream_logs (bool): Whether to stream logs. (Default: ``True``)
require_outputs (bool): Whether to return stdout/stderr in addition to status code. (Default: ``True``)
conda_env_name (str or None): Name of conda env to run the command in, if applicable. (Defaut: ``None``)
venv_path: (str or None): Path of venv to run the command in, if applicable. (Defaut: ``None``)
"""
if self.on_this_cluster():
raise ValueError("Run bash over SSH is not supported on the local cluster.")
Expand All @@ -2004,8 +2026,12 @@ def run_bash_over_ssh(
res_list.append(res)
return res_list

venv_path = venv_path or self.image.venv_path if self.image else None

if conda_env_name:
commands = [conda_env_cmd(cmd, conda_env_name) for cmd in commands]
if venv_path:
commands = [venv_cmd(cmd, venv_path) for cmd in commands]

return_codes = self._run_commands_with_runner(
commands,
Expand Down Expand Up @@ -2157,14 +2183,16 @@ def run_python(
self,
commands: List[str],
conda_env_name: Optional[str] = None,
venv_path: Optional[str] = None,
stream_logs: bool = True,
node: str = None,
):
"""Run a list of python commands on the cluster, or a specific cluster node if its IP is provided.

Args:
commands (List[str]): List of commands to run.
process (str, optional): Process to run the commands in. (Default: ``None``)
conda_env_name (str or None): Name of conda env to run the command in, if applicable. (Defaut: ``None``)
venv_path: (str or None): Path of venv to run the command in, if applicable. (Defaut: ``None``)
stream_logs (bool, optional): Whether to stream logs. (Default: ``True``)
node (str, optional): Node to run commands on. If not specified, runs on head node. (Default: ``None``)

Expand Down Expand Up @@ -2193,6 +2221,7 @@ def run_python(
stream_logs=stream_logs,
node=node,
conda_env_name=conda_env_name,
venv_path=venv_path,
)

return return_codes
Expand Down Expand Up @@ -2817,6 +2846,7 @@ def install_package(
self,
package: Union["Package", str],
conda_env_name: Optional[str] = None,
venv_path: Optional[str] = None,
force_sync_local: bool = False,
):
from runhouse.resources.packages.package import Package
Expand All @@ -2826,17 +2856,26 @@ def install_package(

if self.on_this_cluster():
obj_store.ainstall_package_in_all_nodes_and_processes(
package, conda_env_name, force_sync_local
package=package,
conda_env_name=conda_env_name,
venv_path=venv_path,
force_sync_local=force_sync_local,
)
else:
package = package.to(self)
self.client.install_package(package, conda_env_name, force_sync_local)
self.client.install_package(
package=package,
conda_env_name=conda_env_name,
venv_path=venv_path,
force_sync_local=force_sync_local,
)

def install_package_over_ssh(
self,
package: Union["Package", str],
node: str,
conda_env_name: str,
venv_path: str,
force_sync_local: bool = False,
):
from runhouse.resources.packages import InstallTarget, Package
Expand All @@ -2853,5 +2892,6 @@ def install_package_over_ssh(
cluster=self,
node=node,
conda_env_name=conda_env_name,
venv_path=venv_path,
force_sync_local=force_sync_local,
)
23 changes: 5 additions & 18 deletions runhouse/resources/hardware/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,30 +188,33 @@ def _do_setup_step_for_node(cluster, setup_step, node, env_vars):
cluster.install_packages(
setup_step.kwargs.get("reqs"),
conda_env_name=setup_step.kwargs.get("conda_env_name"),
venv_path=setup_step.kwargs.get("venv_path"),
node=node,
)
elif setup_step.step_type == ImageSetupStepType.PIP_INSTALL:
cluster.pip_install(
setup_step.kwargs.get("reqs"),
conda_env_name=setup_step.kwargs.get("conda_env_name"),
venv_path=setup_step.kwargs.get("venv_path"),
node=node,
)
elif setup_step.step_type == ImageSetupStepType.UV_INSTALL:
cluster.uv_install(
setup_step.kwargs.get("reqs"),
conda_env_name=setup_step.kwargs.get("conda_env_name"),
venv_path=setup_step.kwargs.get("venv_path"),
node=node,
)
elif setup_step.step_type == ImageSetupStepType.CONDA_INSTALL:
cluster.conda_install(
setup_step.kwargs.get("reqs"),
conda_env_name=setup_step.kwargs.get("conda_env_name"),
venv_path=setup_step.kwargs.get("venv_path"),
node=node,
)
elif setup_step.step_type == ImageSetupStepType.SYNC_PACKAGE:
cluster.sync_package(
setup_step.kwargs.get("package"),
conda_env_name=setup_step.kwargs.get("conda_env_name"),
node=node,
)
elif setup_step.step_type == ImageSetupStepType.CMD_RUN:
Expand All @@ -220,6 +223,7 @@ def _do_setup_step_for_node(cluster, setup_step, node, env_vars):
cluster=cluster,
env_vars=env_vars,
conda_env_name=setup_step.kwargs.get("conda_env_name"),
venv_path=setup_step.kwargs.get("venv_path"),
stream_logs=True,
node=node,
)
Expand All @@ -232,23 +236,6 @@ def _do_setup_step_for_node(cluster, setup_step, node, env_vars):
contents=setup_step.kwargs.get("contents"),
filter_options=setup_step.kwargs.get("filter_options"),
)
elif setup_step.step_type == ImageSetupStepType.PIP_INSTALL:
cluster.pip_install(
setup_step.kwargs.get("reqs"),
conda_env_name=setup_step.kwargs.get("conda_env_name"),
node=node,
)
elif setup_step.step_type == ImageSetupStepType.CONDA_INSTALL:
cluster.conda_install(
setup_step.kwargs.get("reqs"),
conda_env_name=setup_step.kwargs.get("conda_env_name"),
node=node,
)
elif setup_step.step_type == ImageSetupStepType.SYNC_PACKAGE:
cluster.sync_package(
setup_step.kwargs.get("package"),
node=node,
)


def _setup_creds_from_dict(ssh_creds: Dict, cluster_name: str):
Expand Down
Loading
Loading