Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add optional timeout to subscribe() #631

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

jbaumann
Copy link

@jbaumann jbaumann commented Jan 6, 2022

Support an optional timeout for the function subscribe() by adding a lock to communicate between the functions callback() that acquires the lock with a timeout. It thus blocks, either until the function on_message_simple() releases the lock when the requested number of messages has been received, or until the timeout has been reached.

The functionality has been tested and is in use in the project https://github.com/jbaumann/system_watchdog

Copy link
Contributor

@PierreF PierreF left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank for your contribution. I've asked for some modificaation to avoid breaking change.

Also you will need to sign ECA for your contribution to be accepted: https://api.eclipse.org/git/eca/status/gh/eclipse/paho.mqtt.python/631


timeout: the timeout value after which the client disconnects from the
broker. If no timeout is given, the client disconnects only
after "msg_count" messages have been received.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no msg_count in the subscribe.callback()

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True, I got this from the simple() function. I will remove this.

else:
lock.acquire()
client.loop_start()
lock.acquire(timeout=timeout)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
lock.acquire(timeout=timeout)
event.wait(timeout=timeout)

And using a threading.Event() rather than threading.Lock(). An Event seems a better fit than an Lock

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is a one-on-one communication I believe that a Lock is actually modelling the relationship better. Do you have any additional use in mind e.g., an observer/debugger watching these events? Otherwise I would tend to see Lock as better.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel that an Event is even better at modelling this: the finishing processing is an event and we wait for that event.

With the lock, we have two consecutive call to acquire within the same thread isn't usual way to work with a lock.

def _on_connect(client, userdata, flags, rc):
"""Internal v5 callback"""
_on_connect_v5(client, userdata, flags, rc, None)


def _on_message_callback(client, userdata, message):
"""Internal callback"""
userdata['callback'](client, userdata['userdata'], message)
userdata['callback'](client, userdata['userdata'],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a breaking change. Existing user of subscribe.callback will be required to change the signature of their callback function to accept an additional parameter "lock".

The idea I see to avoid any breaking change and still support this feature (while simple() still just call callback()) would be to add support for a special exception "_StopSubscriber" that the user callback could raise to disconnect. The callback "_on_message_simple" could then raise that exception, this would continue to make simple() a normal user of callback().
So here we could do:

try:
  userdata['callback'](client, userdata['userdata'], message)
except _StopSubscriber:
  lock.release()

I suggest to name it "_StopSubscriber" because I would like to kept the existence of this special exception undocumented/unsupported for now, because I think the true solution would be to call "client.disconnect()" instead of this exception. But doing the disconnect will not work because currently the received message is acknowledged after the on_message callback so disconnecting from inside the callback will cause the acknowledge to be dropped.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I thought that since it is an internal function I could change the signature.

I could very simply determine whether userdata['lock'] is set, and if not call with the original signature instead. This would keep it compatible and would simply add the new functionality.

Using an exception here is not something I'd like to do. An exception always models something that is not part of the normal execution path, and in addition, is normally quite a bit slower. I think the approach with checking the value of 'lock' would be better. Or is there another motivation for using the exception?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not only an internal function or I did I missunderstood the code ? It's the callback passed to subscribe.callback() which is user provided.

In subscribe.simple() I agree it's internal, but we added timeout & the lock also on subscribe.callback().

I just realize we recently added a user_data_get on client. It's a bit broken in that use-case, since it will return the "internal" userdata instead of the user-userdata (userdata["userdata"]). But that probably solve our issue. We can access to userdata["lock"] by using client.user_data_get()["lock"]. This avoid any change in callback signature.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants