Skip to content

Commit

Permalink
Handle the large_pex case when running from an unzipped pex
Browse files Browse the repository at this point in the history
This happens when we run python code from a large zipped pex, and the code
that is run will itself try and rebuild a large (zipped) pex (e.g. by
launching a spark job that will call `cluster_pack.upload_env`)

The detect_archive_name function must be aware that it is currently running
from an unzipped pex in order to correctly retrieve the original zipped pex
name
  • Loading branch information
jcuquemelle committed Mar 12, 2024
1 parent 6f2270f commit 631e783
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 18 deletions.
29 changes: 25 additions & 4 deletions cluster_pack/packaging.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import json
import logging
import os
import glob
import pathlib
import shutil
import subprocess
Expand Down Expand Up @@ -368,19 +369,40 @@ def detect_archive_names(
pex_file = ""
env_name = packer.env_name()

def build_package_path(name: str = env_name,
extension: Optional[str] = packer.extension()) -> str:
path = (f"{get_default_fs()}/user/{getpass.getuser()}"
f"/envs/{name}")
if extension is None:
return path
return f"{path}.{extension}"

if not package_path:
package_path = (f"{get_default_fs()}/user/{getpass.getuser()}"
f"/envs/{env_name}.{packer.extension()}")
package_path = build_package_path()
else:
if "".join(os.path.splitext(package_path)[1]) != f".{packer.extension()}":
raise ValueError(f"{package_path} has the wrong extension"
f", .{packer.extension()} is expected")

# we are actually building or reusing a large pex and we have the information from the
# allow_large_pex flag
if (packer.extension() == PEX_PACKER.extension()
and allow_large_pex
and not package_path.endswith('.zip')):
package_path += '.zip'

# We are running from an unzipped large pex and we have the information because `pex_file` is
# not empty, and it is a directory instead of a zipapp

if (pex_file != ""
and os.path.isdir(pex_file)
and not package_path.endswith('.zip')):

pex_files = glob.glob(f"{os.path.dirname(pex_file)}/*.pex.zip")
assert len(pex_files) == 1, \
f"Expected to find single zipped PEX in same dir as {pex_file}, got {pex_files}"
package_path = build_package_path(os.path.basename(pex_files[0]), None)

return package_path, env_name, pex_file


Expand Down Expand Up @@ -430,7 +452,7 @@ def get_current_pex_filepath() -> str:

def get_editable_requirements(
executable: str = sys.executable,
editable_packages_dir: str = os.getcwd()
editable_packages_dir: str = os.getcwd() # only overridden for tests
) -> Dict[str, str]:
editable_requirements: Dict[str, str] = {}
if _running_from_pex():
Expand All @@ -457,7 +479,6 @@ def get_editable_requirements(


def get_pyenv_usage_from_archive(path_to_archive: str) -> PythonEnvDescription:

archive_filename = os.path.basename(path_to_archive)

if archive_filename.endswith('.pex.zip'):
Expand Down
72 changes: 58 additions & 14 deletions tests/test_packaging.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,24 @@ def test_get_current_pex_filepath():
)


def test_get_current_pex_filepath():
with tempfile.TemporaryDirectory() as tempdir:
path_to_pex = f"{tempdir}/out.pex"
packaging.pack_in_pex(
["numpy"],
path_to_pex,
# make isolated pex from current pytest virtual env
pex_inherit_path="false",
allow_large_pex=True)
assert os.path.exists(path_to_pex)
subprocess.check_output([
path_to_pex,
"-c",
("""import cluster_pack;"""
"""assert "PEX" in os.environ;""")]
)


def test_get_editable_requirements():
with mock.patch(f"{MODULE_TO_TEST}._running_from_pex") as mock_running_from_pex:
mock_running_from_pex.return_value = True
Expand Down Expand Up @@ -262,7 +280,15 @@ def test_pack_in_pex_with_include_tools():
))


def test_pack_in_pex_with_large_correctly_retrieves_zip_archive():
@pytest.mark.parametrize(
"is_large_pex,package_path",
[
(True, "hdfs://dummy/path/env.pex"),
(None, "hdfs://dummy/path/env.pex"),
(None, None)
]
)
def test_pack_in_pex_with_large_correctly_retrieves_zip_archive(is_large_pex, package_path):
with tempfile.TemporaryDirectory() as tempdir:
current_packages = packaging.get_non_editable_requirements(sys.executable)
reqs = uploader._build_reqs_from_venv({}, current_packages, [])
Expand All @@ -275,18 +301,27 @@ def test_pack_in_pex_with_large_correctly_retrieves_zip_archive():
shutil.unpack_archive(local_package_path, unzipped_pex_path)
st = os.stat(f"{unzipped_pex_path}/__main__.py")
os.chmod(f"{unzipped_pex_path}/__main__.py", st.st_mode | stat.S_IEXEC)
package_argument_as_string = "None" if package_path is None else f"'{package_path}'"
expected_package_path = (
f"hdfs:///user/{getpass.getuser()}/envs/{os.path.basename(unzipped_pex_path)}.zip"
if is_large_pex is None else f"{package_path}.zip"
)
with does_not_raise():
print(subprocess.check_output([
f"{unzipped_pex_path}/__main__.py",
"-c",
("""print("Start importing cluster-pack..");"""
"""from cluster_pack import packaging;"""
"""from unittest import mock;"""
"""packer = packaging.detect_packer_from_env();"""
"""package_path = "hdfs/dummy/path/env.pex";"""
"""allow_large_pex=True;"""
"""packaging.get_default_fs = mock.Mock(return_value='hdfs://');"""
f"""package_path={package_argument_as_string};"""
f"""allow_large_pex={is_large_pex};"""
"""package_path, env_name, pex_file = \
packaging.detect_archive_names(packer, package_path, allow_large_pex);"""
"""assert(package_path == "hdfs/dummy/path/env.pex.zip");"""
"""print(f'package_path: {package_path}');"""
"""print(f'pex_file: {pex_file}');"""
f"""assert(package_path == "{expected_package_path}");"""
"""assert(pex_file.endswith('.pex'));"""
)]
))
Expand Down Expand Up @@ -399,20 +434,23 @@ def test_gen_pyenvs_from_unknown_format():


archive_test_data = [
(False, "dummy/path/exe.pex", False, "dummy/path/exe.pex"),
(False, "dummy/path/exe.pex", True, "dummy/path/exe.pex.zip"),
(True, "dummy/path/exe.pex", False, "dummy/path/exe.pex"),
(True, "dummy/path/exe.pex", True, "dummy/path/exe.pex.zip"),
(False, None, False, f"hdfs:///user/{getpass.getuser()}/envs/venv_exe.pex"),
(False, None, True, f"hdfs:///user/{getpass.getuser()}/envs/venv_exe.pex.zip"),
(True, None, False, f"hdfs:///user/{getpass.getuser()}/envs/pex_exe.pex"),
(True, None, True, f"hdfs:///user/{getpass.getuser()}/envs/pex_exe.pex.zip"),
(False, "dummy/path/exe.pex", False, False, "dummy/path/exe.pex"),
(False, "dummy/path/exe.pex", True, False, "dummy/path/exe.pex.zip"),
(True, "dummy/path/exe.pex", False, False, "dummy/path/exe.pex"),
(True, "dummy/path/exe.pex", True, False, "dummy/path/exe.pex.zip"),
(False, None, False, False, f"hdfs:///user/{getpass.getuser()}/envs/venv_exe.pex"),
(False, None, None, False, f"hdfs:///user/{getpass.getuser()}/envs/venv_exe.pex"),
(False, None, True, False, f"hdfs:///user/{getpass.getuser()}/envs/venv_exe.pex.zip"),
(True, None, False, False, f"hdfs:///user/{getpass.getuser()}/envs/pex_exe.pex"),
(True, None, True, False, f"hdfs:///user/{getpass.getuser()}/envs/pex_exe.pex.zip"),
(True, None, None, False, f"hdfs:///user/{getpass.getuser()}/envs/pex_exe.pex"),
(True, None, None, True, f"hdfs:///user/{getpass.getuser()}/envs/pex_exe.pex.zip"),
]


@pytest.mark.parametrize(
"running_from_pex, package_path, allow_large_pex, expected", archive_test_data)
def test_detect_archive_names(running_from_pex, package_path, allow_large_pex, expected):
"running_from_pex, package_path, allow_large_pex, is_dir, expected", archive_test_data)
def test_detect_archive_names(running_from_pex, package_path, allow_large_pex, is_dir, expected):
with contextlib.ExitStack() as stack:
mock_running_from_pex = stack.enter_context(
mock.patch(f"{MODULE_TO_TEST}._running_from_pex"))
Expand All @@ -422,11 +460,17 @@ def test_detect_archive_names(running_from_pex, package_path, allow_large_pex, e
mock.patch(f"{MODULE_TO_TEST}.get_default_fs"))
mock_venv = stack.enter_context(
mock.patch(f"{MODULE_TO_TEST}.get_env_name"))
mock_is_dir = stack.enter_context(
mock.patch(f"os.path.isdir"))
mock_glob = stack.enter_context(
mock.patch(f"glob.glob"))

mock_running_from_pex.return_value = running_from_pex
mock_current_filepath.return_value = "pex_exe.pex"
mock_fs.return_value = "hdfs://"
mock_venv.return_value = "venv_exe"
mock_is_dir.return_value = is_dir
mock_glob.return_value = "pex_exe.pex.zip"
actual, _, _ = packaging.detect_archive_names(
packaging.PEX_PACKER, package_path, allow_large_pex)
assert actual == expected

0 comments on commit 631e783

Please sign in to comment.