Skip to content

Commit

Permalink
update integration-test & fix deck
Browse files Browse the repository at this point in the history
Signed-off-by: Mecoli1219 <[email protected]>
  • Loading branch information
Mecoli1219 committed Aug 14, 2024
1 parent 904350d commit ee69ca5
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 62 deletions.
18 changes: 16 additions & 2 deletions flytekit/core/future.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from datetime import timedelta
from typing import TYPE_CHECKING

import click
import cloudpickle

if TYPE_CHECKING:
Expand Down Expand Up @@ -57,7 +58,14 @@ def __init__(

self._exe = self._remote_entry.execute(entity, version=version, inputs=kwargs, options=options)
self._is_task = isinstance(entity, PythonTask)
self._entity = entity
console_url = self._remote_entry.generate_console_url(self._exe)
s = (
click.style("\n[✔] ", fg="green")
+ "Go to "
+ click.style(console_url, fg="cyan")
+ " to see execution in the console."
)
click.echo(s)

def __wait(
self,
Expand Down Expand Up @@ -108,6 +116,12 @@ def get_deck(
timeout=timeout,
poll_interval=timedelta(seconds=5),
)
uri = self._exe._node_executions["pcaplot"]._closure.deck_uri
for node_execution in self._exe.node_executions.values():
uri = node_execution._closure.deck_uri
if uri:
break
if uri == "":
raise ValueError("Deck not found for task execution")

deck_uri = self._remote_entry.client.get_download_signed_url(uri)
return IFrame(src=deck_uri.signed_url, width=800, height=600)
4 changes: 2 additions & 2 deletions flytekit/remote/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ def __init__(
:param default_domain: default domain to use when fetching or executing flyte entities.
:param data_upload_location: this is where all the default data will be uploaded when providing inputs.
The default location - `s3://my-s3-bucket/data` works for sandbox/demo environment. Please override this for non-sandbox cases.
:param interactive_mode_enabled: If set to True, the FlyteRemote will pickle the task/workflow objects if not found
:param interactive_mode_enabled: If set to True, the FlyteRemote will pickle the task/workflow.
"""
if config is None or config.platform is None or config.platform.endpoint is None:
raise user_exceptions.FlyteAssertion("Flyte endpoint should be provided.")
Expand Down Expand Up @@ -270,7 +270,7 @@ def file_access(self) -> FileAccessProvider:

@property
def interactive_mode_enabled(self) -> bool:
"""If set to True, the FlyteRemote will pickle the task/workflow objects if not found."""
"""If set to True, the FlyteRemote will pickle the task/workflow."""
return self._interactive_mode_enabled

def get(
Expand Down
11 changes: 4 additions & 7 deletions flytekit/tools/translator.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,16 +163,13 @@ def get_command_prefix_for_fast_execute(settings: SerializationSettings) -> List
if settings.fast_serialization_settings and settings.fast_serialization_settings.distribution_location
else "{{ .remote_package_path }}"
),
"--dest-dir",
(
settings.fast_serialization_settings.destination_dir
if settings.fast_serialization_settings and settings.fast_serialization_settings.destination_dir
else "{{ .dest_dir }}"
),
]
# If pickling is enabled, we will add a pickled bit
if settings.fast_serialization_settings and settings.fast_serialization_settings.pickled:
prefix = prefix + ["--pickled"]
elif settings.fast_serialization_settings and settings.fast_serialization_settings.destination_dir:
prefix = prefix + ["--dest-dir", settings.fast_serialization_settings.destination_dir]
else:
prefix = prefix + ["--dest-dir", "{{ .dest_dir }}"]

return prefix + ["--"]

Expand Down
67 changes: 16 additions & 51 deletions tests/flytekit/integration/remote/test_remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -586,54 +586,19 @@ def test_flyteremote_uploads_large_file(gigabytes):
def test_workflow_remote_func(mock_ipython_check):
"""Test the logic of the remote execution of workflows and tasks."""
mock_ipython_check.return_value = True
with pytest.raises(AssertionError):
init_remote(Config.auto(config_file=CONFIG), PROJECT, DOMAIN, interactive_mode_enabled=True)
from .workflows.basic.child_workflow import parent_wf, double

# child_workflow.parent_wf asynchronously register a parent wf1 with child lp from another wf2.
future0 = double.remote(a=3)
future1 = parent_wf.remote(a=3)
future2 = parent_wf.remote(a=2)
assert future0.version != VERSION
assert future1.version != VERSION
assert future2.version != VERSION
# It should generate a new version for each execution
assert future1.version != future2.version

out0 = future0.wait()
assert out0.outputs["o0"] == 6
out1 = future1.wait()
assert out1.outputs["o0"] == 18
out2 = future2.wait()
assert out2.outputs["o0"] == 12


def test_fetch_python_task_remote_func(register):
"""Test remote execution of a @task-decorated python function that is already registered."""
with patch("flytekit.tools.interactive.ipython_check") as mock_ipython_check:
mock_ipython_check.return_value = True

from .workflows.basic.basic_workflow import t1

future = t1.remote(a=10, version=VERSION)
out = future.wait()
assert future.version == VERSION

assert out.outputs["t1_int_output"] == 12
assert out.outputs["c"] == "world"


@pytest.mark.skip(reason="Waiting for supporting the `name` parameter in the remote function")
def test_fetch_python_workflow_remote_func(register):
"""Test remote execution of a @workflow-decorated python function that is already registered."""
with patch("flytekit.tools.interactive.ipython_check") as mock_ipython_check:
mock_ipython_check.return_value = True
from .workflows.basic.basic_workflow import my_basic_wf

future = my_basic_wf.remote(a=10, b="xyz", version=VERSION)
out = future.wait()
assert out.outputs["o0"] == 12
assert out.outputs["o1"] == "xyzworld"
out0 = future0.get()
assert out0["o0"] == 6
out1 = future1.get()
assert out1["o0"] == 18
out2 = future2.get()
assert out2["o0"] == 12


@mock.patch("flytekit.tools.interactive.ipython_check")
Expand All @@ -644,8 +609,8 @@ def test_execute_task_remote_func_list_of_floats(mock_ipython_check):

xs: typing.List[float] = [0.1, 0.2, 0.3, 0.4, -99999.7]
future = concat_list.remote(xs=xs)
out = future.wait()
assert out.outputs["o0"] == "[0.1, 0.2, 0.3, 0.4, -99999.7]"
out = future.get()
assert out["o0"] == "[0.1, 0.2, 0.3, 0.4, -99999.7]"


@mock.patch("flytekit.tools.interactive.ipython_check")
Expand All @@ -656,8 +621,8 @@ def test_execute_task_remote_func_convert_dict(mock_ipython_check):

d: typing.Dict[str, str] = {"key1": "value1", "key2": "value2"}
future = convert_to_string.remote(d=d)
out = future.wait()
assert json.loads(out.outputs["o0"]) == {"key1": "value1", "key2": "value2"}
out = future.get()
assert json.loads(out["o0"]) == {"key1": "value1", "key2": "value2"}


@mock.patch("flytekit.tools.interactive.ipython_check")
Expand All @@ -668,8 +633,8 @@ def test_execute_python_workflow_remote_func_dict_of_string_to_string(mock_ipyth

d: typing.Dict[str, str] = {"k1": "v1", "k2": "v2"}
future = my_dict_str_wf.remote(d=d)
out = future.wait()
assert json.loads(out.outputs["o0"]) == {"k1": "v1", "k2": "v2"}
out = future.get()
assert json.loads(out["o0"]) == {"k1": "v1", "k2": "v2"}


@mock.patch("flytekit.tools.interactive.ipython_check")
Expand All @@ -681,8 +646,8 @@ def test_execute_python_workflow_remote_func_list_of_floats(mock_ipython_check):

xs: typing.List[float] = [42.24, 999.1, 0.0001]
future = my_list_float_wf.remote(xs=xs)
out = future.wait()
assert out.outputs["o0"] == "[42.24, 999.1, 0.0001]"
out = future.get()
assert out["o0"] == "[42.24, 999.1, 0.0001]"

@mock.patch("flytekit.tools.interactive.ipython_check")
def test_execute_workflow_remote_fn_with_maptask(mock_ipython_check):
Expand All @@ -692,5 +657,5 @@ def test_execute_workflow_remote_fn_with_maptask(mock_ipython_check):

d: typing.List[int] = [1, 2, 3]
future = workflow_with_maptask.remote(data=d, y=3)
out = future.wait()
assert out.outputs["o0"] == [4, 5, 6]
out = future.get()
assert out["o0"] == [4, 5, 6]

0 comments on commit ee69ca5

Please sign in to comment.