From 4f83f0880aebf3d49070e286c8770d0307f2e058 Mon Sep 17 00:00:00 2001 From: kuyperse Date: Wed, 8 Jan 2025 18:03:13 +0000 Subject: [PATCH 1/3] Publish hybrid scheduler/worker requirements install and startup script logs to worker Cloudwatch log group. --- .../airflow/2.10.1/python/mwaa/entrypoint.py | 16 +++++++++++----- .../airflow/2.10.3/python/mwaa/entrypoint.py | 17 +++++++++++------ images/airflow/2.9.2/python/mwaa/entrypoint.py | 18 ++++++++++++------ 3 files changed, 34 insertions(+), 17 deletions(-) diff --git a/images/airflow/2.10.1/python/mwaa/entrypoint.py b/images/airflow/2.10.1/python/mwaa/entrypoint.py index aa91d49..ec22044 100644 --- a/images/airflow/2.10.1/python/mwaa/entrypoint.py +++ b/images/airflow/2.10.1/python/mwaa/entrypoint.py @@ -228,6 +228,9 @@ async def install_user_requirements(cmd: str, environ: dict[str, str]): """ requirements_file = environ.get("MWAA__CORE__REQUIREMENTS_PATH") logger.info(f"MWAA__CORE__REQUIREMENTS_PATH = {requirements_file}") + # For hybrid worker/scheduler containers we publish the requirement install logs + # to the worker CloudWatch log group. + logger_prefix = "worker" if cmd == "hybrid" else cmd; if requirements_file and os.path.isfile(requirements_file): logger.info(f"Installing user requirements from {requirements_file}...") @@ -237,7 +240,7 @@ async def install_user_requirements(cmd: str, environ: dict[str, str]): # use CloudWatch for logging. *set( [ - logging.getLogger(MWAA_LOGGERS.get(f"{cmd}_requirements")), + logging.getLogger(MWAA_LOGGERS.get(f"{logger_prefix}_requirements")), logger, ] ), @@ -266,7 +269,7 @@ async def install_user_requirements(cmd: str, environ: dict[str, str]): conditions=[ TimeoutCondition(USER_REQUIREMENTS_MAX_INSTALL_TIME), ], - friendly_name=f"{cmd}_requirements", + friendly_name=f"{logger_prefix}_requirements", ) pip_process.start() if pip_process.process and pip_process.process.returncode != 0: @@ -296,7 +299,10 @@ def execute_startup_script(cmd: str, environ: Dict[str, str]) -> Dict[str, str]: EXECUTE_USER_STARTUP_SCRIPT_PATH = "execute-user-startup-script" POST_STARTUP_SCRIPT_VERIFICATION_PATH = "post-startup-script-verification" - PROCESS_LOGGER = logging.getLogger(MWAA_LOGGERS.get(f"{cmd}_startup")) + # For hybrid worker/scheduler containers we publish the startup script logs + # to the worker CloudWatch log group. + PROCESS_LOGGER_PREFIX = "worker" if cmd == "hybrid" else cmd; + PROCESS_LOGGER = logging.getLogger(MWAA_LOGGERS.get(f"{PROCESS_LOGGER_PREFIX}_startup")) if os.path.isfile(startup_script_path): logger.info("Executing customer startup script.") @@ -309,7 +315,7 @@ def execute_startup_script(cmd: str, environ: Dict[str, str]) -> Dict[str, str]: conditions=[ TimeoutCondition(STARTUP_SCRIPT_MAX_EXECUTION_TIME), ], - friendly_name=f"{cmd}_startup", + friendly_name=f"{PROCESS_LOGGER_PREFIX}_startup", sigterm_patience_interval=STARTUP_SCRIPT_SIGTERM_PATIENCE_INTERVAL, ) startup_script_process.start() @@ -325,7 +331,7 @@ def execute_startup_script(cmd: str, environ: Dict[str, str]) -> Dict[str, str]: conditions=[ TimeoutCondition(STARTUP_SCRIPT_MAX_EXECUTION_TIME), ], - friendly_name=f"{cmd}_startup", + friendly_name=f"{PROCESS_LOGGER_PREFIX}_startup", ) verification_process.start() diff --git a/images/airflow/2.10.3/python/mwaa/entrypoint.py b/images/airflow/2.10.3/python/mwaa/entrypoint.py index aa91d49..535ed3a 100644 --- a/images/airflow/2.10.3/python/mwaa/entrypoint.py +++ b/images/airflow/2.10.3/python/mwaa/entrypoint.py @@ -230,14 +230,16 @@ async def install_user_requirements(cmd: str, environ: dict[str, str]): logger.info(f"MWAA__CORE__REQUIREMENTS_PATH = {requirements_file}") if requirements_file and os.path.isfile(requirements_file): logger.info(f"Installing user requirements from {requirements_file}...") - + # For hybrid worker/scheduler containers we publish the requirement install logs + # to the worker CloudWatch log group. + logger_prefix = "worker" if cmd == "hybrid" else cmd; subprocess_logger = CompositeLogger( "requirements_composite_logging", # name can be anything unused. # We use a set to avoid double logging to console if the user doesn't # use CloudWatch for logging. *set( [ - logging.getLogger(MWAA_LOGGERS.get(f"{cmd}_requirements")), + logging.getLogger(MWAA_LOGGERS.get(f"{logger_prefix}_requirements")), logger, ] ), @@ -266,7 +268,7 @@ async def install_user_requirements(cmd: str, environ: dict[str, str]): conditions=[ TimeoutCondition(USER_REQUIREMENTS_MAX_INSTALL_TIME), ], - friendly_name=f"{cmd}_requirements", + friendly_name=f"{logger_prefix}_requirements", ) pip_process.start() if pip_process.process and pip_process.process.returncode != 0: @@ -296,7 +298,10 @@ def execute_startup_script(cmd: str, environ: Dict[str, str]) -> Dict[str, str]: EXECUTE_USER_STARTUP_SCRIPT_PATH = "execute-user-startup-script" POST_STARTUP_SCRIPT_VERIFICATION_PATH = "post-startup-script-verification" - PROCESS_LOGGER = logging.getLogger(MWAA_LOGGERS.get(f"{cmd}_startup")) + # For hybrid worker/scheduler containers we publish the startup script logs + # to the worker CloudWatch log group. + PROCESS_LOGGER_PREFIX = "worker" if cmd == "hybrid" else cmd; + PROCESS_LOGGER = logging.getLogger(MWAA_LOGGERS.get(f"{PROCESS_LOGGER_PREFIX}_startup")) if os.path.isfile(startup_script_path): logger.info("Executing customer startup script.") @@ -309,7 +314,7 @@ def execute_startup_script(cmd: str, environ: Dict[str, str]) -> Dict[str, str]: conditions=[ TimeoutCondition(STARTUP_SCRIPT_MAX_EXECUTION_TIME), ], - friendly_name=f"{cmd}_startup", + friendly_name=f"{PROCESS_LOGGER_PREFIX}_startup", sigterm_patience_interval=STARTUP_SCRIPT_SIGTERM_PATIENCE_INTERVAL, ) startup_script_process.start() @@ -325,7 +330,7 @@ def execute_startup_script(cmd: str, environ: Dict[str, str]) -> Dict[str, str]: conditions=[ TimeoutCondition(STARTUP_SCRIPT_MAX_EXECUTION_TIME), ], - friendly_name=f"{cmd}_startup", + friendly_name=f"{PROCESS_LOGGER_PREFIX}_startup", ) verification_process.start() diff --git a/images/airflow/2.9.2/python/mwaa/entrypoint.py b/images/airflow/2.9.2/python/mwaa/entrypoint.py index f3a09c9..d4a40eb 100644 --- a/images/airflow/2.9.2/python/mwaa/entrypoint.py +++ b/images/airflow/2.9.2/python/mwaa/entrypoint.py @@ -264,6 +264,9 @@ async def install_user_requirements(cmd: str, environ: dict[str, str]): """ requirements_file = environ.get("MWAA__CORE__REQUIREMENTS_PATH") logger.info(f"MWAA__CORE__REQUIREMENTS_PATH = {requirements_file}") + # For hybrid worker/scheduler containers we publish the requirements logs to the worker + # CloudWatch log group. + logger_prefix = "worker" if cmd == "hybrid" else cmd; if requirements_file and os.path.isfile(requirements_file): logger.info(f"Installing user requirements from {requirements_file}...") @@ -273,7 +276,7 @@ async def install_user_requirements(cmd: str, environ: dict[str, str]): # use CloudWatch for logging. *set( [ - logging.getLogger(MWAA_LOGGERS.get(f"{cmd}_requirements")), + logging.getLogger(MWAA_LOGGERS.get(f"{logger_prefix}_requirements")), logger, ] ), @@ -302,7 +305,7 @@ async def install_user_requirements(cmd: str, environ: dict[str, str]): conditions=[ TimeoutCondition(USER_REQUIREMENTS_MAX_INSTALL_TIME), ], - friendly_name=f"{cmd}_requirements", + friendly_name=f"{logger_prefix}_requirements", ) pip_process.start() if pip_process.process and pip_process.process.returncode != 0: @@ -332,7 +335,10 @@ def execute_startup_script(cmd: str, environ: Dict[str, str]) -> Dict[str, str]: EXECUTE_USER_STARTUP_SCRIPT_PATH = "execute-user-startup-script" POST_STARTUP_SCRIPT_VERIFICATION_PATH = "post-startup-script-verification" - PROCESS_LOGGER = logging.getLogger(MWAA_LOGGERS.get(f"{cmd}_startup")) + # For hybrid worker/scheduler containers we publish the startup script logs + # to the worker CloudWatch log group. + PROCESS_LOGGER_PREFIX = "worker" if cmd == "hybrid" else cmd; + PROCESS_LOGGER = logging.getLogger(MWAA_LOGGERS.get(f"{PROCESS_LOGGER_PREFIX}_startup")) if os.path.isfile(startup_script_path): logger.info("Executing customer startup script.") @@ -345,7 +351,7 @@ def execute_startup_script(cmd: str, environ: Dict[str, str]) -> Dict[str, str]: conditions=[ TimeoutCondition(STARTUP_SCRIPT_MAX_EXECUTION_TIME), ], - friendly_name=f"{cmd}_startup", + friendly_name=f"{PROCESS_LOGGER_PREFIX}_startup", sigterm_patience_interval=STARTUP_SCRIPT_SIGTERM_PATIENCE_INTERVAL, ) startup_script_process.start() @@ -361,7 +367,7 @@ def execute_startup_script(cmd: str, environ: Dict[str, str]) -> Dict[str, str]: conditions=[ TimeoutCondition(STARTUP_SCRIPT_MAX_EXECUTION_TIME), ], - friendly_name=f"{cmd}_startup", + friendly_name=f"{PROCESS_LOGGER_PREFIX}_startup", ) verification_process.start() @@ -717,6 +723,7 @@ async def main() -> None: ) logger.info(f"Warming a Docker container for an Airflow {command}.") + logger.info(f"Hello world.") # Get executor type executor_type = os.environ.get("MWAA__CORE__EXECUTOR_TYPE", "CeleryExecutor") @@ -772,7 +779,6 @@ async def main() -> None: await install_user_requirements(command, environ) await airflow_db_init(environ) - await increase_pool_size_if_default_size(environ) if os.environ.get("MWAA__CORE__AUTH_TYPE", "").lower() == "testing": # In "simple" auth mode, we create an admin user "airflow" with password # "airflow". We use this to make the Docker Compose setup easy to use without From c7806124ec7028e010a4a044e3b618ae880e5e6b Mon Sep 17 00:00:00 2001 From: kuyperse Date: Wed, 8 Jan 2025 18:14:33 +0000 Subject: [PATCH 2/3] Remove debugging log line in 2.9.2 entrypoint --- images/airflow/2.9.2/python/mwaa/entrypoint.py | 1 - 1 file changed, 1 deletion(-) diff --git a/images/airflow/2.9.2/python/mwaa/entrypoint.py b/images/airflow/2.9.2/python/mwaa/entrypoint.py index d4a40eb..094f32f 100644 --- a/images/airflow/2.9.2/python/mwaa/entrypoint.py +++ b/images/airflow/2.9.2/python/mwaa/entrypoint.py @@ -723,7 +723,6 @@ async def main() -> None: ) logger.info(f"Warming a Docker container for an Airflow {command}.") - logger.info(f"Hello world.") # Get executor type executor_type = os.environ.get("MWAA__CORE__EXECUTOR_TYPE", "CeleryExecutor") From 0bfca952f44e86cf90c59615ee955fc598ebf170 Mon Sep 17 00:00:00 2001 From: kuyperse Date: Wed, 8 Jan 2025 18:47:48 +0000 Subject: [PATCH 3/3] Add back removed increase pool size line from entrypoint --- images/airflow/2.9.2/python/mwaa/entrypoint.py | 1 + 1 file changed, 1 insertion(+) diff --git a/images/airflow/2.9.2/python/mwaa/entrypoint.py b/images/airflow/2.9.2/python/mwaa/entrypoint.py index 094f32f..af17ebb 100644 --- a/images/airflow/2.9.2/python/mwaa/entrypoint.py +++ b/images/airflow/2.9.2/python/mwaa/entrypoint.py @@ -778,6 +778,7 @@ async def main() -> None: await install_user_requirements(command, environ) await airflow_db_init(environ) + await increase_pool_size_if_default_size(environ) if os.environ.get("MWAA__CORE__AUTH_TYPE", "").lower() == "testing": # In "simple" auth mode, we create an admin user "airflow" with password # "airflow". We use this to make the Docker Compose setup easy to use without