Skip to content

Commit

Permalink
Add a new Event, on message sent
Browse files Browse the repository at this point in the history
  • Loading branch information
gggeek committed Nov 3, 2019
1 parent d0f0f38 commit d5f3e5a
Show file tree
Hide file tree
Showing 6 changed files with 78 additions and 3 deletions.
8 changes: 7 additions & 1 deletion Event/EventsList.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,12 @@

final class EventsList
{
/**
* The MESSAGE_RECEIVED event is thrown each time a message is received, before it is processed
* The event listener receives a MessageReceivedEvent instance.
*/
const MESSAGE_SENT = 'kaliop_queueing.message_sent';

/**
* The MESSAGE_RECEIVED event is thrown each time a message is received, before it is processed
* The event listener receives a MessageReceivedEvent instance.
Expand Down Expand Up @@ -36,4 +42,4 @@ final class EventsList
* The event listener receives a ProcessStoppedEvent instance.
*/
const PROCESS_STOPPED = 'kaliop_queueing.process_stopped';
}
}
36 changes: 36 additions & 0 deletions Event/MessageSentEvent.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
<?php

namespace Kaliop\QueueingBundle\Event;

use Symfony\Component\EventDispatcher\Event;
use Kaliop\QueueingBundle\Queue\MessageConsumerInterface;
use Kaliop\QueueingBundle\Queue\MessageInterface;

class MessageSentEvent extends Event
{
protected $msgBody;
protected $routingKey;
protected $additionalProperties;

public function __construct($msgBody, $routingKey, $additionalProperties)
{
$this->msgBody = $msgBody;
$this->routingKey = $routingKey;
$this->additionalProperties = $additionalProperties;
}

public function getMessageBody()
{
return $this->msgBody;
}

public function getRoutingKey()
{
return $this->routingKey;
}

public function getAdditionalProperties()
{
return $this->additionalProperties;
}
}
2 changes: 2 additions & 0 deletions Resources/config/services.yml
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ services:
#arguments: [ '@kaliop_queueing.driver.rabbitmq' ]
# nb: we assign by default no queue to this service, otherwise it would look like the following:
#calls: [ setQueueName, [ queue_name ] ]
calls:
- [ setDispatcher, [ '@kaliop_queueing.event_dispatcher' ] ]

# Produces 'generic' messages - message encoding is left to the application
kaliop_queueing.message_producer.generic_message:
Expand Down
28 changes: 26 additions & 2 deletions Service/MessageProducer.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@
namespace Kaliop\QueueingBundle\Service;

use Kaliop\QueueingBundle\Adapter\DriverInterface;
use Kaliop\QueueingBundle\Event\MessageSentEvent;
use Kaliop\QueueingBundle\Event\EventsList;
use Kaliop\QueueingBundle\Queue\MessageProducerInterface;
use Symfony\Component\EventDispatcher\EventDispatcherInterface;

/**
* A helper class, exposed as service.
Expand All @@ -25,6 +28,7 @@ abstract class MessageProducer implements MessageProducerInterface
);
/** @var DriverInterface */
protected $driver;
protected $dispatcher;

/**
* @param DriverInterface $driver
Expand All @@ -45,6 +49,11 @@ public function setDriver(DriverInterface $driver)
return $this;
}

public function setDispatcher(EventDispatcherInterface $dispatcher = null)
{
$this->dispatcher = $dispatcher;
}

/**
* NB: when used for RabbitMQ, the queue name is the name of the producer as defined in old_sound_rabbit_mq.producers,
* it is not the name of the actual amqp queue
Expand Down Expand Up @@ -135,7 +144,13 @@ protected function doPublish($data, $routingKey = '', $extras = array())
{
$producer = $this->getProducerService();
$producer->setContentType($this->getContentType());
$producer->publish($this->encodeMessageBody($data), $routingKey, $extras);
$body = $this->encodeMessageBody($data);
$producer->publish($body, $routingKey, $extras);

if ($this->dispatcher) {
$event = new MessageSentEvent($body, $routingKey, $extras);
$this->dispatcher->dispatch(EventsList::MESSAGE_SENT, $event);
}
}

/**
Expand All @@ -151,14 +166,23 @@ protected function doBatchPublish(array $data, $routingKey = '', $extras = array
$producer = $this->getProducerService();
$producer->setContentType($this->getContentType());
$messages = array();
$events = array();
foreach($data as $key => $element) {
$body = $this->encodeMessageBody($element);
$messages[] = array(
'msgBody' => $this->encodeMessageBody($element),
'msgBody' => $body,
'routingKey' => $routingKey[$key],
'additionalProperties' => $extras
);
$events[] = new MessageSentEvent($body, $routingKey, $extras);
}

$producer->batchPublish($messages);

if ($this->dispatcher) {
foreach ($events as $event) {
$this->dispatcher->dispatch(EventsList::MESSAGE_RECEIVED, $event);
}
}
}
}
5 changes: 5 additions & 0 deletions news.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
# Ver 0.10

* NEW: the MessageProducer now emits an event whenever a message is sent. Useful f.e. for logging / troubleshooting


# Ver 0.9

* NEW: allow forcing `enhanceSigchildCompatibility` to the Symfony Process component.
Expand Down
2 changes: 2 additions & 0 deletions readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ Support for other messaging systems is available in separate bundles:

* An event: kaliop_queueing.message_consumption_failed, which your services can listen to by usage of tag kaliop_queueing.event_listener

* An event: kaliop_queueing.message_sent, which your services can listen to by usage of tag kaliop_queueing.event_listener

* A console command used to consume messages, similar to the rabbitmq:consumer command but with more options, such as
support for multiple driver and timeouts

Expand Down

0 comments on commit d5f3e5a

Please sign in to comment.