Skip to content

Commit

Permalink
enable verbosity & small update & update docstring
Browse files Browse the repository at this point in the history
Signed-off-by: Mecoli1219 <[email protected]>
  • Loading branch information
Mecoli1219 committed Aug 8, 2024
1 parent bae94a3 commit 9599667
Show file tree
Hide file tree
Showing 6 changed files with 36 additions and 15 deletions.
3 changes: 3 additions & 0 deletions flytekit/bin/entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ def setup_execution(
:param dynamic_addl_distro: Works in concert with the other dynamic arg. If present, indicates that if a dynamic
task were to run, it should set fast serialize to true and use these values in FastSerializationSettings
:param dynamic_dest_dir: See above.
:param pickled: If the inputs are pickled
:return:
"""
exe_project = get_one_of("FLYTE_INTERNAL_EXECUTION_PROJECT", "_F_PRJ")
Expand Down Expand Up @@ -446,6 +447,8 @@ def _execute_map_task(
:param resolver: The task resolver to use. This needs to be loadable directly from importlib (and thus cannot be
nested).
:param resolver_args: Args that will be passed to the aforementioned resolver's load_task function
:param pickled: If the inputs are pickled
:param pkl_file: The path to the pickled archive
:return:
"""
if len(resolver_args) < 1:
Expand Down
8 changes: 8 additions & 0 deletions flytekit/core/future.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,12 @@ def wait(
poll_interval: typing.Optional[timedelta] = None,
sync_nodes: bool = True,
) -> FlyteWorkflowExecution:
"""Wait for an execution of the task or workflow to complete.
:param timeout: maximum amount of time to wait
:param poll_interval: sync workflow execution at this interval
:param sync_nodes: passed along to the sync call for the workflow execution
"""
return self._remote_entry.wait(
self._exe,
timeout=timeout,
Expand All @@ -71,8 +77,10 @@ def wait(

@property
def version(self) -> str:
"""The version of the task or workflow being executed."""
return self._version

@property
def exe(self) -> FlyteWorkflowExecution:
"""The executing FlyteWorkflowExecution object."""
return self._exe
9 changes: 5 additions & 4 deletions flytekit/core/tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,11 @@ def _resolve_abs_module_name(self, path: str, package_root: typing.Optional[str]
if dirname == package_root:
return basename

# Execution in a Jupyter notebook, we cannot resolve the module path
if not os.path.exists(dirname):
logger.warning(f"Directory {dirname} does not exist. It is likely that we are in a Jupyter notebook.")
return basename

# If we have reached a directory with no __init__, ignore
if "__init__.py" not in os.listdir(dirname):
return basename
Expand Down Expand Up @@ -337,10 +342,6 @@ def extract_task_module(f: Union[Callable, TrackedInstance]) -> Tuple[str, str,


def get_full_module_path(mod: ModuleType, mod_name: str) -> str:
from flytekit.tools.interactive import ipython_check

if ipython_check():
return ""
if FeatureFlags.FLYTE_PYTHON_PACKAGE_ROOT != ".":
package_root = (
FeatureFlags.FLYTE_PYTHON_PACKAGE_ROOT if FeatureFlags.FLYTE_PYTHON_PACKAGE_ROOT != "auto" else None
Expand Down
7 changes: 7 additions & 0 deletions flytekit/remote/init_remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import typing

from flytekit.configuration import Config
from flytekit.loggers import get_level_from_cli_verbosity, logger
from flytekit.remote.remote import FlyteRemote
from flytekit.tools.interactive import ipython_check
from flytekit.tools.translator import Options
Expand All @@ -19,6 +20,7 @@ def init_remote(
data_upload_location: str = "flyte://my-s3-bucket/",
default_options: typing.Optional[Options] = None,
interactive_mode_enabled: bool = ipython_check(),
verbosity: int = 0,
**kwargs,
):
"""
Expand All @@ -30,6 +32,7 @@ def init_remote(
The default location - `s3://my-s3-bucket/data` works for sandbox/demo environment. Please override this for non-sandbox cases.
:param default_options: default options to use when executing tasks or workflows remotely.
:param interactive_mode_enabled: If True, the client will be configured to work in interactive mode.
:param verbosity: Verbosity level for the logger.
:return:
"""
global REMOTE_ENTRY, REMOTE_DEFAULT_OPTIONS
Expand All @@ -47,3 +50,7 @@ def init_remote(
REMOTE_DEFAULT_OPTIONS = default_options
else:
raise AssertionError("Remote client already initialized")

# Set the log level
log_level = get_level_from_cli_verbosity(verbosity)
logger.setLevel(log_level)
11 changes: 3 additions & 8 deletions flytekit/remote/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -765,10 +765,8 @@ async def _serialize_and_register(
serialization_settings.version = version
serialization_settings.interactive_mode_enabled = self.interactive_mode_enabled

if options is None:
options = Options()
if options.file_uploader is None:
options.file_uploader = self.upload_file
options = options or Options()
options.file_uploader = options.file_uploader or self.upload_file

_ = get_serializable(m, settings=serialization_settings, entity=entity, options=options)
# concurrent register
Expand Down Expand Up @@ -1788,10 +1786,7 @@ def execute_local_task(
domain=domain or self._default_domain,
version=version,
)
try:
flyte_task: FlyteTask = self.register_task(entity, ss)
except Exception as e:
raise e
flyte_task: FlyteTask = self.register_task(entity, ss)

return self.execute(
flyte_task,
Expand Down
13 changes: 10 additions & 3 deletions flytekit/tools/translator.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import typing
from collections import OrderedDict
from dataclasses import dataclass
from functools import lru_cache
from typing import Callable, Dict, List, Optional, Tuple, Union

from flyteidl.admin import schedule_pb2
Expand Down Expand Up @@ -32,6 +33,7 @@
from flytekit.core.workflow import ReferenceWorkflow, WorkflowBase
from flytekit.exceptions.user import FlyteAssertion
from flytekit.image_spec.image_spec import _calculate_deduped_hash_from_image_spec
from flytekit.loggers import logger
from flytekit.models import common as _common_models
from flytekit.models import common as common_models
from flytekit.models import interface as interface_models
Expand Down Expand Up @@ -193,6 +195,12 @@ def fn(settings: SerializationSettings) -> List[str]:
return fn


@lru_cache
def display_ipython_warning(input: str) -> None:
# This is a warning that is only displayed once per python type
logger.debug(input)


def _update_serialization_settings_for_ipython(
entity: FlyteLocalEntity,
serialization_settings: SerializationSettings,
Expand Down Expand Up @@ -223,9 +231,8 @@ def _update_serialization_settings_for_ipython(
import gzip

import cloudpickle
import rich

rich.get_console().print("[bold red]Jupyter notebook and interactive task support is still alpha.[/bold red]")
display_ipython_warning("Jupyter notebook and interactive task support is still alpha.")

from flytekit.configuration import FastSerializationSettings

Expand All @@ -236,7 +243,7 @@ def _update_serialization_settings_for_ipython(
dest = pathlib.Path(tmp_dir, "pkl.gz")
with gzip.GzipFile(filename=dest, mode="wb", mtime=0) as gzipped:
cloudpickle.dump(entity, gzipped)
rich.get_console().print("[yellow]Uploading Pickled representation of Task to remote storage...[/ yellow]")
display_ipython_warning("Uploading Pickled representation of Task to remote storage...")
_, native_url = options.file_uploader(dest)

serialization_settings.fast_serialization_settings = FastSerializationSettings(
Expand Down

0 comments on commit 9599667

Please sign in to comment.