Skip to content
Robin Harvey edited this page Nov 3, 2011 · 23 revisions

Chapter 5 - Consuming messages

Warning: Changes in progress! (03/11)

Introduction

The previous chapter showed one way of receiving messages in Amqp, by using the basic.get command. Whilst this is programmatically convenient it's not particularly efficient, because the server has to wait for clients to instruct it to deliver messages, one at a time. A more efficient solution would be for the broker to deliver messages at any time, as soon as they arrive at the broker, and this is what consuming is. There are 2 adjacent concepts in play here - consume session and event loop. A consume session is an agreement between the amqp client and broker about delivering messages, one is started by the client sending basic.consume and stopped by the client sending basic.cancel. On the other hand, an event loop is programming construct which allows you to receive messages, so it's easy to see these 2 concepts are closely related. Never the less, it is possible to start a consume session without going in to an event loop (and vice-versa, although you'd never want to do this), and it's possible to exit an event loop without closing a consume session.

Event Loops are not specific to Amqphp, these have been used for a variety of purposes for many years.

Basic consumer life-cycle

In order to consume, you need an object to act as an end point for deliveries, this must be an object which implelements the \amqphp\Consumer interface. You add your consumer to a channel (a channel can have many consumers) using the \amqphp\Channel->addConsumer($c) method, or by embedding a <consumer> tag in inside a <channel> tag in your Factory XML configuration file.

Next, you need to create an \amqphp\EventLoop object to manage the event loop which will listen for incoming messages. The EventLoop class supports listening to multiple connections, so you can call \amqphp\EventLoop->addConnection($c) with as many broker connections as you need.

To start consuming you call amqphp\EventLoop->select(), this triggers the amqp consumer setup sequence and enters the event loop. Performing the amqp setup means sending a basic.consume method to the broker for each consumer, this is done for you by the EventLoop, which triggers each connected Channel to call getConsumeMethod() on each \amqphp\Consumer object - these are collected for all consumers and forwarded to the broker en masse. What happens next depends on the exit strategies that are set on each connection, by default there are no exit strategies set which means the default behavior is to wait indefinitely for and deliver incoming messages, forever. (As an aside, you can begin the broker consume session manually by calling Channel->startAllConsumers() and then enter the event loop at a later time - the basic.consume messages won't be re-sent).

Once the consumer is connected and the event loop has started, any incoming messages will be delivered to the appropriate consumer object's handleDelivery() method. The incoming message is passed in as the first parameter, this is a standard \amqphp\wire\Method object, the channel that received the message is the second parameter. If you chose in your basic.consume params (returned from Consumer->getConsumeMethod()) to ack the incoming messages, you can return one or more the following 'signals':

  • \amqphp\CONSUMER_ACK - this const is a signal to the Channel to respond with basic.ack (multiple=false)
  • \amqphp\CONSUMER_REJECT - this const is a signal to the Channel to respond with basic.reject (requeue=true)
  • \amqphp\CONSUMER_DROP - this const is a signal to the Channel to respond with basic.reject (requeue=false)
  • \amqphp\CONSUMER_CANCEL - this const is a signal to the Channel to respond with basic.cancel (no-wait=false)

The CONSUMER_CANCEL signal tells Amqphp to send the basic.cancel message to the broker - this stops a single consume session. Subsequent incoming messages for this consumer are rejected automatically. Note that this only cancels the consume session for that one consumer, other consumers will continue receiving messages. You can return multiple signals, as an array, for example you could pass array(CONSUMER_ACK, CONSUMER_CANCEL) to ack the current message and close the consume session

Event loop Exit strategies.

In Amqphp, the default behaviour of a Connection in an event loop is to wait indefinitely for incoming messages, and to never return. Clearly, this behaviour is highly undesirable in many circumstances, you need to be able to get out of an event loop either after a set time period has elapsed or after a certain set of messages has been received. Amqphp achieves this by allowing you to add exit strategies to each Connection that is involved in the event loop, these do one or two things:

  1. Set a timeout on the system select call so that this exits after a set period
  2. After a particular set of incoming messages have been delivered, the strategy is able to trigger the event loop to exit.

Strategies are added to a connection with the amqphp\Connection->pushExitStrategy() method, the first argument is a const flag which represents the type of strategy, and subsequent arguments depend on the first. Here's a list of strategies and their additional arguments:

  • \amqphp\STRAT_TIMEOUT_ABS - Absolute timeout. The second and third parameters to pushExitStrategy() are $epoch, $usecs, these specify an absolute end time for the select loop. All messages which are received before this cut-off point are delivered to their respective consumers. Example: $conn->pushExitStrategy(\amqphp\STRAT_TIMEOUT_ABS, 1317213963, 250000)
  • \amqphp\STRAT_TIMEOUT_REL - Relative timeout. The same as absolute timeout, except the end point is calculated relative to the current time just before the even loop starts. Example: $conn->pushExitStrategy(\amqphp\STRAT_TIMEOUT_REL, 5, 250000) - this means the loop will exit after 5.25 seconds.
  • \amqphp\SELECT_MAXLOOPS - Return after a set number of event loops, second arg is number of loops (integer). This method will block indefinitely for up to N iterations of the event loop. Note that this isn't necessarily the same as "N messages received", particularly if you use a high prefetch-count value.
  • \amqphp\SELECT_CALLBACK - The second argument to pushExitStrategy() is a callable function which returns a Boolean, if this returns true then the loop will exit. An optional third parameter specifies additional arguments to the callback, these will be passed to the callback using PHP's call_user_func_array. The loop will block indefinitely, after each message is received the callback function is polled.
  • \amqphp\SELECT_COND - Infinite loop with conditional exit. No timeout is specified, but after each event loop, the Connection object polls all connected channels to see if they are able to consume more messages, if none of the channels indicate they expect more messages, the strategy exits. A channel is considered to "expect messages" if it has at least one connected consumer, or if there are pending acknowledgements for RabbitMQ "publisher confirms". Recall that a consumer can return the \amqphp\CONSUMER_CANCEL signal from the handleDelivery() method, when all consumers on a particular channel remove themselves in this way, the channel goes from "expecting" messages to "not expecting" messages. See Chapter 6 for more information on "publisher confirms".

You can add more than one exit strategy to a Connection, these then form a "chain of responsibility". After each loop, the strategies are consulted in turn (in the same order they were added to the connection) such that each strategy is passed the result of the previous one, and can either override or accept that result. A useful application here is to allow a loop to exit before a specific timeout depending on messages received. Suppose you set up a consume session to receive 5 messages, but you don't want to wait more than 2 seconds to receive all 5. If you added STRAT_TIMEOUT_REL then SELECT_COND you could achieve this - once all 5 messages are received your Consumer would return CONSUMER_CANCEL, and this would be picked up by the SELECT_COND strategy which would then trigger the loop to exit. On the other hand, if all 5 messages were not received before the 2 second timeout, the STRAT_TIMEOUT_REL strategy would trigger the loop to exit.

Although exit strategies are set on a per-connection basis, an EventLoop which contains multiple connections can have different exit strategies for each connection. If a single connection exits the loop, the EventLoop will stop receiving messages for that connection, but continue receiving for others.

Ways to consume, reasons to consume

There are 2 ways to consume messages, using an \amqphp\EventLoop object, or directly on a connection using the select() method:

// (1) Listen on a single connection
$conn->pushExitStrategy(\amqphp\SELECT_COND);
$conn->select(); // Enter a select loop which contains only $conn

// (2) Listen on multiple connections
$ev = new \amqphp\EventLoop;
$ev->addConnection($conn1);
$ev->addConnection($conn2);
$ev->select(); // Enter a select loop which contains 2 connections

These 2 methods share the same implementation, the first approach exists as a convenience helper method.

By far the most common reason for using the select() methods is to receive incoming messages whilst consuming, but this is not the only use case. If you're publishing messages with the mandatory or immediate flags set, then there's a possibility these may be returned to you, via. the ChannelEventHandler object as outlined in Chapter 3. In order to receive these messages you have to call select() (either of the above methods will work), in this case you need to make sure you've got an appropriate exit strategy set. If you're listening out for rejected messages, then I'd recommend using one of the timeout strategies, if you're waiting for RabbitMQ "publish confirms" then you could use either a timeout or the SELECT_COND strategy.

Previous Chapter : A simple message reader

Next Chapter: RabbitMQ Extensions

Documentation home

Clone this wiki locally