diff --git a/cluster_pack/__init__.py b/cluster_pack/__init__.py index f73211c..5dda644 100644 --- a/cluster_pack/__init__.py +++ b/cluster_pack/__init__.py @@ -15,3 +15,5 @@ PEX_PACKER, get_pyenv_usage_from_archive ) + +from cluster_pack.skein import skein_launcher as yarn_launcher diff --git a/cluster_pack/skein/skein_config_builder.py b/cluster_pack/skein/skein_config_builder.py index 5ed7c7d..5c56a6e 100644 --- a/cluster_pack/skein/skein_config_builder.py +++ b/cluster_pack/skein/skein_config_builder.py @@ -20,7 +20,8 @@ def build_with_func( additional_files: Optional[List[str]] = None, tmp_dir: str = packaging._get_tmp_dir(), log_level: str = "INFO", - process_logs: Callable[[str], Any] = None + process_logs: Callable[[str], Any] = None, + allow_large_pex: bool = False ) -> SkeinConfig: """Build the skein config from provided a function @@ -35,6 +36,9 @@ def build_with_func( :param log_level: default remote log level :param process_logs: hook with the local log path as a parameter, can be used to uplaod the logs somewhere + :param allow_large_pex: Creates a non-executable pex that will need to be unzipped to circumvent + python's limitation with zips > 2Gb. The file will need to be unzipped + and the entry point will be /__main__.py :return: SkeinConfig """ function_name = f"function_{uuid.uuid4()}.dat" @@ -57,7 +61,7 @@ def build_with_func( package_path, additional_files, tmp_dir, - process_logs) + process_logs, allow_large_pex=allow_large_pex) def build( diff --git a/cluster_pack/skein/skein_launcher.py b/cluster_pack/skein/skein_launcher.py index b198734..5bfa151 100644 --- a/cluster_pack/skein/skein_launcher.py +++ b/cluster_pack/skein/skein_launcher.py @@ -86,7 +86,8 @@ def submit_func(skein_client: skein.Client, acquire_map_reduce_delegation_token: bool = False, pre_script_hook: Optional[str] = None, max_attempts: int = 1, max_restarts: int = 0, - process_logs: Callable[[str], Any] = None) -> str: + process_logs: Callable[[str], Any] = None, + allow_large_pex: bool = False) -> str: """Submit a function in a skein container :param skein_client: skein.Client to use @@ -111,6 +112,9 @@ def submit_func(skein_client: skein.Client, :param max_restarts: maximum number of restarts allowed for the service :param process_logs: hook with the local log path as a parameter, can be used to uplaod the logs somewhere + :param allow_large_pex: Creates a non-executable pex that will need to be unzipped to circumvent + python's limitation with zips > 2Gb. The file will need to be unzipped + and the entry point will be /__main__.py :return: SkeinConfig """ @@ -121,7 +125,8 @@ def submit_func(skein_client: skein.Client, package_path=package_path, additional_files=additional_files, tmp_dir=tmp_dir, - process_logs=process_logs) + process_logs=process_logs, + allow_large_pex=allow_large_pex) return _submit( skein_client, skein_config, @@ -218,17 +223,22 @@ def get_application_logs( wait_for_nb_logs: Optional[int] = None, log_tries: int = 15 ) -> Optional[skein.model.ApplicationLogs]: + nb_keys = 0 for ind in range(log_tries): try: logs = client.application_logs(app_id) nb_keys = len(logs.keys()) logger.info(f"Got {nb_keys}/{wait_for_nb_logs} log files") - if not wait_for_nb_logs or nb_keys == wait_for_nb_logs: + if not wait_for_nb_logs or nb_keys >= wait_for_nb_logs: return logs except Exception: logger.warning( f"Cannot collect logs (attempt {ind+1}/{log_tries})") time.sleep(3) + if nb_keys >= 1: + logger.warning( + f"Only {nb_keys} logs retrieved instead of {wait_for_nb_logs} requested") + return logs return None diff --git a/examples/skein-project/skein_project/client.py b/examples/skein-project/skein_project/client.py index d737df2..8fead61 100644 --- a/examples/skein-project/skein_project/client.py +++ b/examples/skein-project/skein_project/client.py @@ -3,7 +3,8 @@ import tempfile import cluster_pack -from cluster_pack.skein import skein_config_builder, skein_launcher +from cluster_pack import yarn_launcher +from cluster_pack.skein import skein_config_builder if __name__ == "__main__": @@ -28,8 +29,8 @@ spec = skein.ApplicationSpec(services={"service": service}) app_id = client.submit(spec) - skein_launcher.wait_for_finished(client, app_id) - logs = skein_launcher.get_application_logs(client, app_id, 2) + yarn_launcher.wait_for_finished(client, app_id) + logs = yarn_launcher.get_application_logs(client, app_id, 2) if logs: for key, value in logs.items(): print(f"skein logs:{key} {value}") diff --git a/requirements.txt b/requirements.txt index a7a641a..c77bbe6 100644 --- a/requirements.txt +++ b/requirements.txt @@ -7,3 +7,4 @@ fire types-setuptools wheel-filename fsspec +skein \ No newline at end of file diff --git a/tests-requirements.txt b/tests-requirements.txt index 3390413..75cd3e0 100644 --- a/tests-requirements.txt +++ b/tests-requirements.txt @@ -1,6 +1,5 @@ pypandoc<1.8 pyspark -skein pytest pyflakes==2.4.0 pylama diff --git a/tests/integration/test_skein_launcher.py b/tests/integration/test_skein_launcher.py index 8dcf12f..eb539c1 100644 --- a/tests/integration/test_skein_launcher.py +++ b/tests/integration/test_skein_launcher.py @@ -1,13 +1,11 @@ -import getpass import logging import functools -import os import pytest import skein import uuid import tempfile -from cluster_pack.skein import skein_launcher +from cluster_pack import yarn_launcher from cluster_pack import filesystem pytestmark = pytest.mark.hadoop @@ -36,13 +34,13 @@ def path_to_hdfs(): def _submit_and_await_app_master(func, assert_result_status=True, assert_log_content=None): with skein.Client() as client: log_output_path = f"hdfs:///tmp/{uuid.uuid4()}.log" - app_id = skein_launcher.submit_func( + app_id = yarn_launcher.submit_func( client, func=func, args=[], memory="2 GiB", - process_logs=functools.partial(skein_launcher.upload_logs_to_hdfs, log_output_path)) - result = skein_launcher.wait_for_finished(client, app_id) + process_logs=functools.partial(yarn_launcher.upload_logs_to_hdfs, log_output_path)) + result = yarn_launcher.wait_for_finished(client, app_id) fs, _ = filesystem.resolve_filesystem_and_path(log_output_path) with fs.open(log_output_path, "rb") as f: @@ -78,8 +76,8 @@ def launch_skein(): ) spec = skein.ApplicationSpec(services={"service": service}) app_id = client.submit(spec) - skein_launcher.wait_for_finished(client, app_id) - logs = skein_launcher.get_application_logs(client, app_id, 2) + yarn_launcher.wait_for_finished(client, app_id) + logs = yarn_launcher.get_application_logs(client, app_id, 2) for key, value in logs.items(): print(f"skein logs:{key} {value}")