Skip to content
Robin Harvey edited this page Sep 28, 2011 · 23 revisions

Chapter 5 - Consuming messages

Introduction

The previous chapter showed one way of receiving messages in Amqp, by using the basic.get command. Whilst this is convenient it's not particularly efficient, because the server has to wait for clients to instruct it to send messages (one at a time). A more efficient solution would be for the broker to deliver messages at any time (as soon as they arrive at the broker), and this is what consuming is. Consuming is started by a client sending a basic.consume method, this tells the broker that it can deliver messages to the client at any time. The client then listens for incoming messages, and responds to these with basic.ack and basic.reject, as outlined in the previous chapter.

The requirement for the consumer to listen for incoming messages means that Amqphp has a couple of different event loop implementations. An event loop uses the system select function to wait for incoming messages, as soon as one arrives the select function returns and Amqphp can deliver the incoming message. Here's some pseudo-code for a very simple event loop

1. fp = socket_open()
2. while (true) {
3.     if (select(fp)) {
4.         // A message was received.
5.     }
6. }

The select call happens on line 3, the program will pause indefinitely at this point and only continue when an incoming message is received. In many cases, you don't want to enter an infinite loop (for example on a web server), so Amqphp provides a variety of "exit strategies" to allow you to control how long the select loop waits

Basic consumer life-cycle

In order to consume, you need a class which acts as an end point for deliveries, this must be an instance of \amqphp\Consumer. You add your consumer to a channel (a channel can have many consumers) using the \amqphp\Channel->addConsumer($c) method, or by embedding a <consumer> tag in inside a <channel> tag in your Factory XML configuration file.

Next, you need to create an \amqphp\EventLoop object to manage the event loop which will listen for incoming messages. The EventLoop class supports listening to multiple connections, so you can call \amqphp\EventLoop->addConnection($c) for as many broker connections as you need. To enter the event loop, you call select() on the EventLoop, doing so will trigger Amqphp to start the consume session on the broker. One of the \amqphp\Consumer functions is getConsumeMethod() - this must return a assoc array of basic.consume properties, Amqphp calls getConsumeMethod during the consume setup sequence and passes these to the broker, so you can customise the basic.consume parameters on a per-consumer basis.

It's worth noting at this point that the API usage pattern is different for consuming that for sending simpler amqp commands, this is because the Amqphp API sends the low-level consume messages for you.

Once the consumer is connected and the event loop has started, any incoming messages will be delivered to the appropriate consumer object's handleDelivery() method. The incoming message is passed in as the first parameter, this is a standard \amqphp\wire\Method object, the channel that received the message is the second parameter. If you chose in your basic.consume params (returned from Consumer->getConsumeMethod()) to ack the incoming messages, you can return one or more the following 'signals':

  • \amqphp\CONSUMER_ACK - this const is a signal to the Channel to respond with basic.ack (multiple=false)
  • \amqphp\CONSUMER_REJECT - this const is a signal to the Channel to respond with basic.reject (requeue=true)
  • \amqphp\CONSUMER_DROP - this const is a signal to the Channel to respond with basic.reject (requeue=false)
  • \amqphp\CONSUMER_CANCEL - this const is a signal to the Channel to respond with basic.cancel (no-wait=false)

The CONSUMER_CANCEL const tells Amqphp to send the basic.cancel message - this stops a single consumer. Note that this only cancels the consume session for that one consumer, other consumers will continue receiving messages. Once again, this behaviour is not the same as the standard Amqphp API behaviour. You can return multiple signals, as an array, for example you could pass array(CONSUMER_ACK, CONSUMER_CANCEL) to ack the current message and close the consume session

There's a complete example of how to consume messages in the Amqphp demos folder.

Event loops and Exit strategies.

Exit strategies are Amqphp's way of making it easier to work with event loops, these are nothing to do with the Amqp protocol, they're purely a part of the Amqphp API. The basic idea here is that, in many cases, you don't want to sit in an event loop indefinitely, you often want to exit the loop, either under a certain set of circumstances, or after a given period has elapsed.

You set the exit strategy on a per-connection basis using the \amqphp\Connection->setSelectMode() method, you specify the exit strategy as the first parameter and any other strategy-dependent parameters after. The available strategies are represented as consts, and are as follows:

  • \amqphp\SELECT_TIMEOUT_ABS - Absolute timeout. The second and third parameters to setSelectMode() are $epoch, $usecs, these specify an absolute end time for the select loop. All messages which are received before this cut-off point are delivered to their respective consumers. Example: $conn->setSelectMode(\amqphp\SELECT_TIMEOUT_ABS, 1317213963, 250000)
  • \amqphp\SELECT_TIMEOUT_REL - Relative timeout. The same as absolute timeout, except the end point is calculated relative to the current time just before the even loop starts. Example: $conn->setSelectMode(\amqphp\SELECT_TIMEOUT_REL, 5, 250000) - this means the loop will exit after 5.25 seconds.
  • \amqphp\SELECT_MAXLOOPS - Return after a set number of event loops, second arg is number of loops (integer). This method will block indefinitely for up to N iterations of the event loop. If you only use a single connection, this usually means the same as "exit after N messages have been received". Note that if you use an amqp heartbeat then receiving the heartbeat will be counted as an incoming message.
  • \amqphp\SELECT_CALLBACK - The second argument to setSelectMode() is a callable function which returns a Boolean, if this returns true then the loop will exit. An optional third parameter specifies additional arguments to the callback, these will be passed to the callback using PHP's call_user_func_array. The loop will block indefinitely, after each message is received the callback function is polled.
  • \amqphp\SELECT_COND - Infinite loop with conditional exit. No timeout is specified, but after each message is received, the Connection object polls all connected channels to see if they are able to consume more messages, if none of the channels indicate they expect more messages, the loop exits. A channel is considered to "expect messages" if they have at least one connected consumer, or if there are pending acknowledgements for RabbitMQ "publisher confirms". Recall that a consumer can return the \amqphp\CONSUMER_CANCEL signal from the handleDelivery() method, when all consumers on a particular channel remove themselves in this way, the channel goes from "expecting" messages to "not expecting" messages. See Chapter 6 for more information on "publisher confirms".
  • \amqphp\SELECT_INFINITE - Infinite loop, the loop never exits. The event loop will only end if the underlying TCP connection breaks, or if the script is interrupted using OS signals (i.e. SIGINT etc). Make sure you set the Connection->signalDispatch flag to true, and set up a standard signal handler function using the php pcntl functions.

Although exit strategies are set on a per-connection basis, an EventLoop which contains multiple connections can have different exit strategies for each connection. If a single connection exits the loop, the EventLoop will stop receiving messages for that channel, but continue receiving for other connected channels.

Previous Chapter : A simple message reader

Next Chapter: RabbitMQ Extensions

Documentation home

Clone this wiki locally