From 6b2fdeea9acc27dd8e3c53ba5658bc500847729d Mon Sep 17 00:00:00 2001 From: Timor Morrien Date: Mon, 25 Sep 2023 16:08:45 +0200 Subject: [PATCH] Improve error handling --- app/llms/strategy_llm.py | 34 +++++++++++----------- app/services/circuit_breaker.py | 50 +++++++++++++++++++++++++++++---- 2 files changed, 62 insertions(+), 22 deletions(-) diff --git a/app/llms/strategy_llm.py b/app/llms/strategy_llm.py index d12ab456..f57a7603 100644 --- a/app/llms/strategy_llm.py +++ b/app/llms/strategy_llm.py @@ -121,7 +121,11 @@ def set_correct_session( log.error( "No viable LLMs found! Using LLM with longest context length." ) - llm_configs = self.llm.llm_configs + llm_configs = { + llm_key: config for llm_key, config in self.llm.llm_configs.items() if llm_key not in exclude_llms + } + if llm_configs.__len__() == 0: + raise ValueError("All LLMs are excluded!") selected_llm = max( llm_configs, key=lambda llm_key: llm_configs[llm_key].spec.context_length, @@ -149,21 +153,19 @@ def call(): exclude_llms = [] - try: - response = await CircuitBreaker.protected_call( - func=call, cache_key=self.current_session_key - ) - except Exception as e: - log.error( - f"Exception {e} while making request! " - f"Trying again with a different LLM." - ) - exclude_llms.append(self.current_session_key) - self.set_correct_session(prompt, max_tokens, exclude_llms) - response = await CircuitBreaker.protected_call( - func=call, cache_key=self.current_session_key - ) - return response + while True: + try: + response = await CircuitBreaker.protected_call_async( + func=call, cache_key=self.current_session_key + ) + return response + except Exception as e: + log.error( + f"Exception '{e}' while making request! " + f"Trying again with a different LLM." + ) + exclude_llms.append(self.current_session_key) + self.set_correct_session(prompt, max_tokens, exclude_llms) def __exit__(self, exc_type, exc_value, traceback): return self.current_session.__exit__(exc_type, exc_value, traceback) diff --git a/app/services/circuit_breaker.py b/app/services/circuit_breaker.py index 2cd1fa1d..81ff5ba1 100644 --- a/app/services/circuit_breaker.py +++ b/app/services/circuit_breaker.py @@ -48,14 +48,52 @@ def protected_call( except accepted_exceptions as e: raise e except Exception as e: - num_failures = cache_store.incr(num_failures_key) - cache_store.expire(num_failures_key, cls.OPEN_TTL_SECONDS) - if num_failures >= cls.MAX_FAILURES: - cache_store.set( - status_key, cls.Status.OPEN, ex=cls.OPEN_TTL_SECONDS - ) + cls.handle_exception(e, num_failures_key, status_key) + @classmethod + async def protected_call_async( + cls, func, cache_key: str, accepted_exceptions: tuple = () + ): + """Wrap function call to avoid too many failures in a row. Async version. + + Params: + func: function to be called + cache_key: key to be used in the cache store + accepted_exceptions: exceptions that are not considered failures + + Raises: + ValueError: if within the last OPEN_TTL_SECONDS seconds, + the function throws an exception for the MAX_FAILURES-th time. + """ + + num_failures_key = f"{cache_key}:num_failures" + status_key = f"{cache_key}:status" + + status = cache_store.get(status_key) + if status == cls.Status.OPEN: + raise ValueError("Too many failures! Please try again later.") + + try: + response = await func() + cache_store.set( + status_key, cls.Status.CLOSED, ex=cls.CLOSED_TTL_SECONDS + ) + return response + except accepted_exceptions as e: + log.error("Accepted error in protected_call for " + cache_key) raise e + except Exception as e: + cls.handle_exception(e, num_failures_key, status_key) + + @classmethod + def handle_exception(cls, e, num_failures_key, status_key): + num_failures = cache_store.incr(num_failures_key) + cache_store.expire(num_failures_key, cls.OPEN_TTL_SECONDS) + if num_failures >= cls.MAX_FAILURES: + cache_store.set( + status_key, cls.Status.OPEN, ex=cls.OPEN_TTL_SECONDS + ) + raise e @classmethod def get_status(cls, checkhealth_func, cache_key: str):