Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] First implementation of a new consumer API #118

Open
wants to merge 3 commits into
base: master
Choose a base branch
from

Conversation

lumasepa
Copy link

First implementation of a new consumer API, as I propose in #87

  • Added Consumer class to handle messages consumsion
  • Added consumer queues in channels to wire the Consumer with the Channel
  • Fixed all test
  • Added test of stop of consumsion
  • Fixed documentation
  • Updated examples
  • Updated version to 0.9.0 as the API breaks from 0.8.2

I would like a review from polyconseil and some feedback, I think that this API can be a good one.

Sergio Medina Toledo added 2 commits October 27, 2016 07:52
- Added Consumer class to handle messages consumsion
- Added consumer queues in channels to wire the Consumer with the Channel
- Fixed all test
- Need test of stop of consumsion, documentation and update of examples
@dzen
Copy link
Contributor

dzen commented Nov 14, 2016

Hello @lumasepa

Thank you for this huge contribution. Using an asyncio.Queue was the first design of aioamqp, and I removed it.

The first things is that when on high load, RabbitMQ will fill your entire ram of messages, how can we correctly avoid it ?

@lumasepa
Copy link
Author

I know the problem, you are rigth, what about using a limited size Queue (the size can be a parameter of the basic_consume method) and in case of filling it we can use the AMQP basic.reject with requeue set to true. The standar describes basic.reject as the next and I think that it can solve this problem.

  • The server SHOULD be capable of accepting and process the Reject method while sending message content with a Deliver or Get-Ok method. I.e. the server should read and process incoming methods while sending output frames. To cancel a partially-send content, the server sends a content body frame of size 1 (i.e. with no data except the frame-end octet).
  • The server SHOULD interpret this method as meaning that the client is unable to process the message at this time.
  • The client MUST NOT use this method as a means of selecting messages to process.
  • The server MUST NOT deliver the message to the same client within the context of the current channel. The recommended strategy is to attempt to deliver the message to an alternative consumer, and if that is not possible, to move the message to a dead-letter queue. The server MAY use more sophisticated tracking to hold the message on the queue and redeliver it to the same client at a later stage.

What you think about it, @dzen ?

@lumasepa
Copy link
Author

Other option is the limited size Queue and raising an exception in case of fulling it. That allows the user to choose what to do with this situation, for example stop consuming or stop the flow of the channel.

@RemiCardona
Copy link
Contributor

Hello @lumasepa, I'm sorry for the late reply. @dzen and I talked about your PR and we have a few comments:

  • the API you propose is definitely the kind we want to see landing in aioamqp. The current callback mechanism was a mistake.
    Though, to be honest, we're not really fond of the fetch_message() and get_message() calls. Why not wait and return the message directly? Here's what that would look like:
    for channel, body, envelope, properties in (yield from consumer.fetch_message()):
        print(body)

Better yet, maybe it's possible to yield from the consumer object directly. That would be even more user friendly:

    for channel, body, envelope, properties in (yield from consumer):
        print(body)
  • the PR uses syntax only available in python 3.5 and up, which leads to SyntaxError : https://travis-ci.org/Polyconseil/aioamqp/jobs/175646689#L231. For the foreseeable future, aioamqp must remain compatible with 3.4.
  • about the asyncio Queue: it should be configured with the qos prefetch value, to restrict memory consumption when connected to a misbehaving server. It will require some tests, of course. Though the whole queue idea is an independent improvement to properly support prefetch (which is not supported ATM, since we read messages one by one). Maybe it should be brought in through a separate patch? Food for thoughts…
  • if there's any way to keep compatibility on the basic_consume() method, that'd be great. Though if it makes things way too complex, we could forget about it as aioamqp is still in the pre-1.0 phase.

@lumasepa
Copy link
Author

Hello, I'm going to answer your points in order

  • I think that the API that you propose is cleaner and better but AFAIK is not implementable, the next code is the same as your snipet
message = yield from consumer.fetch_message()
for channel, body, envelope, properties in message:
    print(body)

as you can see the asyncronous operation is done only once (not each time that the for iterates) so is not the same behaviour that the while, fetch_message and get_message API have. For this pattern was created the __aiter__ and __anext__ protocol in conjuntion with the async for in python 3.5.

  • I'm setting up tox to test the code in local using python 3.3, 3.4 and 3.5, for now I only have tested it with 3.5. When I have that done I will solve the 3.5 dependant code problem. Also this problem is easy to solve, is only a pair of async and await in the Consumer class
  • I don't understand this point, AFAIK the qos_prefetch is the number of messages that you can get from the broker without acknowledge, so if I put 10 as qos_prefetch parameter the broker can send to the consumer 10 messages and when the consumer have 10 messages without acknowledge the broker stops the sending of messages to the consumer, when the consumer ack one of the messages the broker can send one more message to the consumer. So if the basic.consume is declared with the no-ack flag this is not aplicable. And if we limit the queue to the prefetch limit it can happend that the consume is no-ack and we get more messages than the limit that the queue supports, and thats a problem. But i'm with you that we should restrict the size of the Queue the thing is what to do when the queue is full. Maybe i'm missing something in this point if is that the case can you explain it better.
  • We can try our best to keep compatible the basic_consume() but if we maintain the callback API we are not fixing the problem completly, if you really want to maintain the callback API we can create a new method create_consumer() that have the new API and returns a Consumer object.

@lumasepa lumasepa force-pushed the feature/new-consumer-api branch from 60977df to cda3b3a Compare November 16, 2016 15:23
@allan-simon
Copy link

hello what is the status of this PR ?

@hellysmile
Copy link

Hello there! I am working on https://github.com/aio-libs/aioamqp_consumer as separate project, please take a look on it!

Maybe we can cooperate on more feature rich project together!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants