Skip to content

Commit

Permalink
Lay the groundwork for migrating Airflow CLI to Rich+Click (#24590)
Browse files Browse the repository at this point in the history
This is the first PR of what will be a series of PRs breaking up #22613 into smaller, more reviewable chunks. The end result will be rewriting the existing `airflow` CLI to use Click instead of argparse. For motivation, please see #22708.

This PR installs Click, adds constraints to Rich_Click so we can rely on some nice features in recent versions of that, adds a new barebones `airflow-ng` console script, and tweaks some CLI internals to be more flexible between argparse and Click.

To see how this initial groundwork will be used by future PRs, see #22613, and to see how some of this will be used please see #24591.
  • Loading branch information
blag authored Jun 23, 2022
1 parent 09f38ad commit afae297
Show file tree
Hide file tree
Showing 5 changed files with 178 additions and 27 deletions.
105 changes: 105 additions & 0 deletions airflow/cli/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,108 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import os

import rich_click as click

from airflow import settings
from airflow.utils.cli import ColorMode
from airflow.utils.timezone import parse as parsedate

BUILD_DOCS = "BUILDING_AIRFLOW_DOCS" in os.environ

click_color = click.option(
'--color',
type=click.Choice([ColorMode.ON, ColorMode.OFF, ColorMode.AUTO]),
default=ColorMode.AUTO,
help="Do emit colored output (default: auto)",
)
click_conf = click.option(
'-c', '--conf', help="JSON string that gets pickled into the DagRun's conf attribute"
)
click_daemon = click.option(
"-D", "--daemon", 'daemon_', is_flag=True, help="Daemonize instead of running in the foreground"
)
click_dag_id = click.argument("dag_id", help="The id of the dag")
click_dag_id_opt = click.option("-d", "--dag-id", help="The id of the dag")
click_debug = click.option(
"-d", "--debug", is_flag=True, help="Use the server that ships with Flask in debug mode"
)
click_dry_run = click.option(
'-n',
'--dry-run',
is_flag=True,
default=False,
help="Perform a dry run for each task. Only renders Template Fields for each task, nothing else",
)
click_end_date = click.option(
"-e",
"--end-date",
type=parsedate,
help="Override end_date YYYY-MM-DD",
)
click_execution_date = click.argument("execution_date", help="The execution date of the DAG", type=parsedate)
click_execution_date_or_run_id = click.argument(
"execution_date_or_run_id", help="The execution_date of the DAG or run_id of the DAGRun"
)
click_log_file = click.option(
"-l",
"--log-file",
metavar="LOG_FILE",
type=click.Path(exists=False, dir_okay=False, writable=True),
help="Location of the log file",
)
click_output = click.option(
"-o",
"--output",
type=click.Choice(["table", "json", "yaml", "plain"]),
default="table",
help="Output format.",
)
click_pid = click.option("--pid", metavar="PID", type=click.Path(exists=False), help="PID file location")
click_start_date = click.option(
"-s",
"--start-date",
type=parsedate,
help="Override start_date YYYY-MM-DD",
)
click_stderr = click.option(
"--stderr",
metavar="STDERR",
type=click.Path(exists=False, dir_okay=False, writable=True),
help="Redirect stderr to this file",
)
click_stdout = click.option(
"--stdout",
metavar="STDOUT",
type=click.Path(exists=False, dir_okay=False, writable=True),
help="Redirect stdout to this file",
)
click_subdir = click.option(
"-S",
"--subdir",
default='[AIRFLOW_HOME]/dags' if BUILD_DOCS else settings.DAGS_FOLDER,
type=click.Path(),
help=(
"File location or directory from which to look for the dag. "
"Defaults to '[AIRFLOW_HOME]/dags' where [AIRFLOW_HOME] is the "
"value you set for 'AIRFLOW_HOME' config you set in 'airflow.cfg' "
),
)
click_task_id = click.argument("task_id", help="The id of the task")
click_task_regex = click.option(
"-t", "--task-regex", help="The regex to filter specific task_ids to backfill (optional)"
)
click_verbose = click.option(
'-v', '--verbose', is_flag=True, default=False, help="Make logging output more verbose"
)
click_yes = click.option(
'-y', '--yes', is_flag=True, default=False, help="Do not prompt to confirm. Use with care!"
)


# https://click.palletsprojects.com/en/8.1.x/documentation/#help-parameter-customization
@click.group(context_settings={'help_option_names': ['-h', '--help']})
@click.pass_context
def airflow_cmd(ctx):
pass
23 changes: 23 additions & 0 deletions airflow/cli/__main__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
#!/usr/bin/env python
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from airflow.cli import airflow_cmd

if __name__ == '__main__':
airflow_cmd(obj={})
71 changes: 46 additions & 25 deletions airflow/utils/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,10 @@

def _check_cli_args(args):
if not args:
raise ValueError("Args should be set")
if not isinstance(args[0], Namespace):
raise ValueError(
f"1st positional argument should be argparse.Namespace instance, but is {type(args[0])}"
)
raise ValueError(f"Args should be set: {args} [{type(args)}]")


def action_cli(func=None, check_db=True):
def action_cli(func=None, check_db=True, check_cli_args=True):
def action_logging(f: T) -> T:
"""
Decorates function to execute function at the same time submitting action_logging
Expand All @@ -79,15 +75,14 @@ def action_logging(f: T) -> T:
@functools.wraps(f)
def wrapper(*args, **kwargs):
"""
An wrapper for cli functions. It assumes to have Namespace instance
at 1st positional argument
An wrapper for cli functions.
:param args: Positional argument. It assumes to have Namespace instance
at 1st positional argument
:param args: Positional argument.
:param kwargs: A passthrough keyword argument
"""
_check_cli_args(args)
metrics = _build_metrics(f.__name__, args[0])
if check_cli_args:
_check_cli_args(args)
metrics = _build_metrics(f.__name__, args, kwargs)
cli_action_loggers.on_pre_execution(**metrics)
try:
# Check and run migrations if necessary
Expand All @@ -111,15 +106,16 @@ def wrapper(*args, **kwargs):
return action_logging


def _build_metrics(func_name, namespace):
def _build_metrics(func_name, args, kwargs):
"""
Builds metrics dict from function args
It assumes that function arguments is from airflow.bin.cli module's function
and has Namespace instance where it optionally contains "dag_id", "task_id",
and "execution_date".
If the first item in args is a Namespace instance, it assumes that it
optionally contains "dag_id", "task_id", and "execution_date".
:param func_name: name of function
:param namespace: Namespace instance from argparse
:param args: Arguments from wrapped function, possibly including the Namespace instance from
argparse as the first argument
:param kwargs: Keyword arguments from wrapped function
:return: dict with metrics
"""
from airflow.models import Log
Expand All @@ -146,11 +142,7 @@ def _build_metrics(func_name, namespace):
'user': getuser(),
}

if not isinstance(namespace, Namespace):
raise ValueError(
f"namespace argument should be argparse.Namespace instance, but is {type(namespace)}"
)
tmp_dic = vars(namespace)
tmp_dic = vars(args[0]) if (args and isinstance(args[0], Namespace)) else kwargs
metrics['dag_id'] = tmp_dic.get('dag_id')
metrics['task_id'] = tmp_dic.get('task_id')
metrics['execution_date'] = tmp_dic.get('execution_date')
Expand Down Expand Up @@ -306,11 +298,13 @@ class ColorMode:
AUTO = "auto"


def should_use_colors(args) -> bool:
def should_use_colors(args_or_color):
"""Processes arguments and decides whether to enable color in output"""
if args.color == ColorMode.ON:
# args.color is from argparse, Click CLI will pass in the color directly
color = args_or_color.color if hasattr(args_or_color, 'color') else args_or_color
if color == ColorMode.ON:
return True
if args.color == ColorMode.OFF:
if color == ColorMode.OFF:
return False
return is_terminal_support_colors()

Expand Down Expand Up @@ -338,3 +332,30 @@ def _wrapper(*args, **kwargs):
logging.disable(logging.NOTSET)

return cast(T, _wrapper)


def suppress_logs_and_warning_click_compatible(f: T) -> T:
"""
Click compatible version of suppress_logs_and_warning.
Place after click_verbose decorator.
Decorator to suppress logging and warning messages
in cli functions.
"""

@functools.wraps(f)
def _wrapper(*args, **kwargs):
if kwargs.get("verbose"):
f(*args, **kwargs)
else:
with warnings.catch_warnings():
warnings.simplefilter("ignore")
logging.disable(logging.CRITICAL)
try:
f(*args, **kwargs)
finally:
# logging output again depends on the effective
# levels of individual loggers
logging.disable(logging.NOTSET)

return cast(T, _wrapper)
2 changes: 2 additions & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ install_requires =
python-nvd3>=0.15.0
python-slugify>=5.0
rich>=12.4.4
rich-click>=1.3.1
setproctitle>=1.1.8
# SQL Alchemy 1.4.10 introduces a bug where for PyODBC driver UTCDateTime fields get wrongly converted
# as string and fail to be converted back to datetime. It was supposed to be fixed in
Expand Down Expand Up @@ -174,6 +175,7 @@ airflow.utils=
[options.entry_points]
console_scripts=
airflow=airflow.__main__:main
airflow-ng=airflow.cli.__main__:airflow_cmd

[bdist_wheel]
python-tag=py3
Expand Down
4 changes: 2 additions & 2 deletions tests/utils/test_cli_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def test_metrics_build(self):
func_name = 'test'
exec_date = datetime.utcnow()
namespace = Namespace(dag_id='foo', task_id='bar', subcommand='test', execution_date=exec_date)
metrics = cli._build_metrics(func_name, namespace)
metrics = cli._build_metrics(func_name, [namespace], {})

expected = {
'user': os.environ.get('USER'),
Expand Down Expand Up @@ -132,7 +132,7 @@ def test_cli_create_user_supplied_password_is_masked(self, given_command, expect
exec_date = datetime.utcnow()
namespace = Namespace(dag_id='foo', task_id='bar', subcommand='test', execution_date=exec_date)
with mock.patch.object(sys, "argv", args):
metrics = cli._build_metrics(args[1], namespace)
metrics = cli._build_metrics(args[1], [namespace], {})

assert metrics.get('start_datetime') <= datetime.utcnow()

Expand Down

0 comments on commit afae297

Please sign in to comment.