-
Notifications
You must be signed in to change notification settings - Fork 2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Memory leak caused by publish_to_topic(...) operator #13
Comments
Thanks for reporting. @henrik7264: could you take a look? |
Hi Gijs,
Looks fine.
/Henrik
|
Sorry,
I mean of course that I shall take a look at it 😊
/Henrik
|
@bheijden - first of all thanks a lot for taking interest in this project. It's much appreciated. I have taken a look at your issue, but I am not sure I understand you completely. Is it possible to describe a use case where you need to dispose the pipeline/stream, or maybe even better an example that demonstrates the need for disposing the pipeline/stream? I have to admit that your issue will have to require a lot of research from my side to solve. But first, I am eager to hear your answer to the above question. |
I am building an elaborate pipeline for a reinforcement learning simulation framework based on rxpy and ROS. After an episode terminates, I need to re-initialize parts of the pipeline to clear the states of stateful rx operators (e.g. combine latest, zip, ReplaySubjects, etc...). Perhaps a flushing routine could be created that flushes these states, but that seems a lot more complicated than simply disposing of the old pipeline and re-initializing a new one with "fresh" rx operators. In addition to the memory leak, the current implementation will also create a new publisher every time you reinitialize a pipeline that contains the operator. I've re-implemented the operator myself in ROS 1 (but can easily be adapted to ROS 2) to address the aforementioned problems. First, this implementation avoids a subscription inside the operator, thus relieving the need to dispose of any subscriptions hence resolving the memory leak. Also, the operators takes an already initialized publisher as an argument, so that we avoid creating a new publisher every time.
|
@bheijden - tanks a lot for sharing this information with us and thanks a lot for the code example. I will try to explain my view of the issue in the following text. It is unfortunately not an easy issue to solve even though the problem appears to be a small local problem. If you look at the code below (taken from the https://github.com/rosin-project/rxros2/blob/master/rxros2_py_examples/rxros2_py_examples/talker_old_style.py, slightly modified).
This is a good example of a RxROS python program – you may even see it as a template for how we have envisioned a RxROS program looks like. It is something we have used considerable time on designing this way. It consists of some initialization and creation of a node, then handling of several observable streams, and finally it consists of a spinner and code to close down the note/program.
There is a third observation to make – not really from the example though, and that that is
So, back to your issue, and why it is so hard to solve. I could argue (see the above observations) that RxROS is simply not designed to handle the situation you describe. The situation you describe has never been considered during the design of RxROS and fixing the issue will require a design change, code and documentation change. This is a major undertaking. I could also argue that you are a bit naughty 😊 to use the dispose command. It sounds like a crude way to solve your issue. But as mentioned, I do unfortunately one have the knowledge or expertise to guide you in the right direction here. Well, the good thing is that you have already found a work around for the problem. My mind is a bit split. On one hand we have some design ideas of how to use RxROS and on the other hand we have a user that is not able to use the provided functionality without running in to memory leaks. I do like your code example very much. There is some sound reasoning in separating the ROS functionality from the RxRy operators. But the fix requires a new interface to the RxROS operators. @bheijden – in case we choose to expand the RxROS library/language with a new interface will you allow us to use your example/code? |
Anyone, please feel free to comment on this issue/discussion. |
Thanks for your elaborate answer. And your are free to use the example/code I've posted here. Regarding the discussion, perhaps it is a matter of what users you wish to target? If you wish to cater ROS users by extending ROS with simple but effective reactive programming routines, supporting more advanced functionality like disposing of old pipelines and concurrency might only make the package less accessible to them. On the other hand, if the idea was to create a bridge between ROS and the rxpy package, so that all the benefits of rxpy could be exploited, I do think it is important that properly disposing of old subscriptions when a pipeline is disposed should be supported (or at least its limitations be documented). Else, users accustomed to rxpy might run into operator behavior they would not have expected. The disposable returned by the Subscribe extension methods is returned solely to allow you to manually unsubscribe from the observable before the observable naturally ends. I can see that using the dispose command seems crude, as using the If it is a strict requirement that the interface of the operator must remain unchanged, you can keep the publisher creation inside the operator by modifying the code as shown below. This operator follows the same structure as most rxpy standard operators. This structure is also presented in the official documentation here.
Another issue with the current implementation has to do with cold observables (but perhaps it is documented somewhere that this operator should only be used with hot observables?). See the example below where I have defined both the current and alternative implementation of the operator:
Using the current implementation of the operator, this leads to the following output:
while the alternative output gives:
In the current implementation, the events of the cold observable are send immediately when the pipeline is created, while i would have expected the events of the source observable to only be send at the time of subscription (to an observer). However, now the events are send immediately which 1) does not leave enough time for the ROS publisher to properly initialize. Hence, the events of the observable stream are never send. 2) When actually subscribing later on, we actually do not publishing any messages as the subscription inside the operator has already completed. I can imagine that ROS users might not run into this issues, as they will probably use (infinite) hot observable sources, like in your example where |
@bheijden - thanks a lot for all the information - I am overwhelmed. It will take me some time to process all this information, but I will return. |
@bheijden It looks like you've implemented a Python version of RXROS. Since the original author only provided a C++ version here, are you willing to open source your implementation? Thank you very much!
|
Hi,
I'm observing that that the
publish_to_topic(...)
operator can cause a memory leak because the subscription inside thepublish_to_topic(...)
is not disposed when the subscription to a pipeline in whichpublish_to_topic(...)
is used, is disposed.To clarify what I mean:
The text was updated successfully, but these errors were encountered: