Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add retry on specified httpx network errors #7593

Merged
merged 15 commits into from
Dec 1, 2022
108 changes: 88 additions & 20 deletions src/prefect/client/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
import threading
from collections import defaultdict
from contextlib import asynccontextmanager
from typing import ContextManager, Dict, Tuple, Type
from functools import partial
from typing import Callable, ContextManager, Dict, Set, Tuple, Type

import anyio
import httpx
Expand All @@ -13,6 +14,7 @@
from typing_extensions import Self

from prefect.exceptions import PrefectHTTPStatusError
from prefect.logging import get_logger

# Datastores for lifespan management, keys should be a tuple of thread and app identities.
APP_LIFESPANS: Dict[Tuple[int, int], LifespanManager] = {}
Expand All @@ -21,6 +23,9 @@
APP_LIFESPANS_LOCKS: Dict[int, anyio.Lock] = defaultdict(anyio.Lock)


logger = get_logger("client")


@asynccontextmanager
async def app_lifespan_context(app: FastAPI) -> ContextManager[None]:
"""
Expand Down Expand Up @@ -154,27 +159,90 @@ class PrefectHttpxClient(httpx.AsyncClient):

RETRY_MAX = 5

async def send(self, *args, **kwargs) -> Response:
retry_count = 0
response = PrefectResponse.from_httpx_response(
await super().send(*args, **kwargs)
)
while (
response.status_code
in {status.HTTP_429_TOO_MANY_REQUESTS, status.HTTP_503_SERVICE_UNAVAILABLE}
and retry_count < self.RETRY_MAX
):
retry_count += 1

# Respect the "Retry-After" header, falling back to an exponential back-off
retry_after = response.headers.get("Retry-After")
if retry_after:
retry_seconds = float(retry_after)
else:
retry_seconds = 2**retry_count
async def _send_with_retry(
self,
request: Callable,
retry_codes: Set[int] = set(),
retry_exceptions: Tuple[Exception, ...] = tuple(),
):
"""
Send a request and retry it if it fails.

Sends the provided request and retries it up to self.RETRY_MAX times if
the request either raises an exception listed in `retry_exceptions` or receives
a response with a status code listed in `retry_codes`.

Retries will be delayed based on either the retry header (preferred) or
exponential backoff if a retry header is not provided.
"""
try_count = 0
response = None

while try_count <= self.RETRY_MAX:
try_count += 1
retry_seconds = None
exc_info = None

try:
response = await request()
except retry_exceptions:
if try_count > self.RETRY_MAX:
raise
# Otherwise, we will ignore this error but capture the info for logging
exc_info = sys.exc_info()
else:
# We got a response; return immediately if it is not retryable
if response.status_code not in retry_codes:
return response

if "Retry-After" in response.headers:
retry_seconds = float(response.headers["Retry-After"])

# Use an exponential back-off if not set in a header
if retry_seconds is None:
retry_seconds = 2**try_count

logger.debug(
(
"Encountered retryable exception during request. "
if exc_info
else "Received response with retryable status code. "
)
+ (
f"Another attempt will be made in {retry_seconds}s. "
f"This is attempt {try_count}/{self.RETRY_MAX + 1}."
),
exc_info=exc_info,
)
await anyio.sleep(retry_seconds)
response = await super().send(*args, **kwargs)

assert (
response is not None
), "Retry handling ended without response or exception"

# We ran out of retries, return the failed response
return response

async def send(self, *args, **kwargs) -> Response:
api_request = partial(super().send, *args, **kwargs)
peytonrunyan marked this conversation as resolved.
Show resolved Hide resolved

response = await self._send_with_retry(
request=api_request,
retry_codes={
status.HTTP_429_TOO_MANY_REQUESTS,
status.HTTP_503_SERVICE_UNAVAILABLE,
},
retry_exceptions=(
httpx.ReadTimeout,
httpx.PoolTimeout,
# `ConnectionResetError` when reading socket raises as a `ReadError`
httpx.ReadError,
# Uvicorn bug, see https://github.com/PrefectHQ/prefect/issues/7512
httpx.RemoteProtocolError,
# HTTP2 bug, see https://github.com/PrefectHQ/prefect/issues/7442
httpx.LocalProtocolError,
),
)

# Always raise bad responses
# NOTE: We may want to remove this and handle responses per route in the
Expand Down
Loading