Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PHRAS-3154 add timeout on waiting message from Q #4242

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .env
Original file line number Diff line number Diff line change
Expand Up @@ -760,6 +760,14 @@ PHRASEANET_EXPLODE_WORKER=1
# @run
PHRASEANET_WORKERS_LAUNCH_METHOD=

# timeout in second to wait message from rabbitmq Q
# default 0 unlimeted
#
# NB: if defined the worker will exit 1 when timeout, so use only one worker (bin/console worker:execute) per container
#
# @run
PHRASEANET_WAIT_MESSAGE_TIMEOUT=0

# @run
PHRASEANET_WORKER_assetsIngest=1

Expand Down
17 changes: 17 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,7 @@ services:
- LC_NAME=C.UTF-8
- PHRASEANET_EXPLODE_WORKER
- PHRASEANET_WORKERS_LAUNCH_METHOD
- PHRASEANET_WAIT_MESSAGE_TIMEOUT
- PHRASEANET_WORKER_assetsIngest
- PHRASEANET_WORKER_createRecord
- PHRASEANET_WORKER_deleteRecord
Expand Down Expand Up @@ -382,6 +383,7 @@ services:
- LC_NAME=C.UTF-8
- PHRASEANET_EXPLODE_WORKER
- PHRASEANET_WORKERS_LAUNCH_METHOD
- PHRASEANET_WAIT_MESSAGE_TIMEOUT
- PHRASEANET_WORKER_mainQueue
- IMAGEMAGICK_POLICY_VERSION
- IMAGEMAGICK_POLICY_WIDTH
Expand Down Expand Up @@ -434,6 +436,7 @@ services:
- LC_NAME=C.UTF-8
- PHRASEANET_EXPLODE_WORKER
- PHRASEANET_WORKERS_LAUNCH_METHOD
- PHRASEANET_WAIT_MESSAGE_TIMEOUT
- PHRASEANET_WORKER_assetsIngest
- IMAGEMAGICK_POLICY_VERSION
- IMAGEMAGICK_POLICY_WIDTH
Expand Down Expand Up @@ -485,6 +488,7 @@ services:
- LC_NAME=C.UTF-8
- PHRASEANET_EXPLODE_WORKER
- PHRASEANET_WORKERS_LAUNCH_METHOD
- PHRASEANET_WAIT_MESSAGE_TIMEOUT
- PHRASEANET_WORKER_createRecord
- IMAGEMAGICK_POLICY_VERSION
- IMAGEMAGICK_POLICY_WIDTH
Expand Down Expand Up @@ -537,6 +541,7 @@ services:
- LC_NAME=C.UTF-8
- PHRASEANET_EXPLODE_WORKER
- PHRASEANET_WORKERS_LAUNCH_METHOD
- PHRASEANET_WAIT_MESSAGE_TIMEOUT
- PHRASEANET_WORKER_deleteRecord
- IMAGEMAGICK_POLICY_VERSION
- IMAGEMAGICK_POLICY_WIDTH
Expand Down Expand Up @@ -588,6 +593,7 @@ services:
- LC_NAME=C.UTF-8
- PHRASEANET_EXPLODE_WORKER
- PHRASEANET_WORKERS_LAUNCH_METHOD
- PHRASEANET_WAIT_MESSAGE_TIMEOUT
- PHRASEANET_WORKER_editRecord
- IMAGEMAGICK_POLICY_VERSION
- IMAGEMAGICK_POLICY_WIDTH
Expand Down Expand Up @@ -741,6 +747,7 @@ services:
- LC_NAME=C.UTF-8
- PHRASEANET_EXPLODE_WORKER
- PHRASEANET_WORKERS_LAUNCH_METHOD
- PHRASEANET_WAIT_MESSAGE_TIMEOUT
- PHRASEANET_WORKER_exposeUpload
- IMAGEMAGICK_POLICY_VERSION
- IMAGEMAGICK_POLICY_WIDTH
Expand Down Expand Up @@ -792,6 +799,7 @@ services:
- LC_NAME=C.UTF-8
- PHRASEANET_EXPLODE_WORKER
- PHRASEANET_WORKERS_LAUNCH_METHOD
- PHRASEANET_WAIT_MESSAGE_TIMEOUT
- PHRASEANET_WORKER_ftp
- IMAGEMAGICK_POLICY_VERSION
- IMAGEMAGICK_POLICY_WIDTH
Expand Down Expand Up @@ -843,6 +851,7 @@ services:
- LC_NAME=C.UTF-8
- PHRASEANET_EXPLODE_WORKER
- PHRASEANET_WORKERS_LAUNCH_METHOD
- PHRASEANET_WAIT_MESSAGE_TIMEOUT
- PHRASEANET_WORKER_populateIndex
- IMAGEMAGICK_POLICY_VERSION
- IMAGEMAGICK_POLICY_WIDTH
Expand Down Expand Up @@ -894,6 +903,7 @@ services:
- LC_NAME=C.UTF-8
- PHRASEANET_EXPLODE_WORKER
- PHRASEANET_WORKERS_LAUNCH_METHOD
- PHRASEANET_WAIT_MESSAGE_TIMEOUT
- PHRASEANET_WORKER_pullAssets
- IMAGEMAGICK_POLICY_VERSION
- IMAGEMAGICK_POLICY_WIDTH
Expand Down Expand Up @@ -945,6 +955,7 @@ services:
- LC_NAME=C.UTF-8
- PHRASEANET_EXPLODE_WORKER
- PHRASEANET_WORKERS_LAUNCH_METHOD
- PHRASEANET_WAIT_MESSAGE_TIMEOUT
- PHRASEANET_WORKER_recordsActions
- IMAGEMAGICK_POLICY_VERSION
- IMAGEMAGICK_POLICY_WIDTH
Expand Down Expand Up @@ -999,6 +1010,7 @@ services:
- LC_NAME=C.UTF-8
- PHRASEANET_EXPLODE_WORKER
- PHRASEANET_WORKERS_LAUNCH_METHOD
- PHRASEANET_WAIT_MESSAGE_TIMEOUT
- PHRASEANET_WORKER_shareBasket
- IMAGEMAGICK_POLICY_VERSION
- IMAGEMAGICK_POLICY_WIDTH
Expand Down Expand Up @@ -1050,6 +1062,7 @@ services:
- LC_NAME=C.UTF-8
- PHRASEANET_EXPLODE_WORKER
- PHRASEANET_WORKERS_LAUNCH_METHOD
- PHRASEANET_WAIT_MESSAGE_TIMEOUT
- PHRASEANET_WORKER_subdefCreation
- IMAGEMAGICK_POLICY_VERSION
- IMAGEMAGICK_POLICY_WIDTH
Expand Down Expand Up @@ -1101,6 +1114,7 @@ services:
- LC_NAME=C.UTF-8
- PHRASEANET_EXPLODE_WORKER
- PHRASEANET_WORKERS_LAUNCH_METHOD
- PHRASEANET_WAIT_MESSAGE_TIMEOUT
- PHRASEANET_WORKER_subtitle
- IMAGEMAGICK_POLICY_VERSION
- IMAGEMAGICK_POLICY_WIDTH
Expand Down Expand Up @@ -1152,6 +1166,7 @@ services:
- LC_NAME=C.UTF-8
- PHRASEANET_EXPLODE_WORKER
- PHRASEANET_WORKERS_LAUNCH_METHOD
- PHRASEANET_WAIT_MESSAGE_TIMEOUT
- PHRASEANET_WORKER_validationReminder
- IMAGEMAGICK_POLICY_VERSION
- IMAGEMAGICK_POLICY_WIDTH
Expand Down Expand Up @@ -1203,6 +1218,7 @@ services:
- LC_NAME=C.UTF-8
- PHRASEANET_EXPLODE_WORKER
- PHRASEANET_WORKERS_LAUNCH_METHOD
- PHRASEANET_WAIT_MESSAGE_TIMEOUT
- PHRASEANET_WORKER_webhook
- IMAGEMAGICK_POLICY_VERSION
- IMAGEMAGICK_POLICY_WIDTH
Expand Down Expand Up @@ -1254,6 +1270,7 @@ services:
- LC_NAME=C.UTF-8
- PHRASEANET_EXPLODE_WORKER
- PHRASEANET_WORKERS_LAUNCH_METHOD
- PHRASEANET_WAIT_MESSAGE_TIMEOUT
- PHRASEANET_WORKER_writeMetadatas
- IMAGEMAGICK_POLICY_VERSION
- IMAGEMAGICK_POLICY_WIDTH
Expand Down
2 changes: 1 addition & 1 deletion docker/phraseanet/worker/entrypoint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ else
queue_name="$(echo $i | cut -d'_' -f3)"
m=$i
if [ ${!m} -gt "0" ] ; then
command="bin/console worker:execute --queue-name=$queue_name -m ${!m} &"
command="bin/console worker:execute --queue-name=$queue_name -m ${!m} -t ${PHRASEANET_WAIT_MESSAGE_TIMEOUT} &"
echo $command >> bin/run-worker.sh
echo "Worker " $queue_name " defined with parallelism " ${!m}
NBR_WORKERS=$(expr $NBR_WORKERS + 1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@
use Alchemy\Phrasea\Command\Command;
use Alchemy\Phrasea\WorkerManager\Queue\AMQPConnection;
use Alchemy\Phrasea\WorkerManager\Queue\MessageHandler;
use Alchemy\Phrasea\WorkerManager\Worker\ProcessPool;
use Alchemy\Phrasea\WorkerManager\Worker\WorkerInvoker;
use Doctrine\DBAL\Connection;
use PhpAmqpLib\Channel\AMQPChannel;
use PhpAmqpLib\Exception\AMQPTimeoutException;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
Expand All @@ -25,6 +27,7 @@ public function __construct()
->addOption('preserve-payload', 'p', InputOption::VALUE_NONE, 'Preserve temporary payload file')
->addOption('queue-name', '', InputOption::VALUE_REQUIRED | InputOption::VALUE_IS_ARRAY, 'The name of queues to be consuming')
->addOption('max-processes', 'm', InputOption::VALUE_REQUIRED, 'The max number of process allow to run (default 1) ')
->addOption('wait-message-timeout', 't', InputOption::VALUE_REQUIRED, 'specify number in second to wait message from Q (default infini =0)')
// ->addOption('MWG', '', InputOption::VALUE_NONE, 'Enable MWG metadata compatibility (use only for write metadata service)')
// ->addOption('clear-metadatas', '', InputOption::VALUE_NONE, 'Remove metadatas from documents if not compliant with Database structure (use only for write metadata service)')
->setHelp('');
Expand Down Expand Up @@ -74,6 +77,11 @@ protected function doExecute(InputInterface $input, OutputInterface $output)
$workerInvoker->preservePayloads();
}

$waitTimeout = 0; // infini
if ($input->getOption('wait-message-timeout') != null) {
$waitTimeout = $input->getOption('wait-message-timeout');
}

/** @var MessageHandler $messageHandler */
$messageHandler = $this->container['alchemy_worker.message.handler'];
$messageHandler->consume($channel, $serverConnection, $workerInvoker, $argQueueName, $maxProcesses);
Expand All @@ -97,7 +105,24 @@ protected function doExecute(InputInterface $input, OutputInterface $output)
return 1;
}
}
$channel->wait();

try {
$channel->wait(null, false, $waitTimeout);
} catch (AMQPTimeoutException $e) {
// we are in wait timeout,
// immediately close the rabbit connection to avoid Missed server heartbeat exception after this timeout
$serverConnection->connectionClose();

/** @var ProcessPool $processPool */
$processPool = $this->container['alchemy_worker.process_pool'];
$processPool->setLogger($this->container['alchemy_worker.logger']);

// and wait until all process generated are finished
$processPool->waitForAllJobProcessFinished();

// exit with 1
return 1;
}
}

$serverConnection->connectionClose();
Expand Down
16 changes: 16 additions & 0 deletions lib/Alchemy/Phrasea/WorkerManager/Worker/ProcessPool.php
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,22 @@ public function getWorkerProcess(array $processArguments, AMQPChannel $channel,
return ($this->processes[] = $builder->getProcess());
}

public function waitForAllJobProcessFinished()
{
$interval = 1;
while (count($this->processes) > 0) {
if (count($this->processes) == 1) {
$process = $this->processes[0];
$this->logger->info("only 1 process remaining before exit 1 : " . $process->getCommandLine() . "\n");
}

sleep($interval);

$this->detachFinishedProcesses();
$interval = min(10, $interval + 1);
}
}

private function detachFinishedProcesses()
{
$runningProcesses = [];
Expand Down
Loading