Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Terminate function does not work when a stream is cancelled prematurely when in an anyio task group #1156

Open
JordanZimmitti opened this issue May 23, 2024 · 7 comments

Comments

@JordanZimmitti
Copy link

JordanZimmitti commented May 23, 2024

  • asyncpg version: 0.29.0
  • PostgreSQL version: 14.10
  • Do you use a PostgreSQL SaaS? If so, which? Can you reproduce
    the issue with a local PostgreSQL install?
    : Yes, I can reproduce it locally
  • Python version: 3.12
  • Platform: linux and Mac OS
  • Do you use pgbouncer?: No
  • Did you install asyncpg with pip?: Yes
  • If you built asyncpg locally, which version of Cython did you use?: n/a
  • Can the issue be reproduced under both asyncio and
    uvloop?
    : No, the issue is not producible with asyncio only with anyio

Hi, I have a Fast API application that uses SQLAlchemy and asyncpg. I opened up a discussion on the SQLAlchemy page here (full discussion) reporting a major problem that affected my application after an update they released.

To summarize the problem and what we concluded in that discussion,

I asynchronously stream data from my API to a client using the starlette StreamingResponse class which takes advantage ofanyio under the hood. For some reason when the anyio task group gets cancelled before the stream is finished the asyncpg terminate function does not terminate the connection when the close function is used first, leaving it stuck in an idle in transaction state. This causes connections to run up eventually stopping other applications from creating new connections.

With some help from the SQLAlchemy folks, after some time working through the problem we were able to reproduce a small example of my problem using only the asyncpg and anyio libraries here (Example of problem)

Any help resolving this issue would be greatly appreciated! If you need any more info from me please do not hesitate to message back

Thanks in advance!

@elprans
Copy link
Member

elprans commented Jul 17, 2024

What happens is that anyio cancels await conn.close() before it has a chance to send the termination message to Postgres. Connection does get closed from the client side, but Postgres never receives the memo. I need to think if shielding Protocol.close() makes sense, but you can use the idle_in_transaction_session_timeout server setting as a workaround for now to kill inactive sessions.

@JordanZimmitti
Copy link
Author

@elprans Thanks for the insight! I'll experiment with idle_in_transaction_session_timeout in the meantime though I will add that the connections can run up quickly on us. Keep me posted on any updates or if I can help provide anymore info/context on the situation.

Thanks for looking into this issue!

@JordanZimmitti
Copy link
Author

@elprans Hi!

I am hoping for a small update on this, as it's still an ongoing issue and forcing me not to update the version of sqlalchemy I'm using

Thanks!

@jraby
Copy link

jraby commented Nov 14, 2024

I think I hit the same issue as this and put together a self contained reproducer in https://github.com/brittlesoft/repro-starlette-sa-conn-leak

There's also a similar issue that seem to only happen with direct_tls=true connections as shown in the direct_tls_leak.py script from the repo above.

I'm not sure if it is the same issue or not so I wanted to post here before opening a possibly duplicate issue.

@circlingthesun
Copy link

@elprans have you given shielding Protocol.close() any more thought?

@circlingthesun
Copy link

I don't believe the idle_in_transaction_session_timeout helps if a connections isn't in the middle of a transaction.

@circlingthesun
Copy link

Monkey patching the close function appears to work in the example copied from sqlalchemy/sqlalchemy#11128 (reply in thread)

import asyncio

import anyio
import asyncpg
from asyncpg import Connection


def monkey_patch():
    original_close = Connection.close

    # shield the close method from being cancelled
    async def close(self, *, timeout=None):
        await asyncio.shield(original_close(self, timeout=timeout))

    Connection.close = close


# Uncomment for bug
monkey_patch()

async def partitions(conn, cursor, size):
    try:
        while True:
            partition = await cursor.fetch(size)
            if partition:
                yield partition
            else:
                break
    except BaseException as e:
        print(f"Got exception {e}; attempting to close")
        try:
            await conn.close(timeout=2)
        except (
            asyncio.CancelledError,
            asyncio.TimeoutError,
            OSError,
        ) as e2:
            print(
                f"Got a second exception {e2} while trying to close, trying to terminate"
            )
            conn.terminate()
            print(
                "Conn terminate succeeded!   is the connection closed?  no?  then that's a bug in asyncpg"
            )


async def run():
    async def go_raw_anyio():
        conn = await asyncpg.connect(
            user="doadmin",
            password="bookem",
            host="localhost",
            database="defaultdb",
            server_settings={"application_name": "bug"},
        )
        _transaction = conn.transaction()
        await _transaction.start()

        cursor = await conn.cursor(
            "SELECT anon_1 FROM "
            "generate_series(1::INTEGER, 500::INTEGER) AS anon_1"
        )

        async for _ in partitions(conn, cursor, 10):
            tg.cancel_scope.cancel()

    for _ in range(5):
        async with anyio.create_task_group() as tg:
            tg.start_soon(go_raw_anyio)

    await asyncio.sleep(5)

    print("Check the number of connections in the database:")
    print(
        "SELECT * FROM pg_stat_activity WHERE datname = 'defaultdb' and application_name = 'bug';"
    )
    input("Press Enter to continue...")


if __name__ == "__main__":
    asyncio.run(run())

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants