Skip to content

Commit

Permalink
Expose yarn_launcher at top module level
Browse files Browse the repository at this point in the history
  • Loading branch information
jcuquemelle committed Nov 21, 2024
1 parent 919e4f5 commit 8e94e4b
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 11 deletions.
2 changes: 2 additions & 0 deletions cluster_pack/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,5 @@
PEX_PACKER,
get_pyenv_usage_from_archive
)

from cluster_pack.skein import skein_launcher as yarn_launcher
7 changes: 4 additions & 3 deletions examples/skein-project/skein_project/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
import tempfile

import cluster_pack
from cluster_pack.skein import skein_config_builder, skein_launcher
from cluster_pack import yarn_launcher
from cluster_pack.skein import skein_config_builder


if __name__ == "__main__":
Expand All @@ -28,8 +29,8 @@
spec = skein.ApplicationSpec(services={"service": service})
app_id = client.submit(spec)

skein_launcher.wait_for_finished(client, app_id)
logs = skein_launcher.get_application_logs(client, app_id, 2)
yarn_launcher.wait_for_finished(client, app_id)
logs = yarn_launcher.get_application_logs(client, app_id, 2)
if logs:
for key, value in logs.items():
print(f"skein logs:{key} {value}")
14 changes: 6 additions & 8 deletions tests/integration/test_skein_launcher.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
import getpass
import logging
import functools
import os
import pytest
import skein
import uuid
import tempfile

from cluster_pack.skein import skein_launcher
from cluster_pack import yarn_launcher
from cluster_pack import filesystem

pytestmark = pytest.mark.hadoop
Expand Down Expand Up @@ -36,13 +34,13 @@ def path_to_hdfs():
def _submit_and_await_app_master(func, assert_result_status=True, assert_log_content=None):
with skein.Client() as client:
log_output_path = f"hdfs:///tmp/{uuid.uuid4()}.log"
app_id = skein_launcher.submit_func(
app_id = yarn_launcher.submit_func(
client,
func=func,
args=[],
memory="2 GiB",
process_logs=functools.partial(skein_launcher.upload_logs_to_hdfs, log_output_path))
result = skein_launcher.wait_for_finished(client, app_id)
process_logs=functools.partial(yarn_launcher.upload_logs_to_hdfs, log_output_path))
result = yarn_launcher.wait_for_finished(client, app_id)

fs, _ = filesystem.resolve_filesystem_and_path(log_output_path)
with fs.open(log_output_path, "rb") as f:
Expand Down Expand Up @@ -78,8 +76,8 @@ def launch_skein():
)
spec = skein.ApplicationSpec(services={"service": service})
app_id = client.submit(spec)
skein_launcher.wait_for_finished(client, app_id)
logs = skein_launcher.get_application_logs(client, app_id, 2)
yarn_launcher.wait_for_finished(client, app_id)
logs = yarn_launcher.get_application_logs(client, app_id, 2)
for key, value in logs.items():
print(f"skein logs:{key} {value}")

Expand Down

0 comments on commit 8e94e4b

Please sign in to comment.