Skip to content

Latest commit

 

History

History
157 lines (119 loc) · 4.47 KB

message_processor.md

File metadata and controls

157 lines (119 loc) · 4.47 KB

Message processor

Message processors and usage examples described in consumption/message_processor Here we just show how to register a message processor service to enqueue. Let's say we have app bundle and a message processor there

Container tag

# src/AppBundle/Resources/services.yml

services:
  app.async.say_hello_processor:
    class: 'AppBundle\Async\SayHelloProcessor'
    tags:
        - { name: 'enqueue.client.processor', topicName: 'aTopic' }
        

The tag has some additional options:

  • topicName [Req]: Tells what topic to consume messages from.
  • queueName: By default message processor does not require an extra queue on broker side. It reuse a default one. Setting the option you can define a custom queue to be used.
  • processorName: By default the service id is used as message processor name. Using the option you can define a custom name.

Topic subscriber

There is a TopicSubscriberInterface interface (like EventSubscriberInterface). It is handy to subscribe on event messages. It allows to keep subscription login and process logic closer to each other.

<?php
namespace AppBundle\Async;

use Enqueue\Client\TopicSubscriberInterface;
use Interop\Queue\PsrProcessor;

class SayHelloProcessor implements PsrProcessor, TopicSubscriberInterface
{
    public static function getSubscribedTopics()
    {
        return ['aTopic', 'anotherTopic'];
    }
}

On the topic subscriber you can also define queue and processor name:

<?php
use Enqueue\Client\TopicSubscriberInterface;
use Interop\Queue\PsrProcessor;

class SayHelloProcessor implements PsrProcessor, TopicSubscriberInterface
{
    public static function getSubscribedTopics()
    {
        return [
            'aTopic' => ['queueName' => 'fooQueue', 'processorName' => 'foo'], 
            'anotherTopic' => ['queueName' => 'barQueue', 'processorName' => 'bar'], 
        ];
    }
}

In the container you can just add the tag enqueue.client.message_processor and omit any other options:

# src/AppBundle/Resources/services.yml

services:
  app.async.say_hello_processor:
    class: 'AppBundle\Async\SayHelloProcessor'
    tags:
        - { name: 'enqueue.client.processor'}

Command subscriber

There is a CommandSubscriberInterface interface which allows to register a command handlers. If you send a message using ProducerV2::sendCommand('aCommandName') method it will come to this processor.

<?php
namespace AppBundle\Async;

use Enqueue\Client\CommandSubscriberInterface;
use Interop\Queue\PsrProcessor;

class SayHelloProcessor implements PsrProcessor, CommandSubscriberInterface
{
    public static function getSubscribedCommand()
    {
        return 'aCommandName';
    }
}

On the command subscriber you can also define additional settings such as queue and processor name:

<?php
use Enqueue\Client\CommandSubscriberInterface;
use Interop\Queue\PsrProcessor;

class SayHelloProcessor implements PsrProcessor, CommandSubscriberInterface
{
    public static function getSubscribedCommand()
    {
        return ['queueName' => 'fooQueue', 'processorName' => 'aCommandName'];
    }
}

There is a possibility to register a command processor which works exclusively on the queue (no other processors bound to it). In this case you can send messages without setting any message properties at all. Here's an example of such a processor:

In the container you can just add the tag enqueue.client.message_processor and omit any other options:

<?php
use Enqueue\Client\CommandSubscriberInterface;
use Interop\Queue\PsrProcessor;

class SayHelloProcessor implements PsrProcessor, CommandSubscriberInterface
{
    public static function getSubscribedCommand()
    {
        return [
           'processorName' => 'the-exclusive-command-name',
           'queueName' => 'the-queue-name',
           'queueNameHardcoded' => true,
           'exclusive' => true,
       ];
    }
}

The same as a topic subscriber you have to tag a processor service (no need to add any options there):

# src/AppBundle/Resources/services.yml

services:
  app.async.say_hello_processor:
    class: 'AppBundle\Async\SayHelloProcessor'
    tags:
        - { name: 'enqueue.client.processor'}

back to index