From 95996671dd563f1366ae5f5b9c7e49fd874cac65 Mon Sep 17 00:00:00 2001 From: Mecoli1219 Date: Thu, 8 Aug 2024 15:08:46 +0800 Subject: [PATCH] enable verbosity & small update & update docstring Signed-off-by: Mecoli1219 --- flytekit/bin/entrypoint.py | 3 +++ flytekit/core/future.py | 8 ++++++++ flytekit/core/tracker.py | 9 +++++---- flytekit/remote/init_remote.py | 7 +++++++ flytekit/remote/remote.py | 11 +++-------- flytekit/tools/translator.py | 13 ++++++++++--- 6 files changed, 36 insertions(+), 15 deletions(-) diff --git a/flytekit/bin/entrypoint.py b/flytekit/bin/entrypoint.py index 18613043da..8c3b3f8640 100644 --- a/flytekit/bin/entrypoint.py +++ b/flytekit/bin/entrypoint.py @@ -229,6 +229,7 @@ def setup_execution( :param dynamic_addl_distro: Works in concert with the other dynamic arg. If present, indicates that if a dynamic task were to run, it should set fast serialize to true and use these values in FastSerializationSettings :param dynamic_dest_dir: See above. + :param pickled: If the inputs are pickled :return: """ exe_project = get_one_of("FLYTE_INTERNAL_EXECUTION_PROJECT", "_F_PRJ") @@ -446,6 +447,8 @@ def _execute_map_task( :param resolver: The task resolver to use. This needs to be loadable directly from importlib (and thus cannot be nested). :param resolver_args: Args that will be passed to the aforementioned resolver's load_task function + :param pickled: If the inputs are pickled + :param pkl_file: The path to the pickled archive :return: """ if len(resolver_args) < 1: diff --git a/flytekit/core/future.py b/flytekit/core/future.py index a87d9fcfeb..7e1693573d 100644 --- a/flytekit/core/future.py +++ b/flytekit/core/future.py @@ -62,6 +62,12 @@ def wait( poll_interval: typing.Optional[timedelta] = None, sync_nodes: bool = True, ) -> FlyteWorkflowExecution: + """Wait for an execution of the task or workflow to complete. + + :param timeout: maximum amount of time to wait + :param poll_interval: sync workflow execution at this interval + :param sync_nodes: passed along to the sync call for the workflow execution + """ return self._remote_entry.wait( self._exe, timeout=timeout, @@ -71,8 +77,10 @@ def wait( @property def version(self) -> str: + """The version of the task or workflow being executed.""" return self._version @property def exe(self) -> FlyteWorkflowExecution: + """The executing FlyteWorkflowExecution object.""" return self._exe diff --git a/flytekit/core/tracker.py b/flytekit/core/tracker.py index 3fbdf7989e..1c043722c0 100644 --- a/flytekit/core/tracker.py +++ b/flytekit/core/tracker.py @@ -273,6 +273,11 @@ def _resolve_abs_module_name(self, path: str, package_root: typing.Optional[str] if dirname == package_root: return basename + # Execution in a Jupyter notebook, we cannot resolve the module path + if not os.path.exists(dirname): + logger.warning(f"Directory {dirname} does not exist. It is likely that we are in a Jupyter notebook.") + return basename + # If we have reached a directory with no __init__, ignore if "__init__.py" not in os.listdir(dirname): return basename @@ -337,10 +342,6 @@ def extract_task_module(f: Union[Callable, TrackedInstance]) -> Tuple[str, str, def get_full_module_path(mod: ModuleType, mod_name: str) -> str: - from flytekit.tools.interactive import ipython_check - - if ipython_check(): - return "" if FeatureFlags.FLYTE_PYTHON_PACKAGE_ROOT != ".": package_root = ( FeatureFlags.FLYTE_PYTHON_PACKAGE_ROOT if FeatureFlags.FLYTE_PYTHON_PACKAGE_ROOT != "auto" else None diff --git a/flytekit/remote/init_remote.py b/flytekit/remote/init_remote.py index 1032eac981..47c8dba479 100644 --- a/flytekit/remote/init_remote.py +++ b/flytekit/remote/init_remote.py @@ -2,6 +2,7 @@ import typing from flytekit.configuration import Config +from flytekit.loggers import get_level_from_cli_verbosity, logger from flytekit.remote.remote import FlyteRemote from flytekit.tools.interactive import ipython_check from flytekit.tools.translator import Options @@ -19,6 +20,7 @@ def init_remote( data_upload_location: str = "flyte://my-s3-bucket/", default_options: typing.Optional[Options] = None, interactive_mode_enabled: bool = ipython_check(), + verbosity: int = 0, **kwargs, ): """ @@ -30,6 +32,7 @@ def init_remote( The default location - `s3://my-s3-bucket/data` works for sandbox/demo environment. Please override this for non-sandbox cases. :param default_options: default options to use when executing tasks or workflows remotely. :param interactive_mode_enabled: If True, the client will be configured to work in interactive mode. + :param verbosity: Verbosity level for the logger. :return: """ global REMOTE_ENTRY, REMOTE_DEFAULT_OPTIONS @@ -47,3 +50,7 @@ def init_remote( REMOTE_DEFAULT_OPTIONS = default_options else: raise AssertionError("Remote client already initialized") + + # Set the log level + log_level = get_level_from_cli_verbosity(verbosity) + logger.setLevel(log_level) diff --git a/flytekit/remote/remote.py b/flytekit/remote/remote.py index ac7c277a28..d7a1f51194 100644 --- a/flytekit/remote/remote.py +++ b/flytekit/remote/remote.py @@ -765,10 +765,8 @@ async def _serialize_and_register( serialization_settings.version = version serialization_settings.interactive_mode_enabled = self.interactive_mode_enabled - if options is None: - options = Options() - if options.file_uploader is None: - options.file_uploader = self.upload_file + options = options or Options() + options.file_uploader = options.file_uploader or self.upload_file _ = get_serializable(m, settings=serialization_settings, entity=entity, options=options) # concurrent register @@ -1788,10 +1786,7 @@ def execute_local_task( domain=domain or self._default_domain, version=version, ) - try: - flyte_task: FlyteTask = self.register_task(entity, ss) - except Exception as e: - raise e + flyte_task: FlyteTask = self.register_task(entity, ss) return self.execute( flyte_task, diff --git a/flytekit/tools/translator.py b/flytekit/tools/translator.py index 5e74aff513..af70554d4f 100644 --- a/flytekit/tools/translator.py +++ b/flytekit/tools/translator.py @@ -4,6 +4,7 @@ import typing from collections import OrderedDict from dataclasses import dataclass +from functools import lru_cache from typing import Callable, Dict, List, Optional, Tuple, Union from flyteidl.admin import schedule_pb2 @@ -32,6 +33,7 @@ from flytekit.core.workflow import ReferenceWorkflow, WorkflowBase from flytekit.exceptions.user import FlyteAssertion from flytekit.image_spec.image_spec import _calculate_deduped_hash_from_image_spec +from flytekit.loggers import logger from flytekit.models import common as _common_models from flytekit.models import common as common_models from flytekit.models import interface as interface_models @@ -193,6 +195,12 @@ def fn(settings: SerializationSettings) -> List[str]: return fn +@lru_cache +def display_ipython_warning(input: str) -> None: + # This is a warning that is only displayed once per python type + logger.debug(input) + + def _update_serialization_settings_for_ipython( entity: FlyteLocalEntity, serialization_settings: SerializationSettings, @@ -223,9 +231,8 @@ def _update_serialization_settings_for_ipython( import gzip import cloudpickle - import rich - rich.get_console().print("[bold red]Jupyter notebook and interactive task support is still alpha.[/bold red]") + display_ipython_warning("Jupyter notebook and interactive task support is still alpha.") from flytekit.configuration import FastSerializationSettings @@ -236,7 +243,7 @@ def _update_serialization_settings_for_ipython( dest = pathlib.Path(tmp_dir, "pkl.gz") with gzip.GzipFile(filename=dest, mode="wb", mtime=0) as gzipped: cloudpickle.dump(entity, gzipped) - rich.get_console().print("[yellow]Uploading Pickled representation of Task to remote storage...[/ yellow]") + display_ipython_warning("Uploading Pickled representation of Task to remote storage...") _, native_url = options.file_uploader(dest) serialization_settings.fast_serialization_settings = FastSerializationSettings(