diff --git a/app/services/circuit_breaker.py b/app/services/circuit_breaker.py index 077db14f..d5b1e342 100644 --- a/app/services/circuit_breaker.py +++ b/app/services/circuit_breaker.py @@ -1,6 +1,9 @@ import logging from enum import Enum from app.services.cache import cache_store +from threading import Lock + +health_locks = {} log = logging.getLogger(__name__) @@ -109,23 +112,28 @@ def get_status(cls, checkhealth_func, cache_key: str): Returns: Status of the cache_key. Circuit is CLOSED if cache_key is up and running, and OPEN otherwise. """ - status_key = f"{cache_key}:status" - status = cache_store.get(status_key) - - if status: - return cls.Status(status) - - is_up = False - try: - is_up = checkhealth_func() - except Exception as e: - log.error(e) - - if is_up: - cache_store.set( - status_key, cls.Status.CLOSED, ex=cls.CLOSED_TTL_SECONDS - ) - return cls.Status.CLOSED - - cache_store.set(status_key, cls.Status.OPEN, ex=cls.OPEN_TTL_SECONDS) - return cls.Status.OPEN + if cache_key not in health_locks: + health_locks[cache_key] = Lock() + + # Acquire the lock + with health_locks[cache_key]: + status_key = f"{cache_key}:status" + status = cache_store.get(status_key) + + if status: + return cls.Status(status) + + is_up = False + try: + is_up = checkhealth_func() + except Exception as e: + log.error(e) + + if is_up: + cache_store.set( + status_key, cls.Status.CLOSED, ex=cls.CLOSED_TTL_SECONDS + ) + return cls.Status.CLOSED + + cache_store.set(status_key, cls.Status.OPEN, ex=cls.OPEN_TTL_SECONDS) + return cls.Status.OPEN