Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Miscellanous improvements #129

Merged
merged 4 commits into from
Nov 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cluster_pack/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,5 @@
PEX_PACKER,
get_pyenv_usage_from_archive
)

from cluster_pack.skein import skein_launcher as yarn_launcher
8 changes: 6 additions & 2 deletions cluster_pack/skein/skein_config_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ def build_with_func(
additional_files: Optional[List[str]] = None,
tmp_dir: str = packaging._get_tmp_dir(),
log_level: str = "INFO",
process_logs: Callable[[str], Any] = None
process_logs: Callable[[str], Any] = None,
allow_large_pex: bool = False
) -> SkeinConfig:
"""Build the skein config from provided a function

Expand All @@ -35,6 +36,9 @@ def build_with_func(
:param log_level: default remote log level
:param process_logs: hook with the local log path as a parameter,
can be used to uplaod the logs somewhere
:param allow_large_pex: Creates a non-executable pex that will need to be unzipped to circumvent
python's limitation with zips > 2Gb. The file will need to be unzipped
and the entry point will be <output>/__main__.py
:return: SkeinConfig
"""
function_name = f"function_{uuid.uuid4()}.dat"
Expand All @@ -57,7 +61,7 @@ def build_with_func(
package_path,
additional_files,
tmp_dir,
process_logs)
process_logs, allow_large_pex=allow_large_pex)


def build(
Expand Down
16 changes: 13 additions & 3 deletions cluster_pack/skein/skein_launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ def submit_func(skein_client: skein.Client,
acquire_map_reduce_delegation_token: bool = False,
pre_script_hook: Optional[str] = None,
max_attempts: int = 1, max_restarts: int = 0,
process_logs: Callable[[str], Any] = None) -> str:
process_logs: Callable[[str], Any] = None,
allow_large_pex: bool = False) -> str:
"""Submit a function in a skein container

:param skein_client: skein.Client to use
Expand All @@ -111,6 +112,9 @@ def submit_func(skein_client: skein.Client,
:param max_restarts: maximum number of restarts allowed for the service
:param process_logs: hook with the local log path as a parameter,
can be used to uplaod the logs somewhere
:param allow_large_pex: Creates a non-executable pex that will need to be unzipped to circumvent
python's limitation with zips > 2Gb. The file will need to be unzipped
and the entry point will be <output>/__main__.py
:return: SkeinConfig
"""

Expand All @@ -121,7 +125,8 @@ def submit_func(skein_client: skein.Client,
package_path=package_path,
additional_files=additional_files,
tmp_dir=tmp_dir,
process_logs=process_logs)
process_logs=process_logs,
allow_large_pex=allow_large_pex)

return _submit(
skein_client, skein_config,
Expand Down Expand Up @@ -218,17 +223,22 @@ def get_application_logs(
wait_for_nb_logs: Optional[int] = None,
log_tries: int = 15
) -> Optional[skein.model.ApplicationLogs]:
nb_keys = 0
for ind in range(log_tries):
try:
logs = client.application_logs(app_id)
nb_keys = len(logs.keys())
logger.info(f"Got {nb_keys}/{wait_for_nb_logs} log files")
if not wait_for_nb_logs or nb_keys == wait_for_nb_logs:
if not wait_for_nb_logs or nb_keys >= wait_for_nb_logs:
return logs
except Exception:
logger.warning(
f"Cannot collect logs (attempt {ind+1}/{log_tries})")
time.sleep(3)
if nb_keys >= 1:
logger.warning(
f"Only {nb_keys} logs retrieved instead of {wait_for_nb_logs} requested")
return logs
return None


Expand Down
7 changes: 4 additions & 3 deletions examples/skein-project/skein_project/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
import tempfile

import cluster_pack
from cluster_pack.skein import skein_config_builder, skein_launcher
from cluster_pack import yarn_launcher
from cluster_pack.skein import skein_config_builder


if __name__ == "__main__":
Expand All @@ -28,8 +29,8 @@
spec = skein.ApplicationSpec(services={"service": service})
app_id = client.submit(spec)

skein_launcher.wait_for_finished(client, app_id)
logs = skein_launcher.get_application_logs(client, app_id, 2)
yarn_launcher.wait_for_finished(client, app_id)
logs = yarn_launcher.get_application_logs(client, app_id, 2)
if logs:
for key, value in logs.items():
print(f"skein logs:{key} {value}")
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@ fire
types-setuptools
wheel-filename
fsspec
skein
1 change: 0 additions & 1 deletion tests-requirements.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
pypandoc<1.8
pyspark
skein
pytest
pyflakes==2.4.0
pylama
Expand Down
14 changes: 6 additions & 8 deletions tests/integration/test_skein_launcher.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
import getpass
import logging
import functools
import os
import pytest
import skein
import uuid
import tempfile

from cluster_pack.skein import skein_launcher
from cluster_pack import yarn_launcher
from cluster_pack import filesystem

pytestmark = pytest.mark.hadoop
Expand Down Expand Up @@ -36,13 +34,13 @@ def path_to_hdfs():
def _submit_and_await_app_master(func, assert_result_status=True, assert_log_content=None):
with skein.Client() as client:
log_output_path = f"hdfs:///tmp/{uuid.uuid4()}.log"
app_id = skein_launcher.submit_func(
app_id = yarn_launcher.submit_func(
client,
func=func,
args=[],
memory="2 GiB",
process_logs=functools.partial(skein_launcher.upload_logs_to_hdfs, log_output_path))
result = skein_launcher.wait_for_finished(client, app_id)
process_logs=functools.partial(yarn_launcher.upload_logs_to_hdfs, log_output_path))
result = yarn_launcher.wait_for_finished(client, app_id)

fs, _ = filesystem.resolve_filesystem_and_path(log_output_path)
with fs.open(log_output_path, "rb") as f:
Expand Down Expand Up @@ -78,8 +76,8 @@ def launch_skein():
)
spec = skein.ApplicationSpec(services={"service": service})
app_id = client.submit(spec)
skein_launcher.wait_for_finished(client, app_id)
logs = skein_launcher.get_application_logs(client, app_id, 2)
yarn_launcher.wait_for_finished(client, app_id)
logs = yarn_launcher.get_application_logs(client, app_id, 2)
for key, value in logs.items():
print(f"skein logs:{key} {value}")

Expand Down
Loading