From cd8ddea6ce46addb282c6755beef6558fd6b929a Mon Sep 17 00:00:00 2001 From: aynsix Date: Thu, 25 Apr 2024 18:57:35 +0300 Subject: [PATCH] PHRAS-4058 auto cancelling job --- .../Provider/AlchemyWorkerServiceProvider.php | 5 ++- .../WorkerManager/Worker/MainQueueWorker.php | 32 ++++++++++++++++++- 2 files changed, 35 insertions(+), 2 deletions(-) diff --git a/lib/Alchemy/Phrasea/WorkerManager/Provider/AlchemyWorkerServiceProvider.php b/lib/Alchemy/Phrasea/WorkerManager/Provider/AlchemyWorkerServiceProvider.php index 9c5060e38b..81e78ab9cd 100644 --- a/lib/Alchemy/Phrasea/WorkerManager/Provider/AlchemyWorkerServiceProvider.php +++ b/lib/Alchemy/Phrasea/WorkerManager/Provider/AlchemyWorkerServiceProvider.php @@ -165,7 +165,10 @@ public function register(Application $app) })); $app['alchemy_worker.type_based_worker_resolver']->addFactory(MessagePublisher::MAIN_QUEUE_TYPE, new CallableWorkerFactory(function () use ($app) { - return new MainQueueWorker($app['alchemy_worker.message.publisher'], $app['repo.worker-job']); + return (new MainQueueWorker($app['alchemy_worker.message.publisher'], $app['repo.worker-job'], $app['repo.worker-running-job'])) + ->setDataboxLoggerLocator($app['phraseanet.logger']) + ->setApplicationBox($app['phraseanet.appbox']) + ; })); $app['alchemy_worker.type_based_worker_resolver']->addFactory(MessagePublisher::FTP_TYPE, new CallableWorkerFactory(function () use ($app) { diff --git a/lib/Alchemy/Phrasea/WorkerManager/Worker/MainQueueWorker.php b/lib/Alchemy/Phrasea/WorkerManager/Worker/MainQueueWorker.php index 498c902118..c1d02bb228 100644 --- a/lib/Alchemy/Phrasea/WorkerManager/Worker/MainQueueWorker.php +++ b/lib/Alchemy/Phrasea/WorkerManager/Worker/MainQueueWorker.php @@ -2,24 +2,34 @@ namespace Alchemy\Phrasea\WorkerManager\Worker; +use Alchemy\Phrasea\Application\Helper\ApplicationBoxAware; +use Alchemy\Phrasea\Application\Helper\DataboxLoggerAware; use Alchemy\Phrasea\Model\Entities\WorkerJob; +use Alchemy\Phrasea\Model\Entities\WorkerRunningJob; use Alchemy\Phrasea\Model\Repositories\WorkerJobRepository; +use Alchemy\Phrasea\Model\Repositories\WorkerRunningJobRepository; use Alchemy\Phrasea\WorkerManager\Event\RecordEditInWorkerEvent; use Alchemy\Phrasea\WorkerManager\Queue\MessagePublisher; class MainQueueWorker implements WorkerInterface { + use ApplicationBoxAware; + use DataboxLoggerAware; + private $messagePublisher; private $repoWorkerJob; + private $repoWorkerRunningJob; public function __construct( MessagePublisher $messagePublisher, - WorkerJobRepository $repoWorkerJob + WorkerJobRepository $repoWorkerJob, + WorkerRunningJobRepository $repoWorkerRunningJob ) { $this->messagePublisher = $messagePublisher; $this->repoWorkerJob = $repoWorkerJob; + $this->repoWorkerRunningJob = $repoWorkerRunningJob; } public function process(array $payload) @@ -29,6 +39,26 @@ public function process(array $payload) 'payload' => $payload ]; + $autoCancelingJob = 48; // hours + // first get the workerRunningJobs for log_docs 'subdefCreation', 'writeMetadatas' + $workerRunningJobs = $this->repoWorkerRunningJob->getRunningSinceCreated($autoCancelingJob, ['subdefCreation', 'writeMetadatas']); + + // update the status for table workerRunningJob + $this->repoWorkerRunningJob->updateStatusRunningToCanceledSinceCreated($autoCancelingJob); + + // last, treat the log_docs + $finishedDate = new \DateTime('now'); + /** @var WorkerRunningJob $workerRunningJob */ + foreach ($workerRunningJobs as $workerRunningJob) { + $databox = $this->findDataboxById($workerRunningJob->getDataboxId()); + $record = $databox->get_record($workerRunningJob->getRecordId()); + $subdefName = $workerRunningJob->getWorkOn(); + $action = $workerRunningJob->getWork(); + $status = 'canceled'; + + $this->getDataboxLogger($databox)->initOrUpdateLogDocsFromWorker($record, $databox, $workerRunningJob, $subdefName, $action, $finishedDate, $status); + } + $em = $this->repoWorkerJob->getEntityManager(); $em->getConnection()->beginTransaction(); /** @var WorkerJob $workerJob */