diff --git a/flytekit/core/base_task.py b/flytekit/core/base_task.py index 85b57d6f1b..190ade6cd7 100644 --- a/flytekit/core/base_task.py +++ b/flytekit/core/base_task.py @@ -802,21 +802,19 @@ def execute(self, **kwargs) -> Any: """ pass - def remote(self, version: Optional[str] = None, options: Optional[Options] = None, **kwargs) -> FlyteFuture: + def remote(self, options: Optional[Options] = None, **kwargs) -> FlyteFuture: """ This method will be invoked to execute the task remotely. This will return a FlyteFuture object that can be used to track the progress of the task execution. This method should be executed after specifying the remote configuration via `flytekit.remote.init_remote()`. - :param version: an optional version string to fetch or register the task. If not specified, it will randomly - generate a version string. :param options: an optional options that can be used to override the default options of the task. If not specified, the default options provided by `init_remote()` will be used. :param kwargs: Dict[str, Any] the inputs to the task. The inputs should match the signature of the task. :return: FlyteFuture """ - return FlyteFuture(self, version=version, options=options, **kwargs) + return FlyteFuture(self, options=options, **kwargs) def post_execute(self, user_params: Optional[ExecutionParameters], rval: Any) -> Any: """ diff --git a/flytekit/core/future.py b/flytekit/core/future.py index 7e1693573d..5793ae214d 100644 --- a/flytekit/core/future.py +++ b/flytekit/core/future.py @@ -9,10 +9,14 @@ from datetime import timedelta from typing import TYPE_CHECKING +import click import cloudpickle if TYPE_CHECKING: + from IPython.display import IFrame + from flytekit.core.base_task import PythonTask + from flytekit.core.type_engine import LiteralsResolver from flytekit.core.workflow import WorkflowBase from flytekit.remote.executions import FlyteWorkflowExecution from flytekit.tools.translator import Options @@ -22,7 +26,6 @@ class FlyteFuture: def __init__( self, entity: typing.Union[PythonTask, WorkflowBase], - version: typing.Optional[str] = None, options: typing.Optional[Options] = None, **kwargs, ): @@ -31,6 +34,7 @@ def __init__( This object requires the FlyteRemote client to be initialized before it can be used. The FlyteRemote client can be initialized by calling `flytekit.remote.init_remote()`. """ + from flytekit.core.base_task import PythonTask from flytekit.remote.init_remote import REMOTE_DEFAULT_OPTIONS, REMOTE_ENTRY from flytekit.tools.script_mode import hash_file @@ -40,23 +44,30 @@ def __init__( ) self._remote_entry = REMOTE_ENTRY - if version is None: - with tempfile.TemporaryDirectory() as tmp_dir: - dest = pathlib.Path(tmp_dir, "pkl.gz") - with gzip.GzipFile(filename=dest, mode="wb", mtime=0) as gzipped: - cloudpickle.dump(entity, gzipped) - md5_bytes, _, _ = hash_file(dest) + with tempfile.TemporaryDirectory() as tmp_dir: + dest = pathlib.Path(tmp_dir, "pkl.gz") + with gzip.GzipFile(filename=dest, mode="wb", mtime=0) as gzipped: + cloudpickle.dump(entity, gzipped) + md5_bytes, _, _ = hash_file(dest) - h = hashlib.md5(md5_bytes) - version = base64.urlsafe_b64encode(h.digest()).decode("ascii").rstrip("=") + h = hashlib.md5(md5_bytes) + version = base64.urlsafe_b64encode(h.digest()).decode("ascii").rstrip("=") if options is None: options = REMOTE_DEFAULT_OPTIONS - self._version = version self._exe = self._remote_entry.execute(entity, version=version, inputs=kwargs, options=options) + self._is_task = isinstance(entity, PythonTask) + console_url = self._remote_entry.generate_console_url(self._exe) + s = ( + click.style("\n[✔] ", fg="green") + + "Go to " + + click.style(console_url, fg="cyan") + + " to see execution in the console." + ) + click.echo(s) - def wait( + def __wait( self, timeout: typing.Optional[timedelta] = None, poll_interval: typing.Optional[timedelta] = None, @@ -75,12 +86,42 @@ def wait( sync_nodes=sync_nodes, ) - @property - def version(self) -> str: - """The version of the task or workflow being executed.""" - return self._version + def get( + self, + timeout: typing.Optional[timedelta] = None, + ) -> typing.Optional[LiteralsResolver]: + """Wait for the execution to complete and return the output. + + :param timeout: maximum amount of time to wait + """ + out = self.__wait( + timeout=timeout, + poll_interval=timedelta(seconds=5), + ) + return out.outputs + + def get_deck( + self, + timeout: typing.Optional[timedelta] = None, + ) -> typing.Optional[IFrame]: + """Wait for the execution to complete and return the deck for the task. + + :param timeout: maximum amount of time to wait + """ + if not self._is_task: + raise ValueError("Deck can only be retrieved for task executions.") + from IPython.display import IFrame + + self.__wait( + timeout=timeout, + poll_interval=timedelta(seconds=5), + ) + for node_execution in self._exe.node_executions.values(): + uri = node_execution._closure.deck_uri + if uri: + break + if uri == "": + raise ValueError("Deck not found for task execution") - @property - def exe(self) -> FlyteWorkflowExecution: - """The executing FlyteWorkflowExecution object.""" - return self._exe + deck_uri = self._remote_entry.client.get_download_signed_url(uri) + return IFrame(src=deck_uri.signed_url, width=800, height=600) diff --git a/flytekit/core/workflow.py b/flytekit/core/workflow.py index 9656ead3c1..b90dba0a48 100644 --- a/flytekit/core/workflow.py +++ b/flytekit/core/workflow.py @@ -306,21 +306,19 @@ def __call__(self, *args, **kwargs) -> Union[Tuple[Promise], Promise, VoidPromis def execute(self, **kwargs): raise Exception("Should not be called") - def remote(self, version: Optional[str] = None, options: Optional[Options] = None, **kwargs) -> FlyteFuture: + def remote(self, options: Optional[Options] = None, **kwargs) -> FlyteFuture: """ This method will be invoked to execute the workflow remotely. This will return a FlyteFuture object that can be used to track the progress of the workflow execution. This method should be executed after specifying the remote configuration via `flytekit.remote.init_remote()`. - :param version: an optional version string to fetch or register the workflow. If not specified, it will randomly - generate a version string. :param options: an optional options that can be used to override the default options of the workflow. If not specified, the default options provided by `init_remote()` will be used. :param kwargs: Dict[str, Any] the inputs to the workflow. The inputs should match the signature of the workflow. :return: FlyteFuture """ - return FlyteFuture(self, version=version, options=options, **kwargs) + return FlyteFuture(self, options=options, **kwargs) def compile(self, **kwargs): pass diff --git a/flytekit/remote/init_remote.py b/flytekit/remote/init_remote.py index 47c8dba479..713d9a4132 100644 --- a/flytekit/remote/init_remote.py +++ b/flytekit/remote/init_remote.py @@ -37,19 +37,16 @@ def init_remote( """ global REMOTE_ENTRY, REMOTE_DEFAULT_OPTIONS with REMOTE_ENTRY_LOCK: - if REMOTE_ENTRY is None: - REMOTE_ENTRY = FlyteRemote( - config=config, - default_project=default_project, - default_domain=default_domain, - data_upload_location=data_upload_location, - interactive_mode_enabled=interactive_mode_enabled, - **kwargs, - ) - # TODO: This should be merged into the FlyteRemote in the future - REMOTE_DEFAULT_OPTIONS = default_options - else: - raise AssertionError("Remote client already initialized") + REMOTE_ENTRY = FlyteRemote( + config=config, + default_project=default_project, + default_domain=default_domain, + data_upload_location=data_upload_location, + interactive_mode_enabled=interactive_mode_enabled, + **kwargs, + ) + # TODO: This should be merged into the FlyteRemote in the future + REMOTE_DEFAULT_OPTIONS = default_options # Set the log level log_level = get_level_from_cli_verbosity(verbosity) diff --git a/flytekit/remote/remote.py b/flytekit/remote/remote.py index d7a1f51194..d6611883cb 100644 --- a/flytekit/remote/remote.py +++ b/flytekit/remote/remote.py @@ -210,7 +210,7 @@ def __init__( :param default_domain: default domain to use when fetching or executing flyte entities. :param data_upload_location: this is where all the default data will be uploaded when providing inputs. The default location - `s3://my-s3-bucket/data` works for sandbox/demo environment. Please override this for non-sandbox cases. - :param interactive_mode_enabled: If set to True, the FlyteRemote will pickle the task/workflow objects if not found + :param interactive_mode_enabled: If set to True, the FlyteRemote will pickle the task/workflow. """ if config is None or config.platform is None or config.platform.endpoint is None: raise user_exceptions.FlyteAssertion("Flyte endpoint should be provided.") @@ -270,7 +270,7 @@ def file_access(self) -> FileAccessProvider: @property def interactive_mode_enabled(self) -> bool: - """If set to True, the FlyteRemote will pickle the task/workflow objects if not found.""" + """If set to True, the FlyteRemote will pickle the task/workflow.""" return self._interactive_mode_enabled def get( diff --git a/flytekit/tools/translator.py b/flytekit/tools/translator.py index af70554d4f..fdb2ee74f5 100644 --- a/flytekit/tools/translator.py +++ b/flytekit/tools/translator.py @@ -163,16 +163,13 @@ def get_command_prefix_for_fast_execute(settings: SerializationSettings) -> List if settings.fast_serialization_settings and settings.fast_serialization_settings.distribution_location else "{{ .remote_package_path }}" ), - "--dest-dir", - ( - settings.fast_serialization_settings.destination_dir - if settings.fast_serialization_settings and settings.fast_serialization_settings.destination_dir - else "{{ .dest_dir }}" - ), ] - # If pickling is enabled, we will add a pickled bit if settings.fast_serialization_settings and settings.fast_serialization_settings.pickled: prefix = prefix + ["--pickled"] + elif settings.fast_serialization_settings and settings.fast_serialization_settings.destination_dir: + prefix = prefix + ["--dest-dir", settings.fast_serialization_settings.destination_dir] + else: + prefix = prefix + ["--dest-dir", "{{ .dest_dir }}"] return prefix + ["--"] diff --git a/tests/flytekit/integration/remote/test_remote.py b/tests/flytekit/integration/remote/test_remote.py index 810f1fc514..760d1df154 100644 --- a/tests/flytekit/integration/remote/test_remote.py +++ b/tests/flytekit/integration/remote/test_remote.py @@ -586,54 +586,19 @@ def test_flyteremote_uploads_large_file(gigabytes): def test_workflow_remote_func(mock_ipython_check): """Test the logic of the remote execution of workflows and tasks.""" mock_ipython_check.return_value = True - with pytest.raises(AssertionError): - init_remote(Config.auto(config_file=CONFIG), PROJECT, DOMAIN, interactive_mode_enabled=True) from .workflows.basic.child_workflow import parent_wf, double # child_workflow.parent_wf asynchronously register a parent wf1 with child lp from another wf2. future0 = double.remote(a=3) future1 = parent_wf.remote(a=3) future2 = parent_wf.remote(a=2) - assert future0.version != VERSION - assert future1.version != VERSION - assert future2.version != VERSION - # It should generate a new version for each execution - assert future1.version != future2.version - out0 = future0.wait() - assert out0.outputs["o0"] == 6 - out1 = future1.wait() - assert out1.outputs["o0"] == 18 - out2 = future2.wait() - assert out2.outputs["o0"] == 12 - - -def test_fetch_python_task_remote_func(register): - """Test remote execution of a @task-decorated python function that is already registered.""" - with patch("flytekit.tools.interactive.ipython_check") as mock_ipython_check: - mock_ipython_check.return_value = True - - from .workflows.basic.basic_workflow import t1 - - future = t1.remote(a=10, version=VERSION) - out = future.wait() - assert future.version == VERSION - - assert out.outputs["t1_int_output"] == 12 - assert out.outputs["c"] == "world" - - -@pytest.mark.skip(reason="Waiting for supporting the `name` parameter in the remote function") -def test_fetch_python_workflow_remote_func(register): - """Test remote execution of a @workflow-decorated python function that is already registered.""" - with patch("flytekit.tools.interactive.ipython_check") as mock_ipython_check: - mock_ipython_check.return_value = True - from .workflows.basic.basic_workflow import my_basic_wf - - future = my_basic_wf.remote(a=10, b="xyz", version=VERSION) - out = future.wait() - assert out.outputs["o0"] == 12 - assert out.outputs["o1"] == "xyzworld" + out0 = future0.get() + assert out0["o0"] == 6 + out1 = future1.get() + assert out1["o0"] == 18 + out2 = future2.get() + assert out2["o0"] == 12 @mock.patch("flytekit.tools.interactive.ipython_check") @@ -644,8 +609,8 @@ def test_execute_task_remote_func_list_of_floats(mock_ipython_check): xs: typing.List[float] = [0.1, 0.2, 0.3, 0.4, -99999.7] future = concat_list.remote(xs=xs) - out = future.wait() - assert out.outputs["o0"] == "[0.1, 0.2, 0.3, 0.4, -99999.7]" + out = future.get() + assert out["o0"] == "[0.1, 0.2, 0.3, 0.4, -99999.7]" @mock.patch("flytekit.tools.interactive.ipython_check") @@ -656,8 +621,8 @@ def test_execute_task_remote_func_convert_dict(mock_ipython_check): d: typing.Dict[str, str] = {"key1": "value1", "key2": "value2"} future = convert_to_string.remote(d=d) - out = future.wait() - assert json.loads(out.outputs["o0"]) == {"key1": "value1", "key2": "value2"} + out = future.get() + assert json.loads(out["o0"]) == {"key1": "value1", "key2": "value2"} @mock.patch("flytekit.tools.interactive.ipython_check") @@ -668,8 +633,8 @@ def test_execute_python_workflow_remote_func_dict_of_string_to_string(mock_ipyth d: typing.Dict[str, str] = {"k1": "v1", "k2": "v2"} future = my_dict_str_wf.remote(d=d) - out = future.wait() - assert json.loads(out.outputs["o0"]) == {"k1": "v1", "k2": "v2"} + out = future.get() + assert json.loads(out["o0"]) == {"k1": "v1", "k2": "v2"} @mock.patch("flytekit.tools.interactive.ipython_check") @@ -681,8 +646,8 @@ def test_execute_python_workflow_remote_func_list_of_floats(mock_ipython_check): xs: typing.List[float] = [42.24, 999.1, 0.0001] future = my_list_float_wf.remote(xs=xs) - out = future.wait() - assert out.outputs["o0"] == "[42.24, 999.1, 0.0001]" + out = future.get() + assert out["o0"] == "[42.24, 999.1, 0.0001]" @mock.patch("flytekit.tools.interactive.ipython_check") def test_execute_workflow_remote_fn_with_maptask(mock_ipython_check): @@ -692,5 +657,5 @@ def test_execute_workflow_remote_fn_with_maptask(mock_ipython_check): d: typing.List[int] = [1, 2, 3] future = workflow_with_maptask.remote(data=d, y=3) - out = future.wait() - assert out.outputs["o0"] == [4, 5, 6] + out = future.get() + assert out["o0"] == [4, 5, 6]