diff --git a/lib/Alchemy/Phrasea/Model/Repositories/WorkerRunningJobRepository.php b/lib/Alchemy/Phrasea/Model/Repositories/WorkerRunningJobRepository.php index cd32204602..3e0b281c59 100644 --- a/lib/Alchemy/Phrasea/Model/Repositories/WorkerRunningJobRepository.php +++ b/lib/Alchemy/Phrasea/Model/Repositories/WorkerRunningJobRepository.php @@ -328,14 +328,16 @@ private function releaseMutex(int $recordMutexId) * mark a job a "finished" * nb : after a long job, connection may be lost so we reconnect. * But sometimes (?) a first commit fails (due to reconnect ?), while the second one is ok. - * So here we try 2 times, just in case... + * So here we try 4 times, just in case... * * @param int $workerRunningJobId + * @param MessagePublisher $messagePublisher + * @param $jobType * @param null $info */ - public function markFinished(int $workerRunningJobId, $info = null) + public function markFinished(int $workerRunningJobId, MessagePublisher $messagePublisher, $jobType, $info = null) { - for($tryout=1; $tryout<=2; $tryout++) { + for($wait = 2, $tryout=1; $tryout<=4; $tryout++) { try { $this->reconnect(); $cnx = $this->getEntityManager()->getConnection()->getWrappedConnection(); @@ -356,8 +358,10 @@ public function markFinished(int $workerRunningJobId, $info = null) throw new Exception(sprintf("updating WorkerRunningJob should return 1 row affected, got %s", $a)); } catch (Exception $e) { - if($tryout < 2) { - sleep(1); // retry in 1 sec + if($tryout < 4) { + $messagePublisher->pushLog(sprintf("failed updating WorkerRunningJob to finished with id=%d for %s, attempt %d", $workerRunningJobId, $jobType, $tryout)); + sleep($wait); // retry after more sec + $wait *= 2; } } } diff --git a/lib/Alchemy/Phrasea/WorkerManager/Controller/AdminConfigurationController.php b/lib/Alchemy/Phrasea/WorkerManager/Controller/AdminConfigurationController.php index 5f4e2684c1..38f3a6a8b0 100644 --- a/lib/Alchemy/Phrasea/WorkerManager/Controller/AdminConfigurationController.php +++ b/lib/Alchemy/Phrasea/WorkerManager/Controller/AdminConfigurationController.php @@ -5,6 +5,7 @@ use Alchemy\Phrasea\Application as PhraseaApplication; use Alchemy\Phrasea\Application\Helper\DataboxLoggerAware; use Alchemy\Phrasea\Controller\Controller; +use Alchemy\Phrasea\Filesystem\FilesystemService; use Alchemy\Phrasea\Model\Entities\WorkerRunningJob; use Alchemy\Phrasea\Model\Repositories\WorkerRunningJobRepository; use Alchemy\Phrasea\Plugin\Exception\JsonValidationException; @@ -224,12 +225,34 @@ public function changeStatusAction(Request $request, $workerId) /** @var WorkerRunningJob $workerRunningJob */ $workerRunningJob = $repoWorker->find($workerId); - - $workerRunningJob->setStatus($request->request->get('status')); + $subdefOK = false; $finishedDate = new \DateTime('now'); - if($request->request->get('finished') == '1') { - $workerRunningJob->setFinished($finishedDate)->setFlock(null); + if ($workerRunningJob->getWork() == 'subdefCreation') { + try { + $databox = $this->findDataboxById($workerRunningJob->getDataboxId()); + $record = $databox->get_record($workerRunningJob->getRecordId()); + if ($record->has_subdef($workerRunningJob->getWorkOn()) ) { + $filePathToCheck = $record->get_subdef($workerRunningJob->getWorkOn())->getRealPath(); + if ($this->getFileSystem()->exists($filePathToCheck)) { + // the subdefinition exist + // so mark as finished + $subdefOK = true; + $workerRunningJob->setStatus(WorkerRunningJob::FINISHED); + $workerRunningJob->setFinished($finishedDate)->setFlock(null); + } + } + } catch (\Exception $e) { + } + } + + if (!$subdefOK || $workerRunningJob->getWork() != 'subdefCreation') { + $workerRunningJob->setStatus($request->request->get('status')); + + + if($request->request->get('finished') == '1') { + $workerRunningJob->setFinished($finishedDate)->setFlock(null); + } } $em = $repoWorker->getEntityManager(); @@ -259,14 +282,48 @@ public function doChangeStatusToCanceledAction(PhraseaApplication $app, Request { /** @var WorkerRunningJobRepository $repoWorker */ $repoWorker = $this->app['repo.worker-running-job']; + $finishedDate = new \DateTime('now'); + $em = $repoWorker->getEntityManager(); + $workerRunningJobs = $repoWorker->getRunningSinceCreated($request->request->get('hour'), ['subdefCreation', 'writeMetadatas']); + $workerRunningJobsForOnlySubdefcreation = $repoWorker->getRunningSinceCreated($request->request->get('hour'), ['subdefCreation']); + // treat the subdefinition case + /** @var WorkerRunningJob $ws */ + foreach ($workerRunningJobsForOnlySubdefcreation as $ws) { + $subdefOK = false; + try { + $databox = $this->findDataboxById($ws->getDataboxId()); + $record = $databox->get_record($ws->getRecordId()); + if ($record->has_subdef($ws->getWorkOn()) ) { + $filePathToCheck = $record->get_subdef($ws->getWorkOn())->getRealPath(); + if ($this->getFileSystem()->exists($filePathToCheck)) { + // the subdefinition exist + // so mark as finished + $subdefOK = true; + $ws->setStatus(WorkerRunningJob::FINISHED); + $ws->setFinished($finishedDate)->setFlock(null); + } + } + + } catch (\Exception $e) { + } + + if (!$subdefOK) { + $ws->setStatus(WorkerRunningJob::INTERRUPT); + $ws->setFinished($finishedDate)->setFlock(null); + } + $em->persist($ws); + } + $em->flush(); + + // treat all the rest case $repoWorker->updateStatusRunningToCanceledSinceCreated($request->request->get('hour')); - $finishedDate = new \DateTime('now'); + // "log docs" the subdefCreation and writeMetadatas action /** @var WorkerRunningJob $workerRunningJob */ foreach ($workerRunningJobs as $workerRunningJob) { - $this->updateLogDocs($workerRunningJob, 'canceled', $finishedDate); + $this->updateLogDocs($workerRunningJob, $workerRunningJob->getStatus(), $finishedDate); } return $this->app->json(['success' => true]); @@ -791,4 +848,11 @@ private function getUrlGenerator() return $this->app['url_generator']; } + /** + * @return FilesystemService + */ + private function getFileSystem() + { + return $this->app['phraseanet.filesystem']; + } } diff --git a/lib/Alchemy/Phrasea/WorkerManager/Provider/AlchemyWorkerServiceProvider.php b/lib/Alchemy/Phrasea/WorkerManager/Provider/AlchemyWorkerServiceProvider.php index 71aba39dfe..c947fd6023 100644 --- a/lib/Alchemy/Phrasea/WorkerManager/Provider/AlchemyWorkerServiceProvider.php +++ b/lib/Alchemy/Phrasea/WorkerManager/Provider/AlchemyWorkerServiceProvider.php @@ -159,7 +159,7 @@ public function register(Application $app) })); $app['alchemy_worker.type_based_worker_resolver']->addFactory(MessagePublisher::SUBTITLE_TYPE, new CallableWorkerFactory(function () use ($app) { - return (new SubtitleWorker($app['repo.worker-running-job'], $app['conf'], new LazyLocator($app, 'phraseanet.appbox'), $app['alchemy_worker.logger'], $app['dispatcher'])) + return (new SubtitleWorker($app['repo.worker-running-job'], $app['conf'], new LazyLocator($app, 'phraseanet.appbox'), $app['alchemy_worker.logger'], $app['dispatcher'], $app['alchemy_worker.message.publisher'])) ->setFileSystemLocator(new LazyLocator($app, 'filesystem')) ->setTemporaryFileSystemLocator(new LazyLocator($app, 'temporary-filesystem')); })); diff --git a/lib/Alchemy/Phrasea/WorkerManager/Worker/CreateRecordWorker.php b/lib/Alchemy/Phrasea/WorkerManager/Worker/CreateRecordWorker.php index 03847d2fa2..18168332e2 100644 --- a/lib/Alchemy/Phrasea/WorkerManager/Worker/CreateRecordWorker.php +++ b/lib/Alchemy/Phrasea/WorkerManager/Worker/CreateRecordWorker.php @@ -137,24 +137,6 @@ public function process(array $payload) return; } - if ($workerRunningJob != null) { - $em->beginTransaction(); - try { - $workerRunningJob - ->setStatus(WorkerRunningJob::FINISHED) - ->setFinished(new \DateTime('now')) - ; - - $em->persist($workerRunningJob); - - $em->flush(); - $em->commit(); - } catch (\Exception $e) { - $em->rollback(); - } - - } - $lazaretSession = new LazaretSession(); $userRepository = $this->getUserRepository(); @@ -257,6 +239,11 @@ public function process(array $payload) ] ] ); + + if ($workerRunningJob != null) { + $this->repoWorkerJob->markFinished($workerRunningJob->getId(), $this->messagePublisher, MessagePublisher::CREATE_RECORD_TYPE); + } + } /** diff --git a/lib/Alchemy/Phrasea/WorkerManager/Worker/DeleteRecordWorker.php b/lib/Alchemy/Phrasea/WorkerManager/Worker/DeleteRecordWorker.php index ad1fc0a492..59a2184baa 100644 --- a/lib/Alchemy/Phrasea/WorkerManager/Worker/DeleteRecordWorker.php +++ b/lib/Alchemy/Phrasea/WorkerManager/Worker/DeleteRecordWorker.php @@ -69,14 +69,7 @@ public function process(array $payload) // tell that the delete is finished if ($workerRunningJob != null) { - $workerRunningJob - ->setStatus(WorkerRunningJob::FINISHED) - ->setFinished(new \DateTime('now')) - ; - - $em->persist($workerRunningJob); - - $em->flush(); + $this->repoWorker->markFinished($workerRunningJob->getId(), $this->messagePublisher, MessagePublisher::DELETE_RECORD_TYPE); } } } diff --git a/lib/Alchemy/Phrasea/WorkerManager/Worker/DownloadAsyncWorker.php b/lib/Alchemy/Phrasea/WorkerManager/Worker/DownloadAsyncWorker.php index e567530ac9..35270ab176 100644 --- a/lib/Alchemy/Phrasea/WorkerManager/Worker/DownloadAsyncWorker.php +++ b/lib/Alchemy/Phrasea/WorkerManager/Worker/DownloadAsyncWorker.php @@ -385,15 +385,7 @@ public function process(array $payload) ); if ($workerRunningJob != null) { - $this->repoWorkerJob->reconnect(); - $workerRunningJob - ->setStatus(WorkerRunningJob::FINISHED) - ->setFinished(new \DateTime('now')) - ; - - $em->persist($workerRunningJob); - - $em->flush(); + $this->repoWorkerJob->markFinished($workerRunningJob->getId(), $this->getMessagePublisher(),MessagePublisher::DOWNLOAD_ASYNC_TYPE); } sleep(1); @@ -436,6 +428,11 @@ private function getWorkerRunningJobRepository() return $this->app['repo.worker-running-job']; } + private function getMessagePublisher() + { + return $this->app['alchemy_worker.message.publisher']; + } + private function cellRefFromColumnAndRow(int $col, int $row = null) { $r = Coordinate::stringFromColumnIndex($col); diff --git a/lib/Alchemy/Phrasea/WorkerManager/Worker/EditRecordWorker.php b/lib/Alchemy/Phrasea/WorkerManager/Worker/EditRecordWorker.php index 0997204cf5..0886615a61 100644 --- a/lib/Alchemy/Phrasea/WorkerManager/Worker/EditRecordWorker.php +++ b/lib/Alchemy/Phrasea/WorkerManager/Worker/EditRecordWorker.php @@ -200,17 +200,7 @@ public function process(array $payload) ); // tell that we have finished to work on edit - $this->repoWorker->reconnect(); - $em->getConnection()->beginTransaction(); - try { - $workerRunningJob->setStatus(WorkerRunningJob::FINISHED); - $workerRunningJob->setFinished(new \DateTime('now')); - $em->persist($workerRunningJob); - $em->flush(); - $em->commit(); - } catch (\Exception $e) { - $em->rollback(); - } + $this->repoWorker->markFinished($workerRunningJob->getId(), $this->messagePublisher, MessagePublisher::EDIT_RECORD_TYPE); $this->messagePublisher->pushLog(sprintf("record edited databoxname=%s databoxid=%d recordid=%d", $databox->get_viewname(), $payload['databoxId'], $payload['record_id'])); } diff --git a/lib/Alchemy/Phrasea/WorkerManager/Worker/ExportMailWorker.php b/lib/Alchemy/Phrasea/WorkerManager/Worker/ExportMailWorker.php index 0e33106341..a836277085 100644 --- a/lib/Alchemy/Phrasea/WorkerManager/Worker/ExportMailWorker.php +++ b/lib/Alchemy/Phrasea/WorkerManager/Worker/ExportMailWorker.php @@ -216,16 +216,7 @@ public function process(array $payload) } if ($workerRunningJob != null) { - $this->repoWorkerJob->reconnect(); - $workerRunningJob - ->setWorkOn(implode(',', $deliverEmails)) - ->setStatus(WorkerRunningJob::FINISHED) - ->setFinished(new \DateTime('now')) - ; - - $em->persist($workerRunningJob); - - $em->flush(); + $this->repoWorkerJob->markFinished($workerRunningJob->getId(), $this->getMessagePublisher(), MessagePublisher::EXPORT_MAIL_TYPE); } sleep(30); @@ -250,4 +241,12 @@ private function getWorkerRunningJobRepository() { return $this->app['repo.worker-running-job']; } + + /** + * @return MessagePublisher + */ + private function getMessagePublisher() + { + return $this->app['alchemy_worker.message.publisher']; + } } diff --git a/lib/Alchemy/Phrasea/WorkerManager/Worker/ExposeUploadWorker.php b/lib/Alchemy/Phrasea/WorkerManager/Worker/ExposeUploadWorker.php index 1050e8df37..2bab163278 100644 --- a/lib/Alchemy/Phrasea/WorkerManager/Worker/ExposeUploadWorker.php +++ b/lib/Alchemy/Phrasea/WorkerManager/Worker/ExposeUploadWorker.php @@ -373,7 +373,7 @@ public function process(array $payload) } // tell that the upload is finished - $this->finishedJob($workerRunningJob, $em); + $this->repoWorker->markFinished($workerRunningJob->getId(), $this->messagePublisher, MessagePublisher::EXPOSE_UPLOAD_TYPE); } private function getClientAnnotationProfile(Client $exposeClient, $publicationId) diff --git a/lib/Alchemy/Phrasea/WorkerManager/Worker/FtpWorker.php b/lib/Alchemy/Phrasea/WorkerManager/Worker/FtpWorker.php index 6db13c3bd9..af710f16ad 100644 --- a/lib/Alchemy/Phrasea/WorkerManager/Worker/FtpWorker.php +++ b/lib/Alchemy/Phrasea/WorkerManager/Worker/FtpWorker.php @@ -355,18 +355,7 @@ private function doExport(FtpExport $export, array $payload) if (!$processError && $workerRunningJob) { // tell that we have finished to work on this file - $this->repoWorker->reconnect(); - $em->beginTransaction(); - try { - $workerRunningJob->setStatus(WorkerRunningJob::FINISHED); - $workerRunningJob->setFinished(new \DateTime('now')); - $em->persist($workerRunningJob); - $em->flush(); - $em->commit(); - } - catch (Exception $e) { - $em->rollback(); - } + $this->repoWorker->markFinished($workerRunningJob->getId(), $this->getMessagePublisher(), MessagePublisher::FTP_TYPE); } else { // if there is an error $count = isset($payload['count']) ? $payload['count'] + 1 : 2 ; @@ -537,5 +526,4 @@ private function getMessagePublisher() { return $this->app['alchemy_worker.message.publisher']; } - } diff --git a/lib/Alchemy/Phrasea/WorkerManager/Worker/PopulateIndexWorker.php b/lib/Alchemy/Phrasea/WorkerManager/Worker/PopulateIndexWorker.php index 465599fae9..98e45a44fa 100644 --- a/lib/Alchemy/Phrasea/WorkerManager/Worker/PopulateIndexWorker.php +++ b/lib/Alchemy/Phrasea/WorkerManager/Worker/PopulateIndexWorker.php @@ -145,15 +145,7 @@ public function process(array $payload) // tell that the populate is finished if ($workerRunningJob != null) { - $this->repoWorker->reconnect(); - $workerRunningJob - ->setStatus(WorkerRunningJob::FINISHED) - ->setFinished(new \DateTime('now')) - ; - - $em->persist($workerRunningJob); - - $em->flush(); + $this->repoWorker->markFinished($workerRunningJob->getId(), $this->messagePublisher, MessagePublisher::POPULATE_INDEX_TYPE); } } diff --git a/lib/Alchemy/Phrasea/WorkerManager/Worker/ShareBasketWorker.php b/lib/Alchemy/Phrasea/WorkerManager/Worker/ShareBasketWorker.php index 3fc10e136a..e8d23471f8 100644 --- a/lib/Alchemy/Phrasea/WorkerManager/Worker/ShareBasketWorker.php +++ b/lib/Alchemy/Phrasea/WorkerManager/Worker/ShareBasketWorker.php @@ -15,6 +15,7 @@ use Alchemy\Phrasea\Model\Manipulator\TokenManipulator; use Alchemy\Phrasea\Model\Repositories\BasketRepository; use Alchemy\Phrasea\Model\Repositories\UserRepository; +use Alchemy\Phrasea\Model\Repositories\WorkerRunningJobRepository; use Alchemy\Phrasea\Record\RecordReference; use Alchemy\Phrasea\WorkerManager\Queue\MessagePublisher; use DateTime; @@ -405,14 +406,7 @@ public function process(array $payload) $this->getLogger()->info("Basket with Id " . $basket->getId() . " successfully shared !"); if ($workerRunningJob != null) { - $workerRunningJob - ->setStatus(WorkerRunningJob::FINISHED) - ->setFinished(new \DateTime('now')) - ; - - $manager->persist($workerRunningJob); - - $manager->flush(); + $this->getRepoWorkerRunningJob()->markFinished($workerRunningJob->getId(), $this->getMessagePublisher(), MessagePublisher::SHARE_BASKET_TYPE); } // file_put_contents("./tmp/phraseanet-log.txt", sprintf("\n%s; ==== END (N = %d ; dT = %d ==> %0.2f / sec) ====\n\n", time(), $n_participants, time()-$_t0, $n_participants/(max(time()-$_t0, 0.001))), FILE_APPEND); @@ -500,4 +494,20 @@ private function getLogger() { return $this->app['alchemy_worker.logger']; } + + /** + * @return WorkerRunningJobRepository + */ + private function getRepoWorkerRunningJob() + { + return $this->app['repo.worker-running-job']; + } + + /** + * @return MessagePublisher + */ + private function getMessagePublisher() + { + return $this->app['alchemy_worker.message.publisher']; + } } diff --git a/lib/Alchemy/Phrasea/WorkerManager/Worker/SubdefCreationWorker.php b/lib/Alchemy/Phrasea/WorkerManager/Worker/SubdefCreationWorker.php index 9e647191f6..6869c36f99 100644 --- a/lib/Alchemy/Phrasea/WorkerManager/Worker/SubdefCreationWorker.php +++ b/lib/Alchemy/Phrasea/WorkerManager/Worker/SubdefCreationWorker.php @@ -199,7 +199,7 @@ public function process(array $payload) $payload['subdefName'], $databox->get_viewname(), $databoxId, $recordId)); // tell that we have finished to work on this file (=unlock) - $this->repoWorker->markFinished($workerRunningJobId); + $this->repoWorker->markFinished($workerRunningJobId, $this->messagePublisher, MessagePublisher::SUBDEF_CREATION_TYPE); $this->getDataboxLogger($databox)->initOrUpdateLogDocsFromWorker($record, $databox, $workerRunningJob, $subdefName, \Session_Logger::EVENT_SUBDEFCREATION, new \DateTime('now'), WorkerRunningJob::FINISHED); } diff --git a/lib/Alchemy/Phrasea/WorkerManager/Worker/SubtitleWorker.php b/lib/Alchemy/Phrasea/WorkerManager/Worker/SubtitleWorker.php index 8119f509a1..74be8f339c 100644 --- a/lib/Alchemy/Phrasea/WorkerManager/Worker/SubtitleWorker.php +++ b/lib/Alchemy/Phrasea/WorkerManager/Worker/SubtitleWorker.php @@ -38,13 +38,19 @@ class SubtitleWorker implements WorkerInterface private $workerRunningJob; private $transcriptionsId; - public function __construct(WorkerRunningJobRepository $repoWorker, PropertyAccess $conf, callable $appboxLocator, LoggerInterface $logger, EventDispatcherInterface $dispatcher) + /** + * @var MessagePublisher + */ + private $messagePublisher; + + public function __construct(WorkerRunningJobRepository $repoWorker, PropertyAccess $conf, callable $appboxLocator, LoggerInterface $logger, EventDispatcherInterface $dispatcher, $messagePublisher) { $this->repoWorker = $repoWorker; $this->conf = $conf; $this->appboxLocator = $appboxLocator; $this->logger = $logger; $this->dispatcher = $dispatcher; + $this->messagePublisher = $messagePublisher; } public function process(array $payload) @@ -265,7 +271,7 @@ public function process(array $payload) // $this->deleteTranscription($transcriptionId); // } - $this->jobFinished(); + $this->repoWorker->markFinished($this->workerRunningJob->getId(), $this->messagePublisher, MessagePublisher::SUBTITLE_TYPE); return 0; } diff --git a/lib/Alchemy/Phrasea/WorkerManager/Worker/WebhookWorker.php b/lib/Alchemy/Phrasea/WorkerManager/Worker/WebhookWorker.php index 593afbee5b..ed78038f7d 100644 --- a/lib/Alchemy/Phrasea/WorkerManager/Worker/WebhookWorker.php +++ b/lib/Alchemy/Phrasea/WorkerManager/Worker/WebhookWorker.php @@ -137,14 +137,7 @@ public function process(array $payload) } if ($workerRunningJob != null) { - $workerRunningJob - ->setStatus(WorkerRunningJob::FINISHED) - ->setFinished(new \DateTime('now')) - ; - - $em->persist($workerRunningJob); - - $em->flush(); + $this->repoWorkerJob->markFinished($workerRunningJob->getId(), $this->messagePublisher, MessagePublisher::WEBHOOK_TYPE); } } } diff --git a/lib/Alchemy/Phrasea/WorkerManager/Worker/WriteMetadatasWorker.php b/lib/Alchemy/Phrasea/WorkerManager/Worker/WriteMetadatasWorker.php index a32ceadce7..a2ddca88b0 100644 --- a/lib/Alchemy/Phrasea/WorkerManager/Worker/WriteMetadatasWorker.php +++ b/lib/Alchemy/Phrasea/WorkerManager/Worker/WriteMetadatasWorker.php @@ -96,7 +96,7 @@ public function process(array $payload) try { $record = $databox->get_record($recordId); } catch (\Exception $e) { - $this->repoWorker->markFinished($workerRunningJobId, "error " . $e->getMessage()); + $this->repoWorker->markFinished($workerRunningJobId, $this->messagePublisher, MessagePublisher::WRITE_METADATAS_TYPE, "error " . $e->getMessage()); return; } @@ -112,7 +112,7 @@ public function process(array $payload) $this->logger->error("Can't write meta on svg file!"); // tell that we have finished to work on this file ("unlock") - $this->repoWorker->markFinished($workerRunningJobId, "Can't write meta on svg file!"); + $this->repoWorker->markFinished($workerRunningJobId, $this->messagePublisher, MessagePublisher::WRITE_METADATAS_TYPE, "Can't write meta on svg file!"); $this->getDataboxLogger($databox)->initOrUpdateLogDocsFromWorker($record, $databox, $workerRunningJob, $subdefName, \Session_Logger::EVENT_WRITEMETADATAS, new \DateTime('now'), WorkerRunningJob::ERROR); @@ -323,7 +323,7 @@ public function process(array $payload) $this->updateJeton($record); // tell that we have finished to work on this file (=unlock) - $this->repoWorker->markFinished($workerRunningJobId, $stopInfo); + $this->repoWorker->markFinished($workerRunningJobId, $this->messagePublisher, MessagePublisher::WRITE_METADATAS_TYPE, $stopInfo); $this->getDataboxLogger($databox)->initOrUpdateLogDocsFromWorker($record, $databox, $workerRunningJob, $subdefName, \Session_Logger::EVENT_WRITEMETADATAS, new \DateTime('now'), WorkerRunningJob::ERROR); } return ; @@ -333,7 +333,7 @@ public function process(array $payload) $this->updateJeton($record); // tell that we have finished to work on this file (=unlock) - $this->repoWorker->markFinished($workerRunningJobId); + $this->repoWorker->markFinished($workerRunningJobId, $this->messagePublisher, MessagePublisher::WRITE_METADATAS_TYPE); $this->getDataboxLogger($databox)->initOrUpdateLogDocsFromWorker($record, $databox, $workerRunningJob, $subdefName, \Session_Logger::EVENT_WRITEMETADATAS, new \DateTime('now'), WorkerRunningJob::FINISHED); }