Skip to content

Commit

Permalink
feat: support LocalExecutor (#138)
Browse files Browse the repository at this point in the history
1. Adds support for LocalExecutor when `MWAA__SQS__CREATE_QUEUE` is set
to false. If we're not creating an SQS queue/using Celery, then we don't
want to attempt to create the CeleryExecutor. This is useful for
environments where a discrete worker is not necessary/too expensive.
2. Adds a config variable `MWAA__CORE__DISABLE_CORS` (default false),
which, when set to true, sets all CORS-related headers to accept all
connections. This is required to make MWAA work in embedded environments
(i.e. within a portal or webpage)
  • Loading branch information
agupta01 authored Sep 17, 2024
1 parent 6f834b5 commit faedb17
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 23 deletions.
45 changes: 28 additions & 17 deletions images/airflow/2.9.2/python/mwaa/config/airflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,27 +18,36 @@
logger = logging.getLogger(__name__)


def _get_essential_airflow_celery_config() -> Dict[str, str]:
def _get_essential_airflow_executor_config(executor_type: str) -> Dict[str, str]:
"""
Retrieve the environment variables required for Celery executor.
Retrieve the environment variables required for executor. Currently, two executors
are supported:
- LocalExecutor: All tasks are run on a local process
- CeleryExecutor (Default): All tasks are run on Celery worker processes
The required environment variables are mostly under the "celery" section, but
other sections as well.
:param executor_type A string indicating the type of executor to use.
:returns A dictionary containing the environment variables.
"""
celery_config_module_path = "mwaa.config.celery.MWAA_CELERY_CONFIG"

return {
"AIRFLOW__CELERY_BROKER_TRANSPORT_OPTIONS__VISIBILITY_TIMEOUT": "43200",
"AIRFLOW__CELERY__BROKER_URL": get_sqs_endpoint(),
"AIRFLOW__CELERY__CELERY_CONFIG_OPTIONS": celery_config_module_path,
"AIRFLOW__CELERY__RESULT_BACKEND": f"db+{get_db_connection_string()}",
"AIRFLOW__CELERY__WORKER_ENABLE_REMOTE_CONTROL": "False",
# These two are not Celery configs per-se, but are used by the Celery executor.
"AIRFLOW__CORE__EXECUTOR": "CeleryExecutor",
"AIRFLOW__OPERATORS__DEFAULT_QUEUE": get_sqs_queue_name(),
}
match executor_type.lower():
case 'localexecutor':
return {
"AIRFLOW__CORE__EXECUTOR": "LocalExecutor",
}
case 'celeryexecutor':
celery_config_module_path = "mwaa.config.celery.MWAA_CELERY_CONFIG"
return {
"AIRFLOW__CELERY_BROKER_TRANSPORT_OPTIONS__VISIBILITY_TIMEOUT": "43200",
"AIRFLOW__CELERY__BROKER_URL": get_sqs_endpoint(),
"AIRFLOW__CELERY__CELERY_CONFIG_OPTIONS": celery_config_module_path,
"AIRFLOW__CELERY__RESULT_BACKEND": f"db+{get_db_connection_string()}",
"AIRFLOW__CELERY__WORKER_ENABLE_REMOTE_CONTROL": "False",
"AIRFLOW__CORE__EXECUTOR": "CeleryExecutor",
"AIRFLOW__OPERATORS__DEFAULT_QUEUE": get_sqs_queue_name(),
}
case _:
raise ValueError(f"Executor type {executor_type} is not supported.")


def _get_essential_airflow_core_config() -> Dict[str, str]:
Expand Down Expand Up @@ -275,16 +284,18 @@ def _get_essential_airflow_api_config() -> Dict[str, str]:
return api_config


def get_essential_airflow_config() -> Dict[str, str]:
def get_essential_airflow_config(executor_type: str) -> Dict[str, str]:
"""
Retrieve the environment variables required to set Airflow configurations.
These environment variables are essential and cannot be overridden by the customer.
:param executor_type A string indicating the type of executor to use.
:returns A dictionary containing the environment variables.
"""
return {
**_get_essential_airflow_celery_config(),
**_get_essential_airflow_executor_config(executor_type),
**_get_essential_airflow_core_config(),
**_get_essential_airflow_db_config(),
**_get_essential_airflow_logging_config(),
Expand Down
3 changes: 1 addition & 2 deletions images/airflow/2.9.2/python/mwaa/config/sqs.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,13 +124,12 @@ def get_sqs_queue_name() -> str:

def should_create_queue() -> bool:
"""
Determine whether the SQS queue should be created or not.
Determine whether the SQS queue should be created or not. Only used with CeleryExecutor.
:return: True or False.
"""
return os.environ.get("MWAA__SQS__CREATE_QUEUE", "false").lower() == "true"


def should_use_ssl() -> bool:
"""
Determine whether to use SSL when communicating with SQS or not.
Expand Down
12 changes: 8 additions & 4 deletions images/airflow/2.9.2/python/mwaa/entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -565,7 +565,7 @@ def on_sigterm() -> None:
def _create_airflow_scheduler_subprocesses(environ: Dict[str, str], conditions: List):
"""
Get the scheduler subproceses: scheduler, dag-processor, and triggerer.
:param environ: A dictionary containing the environment variables.
:param conditions: A list of subprocess conditions.
:returns: Scheduler subprocesses.
Expand All @@ -589,7 +589,7 @@ def _create_airflow_scheduler_subprocesses(environ: Dict[str, str], conditions:
def _create_airflow_process_conditions(airflow_cmd: str):
"""
Get conditions for the given Airflow command.
:param airflow_cmd: The command to get conditions for, e.g. "scheduler"
:returns: A list of conditions for the given Airflow command.
"""
Expand Down Expand Up @@ -672,8 +672,11 @@ async def main() -> None:

logger.info(f"Warming a Docker container for an Airflow {command}.")

# Get executor type
executor_type = os.environ.get("MWAA__CORE__EXECUTOR_TYPE", "CeleryExecutor")

# Add the necessary environment variables.
mwaa_essential_airflow_config = get_essential_airflow_config()
mwaa_essential_airflow_config = get_essential_airflow_config(executor_type)
mwaa_opinionated_airflow_config = get_opinionated_airflow_config()
mwaa_essential_airflow_environ = get_essential_environ(command)
mwaa_opinionated_airflow_environ = get_opinionated_environ()
Expand Down Expand Up @@ -729,7 +732,8 @@ async def main() -> None:
# having to create a user manually. Needless to say, this shouldn't be used in
# production environments.
await create_airflow_user(environ)
create_queue()
if executor_type.lower() == "celeryexecutor":
create_queue()

# Export the environment variables to .bashrc and .bash_profile to enable
# users to run a shell on the container and have the necessary environment
Expand Down

0 comments on commit faedb17

Please sign in to comment.