diff --git a/images/airflow/2.9.2/python/mwaa/celery/task_monitor.py b/images/airflow/2.9.2/python/mwaa/celery/task_monitor.py index 6ec92dc..0f89ccd 100644 --- a/images/airflow/2.9.2/python/mwaa/celery/task_monitor.py +++ b/images/airflow/2.9.2/python/mwaa/celery/task_monitor.py @@ -46,7 +46,7 @@ # blocks without running out of space. BUFFER_SIZE_PER_TASK = 2500 CELERY_WORKER_TASK_LIMIT = int( - os.environ.get("AIRFLOW__CELERY__WORKER_AUTOSCALE", "20,20").split(",")[0] + os.environ.get("AIRFLOW__CELERY__WORKER_AUTOSCALE", "80,80").split(",")[0] ) CELERY_TASKS_BUFFER_SIZE = CELERY_WORKER_TASK_LIMIT * BUFFER_SIZE_PER_TASK @@ -622,6 +622,7 @@ def close(self): return logger.info("Closing task monitor...") + self.pause_task_consumption() # Report a metric about the number of current task, and a warning in case this is greater than zero. If the worker was # marked for killing or was marked for termination and the allowed time limit for termination has been breached, then we do diff --git a/images/airflow/2.9.2/python/mwaa/subprocess/conditions.py b/images/airflow/2.9.2/python/mwaa/subprocess/conditions.py index 0000be1..3470e7a 100644 --- a/images/airflow/2.9.2/python/mwaa/subprocess/conditions.py +++ b/images/airflow/2.9.2/python/mwaa/subprocess/conditions.py @@ -30,6 +30,7 @@ # Our imports from mwaa.celery.task_monitor import WorkerTaskMonitor from mwaa.config.database import get_db_connection_string +from mwaa.logging.utils import throttle from mwaa.subprocess import ProcessStatus from mwaa.utils.plogs import generate_plog @@ -233,6 +234,7 @@ def _generate_autorestart_plog(self): ) ) + @throttle(seconds=60, instance_level_throttling=True) # avoid excessive calls to process conditions def _check(self, process_status: ProcessStatus) -> ProcessConditionResponse: """ Execute the condition and return the response. @@ -337,6 +339,7 @@ def prepare(self): """ self.start_time = time.time() + @throttle(seconds=60, instance_level_throttling=True) # avoid excessive calls to process conditions def _check(self, process_status: ProcessStatus) -> ProcessConditionResponse: """ Execute the condition and return the response. @@ -413,6 +416,7 @@ def _generate_health_plog(self, healthy: bool, health_changed: bool): ) ) + @throttle(seconds=60, instance_level_throttling=True) # avoid excessive calls to process conditions def _check(self, process_status: ProcessStatus) -> ProcessConditionResponse: """ Execute the condition and return the response. @@ -507,6 +511,7 @@ def _get_failed_condition_response(self, message: str) -> ProcessConditionRespon message=message, ) + @throttle(seconds=10, instance_level_throttling=True) # avoid excessive calls to process conditions def _check(self, process_status: ProcessStatus) -> ProcessConditionResponse: """ Execute the condition and return the response. diff --git a/images/airflow/2.9.2/python/mwaa/subprocess/subprocess.py b/images/airflow/2.9.2/python/mwaa/subprocess/subprocess.py index 5dd0204..abd1d4b 100644 --- a/images/airflow/2.9.2/python/mwaa/subprocess/subprocess.py +++ b/images/airflow/2.9.2/python/mwaa/subprocess/subprocess.py @@ -24,7 +24,6 @@ # Our imports from mwaa.logging.loggers import CompositeLogger -from mwaa.logging.utils import throttle from mwaa.subprocess import ProcessStatus from mwaa.subprocess.conditions import ProcessCondition, ProcessConditionResponse @@ -188,13 +187,12 @@ def start( exc_info=sys.exc_info(), ) - @throttle(seconds=60, instance_level_throttling=True) # avoid excessive calls to process conditions def _check_process_conditions(self) -> List[ProcessConditionResponse]: # Evaluate all conditions checked_conditions = [c.check(self.process_status) for c in self.conditions] # Filter out the unsuccessful conditions - failed_conditions = [c for c in checked_conditions if not c.successful] + failed_conditions = [c for c in checked_conditions if c and not c.successful] return failed_conditions