Skip to content

Commit

Permalink
Add an in-process executor for console commands; fix a fatal error wi…
Browse files Browse the repository at this point in the history
…th invalid message body received; echoback command uses the given output for echoing
  • Loading branch information
gggeek committed Nov 6, 2015
1 parent ef04a93 commit 313eee0
Show file tree
Hide file tree
Showing 8 changed files with 204 additions and 18 deletions.
2 changes: 1 addition & 1 deletion Command/EchoBackCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ protected function execute(InputInterface $input, OutputInterface $output)
" and process with pid " . getmypid() . " on host " . gethostname() . " says: " .
$input->getArgument('input') . "\n";

echo $msg;
$output->write($msg);

$fileName = $input->getOption('file');
if ($fileName != '') {
Expand Down
13 changes: 13 additions & 0 deletions Event/MessageConsumptionFailedEvent.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,19 @@ public function __construct($body, \Exception $exception, MessageConsumerInterfa
$this->consumer = $consumer;
}

/**
* The raw data received from the queueing driver
* @return \Kaliop\QueueingBundle\Queue\MessageInterface
*/
public function getMessage()
{
return $this->consumer->getCurrentMessage();
}

/**
* The decoded data received from the queueing driver
* @return mixed
*/
public function getBody()
{
return $this->body;
Expand Down
16 changes: 16 additions & 0 deletions Resources/config/services.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ parameters:
kaliop_queueing.worker_manager.class: Kaliop\QueueingBundle\Service\WorkerManager
kaliop_queueing.watchdog.class: Kaliop\QueueingBundle\Helper\Watchdog
kaliop_queueing.event_dispatcher.class: Symfony\Component\EventDispatcher\ContainerAwareEventDispatcher
kaliop_queueing.console_event_listener.class: Kaliop\QueueingBundle\Service\ConsoleEventListener

kaliop_queueing.message_producer.class: Kaliop\QueueingBundle\Service\MessageProducer
kaliop_queueing.message_consumer.class: Kaliop\QueueingBundle\Service\MessageConsumer
Expand All @@ -14,6 +15,8 @@ parameters:
kaliop_queueing.message_consumer.console_command.class: Kaliop\QueueingBundle\Service\MessageConsumer\ConsoleCommand
kaliop_queueing.message_consumer.console_command.filter.class: Kaliop\QueueingBundle\Service\MessageConsumer\EventListener\ConsoleCommandFilter

kaliop_queueing.message_consumer.inprocess_console_command.class: Kaliop\QueueingBundle\Service\MessageConsumer\InProcessConsoleCommand

kaliop_queueing.message_producer.symfony_service.class: Kaliop\QueueingBundle\Service\MessageProducer\SymfonyService
kaliop_queueing.message_consumer.symfony_service.class: Kaliop\QueueingBundle\Service\MessageConsumer\SymfonyService
kaliop_queueing.message_consumer.symfony_service.filter.class: Kaliop\QueueingBundle\Service\MessageConsumer\EventListener\SymfonyServiceFilter
Expand Down Expand Up @@ -66,6 +69,12 @@ services:
calls:
- [ setContainer, [ @service_container ] ]

# An event listener used to get a hook into the currently running Application
kaliop_queueing.console_event_listener:
class: %kaliop_queueing.console_event_listener.class%
tags:
- { name: kernel.event_listener, event: console.command, method: onConsoleCommand }

### Producers ###

# The base service - mapped to an abstract class
Expand Down Expand Up @@ -120,6 +129,13 @@ services:
arguments: [ @kaliop_queueing.worker_manager ]
parent: kaliop_queueing.message_consumer

# Executes Sf console commands without spawning new processes
kaliop_queueing.message_consumer.inprocess_console_command:
class: %kaliop_queueing.message_consumer.inprocess_console_command.class%
calls:
- [ setEventListener, [ @kaliop_queueing.console_event_listener ] ]
parent: kaliop_queueing.message_consumer.console_command

# Executes Sf services methods
kaliop_queueing.message_consumer.symfony_service:
class: %kaliop_queueing.message_consumer.symfony_service.class%
Expand Down
25 changes: 25 additions & 0 deletions Service/ConsoleEventListener.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
<?php

namespace Kaliop\QueueingBundle\Service;

use Symfony\Component\Console\Event\ConsoleCommandEvent;

/**
* Keeps a pointer to the executing Application. NB: even if recursive apps are used, it only keeps a ref to the original one
*/
class ConsoleEventListener
{
protected $application;

public function onConsoleCommand(ConsoleCommandEvent $event)
{
if ($this->application == null) {
$this->application = $event->getCommand()->getApplication();
}
}

public function getCurrentApplication()
{
return $this->application;
}
}
1 change: 1 addition & 0 deletions Service/MessageConsumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ protected function decodeAndConsume(MessageInterface $msg)
// save the message, in case child class needs it for whacky stuff
$this->currentMessage = $msg;

$body = null;
try {
$body = $this->decodeMessageBody($msg);

Expand Down
97 changes: 80 additions & 17 deletions Service/MessageConsumer/ConsoleCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ class ConsoleCommand extends MessageConsumer
{
protected $consoleCommand;
protected $application;
protected $eventListener;
/// these will not be used for the executed commands, even if present in received message
protected $filteredOptions = array(
// default symfony console options which make no sense when commands are executed headless
Expand All @@ -33,11 +34,16 @@ public function __construct($consoleManager)
$this->consoleCommand = $consoleManager->getConsoleCommand();
}

public function setApplication(Application $application)
public function setEventListener($listener)
{
$this->application = $application;
$this->eventListener = $listener;
}

/*public function setApplication(Application $application)
{
$this->application = $application;
}*/

/**
* @param array $body
* @return array (positional) retcode, stdout, stderr
Expand All @@ -55,14 +61,18 @@ public function consume($body)
throw new \UnexpectedValueException("Message format unsupported: missing 'command'. Received: " . json_encode($body));
}

if ($this->eventListener) {
$this->application = $this->eventListener->getCurrentApplication();
}

// for a speed/resource gain, we test: if command is not registered, do not try to run it
$this->validateCommand($body['command'], @$body['arguments'], @$body['options']);

return $this->runCommand($body['command'], @$body['arguments'], @$body['options']);
}

/**
* Does some preliminary checks before attempting to run command. , throws if command is blatantly non-runnable.
* Does some preliminary checks before attempting to run command, throws if command is blatantly non-runnable.
* (split as a separate method to better accommodate subclasses)
*
* @param string $consoleCommand
Expand Down Expand Up @@ -95,6 +105,35 @@ protected function runCommand($consoleCommand, $arguments = array(), $options =
{
$command = $this->consoleCommand;

$command .= $this->buildCommandString($consoleCommand, $arguments, $options);

$label = trim(ConsumerCommand::getLabel());
if ($label != '') {
$label = " '$label'";
}

if ($this->logger) {
$this->logger->debug("Console command will be executed from MessageConsumer{$label}: " . $command);
}

$process = new Process($command);
$retCode = $process->run();

$results = array($retCode, $process->getOutput(), $process->getErrorOutput());

if ($retCode != 0 && $this->logger) {
$this->logger->error(
"Console command executed from MessageConsumer{$label} failed. Retcode: $retCode, Error message: '" . trim($results[2]) . "'",
array());
}

return $results;
}

protected function buildCommandString($consoleCommand, $arguments = array(), $options = array())
{
$command = '';

// forced options come before the command proper
foreach ($this->getForcedOptions() as $opt => $value) {
$command .= (strlen($opt) == 1 ? ' -' : ' --') . $opt;
Expand All @@ -108,6 +147,7 @@ protected function runCommand($consoleCommand, $arguments = array(), $options =
$command .= ' ' . escapeshellarg($arg);
}

/// @todo !important check if we can trim down this code by usage of \Symfony\Component\Console\Input\ArrayInput
// options come after arguments to allow legacy scripts to be queued
foreach ($options as $opt => $value) {
// We allow callers to tell us how many dashes they want
Expand Down Expand Up @@ -137,27 +177,50 @@ protected function runCommand($consoleCommand, $arguments = array(), $options =
}
}

$label = trim(ConsumerCommand::getLabel());
if ($label != '') {
$label = " '$label'";
}
return $command;
}

if ($this->logger) {
$this->logger->debug("Console command will be executed from MessageConsumer{$label}: " . $command);
protected function buildCommandArray($consoleCommand, $arguments = array(), $options = array())
{
$command = array();

// forced options come before the command proper
foreach ($this->getForcedOptions() as $opt => $value) {
$realOpt = (strlen($opt) == 1 ? '-' : '--') . $opt;
$command[$realOpt] = $value;
}

$process = new Process($command);
$retCode = $process->run();
$command['command'] = $consoleCommand;

$results = array($retCode, $process->getOutput(), $process->getErrorOutput());
foreach ($arguments as $arg) {
$command[] = $arg;
}

if ($retCode != 0 && $this->logger) {
$this->logger->error(
"Console command executed from MessageConsumer{$label} failed. Retcode: $retCode, Error message: '" . trim($results[2]) . "'",
array());
/// @todo !important check if we can trim down this code by usage of \Symfony\Component\Console\Input\ArrayInput
// options come after arguments to allow legacy scripts to be queued
foreach ($options as $opt => $value) {
// We allow callers to tell us how many dashes they want
// If no dash is given, we use 1 for single letter options,
$optName = ltrim($opt, '-');
$dashes = strlen($opt) - strlen($optName);
if ($dashes == 0) {
$dashes = (strlen($optName) == 1) ? 1 : 2;
}

// silently drop undesirable options
if (preg_match($this->validOptionsRegexp, $optName) &&
!in_array($optName, $this->filteredOptions)
) {
$opt = str_repeat('-', $dashes) . $optName;
$command[$opt] = $value;
} else {
if ($this->logger) {
$this->logger->notice("Dropped option: '$opt'");
}
}
}

return $results;
return $command;
}

protected function getForcedOptions()
Expand Down
54 changes: 54 additions & 0 deletions Service/MessageConsumer/InProcessConsoleCommand.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
<?php

namespace Kaliop\QueueingBundle\Service\MessageConsumer;

use Symfony\Component\Console\Input\StringInput;
use Symfony\Component\Console\Output\BufferedOutput;
use Kaliop\QueueingBundle\Command\ConsumerCommand;

/**
* Instead of forking a php process to run a Symfony console command, runs it directly in-process.
* This is expected to be:
* - faster
* - unsafe, as there is no guarantee against memory leaks, stale database connections etc...
*
* NB: the the Application HAS to be injected into this message consumer or a fatal error will be thrown
*/
class InProcessConsoleCommand extends ConsoleCommand
{
protected function runCommand($consoleCommand, $arguments = array(), $options = array())
{
$input = new StringInput($this->buildCommandString($consoleCommand, $arguments, $options));

$label = trim(ConsumerCommand::getLabel());
if ($label != '') {
$label = " '$label'";
}

if ($this->logger) {
$this->logger->debug("console command will be executed in-process from MessageConsumer{$label}: " . (string)$input);
}

$kernel = $this->application->getKernel();
// q: is this helpful / needed ?
//$kernel->shutdown();
//$kernel->boot();

$applicationClass = get_class($this->application);
$app = new $applicationClass($kernel);
$app->setAutoExit(false);

$output = new BufferedOutput();
$retCode = $app->run($input, $output);

$results = array($retCode, $output->fetch(), '');

if ($retCode != 0 && $this->logger) {
$this->logger->error(
"Console command executed in-process from MessageConsumer{$label} failed. Retcode: $retCode, Output: '" . trim($results[1]) . "'",
array());
}

return $results;
}
}
14 changes: 14 additions & 0 deletions news.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,21 @@
# Ver 0.3

* FIXED: do not crash the consumer if the received message is invalid json (or other expected encoding)

* NEW: introduced a new Event: MessageConsumptionFailed (triggered when message processing raises an Exception)

* NEW: the ConsoleCommand producer gained a batchPublish() method

* NEW: a new *EXPERIMENTAL* consumer is available for executing Console-Command messages. It is registered as service
kaliop_queueing.message_consumer.inprocess_console_command
The difference with the standard kaliop_queueing.message_consumer.console_command consumer is that this one does
not fork a new php process to execute the received commands.
This has the effect of making it:
- fast
- prone to memory leaks
- prone to resource leaks
- sensitive to problems with long-lived database connections
- prone to problems with fatal errors (unless you are on php 7 and you catch them all as exceptions)

# Ver 0.2

Expand Down

0 comments on commit 313eee0

Please sign in to comment.