-
Notifications
You must be signed in to change notification settings - Fork 11
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Graceful Shutdown #162
Comments
I'll dig a bit deeper later, 2 things I can think of right now:
I think the upcoming Middlewares #155 could help with that Is your app running by itself on a worker, or as part of fastapi? |
Our app is running as a worker, but through fastapi. Using uvicorn + starlette's lifespan to start and stop the stream engine |
The middleware is a nice improvement on building a way to keep track of streams including when they shutdown. There is one exception that I am not able to catch, which is thrown on this line https://github.com/kpn/kstreams/blob/0.17.1/kstreams/streams.py#L240 I want to close the consumers and let our "Stream Manager" know that a stream was closed (so we know we can gracefully shutdown). However when closing the consumer it will raise a |
…rocessed before Streams are stopped. Related to kpn#162
…rocessed before Streams are stopped. Related to kpn#162
Now you can stablish error_policies per |
Is your feature request related to a problem? Please describe.
In our codebase we've spent some effort into building our own shutdown mechanisms, which upon further investigation still does not accomplish exactly what we need.
The current way to stop the consumers is using
stream_engine.stop()
. This however has the downside that coroutines that are still running are not run until completion.For example:
Will never print "Complete" if
stream_engine.stop()
is called during the sleep.This means that if autocommit is enabled the event is committed on Kafka but not actually processed. (Note: This could be overcome with manual commit, but having graceful shutdown would then still improve unnecessary double processing)
Describe the solution you'd like
There are multiple reasons for shutdown, but the two primary ones we see are:
kstreams
, if one of my streams/consumers crashes, it does not automatically recover, so I would like my application to exit so kubernetes can spin up a new container. (Note: an alternative to this use case is to have streams recover after crash)Ideally when
stream_engine.stop
is awaited I would prefer it to:getone
method)Second to that there is no way to trigger a callback on stream crash. A mechanism to trigger an application stop after a stream/consumer has crashed is also required to accomplish a shutdown on crash.
Currently we do this by wrapping our stream functions with an exception handler, which then triggers an application shutdown.
(Note: This might be worth a separate issue, leave that up to you)
I'm curious to hear about your insights in this challenge and what kstreams could provide to assist us.
The text was updated successfully, but these errors were encountered: