From 81a910db9af72db0c7d12c33bb186cb8b117322e Mon Sep 17 00:00:00 2001 From: Pierre Jeambrun Date: Wed, 20 Nov 2024 02:01:25 +0800 Subject: [PATCH] AIP-84 Fix session handling (#44187) ## Problem Tests are green, CI is green, manual API testing looks good, but when we use the UI, we see random 500 errors from time to time, cf screenshot. All that hint to a bug in the session management. Between the DB dependency from fastapi, the way fastapi uses multiple thread to serve `sync` route requests, how we create session in airflow utility code and when / how are sessions rolled back / commited. ## TLDR: The default `Session` factory uses sessionmaker that will make a session thread local, on the other hand FastAPI can re-use the same thread to serve multiple requests but because of the dependency we use, the session is closed after each request making successive requests on the same thread to fail. Session lifecycle that we want is not `per thread` but `per HTTTP request`. To achieve that we delegate the `session` handling to FastAPI dependency system and use a normal session factory not a threadlocal one. ![Screenshot 2024-11-19 at 16 06 47](https://github.com/user-attachments/assets/d5f1d5b5-9d10-4d34-9c96-4c01583c3a19) --- airflow/api_fastapi/common/db/common.py | 4 ++-- airflow/settings.py | 9 ++++++++- airflow/utils/session.py | 7 +++++-- 3 files changed, 15 insertions(+), 5 deletions(-) diff --git a/airflow/api_fastapi/common/db/common.py b/airflow/api_fastapi/common/db/common.py index 01e1fe532bf60..b4462bda420c9 100644 --- a/airflow/api_fastapi/common/db/common.py +++ b/airflow/api_fastapi/common/db/common.py @@ -29,7 +29,7 @@ from airflow.api_fastapi.common.parameters import BaseParam -async def get_session() -> Session: +def get_session() -> Session: """ Dependency for providing a session. @@ -43,7 +43,7 @@ async def get_session() -> Session: def your_route(session: Annotated[Session, Depends(get_session)]): pass """ - with create_session() as session: + with create_session(scoped=False) as session: yield session diff --git a/airflow/settings.py b/airflow/settings.py index db8ee4f411179..8041c8f29424e 100644 --- a/airflow/settings.py +++ b/airflow/settings.py @@ -105,6 +105,11 @@ engine: Engine Session: Callable[..., SASession] +# NonScopedSession creates global sessions and is not safe to use in multi-threaded environment without +# additional precautions. The only use case is when the session lifecycle needs +# custom handling. Most of the time we only want one unique thread local session object, +# this is achieved by the Session factory above. +NonScopedSession: Callable[..., SASession] async_engine: AsyncEngine create_async_session: Callable[..., AsyncSession] @@ -465,6 +470,7 @@ def configure_orm(disable_connection_pool=False, pool_class=None): global engine global async_engine global create_async_session + global NonScopedSession if os.environ.get("_AIRFLOW_SKIP_DB_TESTS") == "true": # Skip DB initialization in unit tests, if DB tests are skipped @@ -515,7 +521,8 @@ def _session_maker(_engine): expire_on_commit=False, ) - Session = scoped_session(_session_maker(engine)) + NonScopedSession = _session_maker(engine) + Session = scoped_session(NonScopedSession) def force_traceback_session_for_untrusted_components(allow_tests_to_use_db=False): diff --git a/airflow/utils/session.py b/airflow/utils/session.py index fc10cab64ad26..a63d3f3f937a8 100644 --- a/airflow/utils/session.py +++ b/airflow/utils/session.py @@ -31,7 +31,7 @@ @contextlib.contextmanager -def create_session() -> Generator[SASession, None, None]: +def create_session(scoped: bool = True) -> Generator[SASession, None, None]: """Contextmanager that will create and teardown a session.""" if InternalApiConfig.get_use_internal_api(): if os.environ.get("RUN_TESTS_WITH_DATABASE_ISOLATION", "false").lower() == "true": @@ -48,7 +48,10 @@ def create_session() -> Generator[SASession, None, None]: else: yield TracebackSession() return - Session = getattr(settings, "Session", None) + if scoped: + Session = getattr(settings, "Session", None) + else: + Session = getattr(settings, "NonScopedSession", None) if Session is None: raise RuntimeError("Session must be set before!") session = Session()