Skip to content

Commit

Permalink
Improve the handling of routing keys when sending batch messages
Browse files Browse the repository at this point in the history
  • Loading branch information
gggeek committed Nov 3, 2015
1 parent 4fb5fa3 commit ef04a93
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 5 deletions.
12 changes: 10 additions & 2 deletions Service/MessageProducer.php
Original file line number Diff line number Diff line change
Expand Up @@ -138,15 +138,23 @@ protected function doPublish($data, $routingKey = '', $extras = array())
$producer->publish($this->encodeMessageBody($data), $routingKey, $extras);
}

/**
* @param array $data
* @param string|array $routingKey if an array, it must have the same keys as $data
* @param array $extras
*/
protected function doBatchPublish(array $data, $routingKey = '', $extras = array())
{
if (is_string($routingKey)) {
$routingKey = array_fill_keys(array_keys($data), $routingKey);
}
$producer = $this->getProducerService();
$producer->setContentType($this->getContentType());
$messages = array();
foreach($data as $element) {
foreach($data as $key => $element) {
$messages[] = array(
'msgBody' => $this->encodeMessageBody($element),
'routingKey' => $routingKey,
'routingKey' => $routingKey[$key],
'additionalProperties' => $extras
);
}
Expand Down
15 changes: 12 additions & 3 deletions Service/MessageProducer/ConsoleCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,23 @@ public function publish($command, $arguments = array(), $options = array(), $rou
$this->doPublish($msg, $routingKey, $extras);
}


public function batchPublish($messages, $routingKey = null, $ttl = null)
/**
* @param array $messages for each item: command, arguments, options
* @param null $routingKey
* @param null $ttl
*/
public function batchPublish(array $messages, $routingKey = null, $ttl = null)
{
$extras = array();
if ($ttl) {
$extras = array('expiration' => $ttl * 1000);
}

if ($routingKey === null) {
$routingKey = array();
foreach($messages as $key => $message) {
$routingKey[$key] = $this->getRoutingKey($message['command'], @$message['arguments'], @$message['options']);
}
}
$this->doBatchPublish($messages, $routingKey, $extras);

}
Expand Down

0 comments on commit ef04a93

Please sign in to comment.