-
Notifications
You must be signed in to change notification settings - Fork 25
Consuming
Warning: Changes in progress! (03/11)
The previous chapter showed one way of receiving messages in Amqp, by using the basic.get command. Whilst this is programmatically convenient it's not particularly efficient, because the server has to wait for clients to instruct it to deliver messages, one at a time. A more efficient solution would be for the broker to deliver messages at any time, as soon as they arrive at the broker, and this is what consuming is. There are 2 adjacent concepts in play here - consume session and event loop. A consume session is an agreement between the amqp client and broker about delivering messages, one is started by the client sending basic.consume and stopped by the client sending basic.cancel. On the other hand, an event loop is programming construct which allows you to receive messages, so it's easy to see these 2 concepts are closely related. Never the less, it is possible to start a consume session without going in to an event loop (and vice-versa, although you'd never want to do this), and it's possible to exit an event loop without closing a consume session.
Event Loops are not specific to Amqphp, these have been used for a variety of purposes for many years.
In order to consume, you need an object to act as an end point for
deliveries, this must be an object which implelements the
\amqphp\Consumer
interface. You add your consumer to a channel (a
channel can have many consumers) using the
\amqphp\Channel->addConsumer($c)
method, or by embedding a
<consumer>
tag in inside a <channel>
tag in your Factory XML
configuration file.
Next, you need to create an \amqphp\EventLoop
object to manage the
event loop which will listen for incoming messages. The EventLoop
class supports listening to multiple connections, so you can call
\amqphp\EventLoop->addConnection($c)
with as many broker connections
as you need.
To start consuming you call amqphp\EventLoop->select()
, this
triggers the amqp consumer setup sequence and enters the event loop.
Performing the amqp setup means sending a basic.consume method to the
broker for each consumer, this is done for you by the EventLoop, which
triggers each connected Channel
to call getConsumeMethod()
on each
\amqphp\Consumer
object - these are collected for all consumers and
forwarded to the broker en masse. What happens next depends on the
exit strategies that are set on each connection, by default there are
no exit strategies set which means the default behavior is to wait
indefinitely for and deliver incoming messages, forever. (As an
aside, you can begin the broker consume session manually by calling
Channel->startAllConsumers()
and then enter the event loop at a
later time - the basic.consume messages won't be re-sent).
Once the consumer is connected and the event loop has started, any
incoming messages will be delivered to the appropriate consumer
object's handleDelivery()
method. The incoming message is passed in
as the first parameter, this is a standard \amqphp\wire\Method
object, the channel that received the message is the second parameter.
If you chose in your basic.consume params (returned from
Consumer->getConsumeMethod()) to ack the incoming messages, you can
return one or more the following 'signals':
- \amqphp\CONSUMER_ACK - this const is a signal to the Channel to respond with basic.ack (multiple=false)
- \amqphp\CONSUMER_REJECT - this const is a signal to the Channel to respond with basic.reject (requeue=true)
- \amqphp\CONSUMER_DROP - this const is a signal to the Channel to respond with basic.reject (requeue=false)
- \amqphp\CONSUMER_CANCEL - this const is a signal to the Channel to respond with basic.cancel (no-wait=false)
The CONSUMER_CANCEL signal tells Amqphp to send the basic.cancel message to the broker - this stops a single consume session. Subsequent incoming messages for this consumer are rejected automatically. Note that this only cancels the consume session for that one consumer, other consumers will continue receiving messages. You can return multiple signals, as an array, for example you could pass array(CONSUMER_ACK, CONSUMER_CANCEL) to ack the current message and close the consume session
In Amqphp, the default behaviour of a Connection in an event loop
is to wait indefinitely for incoming messages, and to never return.
Clearly, this behaviour is highly undesirable in many circumstances,
you need to be able to get out of an event loop either after a set
time period has elapsed or after a certain set of messages has been
received. Amqphp achieves this by allowing you to add exit
strategies to each Connection
that is involved in the event loop,
these do one or two things:
- Set a timeout on the system
select
call so that this exits after a set period - After a particular set of incoming messages have been delivered, the strategy is able to trigger the event loop to exit.
Strategies are added to a connection with the
amqphp\Connection->pushExitStrategy()
method, the first argument is
a const flag which represents the type of strategy, and subsequent
arguments depend on the first. Here's a list of strategies and their
additional arguments:
- \amqphp\STRAT_TIMEOUT_ABS - Absolute timeout. The second and
third parameters to
pushExitStrategy()
are $epoch, $usecs, these specify an absolute end time for the select loop. All messages which are received before this cut-off point are delivered to their respective consumers. Example:$conn->pushExitStrategy(\amqphp\STRAT_TIMEOUT_ABS, 1317213963, 250000)
- \amqphp\STRAT_TIMEOUT_REL - Relative timeout. The same as
absolute timeout, except the end point is calculated relative to
the current time just before the even loop starts. Example:
$conn->pushExitStrategy(\amqphp\STRAT_TIMEOUT_REL, 5, 250000)
- this means the loop will exit after 5.25 seconds. - \amqphp\SELECT_MAXLOOPS - Return after a set number of event loops, second arg is number of loops (integer). This method will block indefinitely for up to N iterations of the event loop. Note that this isn't necessarily the same as "N messages received", particularly if you use a high prefetch-count value.
- \amqphp\SELECT_CALLBACK - The second argument to
pushExitStrategy()
is a callable function which returns a Boolean, if this returns true then the loop will exit. An optional third parameter specifies additional arguments to the callback, these will be passed to the callback using PHP'scall_user_func_array
. The loop will block indefinitely, after each message is received the callback function is polled. - \amqphp\SELECT_COND - Infinite loop with conditional exit. No
timeout is specified, but after each event loop, the Connection
object polls all connected channels to see if they are able to
consume more messages, if none of the channels indicate they
expect more messages, the strategy exits. A channel is considered
to "expect messages" if it has at least one connected consumer, or
if there are pending acknowledgements for RabbitMQ "publisher
confirms". Recall that a consumer can return the
\amqphp\CONSUMER_CANCEL
signal from thehandleDelivery()
method, when all consumers on a particular channel remove themselves in this way, the channel goes from "expecting" messages to "not expecting" messages. See Chapter 6 for more information on "publisher confirms".
You can add more than one exit strategy to a Connection, these then form a "chain of responsibility". After each loop, the strategies are consulted in turn (in the same order they were added to the connection) such that each strategy is passed the result of the previous one, and can either override or accept that result. A useful application here is to allow a loop to exit before a specific timeout depending on messages received. Suppose you set up a consume session to receive 5 messages, but you don't want to wait more than 2 seconds to receive all 5. If you added STRAT_TIMEOUT_REL then SELECT_COND you could achieve this - once all 5 messages are received your Consumer would return CONSUMER_CANCEL, and this would be picked up by the SELECT_COND strategy which would then trigger the loop to exit. On the other hand, if all 5 messages were not received before the 2 second timeout, the STRAT_TIMEOUT_REL strategy would trigger the loop to exit.
Although exit strategies are set on a per-connection basis, an EventLoop which contains multiple connections can have different exit strategies for each connection. If a single connection exits the loop, the EventLoop will stop receiving messages for that connection, but continue receiving for others.
There are 2 ways to consume messages, using an \amqphp\EventLoop
object, or directly on a connection using the select()
method:
// (1) Listen on a single connection
$conn->pushExitStrategy(\amqphp\SELECT_COND);
$conn->select(); // Enter a select loop which contains only $conn
// (2) Listen on multiple connections
$ev = new \amqphp\EventLoop;
$ev->addConnection($conn1);
$ev->addConnection($conn2);
$ev->select(); // Enter a select loop which contains 2 connections
These 2 methods share the same implementation, the first approach exists as a convenience helper method.
By far the most common reason for using the select()
methods is to
receive incoming messages whilst consuming, but this is not the only
use case. If you're publishing messages with the mandatory or
immediate flags set, then there's a possibility these may be returned
to you, via. the ChannelEventHandler object as outlined in Chapter 3.
In order to receive these messages you have to call select()
(either
of the above methods will work), in this case you need to make sure
you've got an appropriate exit strategy set. If you're listening out
for rejected messages, then I'd recommend using one of the timeout
strategies, if you're waiting for RabbitMQ "publish confirms" then you
could use either a timeout or the SELECT_COND strategy.
Previous Chapter : A simple message reader