From 4800ebe5835f941ab62a01c41cfda07b6f506db8 Mon Sep 17 00:00:00 2001 From: AlanCoding Date: Thu, 16 Jan 2025 15:27:13 -0500 Subject: [PATCH] Slowly exit with a crisp message when needed service unavailable --- awx/main/dispatch/worker/base.py | 7 ++++++- awx/main/management/commands/run_dispatcher.py | 18 ++++++++++++++++++ awx/main/models/ha.py | 3 ++- awx/main/tasks/receptor.py | 3 ++- 4 files changed, 28 insertions(+), 3 deletions(-) diff --git a/awx/main/dispatch/worker/base.py b/awx/main/dispatch/worker/base.py index 9547edf15fd2..58dda363cc86 100644 --- a/awx/main/dispatch/worker/base.py +++ b/awx/main/dispatch/worker/base.py @@ -252,7 +252,12 @@ def run(self, *args, **kwargs): except psycopg.InterfaceError: logger.warning("Stale Postgres message bus connection, reconnecting") continue - except (db.DatabaseError, psycopg.OperationalError): + except (db.DatabaseError, psycopg.OperationalError) as exc: + # If we never connected to begin with, then be brief, no risk of losing work + if init is False: + logger.info(f'Could not create listener connection: {exc}') + time.sleep(1) # Patience to avoid log spam + sys.exit(1) # If we have attained stady state operation, tolerate short-term database hickups if not self.pg_is_down: logger.exception(f"Error consuming new events from postgres, will retry for {self.pg_max_wait} s") diff --git a/awx/main/management/commands/run_dispatcher.py b/awx/main/management/commands/run_dispatcher.py index 28d954abffcd..c8292a87ef25 100644 --- a/awx/main/management/commands/run_dispatcher.py +++ b/awx/main/management/commands/run_dispatcher.py @@ -2,15 +2,21 @@ # All Rights Reserved. import logging import yaml +import os +import sys +import time from django.conf import settings from django.core.management.base import BaseCommand +from django.db import connection from awx.main.dispatch import get_task_queuename from awx.main.dispatch.control import Control from awx.main.dispatch.pool import AutoscalePool from awx.main.dispatch.worker import AWXConsumerPG, TaskWorker from awx.main.analytics.subsystem_metrics import DispatcherMetricsServer +from awx.main.utils.redis import exit_if_redis_down +from awx.main.tasks.receptor import RECEPTOR_SOCK_FILE logger = logging.getLogger('awx.main.dispatch') @@ -63,8 +69,20 @@ def handle(self, *arg, **options): consumer = None + exit_if_redis_down(logger) DispatcherMetricsServer().start() + # TODO: move to a common database checker in DAB + try: + connection.ensure_connection() + except Exception as e: + print(type(e)) + + if not os.path.exists(RECEPTOR_SOCK_FILE): + logger.info(f'Receptor sock file does not exist at {RECEPTOR_SOCK_FILE}') + time.sleep(1) # Patience to avoid log spam + sys.exit(1) + try: queues = ['tower_broadcast_all', 'tower_settings_change', get_task_queuename()] consumer = AWXConsumerPG('dispatcher', TaskWorker(), queues, AutoscalePool(min_workers=4), schedule=settings.CELERYBEAT_SCHEDULE) diff --git a/awx/main/models/ha.py b/awx/main/models/ha.py index a061a638f692..d046fa24ecb2 100644 --- a/awx/main/models/ha.py +++ b/awx/main/models/ha.py @@ -33,6 +33,7 @@ ) from awx.main.models.unified_jobs import UnifiedJob from awx.main.utils.common import get_corrected_cpu, get_cpu_effective_capacity, get_corrected_memory, get_mem_effective_capacity +from awx.main.utils.redis import ping_redis from awx.main.models.mixins import RelatedJobsMixin, ResourceMixin from awx.main.models.receptor_address import ReceptorAddress @@ -397,7 +398,7 @@ def local_health_check(self): try: # if redis is down for some reason, that means we can't persist # playbook event data; we should consider this a zero capacity event - redis.Redis.from_url(settings.BROKER_URL).ping() + ping_redis() except redis.ConnectionError: errors = _('Failed to connect to Redis') diff --git a/awx/main/tasks/receptor.py b/awx/main/tasks/receptor.py index 2fbf6791ed4c..e8df8324f78b 100644 --- a/awx/main/tasks/receptor.py +++ b/awx/main/tasks/receptor.py @@ -41,6 +41,7 @@ logger = logging.getLogger('awx.main.tasks.receptor') __RECEPTOR_CONF = '/etc/receptor/receptor.conf' +RECEPTOR_SOCK_FILE = '/var/run/receptor/receptor.sock' __RECEPTOR_CONF_LOCKFILE = f'{__RECEPTOR_CONF}.lock' RECEPTOR_ACTIVE_STATES = ('Pending', 'Running') @@ -758,7 +759,7 @@ def kube_config(self): {'local-only': None}, {'log-level': settings.RECEPTOR_LOG_LEVEL}, {'node': {'firewallrules': [{'action': 'reject', 'tonode': settings.CLUSTER_HOST_ID, 'toservice': 'control'}]}}, - {'control-service': {'service': 'control', 'filename': '/var/run/receptor/receptor.sock', 'permissions': '0660'}}, + {'control-service': {'service': 'control', 'filename': RECEPTOR_SOCK_FILE, 'permissions': '0660'}}, {'work-command': {'worktype': 'local', 'command': 'ansible-runner', 'params': 'worker', 'allowruntimeparams': True}}, {'work-signing': {'privatekey': '/etc/receptor/work_private_key.pem', 'tokenexpiration': '1m'}}, {