-
Notifications
You must be signed in to change notification settings - Fork 25
Consuming
The previous chapter showed one way of receiving messages in Amqp, by using the basic.get command. Whilst this is convenient it's not particularly efficient, because the server has to wait for clients to instruct it to send messages (one at a time). A more efficient solution would be for the broker to deliver messages at any time (as soon as they arrive at the broker), and this is what consuming is. Consuming is started by a client sending a basic.consume method, this tells the broker that it can deliver messages to the client at any time. The client then listens for incoming messages, and responds to these with basic.ack and basic.reject, as outlined in the previous chapter.
The requirement for the consumer to listen for incoming messages means that Amqphp has a couple of different event loop implementations. An event loop uses the system select function to wait for incoming messages, as soon as one arrives the select function returns and Amqphp can deliver the incoming message. Here's some pseudo-code for a very simple event loop
1. fp = socket_open()
2. while (true) {
3. if (select(fp)) {
4. // A message was received.
5. }
6. }
The select call happens on line 3, the program will pause indefinitely at this point and only continue when an incoming message is received. In many cases, you don't want to enter an infinite loop (for example on a web server), so Amqphp provides a variety of "exit strategies" to allow you to control how long the select loop waits
In order to consume, you need a class which acts as an end point for
deliveries, this must be an instance of \amqphp\Consumer
. You add
your consumer to a channel (a channel can have many consumers) using
the \amqphp\Channel->addConsumer($c)
method, or by embedding a
<consumer>
tag in inside a <channel>
tag in your Factory XML
configuration file.
Next, you need to create an \amqphp\EventLoop
object to manage the
event loop which will listen for incoming messages. The EventLoop
class supports listening to multiple connections, so you can call
\amqphp\EventLoop->addConnection($c)
for as many broker connections
as you need. To enter the event loop, you call select()
on the
EventLoop, doing so will trigger Amqphp to start the consume
session on the broker. One of the \amqphp\Consumer
functions is
getConsumeMethod()
- this must return a assoc array of basic.consume
properties, Amqphp calls getConsumeMethod during the consume setup
sequence and passes these to the broker, so you can customise the
basic.consume parameters on a per-consumer basis.
It's worth noting at this point that the API usage pattern is different for consuming that for sending simpler amqp commands, this is because the Amqphp API sends the low-level consume messages for you.
Once the consumer is connected and the event loop has started, any
incoming messages will be delivered to the appropriate consumer
object's handleDelivery()
method. The incoming message is passed in
as the first parameter, this is a standard \amqphp\wire\Method
object, the channel that received the message is the second parameter.
If you chose in your basic.consume params (returned from
Consumer->getConsumeMethod()) to ack the incoming messages, you can
return one or more the following 'signals':
- \amqphp\CONSUMER_ACK - this const is a signal to the Channel to respond with basic.ack (multiple=false)
- \amqphp\CONSUMER_REJECT - this const is a signal to the Channel to respond with basic.reject (requeue=true)
- \amqphp\CONSUMER_DROP - this const is a signal to the Channel to respond with basic.reject (requeue=false)
- \amqphp\CONSUMER_CANCEL - this const is a signal to the Channel to respond with basic.cancel (no-wait=false)
The CONSUMER_CANCEL const tells Amqphp to send the basic.cancel message - this stops a single consumer. Note that this only cancels the consume session for that one consumer, other consumers will continue receiving messages. Once again, this behaviour is not the same as the standard Amqphp API behaviour. You can return multiple signals, as an array, for example you could pass array(CONSUMER_ACK, CONSUMER_CANCEL) to ack the current message and close the consume session
There's a complete example of how to consume messages in the Amqphp demos folder.
Exit strategies are Amqphp's way of making it easier to work with event loops, these are nothing to do with the Amqp protocol, they're purely a part of the Amqphp API. The basic idea here is that, in many cases, you don't want to sit in an event loop indefinitely, you often want to exit the loop, either under a certain set of circumstances, or after a given period has elapsed.
You set the exit strategy on a per-connection basis using the
\amqphp\Connection->setSelectMode()
method, you specify the exit
strategy as the first parameter and any other strategy-dependent
parameters after. The available strategies are represented as consts,
and are as follows:
- \amqphp\SELECT_TIMEOUT_ABS - Absolute timeout. The second and
third parameters to
setSelectMode()
are $epoch, $usecs, these specify an absolute end time for the select loop. All messages which are received before this cut-off point are delivered to their respective consumers. Example:$conn->setSelectMode(\amqphp\SELECT_TIMEOUT_ABS, 1317213963, 250000)
- \amqphp\SELECT_TIMEOUT_REL - Relative timeout. The same as
absolute timeout, except the end point is calculated relative to
the current time just before the even loop starts. Example:
$conn->setSelectMode(\amqphp\SELECT_TIMEOUT_REL, 5, 250000)
- this means the loop will exit after 5.25 seconds. - \amqphp\SELECT_MAXLOOPS - Return after a set number of event loops, second arg is number of loops (integer). This method will block indefinitely for up to N iterations of the event loop. If you only use a single connection, this usually means the same as "exit after N messages have been received". Note that if you use an amqp heartbeat then receiving the heartbeat will be counted as an incoming message.
- \amqphp\SELECT_CALLBACK - The second argument to
setSelectMode()
is a callable function which returns a Boolean, if this returns true then the loop will exit. An optional third parameter specifies additional arguments to the callback, these will be passed to the callback using PHP'scall_user_func_array
. The loop will block indefinitely, after each message is received the callback function is polled. - \amqphp\SELECT_COND - Infinite loop with conditional exit. No
timeout is specified, but after each message is received, the
Connection object polls all connected channels to see if they are
able to consume more messages, if none of the channels indicate
they expect more messages, the loop exits. A channel is
considered to "expect messages" if they have at least one
connected consumer, or if there are pending acknowledgements for
RabbitMQ "publisher confirms". Recall that a consumer can return
the
\amqphp\CONSUMER_CANCEL
signal from thehandleDelivery()
method, when all consumers on a particular channel remove themselves in this way, the channel goes from "expecting" messages to "not expecting" messages. See Chapter 6 for more information on "publisher confirms". - \amqphp\SELECT_INFINITE - Infinite loop, the loop never exits.
The event loop will only end if the underlying TCP connection
breaks, or if the script is interrupted using OS signals
(i.e. SIGINT etc). Make sure you set the
Connection->signalDispatch
flag to true, and set up a standard signal handler function using the php pcntl functions.
Although exit strategies are set on a per-connection basis, an EventLoop which contains multiple connections can have different exit strategies for each connection. If a single connection exits the loop, the EventLoop will stop receiving messages for that channel, but continue receiving for other connected channels.
Previous Chapter : A simple message reader