Skip to content

Commit

Permalink
AIP-84 Fix session handling (apache#44187)
Browse files Browse the repository at this point in the history
## Problem
Tests are green, CI is green, manual API testing looks good, but when we use the UI, we see random 500 errors from time to time, cf screenshot.

All that hint to a bug in the session management. Between the DB dependency from fastapi, the way fastapi uses multiple thread to serve `sync` route requests, how we create session in airflow utility code and when / how are sessions rolled back / commited.

## TLDR:
The default `Session` factory uses sessionmaker that will make a session thread local, on the other hand FastAPI can re-use the same thread to serve multiple requests but because of the dependency we use, the session is closed after each request making successive requests on the same thread to fail.

Session lifecycle that we want is not `per thread` but `per  HTTTP request`. To achieve that we delegate the `session` handling to FastAPI dependency system and use a normal session factory not a threadlocal one.

![Screenshot 2024-11-19 at 16 06 47](https://github.com/user-attachments/assets/d5f1d5b5-9d10-4d34-9c96-4c01583c3a19)
  • Loading branch information
pierrejeambrun authored Nov 19, 2024
1 parent c8c5756 commit 81a910d
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 5 deletions.
4 changes: 2 additions & 2 deletions airflow/api_fastapi/common/db/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
from airflow.api_fastapi.common.parameters import BaseParam


async def get_session() -> Session:
def get_session() -> Session:
"""
Dependency for providing a session.
Expand All @@ -43,7 +43,7 @@ async def get_session() -> Session:
def your_route(session: Annotated[Session, Depends(get_session)]):
pass
"""
with create_session() as session:
with create_session(scoped=False) as session:
yield session


Expand Down
9 changes: 8 additions & 1 deletion airflow/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,11 @@

engine: Engine
Session: Callable[..., SASession]
# NonScopedSession creates global sessions and is not safe to use in multi-threaded environment without
# additional precautions. The only use case is when the session lifecycle needs
# custom handling. Most of the time we only want one unique thread local session object,
# this is achieved by the Session factory above.
NonScopedSession: Callable[..., SASession]
async_engine: AsyncEngine
create_async_session: Callable[..., AsyncSession]

Expand Down Expand Up @@ -465,6 +470,7 @@ def configure_orm(disable_connection_pool=False, pool_class=None):
global engine
global async_engine
global create_async_session
global NonScopedSession

if os.environ.get("_AIRFLOW_SKIP_DB_TESTS") == "true":
# Skip DB initialization in unit tests, if DB tests are skipped
Expand Down Expand Up @@ -515,7 +521,8 @@ def _session_maker(_engine):
expire_on_commit=False,
)

Session = scoped_session(_session_maker(engine))
NonScopedSession = _session_maker(engine)
Session = scoped_session(NonScopedSession)


def force_traceback_session_for_untrusted_components(allow_tests_to_use_db=False):
Expand Down
7 changes: 5 additions & 2 deletions airflow/utils/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@


@contextlib.contextmanager
def create_session() -> Generator[SASession, None, None]:
def create_session(scoped: bool = True) -> Generator[SASession, None, None]:
"""Contextmanager that will create and teardown a session."""
if InternalApiConfig.get_use_internal_api():
if os.environ.get("RUN_TESTS_WITH_DATABASE_ISOLATION", "false").lower() == "true":
Expand All @@ -48,7 +48,10 @@ def create_session() -> Generator[SASession, None, None]:
else:
yield TracebackSession()
return
Session = getattr(settings, "Session", None)
if scoped:
Session = getattr(settings, "Session", None)
else:
Session = getattr(settings, "NonScopedSession", None)
if Session is None:
raise RuntimeError("Session must be set before!")
session = Session()
Expand Down

0 comments on commit 81a910d

Please sign in to comment.