From f6db6dbe98c0afb11b8b7f6d20f5f67612616969 Mon Sep 17 00:00:00 2001 From: aynsix Date: Fri, 3 Feb 2023 17:14:33 +0300 Subject: [PATCH 1/2] add timeout on waiting message from Q --- .../Command/WorkerExecuteCommand.php | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/lib/Alchemy/Phrasea/WorkerManager/Command/WorkerExecuteCommand.php b/lib/Alchemy/Phrasea/WorkerManager/Command/WorkerExecuteCommand.php index f6374ae58a..c4b3f3de63 100644 --- a/lib/Alchemy/Phrasea/WorkerManager/Command/WorkerExecuteCommand.php +++ b/lib/Alchemy/Phrasea/WorkerManager/Command/WorkerExecuteCommand.php @@ -8,6 +8,7 @@ use Alchemy\Phrasea\WorkerManager\Worker\WorkerInvoker; use Doctrine\DBAL\Connection; use PhpAmqpLib\Channel\AMQPChannel; +use PhpAmqpLib\Exception\AMQPTimeoutException; use Symfony\Component\Console\Input\InputInterface; use Symfony\Component\Console\Input\InputOption; use Symfony\Component\Console\Output\OutputInterface; @@ -25,6 +26,7 @@ public function __construct() ->addOption('preserve-payload', 'p', InputOption::VALUE_NONE, 'Preserve temporary payload file') ->addOption('queue-name', '', InputOption::VALUE_REQUIRED | InputOption::VALUE_IS_ARRAY, 'The name of queues to be consuming') ->addOption('max-processes', 'm', InputOption::VALUE_REQUIRED, 'The max number of process allow to run (default 1) ') + ->addOption('wait-message-timeout', 't', InputOption::VALUE_REQUIRED, 'specify number in second to wait message from Q (default infini =0)') // ->addOption('MWG', '', InputOption::VALUE_NONE, 'Enable MWG metadata compatibility (use only for write metadata service)') // ->addOption('clear-metadatas', '', InputOption::VALUE_NONE, 'Remove metadatas from documents if not compliant with Database structure (use only for write metadata service)') ->setHelp(''); @@ -74,6 +76,11 @@ protected function doExecute(InputInterface $input, OutputInterface $output) $workerInvoker->preservePayloads(); } + $waitTimeout = 0; // infini + if ($input->getOption('wait-message-timeout') != null) { + $waitTimeout = $input->getOption('wait-message-timeout'); + } + /** @var MessageHandler $messageHandler */ $messageHandler = $this->container['alchemy_worker.message.handler']; $messageHandler->consume($channel, $serverConnection, $workerInvoker, $argQueueName, $maxProcesses); @@ -97,7 +104,14 @@ protected function doExecute(InputInterface $input, OutputInterface $output) return 1; } } - $channel->wait(); + + try { + $channel->wait(null, false, $waitTimeout); + } catch (AMQPTimeoutException $e) { + $serverConnection->connectionClose(); + + return 1; + } } $serverConnection->connectionClose(); From 74d2728027800768c739149a9c0bb988ff26be73 Mon Sep 17 00:00:00 2001 From: aynsix Date: Mon, 6 Feb 2023 14:03:28 +0300 Subject: [PATCH 2/2] return 1 when all job generated finished --- .env | 8 ++++++++ docker-compose.yml | 18 ++++++++++++++++++ docker/phraseanet/worker/entrypoint.sh | 2 +- .../Command/WorkerExecuteCommand.php | 11 +++++++++++ .../WorkerManager/Worker/ProcessPool.php | 16 ++++++++++++++++ 5 files changed, 54 insertions(+), 1 deletion(-) diff --git a/.env b/.env index 62a49ebd7c..1b30e36a9e 100644 --- a/.env +++ b/.env @@ -602,6 +602,14 @@ PHRASEANET_EXPLODE_WORKER=1 # @run PHRASEANET_WORKERS_LAUNCH_METHOD= +# timeout in second to wait message from rabbitmq Q +# default 0 unlimeted +# +# NB: if defined the worker will exit 1 when timeout, so use only one worker (bin/console worker:execute) per container +# +# @run +PHRASEANET_WAIT_MESSAGE_TIMEOUT=0 + # @run PHRASEANET_WORKER_assetsIngest=1 diff --git a/docker-compose.yml b/docker-compose.yml index ea40891ffe..c357f7106d 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -331,6 +331,7 @@ services: - LC_NAME=C.UTF-8 - PHRASEANET_EXPLODE_WORKER - PHRASEANET_WORKERS_LAUNCH_METHOD + - PHRASEANET_WAIT_MESSAGE_TIMEOUT - PHRASEANET_WORKER_assetsIngest - PHRASEANET_WORKER_createRecord - PHRASEANET_WORKER_deleteRecord @@ -400,6 +401,7 @@ services: - LC_NAME=C.UTF-8 - PHRASEANET_EXPLODE_WORKER - PHRASEANET_WORKERS_LAUNCH_METHOD + - PHRASEANET_WAIT_MESSAGE_TIMEOUT - PHRASEANET_WORKER_mainQueue - IMAGEMAGICK_POLICY_VERSION - IMAGEMAGICK_POLICY_WIDTH @@ -454,6 +456,7 @@ services: - LC_NAME=C.UTF-8 - PHRASEANET_EXPLODE_WORKER - PHRASEANET_WORKERS_LAUNCH_METHOD + - PHRASEANET_WAIT_MESSAGE_TIMEOUT - PHRASEANET_WORKER_assetsIngest - IMAGEMAGICK_POLICY_VERSION - IMAGEMAGICK_POLICY_WIDTH @@ -507,6 +510,7 @@ services: - LC_NAME=C.UTF-8 - PHRASEANET_EXPLODE_WORKER - PHRASEANET_WORKERS_LAUNCH_METHOD + - PHRASEANET_WAIT_MESSAGE_TIMEOUT - PHRASEANET_WORKER_createRecord - IMAGEMAGICK_POLICY_VERSION - IMAGEMAGICK_POLICY_WIDTH @@ -561,6 +565,7 @@ services: - LC_NAME=C.UTF-8 - PHRASEANET_EXPLODE_WORKER - PHRASEANET_WORKERS_LAUNCH_METHOD + - PHRASEANET_WAIT_MESSAGE_TIMEOUT - PHRASEANET_WORKER_deleteRecord - IMAGEMAGICK_POLICY_VERSION - IMAGEMAGICK_POLICY_WIDTH @@ -614,6 +619,7 @@ services: - LC_NAME=C.UTF-8 - PHRASEANET_EXPLODE_WORKER - PHRASEANET_WORKERS_LAUNCH_METHOD + - PHRASEANET_WAIT_MESSAGE_TIMEOUT - PHRASEANET_WORKER_editRecord - IMAGEMAGICK_POLICY_VERSION - IMAGEMAGICK_POLICY_WIDTH @@ -667,6 +673,7 @@ services: - LC_NAME=C.UTF-8 - PHRASEANET_EXPLODE_WORKER - PHRASEANET_WORKERS_LAUNCH_METHOD + - PHRASEANET_WAIT_MESSAGE_TIMEOUT - PHRASEANET_WORKER_exportMail - IMAGEMAGICK_POLICY_VERSION - IMAGEMAGICK_POLICY_WIDTH @@ -720,6 +727,7 @@ services: - LC_NAME=C.UTF-8 - PHRASEANET_EXPLODE_WORKER - PHRASEANET_WORKERS_LAUNCH_METHOD + - PHRASEANET_WAIT_MESSAGE_TIMEOUT - PHRASEANET_WORKER_exposeUpload - IMAGEMAGICK_POLICY_VERSION - IMAGEMAGICK_POLICY_WIDTH @@ -773,6 +781,7 @@ services: - LC_NAME=C.UTF-8 - PHRASEANET_EXPLODE_WORKER - PHRASEANET_WORKERS_LAUNCH_METHOD + - PHRASEANET_WAIT_MESSAGE_TIMEOUT - PHRASEANET_WORKER_ftp - IMAGEMAGICK_POLICY_VERSION - IMAGEMAGICK_POLICY_WIDTH @@ -826,6 +835,7 @@ services: - LC_NAME=C.UTF-8 - PHRASEANET_EXPLODE_WORKER - PHRASEANET_WORKERS_LAUNCH_METHOD + - PHRASEANET_WAIT_MESSAGE_TIMEOUT - PHRASEANET_WORKER_populateIndex - IMAGEMAGICK_POLICY_VERSION - IMAGEMAGICK_POLICY_WIDTH @@ -879,6 +889,7 @@ services: - LC_NAME=C.UTF-8 - PHRASEANET_EXPLODE_WORKER - PHRASEANET_WORKERS_LAUNCH_METHOD + - PHRASEANET_WAIT_MESSAGE_TIMEOUT - PHRASEANET_WORKER_pullAssets - IMAGEMAGICK_POLICY_VERSION - IMAGEMAGICK_POLICY_WIDTH @@ -932,6 +943,7 @@ services: - LC_NAME=C.UTF-8 - PHRASEANET_EXPLODE_WORKER - PHRASEANET_WORKERS_LAUNCH_METHOD + - PHRASEANET_WAIT_MESSAGE_TIMEOUT - PHRASEANET_WORKER_recordsActions - IMAGEMAGICK_POLICY_VERSION - IMAGEMAGICK_POLICY_WIDTH @@ -988,6 +1000,7 @@ services: - LC_NAME=C.UTF-8 - PHRASEANET_EXPLODE_WORKER - PHRASEANET_WORKERS_LAUNCH_METHOD + - PHRASEANET_WAIT_MESSAGE_TIMEOUT - PHRASEANET_WORKER_shareBasket - IMAGEMAGICK_POLICY_VERSION - IMAGEMAGICK_POLICY_WIDTH @@ -1041,6 +1054,7 @@ services: - LC_NAME=C.UTF-8 - PHRASEANET_EXPLODE_WORKER - PHRASEANET_WORKERS_LAUNCH_METHOD + - PHRASEANET_WAIT_MESSAGE_TIMEOUT - PHRASEANET_WORKER_subdefCreation - IMAGEMAGICK_POLICY_VERSION - IMAGEMAGICK_POLICY_WIDTH @@ -1094,6 +1108,7 @@ services: - LC_NAME=C.UTF-8 - PHRASEANET_EXPLODE_WORKER - PHRASEANET_WORKERS_LAUNCH_METHOD + - PHRASEANET_WAIT_MESSAGE_TIMEOUT - PHRASEANET_WORKER_subtitle - IMAGEMAGICK_POLICY_VERSION - IMAGEMAGICK_POLICY_WIDTH @@ -1147,6 +1162,7 @@ services: - LC_NAME=C.UTF-8 - PHRASEANET_EXPLODE_WORKER - PHRASEANET_WORKERS_LAUNCH_METHOD + - PHRASEANET_WAIT_MESSAGE_TIMEOUT - PHRASEANET_WORKER_validationReminder - IMAGEMAGICK_POLICY_VERSION - IMAGEMAGICK_POLICY_WIDTH @@ -1200,6 +1216,7 @@ services: - LC_NAME=C.UTF-8 - PHRASEANET_EXPLODE_WORKER - PHRASEANET_WORKERS_LAUNCH_METHOD + - PHRASEANET_WAIT_MESSAGE_TIMEOUT - PHRASEANET_WORKER_webhook - IMAGEMAGICK_POLICY_VERSION - IMAGEMAGICK_POLICY_WIDTH @@ -1253,6 +1270,7 @@ services: - LC_NAME=C.UTF-8 - PHRASEANET_EXPLODE_WORKER - PHRASEANET_WORKERS_LAUNCH_METHOD + - PHRASEANET_WAIT_MESSAGE_TIMEOUT - PHRASEANET_WORKER_writeMetadatas - IMAGEMAGICK_POLICY_VERSION - IMAGEMAGICK_POLICY_WIDTH diff --git a/docker/phraseanet/worker/entrypoint.sh b/docker/phraseanet/worker/entrypoint.sh index 773ef5bbc1..7645a456db 100755 --- a/docker/phraseanet/worker/entrypoint.sh +++ b/docker/phraseanet/worker/entrypoint.sh @@ -112,7 +112,7 @@ else queue_name="$(echo $i | cut -d'_' -f3)" m=$i if [ ${!m} -gt "0" ] ; then - command="bin/console worker:execute --queue-name=$queue_name -m ${!m} &" + command="bin/console worker:execute --queue-name=$queue_name -m ${!m} -t ${PHRASEANET_WAIT_MESSAGE_TIMEOUT} &" echo $command >> bin/run-worker.sh echo "Worker " $queue_name " defined with parallelism " ${!m} NBR_WORKERS=$(expr $NBR_WORKERS + 1) diff --git a/lib/Alchemy/Phrasea/WorkerManager/Command/WorkerExecuteCommand.php b/lib/Alchemy/Phrasea/WorkerManager/Command/WorkerExecuteCommand.php index c4b3f3de63..c952861a32 100644 --- a/lib/Alchemy/Phrasea/WorkerManager/Command/WorkerExecuteCommand.php +++ b/lib/Alchemy/Phrasea/WorkerManager/Command/WorkerExecuteCommand.php @@ -5,6 +5,7 @@ use Alchemy\Phrasea\Command\Command; use Alchemy\Phrasea\WorkerManager\Queue\AMQPConnection; use Alchemy\Phrasea\WorkerManager\Queue\MessageHandler; +use Alchemy\Phrasea\WorkerManager\Worker\ProcessPool; use Alchemy\Phrasea\WorkerManager\Worker\WorkerInvoker; use Doctrine\DBAL\Connection; use PhpAmqpLib\Channel\AMQPChannel; @@ -108,8 +109,18 @@ protected function doExecute(InputInterface $input, OutputInterface $output) try { $channel->wait(null, false, $waitTimeout); } catch (AMQPTimeoutException $e) { + // we are in wait timeout, + // immediately close the rabbit connection to avoid Missed server heartbeat exception after this timeout $serverConnection->connectionClose(); + /** @var ProcessPool $processPool */ + $processPool = $this->container['alchemy_worker.process_pool']; + $processPool->setLogger($this->container['alchemy_worker.logger']); + + // and wait until all process generated are finished + $processPool->waitForAllJobProcessFinished(); + + // exit with 1 return 1; } } diff --git a/lib/Alchemy/Phrasea/WorkerManager/Worker/ProcessPool.php b/lib/Alchemy/Phrasea/WorkerManager/Worker/ProcessPool.php index 8460a3fdb9..a3df6701ea 100644 --- a/lib/Alchemy/Phrasea/WorkerManager/Worker/ProcessPool.php +++ b/lib/Alchemy/Phrasea/WorkerManager/Worker/ProcessPool.php @@ -67,6 +67,22 @@ public function getWorkerProcess(array $processArguments, AMQPChannel $channel, return ($this->processes[] = $builder->getProcess()); } + public function waitForAllJobProcessFinished() + { + $interval = 1; + while (count($this->processes) > 0) { + if (count($this->processes) == 1) { + $process = $this->processes[0]; + $this->logger->info("only 1 process remaining before exit 1 : " . $process->getCommandLine() . "\n"); + } + + sleep($interval); + + $this->detachFinishedProcesses(); + $interval = min(10, $interval + 1); + } + } + private function detachFinishedProcesses() { $runningProcesses = [];