Skip to content

Commit

Permalink
Fix too many bad inits handling during session refreshing (#214)
Browse files Browse the repository at this point in the history
  • Loading branch information
Gallaecio authored Aug 20, 2024
1 parent 106b099 commit a1d81d1
Show file tree
Hide file tree
Showing 2 changed files with 205 additions and 229 deletions.
232 changes: 119 additions & 113 deletions scrapy_zyte_api/_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,32 @@ def session_config(
)


class FatalErrorHandler:

def __init__(self, crawler):
self.crawler = crawler

def __enter__(self):
return None

def __exit__(self, exc_type, exc, tb):
if exc_type is None:
return
from twisted.internet import reactor
from twisted.internet.interfaces import IReactorCore

reactor = cast(IReactorCore, reactor)
close = partial(
reactor.callLater, 0, self.crawler.engine.close_spider, self.crawler.spider
)
if issubclass(exc_type, TooManyBadSessionInits):
close("bad_session_inits")
elif issubclass(exc_type, PoolError):
close("pool_error")
elif issubclass(exc_type, CloseSpider):
close(exc.reason)


session_config_registry = SessionConfigRulesRegistry()
session_config = session_config_registry.session_config

Expand Down Expand Up @@ -592,6 +618,8 @@ def __init__(self, crawler: Crawler):

self._setting_params = settings.getdict("ZYTE_API_SESSION_PARAMS")

self._fatal_error_handler = FatalErrorHandler(crawler)

def _get_session_config(self, request: Request) -> SessionConfig:
try:
return self._session_config_cache[request]
Expand Down Expand Up @@ -686,18 +714,21 @@ async def _init_session(self, session_id: str, request: Request, pool: str) -> b
return result

async def _create_session(self, request: Request, pool: str) -> str:
while True:
session_id = str(uuid4())
session_init_succeeded = await self._init_session(session_id, request, pool)
if session_init_succeeded:
self._pools[pool].add(session_id)
self._bad_inits[pool] = 0
break
self._bad_inits[pool] += 1
if self._bad_inits[pool] >= self._max_bad_inits[pool]:
raise TooManyBadSessionInits
self._queues[pool].append(session_id)
return session_id
with self._fatal_error_handler:
while True:
session_id = str(uuid4())
session_init_succeeded = await self._init_session(
session_id, request, pool
)
if session_init_succeeded:
self._pools[pool].add(session_id)
self._bad_inits[pool] = 0
break
self._bad_inits[pool] += 1
if self._bad_inits[pool] >= self._max_bad_inits[pool]:
raise TooManyBadSessionInits
self._queues[pool].append(session_id)
return session_id

async def _next_from_queue(self, request: Request, pool: str) -> str:
session_id = None
Expand Down Expand Up @@ -794,111 +825,91 @@ async def check(self, response: Response, request: Request) -> bool:
"""Check the response for signs of session expiration, update the
internal session pool accordingly, and return ``False`` if the session
has expired or ``True`` if the session passed validation."""
if self.is_init_request(request):
return True
session_config = self._get_session_config(request)
if not session_config.enabled(request):
return True
pool = self._get_pool(request)
try:
passed = session_config.check(response, request)
except CloseSpider:
raise
except Exception:
self._crawler.stats.inc_value(
f"scrapy-zyte-api/sessions/pools/{pool}/use/check-error"
)
logger.exception(
f"Unexpected exception raised while checking session "
f"validity on response {response}."
)
else:
outcome = "passed" if passed else "failed"
self._crawler.stats.inc_value(
f"scrapy-zyte-api/sessions/pools/{pool}/use/check-{outcome}"
)
if passed:
with self._fatal_error_handler:
if self.is_init_request(request):
return True
session_config = self._get_session_config(request)
if not session_config.enabled(request):
return True
self._start_request_session_refresh(request, pool)
pool = self._get_pool(request)
try:
passed = session_config.check(response, request)
except CloseSpider:
raise
except Exception:
self._crawler.stats.inc_value(
f"scrapy-zyte-api/sessions/pools/{pool}/use/check-error"
)
logger.exception(
f"Unexpected exception raised while checking session "
f"validity on response {response}."
)
else:
outcome = "passed" if passed else "failed"
self._crawler.stats.inc_value(
f"scrapy-zyte-api/sessions/pools/{pool}/use/check-{outcome}"
)
if passed:
return True
self._start_request_session_refresh(request, pool)
return False

async def assign(self, request: Request):
"""Assign a working session to *request*."""
if self.is_init_request(request):
return
session_config = self._get_session_config(request)
if not session_config.enabled(request):
self._crawler.stats.inc_value("scrapy-zyte-api/sessions/use/disabled")
return
session_id = await self._next(request)
# Note: If there is a session set already (e.g. a request being
# retried), it is overridden.
request.meta.setdefault("zyte_api_provider", {})["session"] = {"id": session_id}
if (
"zyte_api" in request.meta
or request.meta.get("zyte_api_automap", None) is False
or (
"zyte_api_automap" not in request.meta
and self._transparent_mode is False
)
):
meta_key = "zyte_api"
else:
meta_key = "zyte_api_automap"
request.meta.setdefault(meta_key, {})
if not isinstance(request.meta[meta_key], dict):
request.meta[meta_key] = {}
request.meta[meta_key]["session"] = {"id": session_id}
request.meta.setdefault("dont_merge_cookies", True)
with self._fatal_error_handler:
if self.is_init_request(request):
return
session_config = self._get_session_config(request)
if not session_config.enabled(request):
self._crawler.stats.inc_value("scrapy-zyte-api/sessions/use/disabled")
return
session_id = await self._next(request)
# Note: If there is a session set already (e.g. a request being
# retried), it is overridden.
request.meta.setdefault("zyte_api_provider", {})["session"] = {
"id": session_id
}
if (
"zyte_api" in request.meta
or request.meta.get("zyte_api_automap", None) is False
or (
"zyte_api_automap" not in request.meta
and self._transparent_mode is False
)
):
meta_key = "zyte_api"
else:
meta_key = "zyte_api_automap"
request.meta.setdefault(meta_key, {})
if not isinstance(request.meta[meta_key], dict):
request.meta[meta_key] = {}
request.meta[meta_key]["session"] = {"id": session_id}
request.meta.setdefault("dont_merge_cookies", True)

def is_enabled(self, request: Request) -> bool:
session_config = self._get_session_config(request)
return session_config.enabled(request)

def handle_error(self, request: Request):
pool = self._get_pool(request)
self._crawler.stats.inc_value(
f"scrapy-zyte-api/sessions/pools/{pool}/use/failed"
)
session_id = self._get_request_session_id(request)
if session_id is not None:
self._errors[session_id] += 1
if self._errors[session_id] < self._max_errors:
return
self._start_request_session_refresh(request, pool)
with self._fatal_error_handler:
pool = self._get_pool(request)
self._crawler.stats.inc_value(
f"scrapy-zyte-api/sessions/pools/{pool}/use/failed"
)
session_id = self._get_request_session_id(request)
if session_id is not None:
self._errors[session_id] += 1
if self._errors[session_id] < self._max_errors:
return
self._start_request_session_refresh(request, pool)

def handle_expiration(self, request: Request):
pool = self._get_pool(request)
self._crawler.stats.inc_value(
f"scrapy-zyte-api/sessions/pools/{pool}/use/expired"
)
self._start_request_session_refresh(request, pool)


class FatalErrorHandler:

def __init__(self, crawler):
self.crawler = crawler

async def __aenter__(self):
return None

async def __aexit__(self, exc_type, exc, tb):
if exc_type is None:
return
from twisted.internet import reactor
from twisted.internet.interfaces import IReactorCore

reactor = cast(IReactorCore, reactor)
close = partial(
reactor.callLater, 0, self.crawler.engine.close_spider, self.crawler.spider
)
if issubclass(exc_type, TooManyBadSessionInits):
close("bad_session_inits")
elif issubclass(exc_type, PoolError):
close("pool_error")
elif issubclass(exc_type, CloseSpider):
close(exc.reason)
with self._fatal_error_handler:
pool = self._get_pool(request)
self._crawler.stats.inc_value(
f"scrapy-zyte-api/sessions/pools/{pool}/use/expired"
)
self._start_request_session_refresh(request, pool)


class ScrapyZyteAPISessionDownloaderMiddleware:
Expand All @@ -910,19 +921,16 @@ def from_crawler(cls, crawler: Crawler):
def __init__(self, crawler: Crawler):
self._crawler = crawler
self._sessions = _SessionManager(crawler)
self._fatal_error_handler = FatalErrorHandler(crawler)

async def process_request(self, request: Request, spider: Spider) -> None:
async with self._fatal_error_handler:
await self._sessions.assign(request)
await self._sessions.assign(request)

async def process_response(
self, request: Request, response: Response, spider: Spider
) -> Union[Request, Response, None]:
if isinstance(response, DummyResponse):
return response
async with self._fatal_error_handler:
passed = await self._sessions.check(response, request)
passed = await self._sessions.check(response, request)
if not passed:
new_request_or_none = get_retry_request(
request,
Expand All @@ -945,12 +953,10 @@ async def process_exception(
return None

if exception.parsed.type == "/problem/session-expired":
async with self._fatal_error_handler:
self._sessions.handle_expiration(request)
self._sessions.handle_expiration(request)
reason = "session_expired"
elif exception.status in {520, 521}:
async with self._fatal_error_handler:
self._sessions.handle_error(request)
self._sessions.handle_error(request)
reason = "download_error"
else:
return None
Expand Down
Loading

0 comments on commit a1d81d1

Please sign in to comment.