Skip to content
Robin Harvey edited this page Jan 28, 2012 · 23 revisions

Chapter 5 - Consuming messages

Introduction

The previous chapter showed one way of receiving messages in Amqp, by using the basic.get command. Whilst this is programmatically convenient it's not particularly efficient, because the server has to wait for clients to instruct it to deliver messages, one at a time. A more efficient solution would be for the broker to deliver messages at any time, as soon as they arrive at the broker, and this is what consuming is. There are 2 adjacent concepts in play here - consume session and event loop. A consume session is an agreement between the amqp client and broker about delivering messages, one is started by the client sending basic.consume and stopped by the client sending basic.cancel. On the other hand, an event loop is programming construct which allows you to receive messages, so it's easy to see these 2 concepts are closely related. Never the less, it is possible to start a consume session without going in to an event loop (and vice-versa, although you'd never want to do this), and it's possible to exit an event loop without closing a consume session.

Event Loops are not specific to Amqphp, these have been used for a variety of purposes for many years.

Basic consumer life-cycle

In order to consume, you need an object to act as an end point for deliveries, this must be an object which implelements the \amqphp\Consumer interface. You add your consumer to a channel (a channel can have many consumers) using the \amqphp\Channel->addConsumer($c) method, or by embedding a <consumer> tag in inside a <channel> tag in your Factory XML configuration file.

Next, you need to create an \amqphp\EventLoop object to manage the event loop which will listen for incoming messages. The EventLoop class supports listening to multiple connections, so you can call \amqphp\EventLoop->addConnection($c) with as many broker connections as you need.

To start consuming you call amqphp\EventLoop->select(), this triggers the amqp consumer setup sequence and enters the event loop. Performing the amqp setup means sending a basic.consume method to the broker for each consumer, this is done for you by the EventLoop, which triggers each connected Channel to call getConsumeMethod() on each \amqphp\Consumer object - these are collected for all consumers and forwarded to the broker en masse. If you prefer, you can begin the broker consume session manually by calling Channel->startAllConsumers() and then enter the event loop at a later time - the basic.consume messages won't be re-sent. What happens next depends on the exit strategies that are set on each connection, by default there are no exit strategies set which means the default behavior is to wait indefinitely for and deliver incoming messages, forever.

Once the consumer is connected and the event loop has started, any incoming messages will be delivered to the appropriate consumer object's handleDelivery() method. The incoming message is passed in as the first parameter, this is a standard \amqphp\wire\Method object, the channel that received the message is the second parameter. If you chose in your basic.consume params (returned from Consumer->getConsumeMethod()) to ack the incoming messages, you can return one or more the following 'signals':

  • \amqphp\CONSUMER_ACK - this const is a signal to the Channel to respond with basic.ack (multiple=false)
  • \amqphp\CONSUMER_REJECT - this const is a signal to the Channel to respond with basic.reject (requeue=true)
  • \amqphp\CONSUMER_DROP - this const is a signal to the Channel to respond with basic.reject (requeue=false)
  • \amqphp\CONSUMER_CANCEL - this const is a signal to the Channel to respond with basic.cancel (no-wait=false)

The CONSUMER_CANCEL signal tells Amqphp to send the basic.cancel message to the broker - this stops a single consume session. Subsequent incoming messages for this consumer are rejected automatically. Note that this only cancels the consume session for that one consumer, other consumers will continue receiving messages. You can return multiple signals, as an array, for example you could pass array(CONSUMER_ACK, CONSUMER_CANCEL) to ack the current message and close the consume session

Event loop Exit strategies.

In Amqphp, the default behaviour of a Connection in an event loop is to wait indefinitely for incoming messages, and to never return. Clearly, this behaviour is highly undesirable in many circumstances. Amqphp allows you to add exit strategies to each Connection that is involved in the event loop, these do one or two things:

  1. Set a timeout on the event loop so that it exits after a set period
  2. Trigger the event loop to exit under a particular set of circumstances, once a set of messages have been delivered.

Strategies are added to a connection with the amqphp\Connection->pushExitStrategy() method, the first argument is a const flag which represents the type of strategy, and subsequent arguments depend on the first. Here's a list of strategies and their additional arguments:

  • \amqphp\STRAT_TIMEOUT_ABS - Absolute timeout. The second and third parameters to pushExitStrategy() are $epoch, $usecs, these specify an absolute end time for the select loop. All messages which are received before this cut-off point are delivered to their respective consumers. Example: $conn->pushExitStrategy(\amqphp\STRAT_TIMEOUT_ABS, 1317213963, 250000)
  • \amqphp\STRAT_TIMEOUT_REL - Relative timeout. The same as absolute timeout, except the end point is calculated relative to the current time just before the even loop starts. Example: $conn->pushExitStrategy(\amqphp\STRAT_TIMEOUT_REL, 5, 250000) - this means the loop will exit after 5.25 seconds.
  • \amqphp\STRAT_MAXLOOPS - Return after a set number of event loops, second arg is number of loops (integer). This strategy will block indefinitely for up to N iterations of the event loop. Note that this isn't necessarily the same as "N messages received", particularly if you use a high prefetch-count value.
  • \amqphp\STRAT_CALLBACK - The second argument to pushExitStrategy() is a callback function which dictates whether the loop will exit. An optional third parameter specifies additional arguments to the callback, these will be passed to the callback using PHP's call_user_func_array. This strategy will block indefinitely until the callback function returns true, the callback is invoked once during each iteration of the event loop - note that this isn't necessarily the same as "called back after each message has been delivered", particularly if you use a high prefetch-count value.
  • \amqphp\STRAT_COND - Infinite loop with conditional exit. No timeout is specified, but after each event loop, the Connection object polls all connected channels to see if they are able to consume more messages, if none of the channels indicate they expect more messages, the strategy exits. A channel is considered to "expect messages" if it has at least one connected consumer, or if there are pending acknowledgements for RabbitMQ "publisher confirms". Recall that a consumer can return the \amqphp\CONSUMER_CANCEL signal from the handleDelivery() method, when all consumers on a particular channel remove themselves in this way, the channel goes from "expecting" messages to "not expecting" messages. See Chapter 6 for more information on "publisher confirms".

You can add more than one exit strategy to a Connection, these then form a "chain of responsibility". After each loop, the strategies are consulted in turn (in the same order they were added to the connection) such that each strategy is passed the result of the previous one, and can either override or accept that result. A typical use-case here is to allow a loop to exit before a specified timeout depending on messages received. Suppose you set up a consume session to receive 5 messages, but you don't want to wait more than 2 seconds to receive all 5. If you added STRAT_TIMEOUT_REL then STRAT_COND you could achieve this - once all 5 messages are received your Consumer would return CONSUMER_CANCEL, and this would be picked up by the STRAT_COND strategy which would then trigger the loop to exit. On the other hand, if all 5 messages were not received before the 2 second timeout, the STRAT_TIMEOUT_REL strategy would trigger the loop to exit.

Although exit strategies are set on a per-connection basis, an EventLoop which contains multiple connections can have different exit strategies for each connection. If a single connection exits the loop, the EventLoop will stop receiving messages for that connection, but continue receiving for others.

Ways to consume, reasons to consume

There are 2 ways to consume messages, using an \amqphp\EventLoop object, or directly on a connection using the select() method:

// (1) Listen on a single connection
$conn->pushExitStrategy(\amqphp\STRAT_COND);
$conn->select(); // Enter a select loop which contains only $conn

// (2) Listen on multiple connections
$ev = new \amqphp\EventLoop;
$ev->addConnection($conn1);
$ev->addConnection($conn2);
$ev->select(); // Enter a select loop which contains 2 connections

These 2 methods share the same implementation, the first approach exists as a convenience helper method.

By far the most common reason for using the select() methods is to receive incoming messages whilst consuming, but this is not the only use case. If you're publishing messages with the mandatory or immediate flags set, then there's a possibility these may be returned to you, via. the ChannelEventHandler object as outlined in Chapter 3. In order to receive these messages you have to call select() (either of the above methods will work), in this case you need to make sure you've got an appropriate exit strategy set. If you're listening out for rejected messages, then I'd recommend using one of the timeout strategies, if you're waiting for RabbitMQ "publish confirms" then you could use either a timeout or the STRAT_COND strategy.

Channel Ack buffering

A feature that was introduced in version 0.9.4 is Channel ack buffering. The basic idea here is that the \amqphp\Channel class can now make use of the amqp basic.ack multiple flag to acknowledge received messages en-masse rather than having to send a single basic.ack for every received message. The motivation for this feature is reduce network overhead for busy consumers; by setting a high buffer value you send ack messages less frequently, thereby reducing network overhead. By default, this feature is disabled.

To use the buffer feature, simply set the ackBuffer field of the \amqphp\Channel object to a suitable value:

$chan = $conn->openChannel();
$chan->ackBuffer = 200; // Send acks after every 200 messages
$chan->addConsumer(...);
$conn->select(); //etc..

Your Consumer will behave as normal, returning one (or more) of the \amqphp\CONSUMER_* consts, but behind the scenes the Channel will save up these messages to be sent at a later time.

Although this feature doesn't take any extra effort to use, it changes the behaviour of consuming channels in some subtle ways, and there are some other consideration to bear in mind:

  1. The current implementation will always flush (n)acks as soon as the response changes from an ack to a reject. For this reason, you can consider the $ackBuffer value to mean "buffer up to $ackBuffer acks"
  2. If your consumer is in an infinite select loop, there may be outstanding acks when the event loop blocks, hence from the point of view of the broker the buffered acks will delay the broker from dequeueing messages.
  3. Setting this value to a number greater than 1 constrains your ability to manage acking manually. Lets say you want to selectively delay acking particular messages for some application-specific reason - normally you would simply not return a value from your Consumer->handleDelivery() handler and the channel would not bother sending the ack. However, if the $ackBuffer is greater than one, subsequent message acks will implicitly ack the delayed message as a result of using the multiple=true flag. If you want to manage acking yourself, then you should set the $ackBuffer to 1, this way the Channel will send ack messages with multiple=false, meaning the delayed messages will remain on the broker.

Point 2 here should be of particular interest, for this reason I would suggest only using this feature for consumers that have predictably high volumes of incoming messages, where the event loop is unlikely to block for very long.

Previous Chapter : A simple message reader

Next Chapter: RabbitMQ Extensions

Documentation home