diff --git a/config/configuration.sample.yml b/config/configuration.sample.yml index 7a65ab6ead..db9ea7542e 100644 --- a/config/configuration.sample.yml +++ b/config/configuration.sample.yml @@ -367,6 +367,7 @@ geocoding-providers: provincefields: Province countryfields: Country, Pays workers: + auto-cancelingJob: 24 # period in hours, not define or null to not execute queue: worker-queue: registry: alchemy_worker.queue_registry diff --git a/lib/Alchemy/Phrasea/Model/Repositories/WorkerRunningJobRepository.php b/lib/Alchemy/Phrasea/Model/Repositories/WorkerRunningJobRepository.php index 965e554537..cd32204602 100644 --- a/lib/Alchemy/Phrasea/Model/Repositories/WorkerRunningJobRepository.php +++ b/lib/Alchemy/Phrasea/Model/Repositories/WorkerRunningJobRepository.php @@ -503,7 +503,7 @@ public function updateStatusRunningToCanceledSinceCreated($hour = 0) ]); } - public function getRunningSinceCreated($hour = 0) + public function getRunningSinceCreated($hour = 0, array $action = null) { $rsm = new ResultSetMappingBuilder($this->_em); $rsm->addRootEntityFromClassMetadata('Alchemy\Phrasea\Model\Entities\WorkerRunningJob', 'w'); @@ -516,6 +516,11 @@ public function getRunningSinceCreated($hour = 0) AND (TO_SECONDS(CURRENT_TIMESTAMP()) - TO_SECONDS(w.created)) > :second' ; + if ($action != null) { + $action = join('" ,"', $action); + $sql .= ' AND work IN("' . $action . '")'; + } + $q = $this->_em->createNativeQuery($sql, $rsm); $q->setParameters([ 'second' => $hour * 3600, diff --git a/lib/Alchemy/Phrasea/WorkerManager/Controller/AdminConfigurationController.php b/lib/Alchemy/Phrasea/WorkerManager/Controller/AdminConfigurationController.php index 9c890c7e21..5f4e2684c1 100644 --- a/lib/Alchemy/Phrasea/WorkerManager/Controller/AdminConfigurationController.php +++ b/lib/Alchemy/Phrasea/WorkerManager/Controller/AdminConfigurationController.php @@ -3,6 +3,7 @@ namespace Alchemy\Phrasea\WorkerManager\Controller; use Alchemy\Phrasea\Application as PhraseaApplication; +use Alchemy\Phrasea\Application\Helper\DataboxLoggerAware; use Alchemy\Phrasea\Controller\Controller; use Alchemy\Phrasea\Model\Entities\WorkerRunningJob; use Alchemy\Phrasea\Model\Repositories\WorkerRunningJobRepository; @@ -32,6 +33,8 @@ class AdminConfigurationController extends Controller { + use DataboxLoggerAware; + public function indexAction(PhraseaApplication $app, Request $request) { return $this->render('admin/worker-manager/index.html.twig', [ @@ -223,8 +226,10 @@ public function changeStatusAction(Request $request, $workerId) $workerRunningJob = $repoWorker->find($workerId); $workerRunningJob->setStatus($request->request->get('status')); + $finishedDate = new \DateTime('now'); + if($request->request->get('finished') == '1') { - $workerRunningJob->setFinished(new \DateTime('now'))->setFlock(null); + $workerRunningJob->setFinished($finishedDate)->setFlock(null); } $em = $repoWorker->getEntityManager(); @@ -232,6 +237,10 @@ public function changeStatusAction(Request $request, $workerId) $em->flush(); + if (in_array($workerRunningJob->getWork(), ['subdefCreation', 'writeMetadatas'])) { + $this->updateLogDocs($workerRunningJob, $workerRunningJob->getStatus(), $finishedDate); + } + return $this->app->json(['success' => true]); } @@ -250,8 +259,16 @@ public function doChangeStatusToCanceledAction(PhraseaApplication $app, Request { /** @var WorkerRunningJobRepository $repoWorker */ $repoWorker = $this->app['repo.worker-running-job']; + $workerRunningJobs = $repoWorker->getRunningSinceCreated($request->request->get('hour'), ['subdefCreation', 'writeMetadatas']); + $repoWorker->updateStatusRunningToCanceledSinceCreated($request->request->get('hour')); + $finishedDate = new \DateTime('now'); + /** @var WorkerRunningJob $workerRunningJob */ + foreach ($workerRunningJobs as $workerRunningJob) { + $this->updateLogDocs($workerRunningJob, 'canceled', $finishedDate); + } + return $this->app->json(['success' => true]); } @@ -557,6 +574,17 @@ public function populateStatusAction(PhraseaApplication $app, Request $request) return $repoWorkerJob->checkPopulateStatusByDataboxIds($databoxIds); } + private function updateLogDocs(WorkerRunningJob $workerRunningJob, $status, $finishedDate) + { + $databox = $this->findDataboxById($workerRunningJob->getDataboxId()); + $record = $databox->get_record($workerRunningJob->getRecordId()); + $subdefName = $workerRunningJob->getWorkOn(); + $action = $workerRunningJob->getWork(); + + $this->getDataboxLogger($databox)->initOrUpdateLogDocsFromWorker($record, $databox, $workerRunningJob, $subdefName, $action, $finishedDate, $status); + + } + private function getDefaultRecordsActionsSettings() { return <<addFactory(MessagePublisher::MAIN_QUEUE_TYPE, new CallableWorkerFactory(function () use ($app) { - return new MainQueueWorker($app['alchemy_worker.message.publisher'], $app['repo.worker-job']); + return (new MainQueueWorker($app['alchemy_worker.message.publisher'], $app['repo.worker-job'], $app['repo.worker-running-job'], $app['conf'])) + ->setDataboxLoggerLocator($app['phraseanet.logger']) + ->setApplicationBox($app['phraseanet.appbox']) + ; })); $app['alchemy_worker.type_based_worker_resolver']->addFactory(MessagePublisher::FTP_TYPE, new CallableWorkerFactory(function () use ($app) { diff --git a/lib/Alchemy/Phrasea/WorkerManager/Provider/ControllerServiceProvider.php b/lib/Alchemy/Phrasea/WorkerManager/Provider/ControllerServiceProvider.php index 25080a06a6..1531113b25 100644 --- a/lib/Alchemy/Phrasea/WorkerManager/Provider/ControllerServiceProvider.php +++ b/lib/Alchemy/Phrasea/WorkerManager/Provider/ControllerServiceProvider.php @@ -22,7 +22,9 @@ class ControllerServiceProvider implements ControllerProviderInterface, ServiceP public function register(Application $app) { $app['controller.worker.admin.configuration'] = $app->share(function (PhraseaApplication $app) { - return new AdminConfigurationController($app); + return (new AdminConfigurationController($app)) + ->setDataboxLoggerLocator($app['phraseanet.logger']) + ; }); // example of route to check webhook diff --git a/lib/Alchemy/Phrasea/WorkerManager/Worker/MainQueueWorker.php b/lib/Alchemy/Phrasea/WorkerManager/Worker/MainQueueWorker.php index 498c902118..975590f642 100644 --- a/lib/Alchemy/Phrasea/WorkerManager/Worker/MainQueueWorker.php +++ b/lib/Alchemy/Phrasea/WorkerManager/Worker/MainQueueWorker.php @@ -2,24 +2,38 @@ namespace Alchemy\Phrasea\WorkerManager\Worker; +use Alchemy\Phrasea\Application\Helper\ApplicationBoxAware; +use Alchemy\Phrasea\Application\Helper\DataboxLoggerAware; +use Alchemy\Phrasea\Core\Configuration\PropertyAccess; use Alchemy\Phrasea\Model\Entities\WorkerJob; +use Alchemy\Phrasea\Model\Entities\WorkerRunningJob; use Alchemy\Phrasea\Model\Repositories\WorkerJobRepository; +use Alchemy\Phrasea\Model\Repositories\WorkerRunningJobRepository; use Alchemy\Phrasea\WorkerManager\Event\RecordEditInWorkerEvent; use Alchemy\Phrasea\WorkerManager\Queue\MessagePublisher; class MainQueueWorker implements WorkerInterface { + use ApplicationBoxAware; + use DataboxLoggerAware; + private $messagePublisher; private $repoWorkerJob; + private $repoWorkerRunningJob; + private $conf; public function __construct( MessagePublisher $messagePublisher, - WorkerJobRepository $repoWorkerJob + WorkerJobRepository $repoWorkerJob, + WorkerRunningJobRepository $repoWorkerRunningJob, + PropertyAccess $conf ) { $this->messagePublisher = $messagePublisher; $this->repoWorkerJob = $repoWorkerJob; + $this->repoWorkerRunningJob = $repoWorkerRunningJob; + $this->conf = $conf; } public function process(array $payload) @@ -29,6 +43,31 @@ public function process(array $payload) 'payload' => $payload ]; + $autoCancelingJob = $this->conf->get(['workers', 'auto-cancelingJob'], null); + + if (!empty($autoCancelingJob)) { + $autoCancelingJob = intval($autoCancelingJob); + + // first get the workerRunningJobs for log_docs 'subdefCreation', 'writeMetadatas' + $workerRunningJobs = $this->repoWorkerRunningJob->getRunningSinceCreated($autoCancelingJob, ['subdefCreation', 'writeMetadatas']); + + // update the status for table workerRunningJob + $this->repoWorkerRunningJob->updateStatusRunningToCanceledSinceCreated($autoCancelingJob); + + // last, treat the log_docs + $finishedDate = new \DateTime('now'); + /** @var WorkerRunningJob $workerRunningJob */ + foreach ($workerRunningJobs as $workerRunningJob) { + $databox = $this->findDataboxById($workerRunningJob->getDataboxId()); + $record = $databox->get_record($workerRunningJob->getRecordId()); + $subdefName = $workerRunningJob->getWorkOn(); + $action = $workerRunningJob->getWork(); + $status = 'canceled'; + + $this->getDataboxLogger($databox)->initOrUpdateLogDocsFromWorker($record, $databox, $workerRunningJob, $subdefName, $action, $finishedDate, $status); + } + } + $em = $this->repoWorkerJob->getEntityManager(); $em->getConnection()->beginTransaction(); /** @var WorkerJob $workerJob */ diff --git a/lib/classes/patch/418RC12PHRAS4058.php b/lib/classes/patch/418RC12PHRAS4058.php new file mode 100644 index 0000000000..c286880cbc --- /dev/null +++ b/lib/classes/patch/418RC12PHRAS4058.php @@ -0,0 +1,73 @@ +release; + } + + /** + * {@inheritdoc} + */ + public function getDoctrineMigrations() + { + return []; + } + + /** + * {@inheritdoc} + */ + public function require_all_upgrades() + { + return false; + } + + /** + * {@inheritdoc} + */ + public function concern() + { + return $this->concern; + } + + /** + * {@inheritdoc} + */ + public function apply(base $base, Application $app) + { + if ($base->get_base_type() === base::DATA_BOX) { + $this->patch_databox($base, $app); + } elseif ($base->get_base_type() === base::APPLICATION_BOX) { + $this->patch_appbox($base, $app); + } + + return true; + } + + private function patch_databox(databox $databox, Application $app) + { + } + + private function patch_appbox(base $appbox, Application $app) + { + /** @var PropertyAccess $conf */ + $conf = $app['conf']; + + if (!$conf->has(['workers', 'auto-cancelingJob'])) { + $conf->set(['workers', 'auto-cancelingJob'], 24); + } + } +} diff --git a/lib/conf.d/configuration.yml b/lib/conf.d/configuration.yml index 7d4f72dc12..390aebe395 100644 --- a/lib/conf.d/configuration.yml +++ b/lib/conf.d/configuration.yml @@ -364,6 +364,7 @@ video-editor: - '1.5' - 3 workers: + auto-cancelingJob: 24 # period in hours, not define or null to not execute queue: worker-queue: registry: alchemy_worker.queue_registry