Skip to content

Commit

Permalink
Letting each condition control its frequency (#150)
Browse files Browse the repository at this point in the history
Using throttling, each condition was being checked once per minute. But
a condition may need to perform its check at a different pace like for
example the TaskMonitor condition. This change applies throttling at
individual condition instead of applying it generically at the
subprocess level. The TaskMonitor condition will not be executed once
per 10 seconds instead of one minute. This will allow for faster
evaluation of worker idleness which would help in scaling down the
worker fleet a lot more effectively.

By submitting this pull request, I confirm that you can use, modify,
copy, and redistribute this contribution, under the terms of your
choice.
  • Loading branch information
ashishgo-aws authored Oct 3, 2024
1 parent 2ebece3 commit bd9c236
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 4 deletions.
3 changes: 2 additions & 1 deletion images/airflow/2.9.2/python/mwaa/celery/task_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
# blocks without running out of space.
BUFFER_SIZE_PER_TASK = 2500
CELERY_WORKER_TASK_LIMIT = int(
os.environ.get("AIRFLOW__CELERY__WORKER_AUTOSCALE", "20,20").split(",")[0]
os.environ.get("AIRFLOW__CELERY__WORKER_AUTOSCALE", "80,80").split(",")[0]
)
CELERY_TASKS_BUFFER_SIZE = CELERY_WORKER_TASK_LIMIT * BUFFER_SIZE_PER_TASK

Expand Down Expand Up @@ -622,6 +622,7 @@ def close(self):
return

logger.info("Closing task monitor...")
self.pause_task_consumption()

# Report a metric about the number of current task, and a warning in case this is greater than zero. If the worker was
# marked for killing or was marked for termination and the allowed time limit for termination has been breached, then we do
Expand Down
5 changes: 5 additions & 0 deletions images/airflow/2.9.2/python/mwaa/subprocess/conditions.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
# Our imports
from mwaa.celery.task_monitor import WorkerTaskMonitor
from mwaa.config.database import get_db_connection_string
from mwaa.logging.utils import throttle
from mwaa.subprocess import ProcessStatus
from mwaa.utils.plogs import generate_plog

Expand Down Expand Up @@ -233,6 +234,7 @@ def _generate_autorestart_plog(self):
)
)

@throttle(seconds=60, instance_level_throttling=True) # avoid excessive calls to process conditions
def _check(self, process_status: ProcessStatus) -> ProcessConditionResponse:
"""
Execute the condition and return the response.
Expand Down Expand Up @@ -337,6 +339,7 @@ def prepare(self):
"""
self.start_time = time.time()

@throttle(seconds=60, instance_level_throttling=True) # avoid excessive calls to process conditions
def _check(self, process_status: ProcessStatus) -> ProcessConditionResponse:
"""
Execute the condition and return the response.
Expand Down Expand Up @@ -413,6 +416,7 @@ def _generate_health_plog(self, healthy: bool, health_changed: bool):
)
)

@throttle(seconds=60, instance_level_throttling=True) # avoid excessive calls to process conditions
def _check(self, process_status: ProcessStatus) -> ProcessConditionResponse:
"""
Execute the condition and return the response.
Expand Down Expand Up @@ -507,6 +511,7 @@ def _get_failed_condition_response(self, message: str) -> ProcessConditionRespon
message=message,
)

@throttle(seconds=10, instance_level_throttling=True) # avoid excessive calls to process conditions
def _check(self, process_status: ProcessStatus) -> ProcessConditionResponse:
"""
Execute the condition and return the response.
Expand Down
4 changes: 1 addition & 3 deletions images/airflow/2.9.2/python/mwaa/subprocess/subprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@

# Our imports
from mwaa.logging.loggers import CompositeLogger
from mwaa.logging.utils import throttle
from mwaa.subprocess import ProcessStatus
from mwaa.subprocess.conditions import ProcessCondition, ProcessConditionResponse

Expand Down Expand Up @@ -188,13 +187,12 @@ def start(
exc_info=sys.exc_info(),
)

@throttle(seconds=60, instance_level_throttling=True) # avoid excessive calls to process conditions
def _check_process_conditions(self) -> List[ProcessConditionResponse]:
# Evaluate all conditions
checked_conditions = [c.check(self.process_status) for c in self.conditions]

# Filter out the unsuccessful conditions
failed_conditions = [c for c in checked_conditions if not c.successful]
failed_conditions = [c for c in checked_conditions if c and not c.successful]

return failed_conditions

Expand Down

0 comments on commit bd9c236

Please sign in to comment.