-
Notifications
You must be signed in to change notification settings - Fork 403
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Terminate function does not work when a stream is cancelled prematurely when in an anyio task group #1156
Comments
What happens is that |
@elprans Thanks for the insight! I'll experiment with Thanks for looking into this issue! |
@elprans Hi! I am hoping for a small update on this, as it's still an ongoing issue and forcing me not to update the version of sqlalchemy I'm using Thanks! |
I think I hit the same issue as this and put together a self contained reproducer in https://github.com/brittlesoft/repro-starlette-sa-conn-leak There's also a similar issue that seem to only happen with I'm not sure if it is the same issue or not so I wanted to post here before opening a possibly duplicate issue. |
@elprans have you given shielding Protocol.close() any more thought? |
I don't believe the idle_in_transaction_session_timeout helps if a connections isn't in the middle of a transaction. |
Monkey patching the close function appears to work in the example copied from sqlalchemy/sqlalchemy#11128 (reply in thread) import asyncio
import anyio
import asyncpg
from asyncpg import Connection
def monkey_patch():
original_close = Connection.close
# shield the close method from being cancelled
async def close(self, *, timeout=None):
await asyncio.shield(original_close(self, timeout=timeout))
Connection.close = close
# Uncomment for bug
monkey_patch()
async def partitions(conn, cursor, size):
try:
while True:
partition = await cursor.fetch(size)
if partition:
yield partition
else:
break
except BaseException as e:
print(f"Got exception {e}; attempting to close")
try:
await conn.close(timeout=2)
except (
asyncio.CancelledError,
asyncio.TimeoutError,
OSError,
) as e2:
print(
f"Got a second exception {e2} while trying to close, trying to terminate"
)
conn.terminate()
print(
"Conn terminate succeeded! is the connection closed? no? then that's a bug in asyncpg"
)
async def run():
async def go_raw_anyio():
conn = await asyncpg.connect(
user="doadmin",
password="bookem",
host="localhost",
database="defaultdb",
server_settings={"application_name": "bug"},
)
_transaction = conn.transaction()
await _transaction.start()
cursor = await conn.cursor(
"SELECT anon_1 FROM "
"generate_series(1::INTEGER, 500::INTEGER) AS anon_1"
)
async for _ in partitions(conn, cursor, 10):
tg.cancel_scope.cancel()
for _ in range(5):
async with anyio.create_task_group() as tg:
tg.start_soon(go_raw_anyio)
await asyncio.sleep(5)
print("Check the number of connections in the database:")
print(
"SELECT * FROM pg_stat_activity WHERE datname = 'defaultdb' and application_name = 'bug';"
)
input("Press Enter to continue...")
if __name__ == "__main__":
asyncio.run(run()) |
the issue with a local PostgreSQL install?: Yes, I can reproduce it locally
uvloop?: No, the issue is not producible with asyncio only with anyio
Hi, I have a Fast API application that uses
SQLAlchemy
andasyncpg
. I opened up a discussion on the SQLAlchemy page here (full discussion) reporting a major problem that affected my application after an update they released.To summarize the problem and what we concluded in that discussion,
I asynchronously stream data from my API to a client using the
starlette
StreamingResponse
class which takes advantage ofanyio
under the hood. For some reason when theanyio
task group
gets cancelled before the stream is finished theasyncpg
terminate
function does not terminate the connection when theclose
function is used first, leaving it stuck in anidle in transaction
state. This causes connections to run up eventually stopping other applications from creating new connections.With some help from the SQLAlchemy folks, after some time working through the problem we were able to reproduce a small example of my problem using only the
asyncpg
andanyio
libraries here (Example of problem)Any help resolving this issue would be greatly appreciated! If you need any more info from me please do not hesitate to message back
Thanks in advance!
The text was updated successfully, but these errors were encountered: