Skip to content

Latest commit

 

History

History
133 lines (104 loc) · 3.4 KB

message_processor.md

File metadata and controls

133 lines (104 loc) · 3.4 KB

Message processor

The message processor is an object that actually process the message and must return a result status. Here's example:

<?php
use Interop\Queue\PsrProcessor;
use Interop\Queue\PsrMessage;
use Interop\Queue\PsrContext;

class SendMailProcessor implements PsrProcessor
{
    public function process(PsrMessage $message, PsrContext $context) 
    {
        $this->mailer->send('[email protected]', $message->getBody());
        
        return self::ACK;
    }
}

Usually there is no need to catch exceptions. The message broker can detect consumer has failed and redeliver the message. Sometimes you have to reject messages explicitly.

<?php
use Interop\Queue\PsrProcessor;
use Interop\Queue\PsrMessage;
use Interop\Queue\PsrContext;
use Enqueue\Util\JSON;

class SendMailProcessor implements PsrProcessor
{
    public function process(PsrMessage $message, PsrContext $context) 
    {
        $data = JSON::decode($message->getBody());
        if ($user  = $this->userRepository->find($data['userId'])) {
            return self::REJECT;
        }
        
        $this->mailer->send($user->getEmail(), $data['text']);
        
        return self::ACK;
    }
}

It is possible to find out whether the message failed previously or not. There is isRedelivered method for that. If it returns true than there was attempt to process message.

<?php
use Interop\Queue\PsrProcessor;
use Interop\Queue\PsrMessage;
use Interop\Queue\PsrContext;

class SendMailProcessor implements PsrProcessor
{
    public function process(PsrMessage $message, PsrContext $context) 
    {
        if ($message->isRedelivered()) {
            return self::REQUEUE;
        }
        
        $this->mailer->send('[email protected]', $message->getBody());
        
        return self::ACK;
    }
}

The second argument is your context. You can use it to send messages to other queues\topics.

<?php
use Interop\Queue\PsrProcessor;
use Interop\Queue\PsrMessage;
use Interop\Queue\PsrContext;

class SendMailProcessor implements PsrProcessor
{
    public function process(PsrMessage $message, PsrContext $context) 
    {
        $this->mailer->send('[email protected]', $message->getBody());
        
        $queue = $context->createQueue('anotherQueue');
        $message = $context->createMessage('Message has been sent');
        $context->createProducer()->send($queue, $message);
        
        return self::ACK;
    }
}

The consumption component provide some useful extensions, for example there is an extension that makes RPC processing simplier.

<?php
use Interop\Queue\PsrProcessor;
use Interop\Queue\PsrMessage;
use Interop\Queue\PsrContext;
use Enqueue\Consumption\ChainExtension;
use Enqueue\Consumption\QueueConsumer;
use Enqueue\Consumption\Extension\ReplyExtension;
use Enqueue\Consumption\Result;

class SendMailProcessor implements PsrProcessor
{
    public function process(PsrMessage $message, PsrContext $context) 
    {
        $this->mailer->send('[email protected]', $message->getBody());
        
        $replyMessage = $context->createMessage('Message has been sent');
        
        return Result::reply($replyMessage);
    }
}

/** @var \Interop\Queue\PsrContext $psrContext */

$queueConsumer = new QueueConsumer($psrContext, new ChainExtension([
    new ReplyExtension()
]));

$queueConsumer->bind('foo', new SendMailProcessor());

$queueConsumer->consume();

back to index