Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/mao3267/flytekit into add…
Browse files Browse the repository at this point in the history
…-copy-command
  • Loading branch information
mao3267 committed Sep 4, 2024
2 parents 76520a0 + a5c44cd commit 9a9707d
Show file tree
Hide file tree
Showing 89 changed files with 3,207 additions and 1,127 deletions.
3 changes: 2 additions & 1 deletion Dockerfile.dev
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ RUN SETUPTOOLS_SCM_PRETEND_VERSION_FOR_FLYTEKIT=$PSEUDO_VERSION \
&& chown flytekit: /home \
&& :

ENV PYTHONPATH="/flytekit:"

ENV PYTHONPATH="/flytekit:/flytekit/tests/flytekit/integration/remote"

# Switch to the 'flytekit' user for better security.
USER flytekit
20 changes: 20 additions & 0 deletions docs/source/design/clis.rst
Original file line number Diff line number Diff line change
Expand Up @@ -99,3 +99,23 @@ Both the commands have their own place in a production Flyte setting.
.. note ::
Neither ``pyflyte register`` nor ``pyflyte run`` commands work on Python namespace packages since both the tools traverse the filesystem to find the first folder that doesn't have an __init__.py file, which is interpreted as the root of the project. Both the commands use this root as the basis to name the Flyte entities.
How to move away from the ``pyflyte serialize`` command?
========================================================

The ``serialize`` command is deprecated around the end of Q3 2024. Users should move to the ``package`` command instead as the two commands provide nearly identical functionality.

Migrate
-------
To use the ``package`` command, make the following changes:
* The ``--local-source-root`` option should be changed to ``--source``
* If the already ``--in-container-virtualenv-root`` option was specified, then move to the ``--python-interpreter`` option in ``package``. The default Python interpreter for serialize was based on this deprecated flag, and if not specified, ``sys.executable``. The default for ``package`` is ``/opt/venv/bin/python3``. If that is not where the Python interpreter is located in the task container, then you'll need to now specify ``--python-interpreter``. Note that this was only used for Spark tasks.
* The ``--in-container-config-path`` option should be removed as this was not actually being used by the ``serialize`` command.


Functional Changes
------------------
Beyond the options, the ``package`` command differs in that
* Whether or not to use fast register should be specified by the ``--copy auto`` or ``--copy all`` flags, rather than ``fast`` being a subcommand.
* The serialized file output by default is in a .tgz file, rather than being separate files. This means that any subsequent ``flytectl register`` command will need to be updated with the ``--archive`` flag.
77 changes: 42 additions & 35 deletions flytekit/bin/entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,7 @@
from flytekit.core.data_persistence import FileAccessProvider
from flytekit.core.promise import VoidPromise
from flytekit.deck.deck import _output_deck
from flytekit.exceptions import scopes as _scoped_exceptions
from flytekit.exceptions import scopes as _scopes
from flytekit.exceptions.user import FlyteRecoverableException, FlyteUserRuntimeException
from flytekit.interfaces.stats.taggable import get_stats as _get_stats
from flytekit.loggers import logger, user_space_logger
from flytekit.models import dynamic_job as _dynamic_job
Expand All @@ -55,7 +54,6 @@ def get_version_message():


def _compute_array_job_index():
# type () -> int
"""
Computes the absolute index of the current array job. This is determined by summing the compute-environment-specific
environment variable and the offset (if one's set). The offset will be set and used when the user request that the
Expand Down Expand Up @@ -94,7 +92,7 @@ def _dispatch_execute(
except Exception as e:
# If the task can not be loaded, then it's most likely a user error. For example,
# a dependency is not installed during execution.
raise _scoped_exceptions.FlyteScopedUserException(*sys.exc_info()) from e
raise FlyteUserRuntimeException(e) from e

logger.debug(f"Starting _dispatch_execute for {task_def.name}")
# Step1
Expand All @@ -104,9 +102,8 @@ def _dispatch_execute(
idl_input_literals = _literal_models.LiteralMap.from_flyte_idl(input_proto)

# Step2
# Decorate the dispatch execute function before calling it, this wraps all exceptions into one
# of the FlyteScopedExceptions
outputs = _scoped_exceptions.system_entry_point(task_def.dispatch_execute)(ctx, idl_input_literals)
# Invoke task - dispatch_execute
outputs = task_def.dispatch_execute(ctx, idl_input_literals)
if inspect.iscoroutine(outputs):
# Handle eager-mode (async) tasks
logger.info("Output is a coroutine")
Expand All @@ -132,50 +129,46 @@ def _dispatch_execute(
)

# Handle user-scoped errors
except _scoped_exceptions.FlyteScopedUserException as e:
except FlyteUserRuntimeException as e:
# Step3b
if isinstance(e.value, IgnoreOutputs):
logger.warning(f"User-scoped IgnoreOutputs received! Outputs.pb will not be uploaded. reason {e}!!")
return
output_file_dict[_constants.ERROR_FILE_NAME] = _error_models.ErrorDocument(
_error_models.ContainerError(
e.error_code, e.verbose_message, e.kind, _execution_models.ExecutionError.ErrorKind.USER
)
)
logger.error("!! Begin User Error Captured by Flyte !!")
logger.error(e.verbose_message)
logger.error("!! End Error Captured by Flyte !!")

# Handle system-scoped errors
except _scoped_exceptions.FlyteScopedSystemException as e:
if isinstance(e.value, IgnoreOutputs):
logger.warning(f"System-scoped IgnoreOutputs received! Outputs.pb will not be uploaded. reason {e}!!")
return
# Step3c
if isinstance(e.value, FlyteRecoverableException):
kind = _error_models.ContainerError.Kind.RECOVERABLE
else:
kind = _error_models.ContainerError.Kind.NON_RECOVERABLE

exc_str = get_traceback_str(e)
output_file_dict[_constants.ERROR_FILE_NAME] = _error_models.ErrorDocument(
_error_models.ContainerError(
e.error_code, e.verbose_message, e.kind, _execution_models.ExecutionError.ErrorKind.SYSTEM
"USER",
exc_str,
kind,
_execution_models.ExecutionError.ErrorKind.USER,
)
)
logger.error("!! Begin System Error Captured by Flyte !!")
logger.error(e.verbose_message)
if task_def is not None:
logger.error(f"Exception when executing task {task_def.name or task_def.id.name}, reason {str(e)}")
else:
logger.error(f"Exception when loading_task, reason {str(e)}")
logger.error("!! Begin User Error Captured by Flyte !!")
logger.error(exc_str)
logger.error("!! End Error Captured by Flyte !!")

# Interpret all other exceptions (some of which may be caused by the code in the try block outside of
# dispatch_execute) as recoverable system exceptions.
# All the Non-user errors are captured here, and are considered system errors
except Exception as e:
# Step 3c
exc_str = traceback.format_exc()
exc_str = get_traceback_str(e)
output_file_dict[_constants.ERROR_FILE_NAME] = _error_models.ErrorDocument(
_error_models.ContainerError(
"SYSTEM:Unknown",
"SYSTEM",
exc_str,
_error_models.ContainerError.Kind.RECOVERABLE,
_execution_models.ExecutionError.ErrorKind.SYSTEM,
)
)
if task_def is not None:
logger.error(f"Exception when executing task {task_def.name or task_def.id.name}, reason {str(e)}")
else:
logger.error(f"Exception when loading_task, reason {str(e)}")

logger.error("!! Begin Unknown System Error Captured by Flyte !!")
logger.error(exc_str)
Expand All @@ -199,6 +192,22 @@ def _dispatch_execute(
exit(1)


def get_traceback_str(e: Exception) -> str:
if isinstance(e, FlyteUserRuntimeException):
# If the exception is a user exception, we want to capture the traceback of the exception that was raised by the
# user code, not the Flyte internals.
tb = e.__cause__.__traceback__ if e.__cause__ else e.__traceback__
else:
tb = e.__traceback__
lines = traceback.format_tb(tb)
lines = [line.rstrip() for line in lines]
tb_str = "\n ".join(lines)
format_str = "Traceback (most recent call last):\n" "\n {traceback}\n" "\n" "Message:\n" "\n" " {message}"

value = e.value if isinstance(e, FlyteUserRuntimeException) else e
return format_str.format(traceback=tb_str, message=f"{type(value).__name__}: {value}")


def get_one_of(*args) -> str:
"""
Helper function to iterate through a series of different environment variables. This function exists because for
Expand Down Expand Up @@ -331,7 +340,6 @@ def setup_execution(
yield ctx


@_scopes.system_entry_point
def _execute_task(
inputs: str,
output_prefix: str,
Expand Down Expand Up @@ -395,7 +403,6 @@ def load_task():
_dispatch_execute(ctx, load_task, inputs, output_prefix)


@_scopes.system_entry_point
def _execute_map_task(
inputs,
output_prefix,
Expand Down
15 changes: 15 additions & 0 deletions flytekit/clis/sdk_in_container/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from flytekit.configuration import ImageConfig
from flytekit.configuration.plugin import get_plugin
from flytekit.remote.remote import FlyteRemote
from flytekit.tools.fast_registration import CopyFileDetection

FLYTE_REMOTE_INSTANCE_KEY = "flyte_remote"

Expand Down Expand Up @@ -61,3 +62,17 @@ def patch_image_config(config_file: Optional[str], image_config: ImageConfig) ->
if addl.name not in additional_image_names:
new_additional_images.append(addl)
return replace(image_config, default_image=new_default, images=new_additional_images)


def parse_copy(ctx, param, value) -> Optional[CopyFileDetection]:
"""Helper function to parse cmd line args into enum"""
if value == "auto":
copy_style = CopyFileDetection.LOADED_MODULES
elif value == "all":
copy_style = CopyFileDetection.ALL
elif value == "none":
copy_style = CopyFileDetection.NO_COPY
else:
copy_style = None

return copy_style
32 changes: 29 additions & 3 deletions flytekit/clis/sdk_in_container/package.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
import os
import typing

import rich_click as click

from flytekit.clis.helpers import display_help_with_error
from flytekit.clis.sdk_in_container import constants
from flytekit.clis.sdk_in_container.helpers import parse_copy
from flytekit.configuration import (
DEFAULT_RUNTIME_PYTHON_INTERPRETER,
FastSerializationSettings,
ImageConfig,
SerializationSettings,
)
from flytekit.interaction.click_types import key_value_callback
from flytekit.tools.fast_registration import CopyFileDetection, FastPackageOptions
from flytekit.tools.repo import NoSerializableEntitiesError, serialize_and_package


Expand Down Expand Up @@ -50,8 +53,18 @@
is_flag=True,
default=False,
required=False,
help="This flag enables fast packaging, that allows `no container build` deploys of flyte workflows and tasks. "
"Note this needs additional configuration, refer to the docs.",
help="[Will be deprecated, see --copy] This flag enables fast packaging, that allows `no container build`"
" deploys of flyte workflows and tasks. You can specify --copy all/auto instead"
" Note this needs additional configuration, refer to the docs.",
)
@click.option(
"--copy",
required=False,
type=click.Choice(["all", "auto", "none"], case_sensitive=False),
default=None, # this will be changed to "none" after removing fast option
callback=parse_copy,
help="[Beta] Specify whether local files should be copied and uploaded so task containers have up-to-date code"
" 'all' will behave as the current 'fast' flag, copying all files, 'auto' copies only loaded Python modules",
)
@click.option(
"-f",
Expand Down Expand Up @@ -100,6 +113,7 @@ def package(
source,
output,
force,
copy: typing.Optional[CopyFileDetection],
fast,
in_container_source_path,
python_interpreter,
Expand All @@ -113,6 +127,12 @@ def package(
object contains the WorkflowTemplate, along with the relevant tasks for that workflow.
This serialization step will set the name of the tasks to the fully qualified name of the task function.
"""
if copy is not None and fast:
raise ValueError("--fast and --copy cannot be used together. Please use --copy all instead.")
elif copy == CopyFileDetection.ALL or copy == CopyFileDetection.LOADED_MODULES:
# for those migrating, who only set --copy all/auto but don't have --fast set.
fast = True

if os.path.exists(output) and not force:
raise click.BadParameter(
click.style(
Expand All @@ -136,6 +156,12 @@ def package(
display_help_with_error(ctx, "No packages to scan for flyte entities. Aborting!")

try:
serialize_and_package(pkgs, serialization_settings, source, output, fast, deref_symlinks)
# verbosity greater than 0 means to print the files
show_files = ctx.obj[constants.CTX_VERBOSE] > 0

fast_options = FastPackageOptions([], copy_style=copy, show_files=show_files)
serialize_and_package(
pkgs, serialization_settings, source, output, fast, deref_symlinks, fast_options=fast_options
)
except NoSerializableEntitiesError:
click.secho(f"No flyte objects found in packages {pkgs}", fg="yellow")
34 changes: 31 additions & 3 deletions flytekit/clis/sdk_in_container/register.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,18 @@

from flytekit.clis.helpers import display_help_with_error
from flytekit.clis.sdk_in_container import constants
from flytekit.clis.sdk_in_container.helpers import get_and_save_remote_with_click_context, patch_image_config
from flytekit.clis.sdk_in_container.helpers import (
get_and_save_remote_with_click_context,
parse_copy,
patch_image_config,
)
from flytekit.clis.sdk_in_container.utils import domain_option_dec, project_option_dec
from flytekit.configuration import ImageConfig
from flytekit.configuration.default_images import DefaultImages
from flytekit.interaction.click_types import key_value_callback
from flytekit.loggers import logger
from flytekit.tools import repo
from flytekit.tools.fast_registration import CopyFileDetection

_register_help = """
This command is similar to ``package`` but instead of producing a zip file, all your Flyte entities are compiled,
Expand Down Expand Up @@ -93,7 +98,17 @@
"--non-fast",
default=False,
is_flag=True,
help="Skip zipping and uploading the package",
help="[Will be deprecated, see --copy] Skip zipping and uploading the package. You can specify --copy none instead",
)
@click.option(
"--copy",
required=False,
type=click.Choice(["all", "auto", "none"], case_sensitive=False),
default=None, # this will be changed to "all" after removing non-fast option
callback=parse_copy,
help="[Beta] Specify how and whether to use fast register"
" 'all' is the current behavior copying all files from root, 'auto' copies only loaded Python modules"
" 'none' means no files are copied, i.e. don't use fast register",
)
@click.option(
"--dry-run",
Expand Down Expand Up @@ -139,6 +154,7 @@ def register(
version: typing.Optional[str],
deref_symlinks: bool,
non_fast: bool,
copy: typing.Optional[CopyFileDetection],
package_or_module: typing.Tuple[str],
dry_run: bool,
activate_launchplans: bool,
Expand All @@ -148,14 +164,24 @@ def register(
"""
see help
"""
if copy is not None and non_fast:
raise ValueError("--non-fast and --copy cannot be used together. Use --copy none instead.")

# Handle the new case where the copy flag is used instead of non-fast
if copy == CopyFileDetection.NO_COPY:
non_fast = True
# Set this to None because downstream logic currently detects None to mean old logic.
copy = None
show_files = ctx.obj[constants.CTX_VERBOSE] > 0

pkgs = ctx.obj[constants.CTX_PACKAGES]
if not pkgs:
logger.debug("No pkgs")
if pkgs:
raise ValueError("Unimplemented, just specify pkgs like folder/files as args at the end of the command")

if non_fast and not version:
raise ValueError("Version is a required parameter in case --non-fast is specified.")
raise ValueError("Version is a required parameter in case --non-fast/--copy none is specified.")

if len(package_or_module) == 0:
display_help_with_error(
Expand Down Expand Up @@ -190,10 +216,12 @@ def register(
version,
deref_symlinks,
fast=not non_fast,
copy_style=copy,
package_or_module=package_or_module,
remote=remote,
env=env,
dry_run=dry_run,
activate_launchplans=activate_launchplans,
skip_errors=skip_errors,
show_files=show_files,
)
Loading

0 comments on commit 9a9707d

Please sign in to comment.