Skip to content

Commit

Permalink
PHRAS-4058 auto cancelling job
Browse files Browse the repository at this point in the history
  • Loading branch information
aynsix committed Apr 25, 2024
1 parent 475c52b commit cd8ddea
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,10 @@ public function register(Application $app)
}));

$app['alchemy_worker.type_based_worker_resolver']->addFactory(MessagePublisher::MAIN_QUEUE_TYPE, new CallableWorkerFactory(function () use ($app) {
return new MainQueueWorker($app['alchemy_worker.message.publisher'], $app['repo.worker-job']);
return (new MainQueueWorker($app['alchemy_worker.message.publisher'], $app['repo.worker-job'], $app['repo.worker-running-job']))
->setDataboxLoggerLocator($app['phraseanet.logger'])
->setApplicationBox($app['phraseanet.appbox'])
;
}));

$app['alchemy_worker.type_based_worker_resolver']->addFactory(MessagePublisher::FTP_TYPE, new CallableWorkerFactory(function () use ($app) {
Expand Down
32 changes: 31 additions & 1 deletion lib/Alchemy/Phrasea/WorkerManager/Worker/MainQueueWorker.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,34 @@

namespace Alchemy\Phrasea\WorkerManager\Worker;

use Alchemy\Phrasea\Application\Helper\ApplicationBoxAware;
use Alchemy\Phrasea\Application\Helper\DataboxLoggerAware;
use Alchemy\Phrasea\Model\Entities\WorkerJob;
use Alchemy\Phrasea\Model\Entities\WorkerRunningJob;
use Alchemy\Phrasea\Model\Repositories\WorkerJobRepository;
use Alchemy\Phrasea\Model\Repositories\WorkerRunningJobRepository;
use Alchemy\Phrasea\WorkerManager\Event\RecordEditInWorkerEvent;
use Alchemy\Phrasea\WorkerManager\Queue\MessagePublisher;

class MainQueueWorker implements WorkerInterface
{
use ApplicationBoxAware;
use DataboxLoggerAware;

private $messagePublisher;

private $repoWorkerJob;
private $repoWorkerRunningJob;

public function __construct(
MessagePublisher $messagePublisher,
WorkerJobRepository $repoWorkerJob
WorkerJobRepository $repoWorkerJob,
WorkerRunningJobRepository $repoWorkerRunningJob
)
{
$this->messagePublisher = $messagePublisher;
$this->repoWorkerJob = $repoWorkerJob;
$this->repoWorkerRunningJob = $repoWorkerRunningJob;
}

public function process(array $payload)
Expand All @@ -29,6 +39,26 @@ public function process(array $payload)
'payload' => $payload
];

$autoCancelingJob = 48; // hours
// first get the workerRunningJobs for log_docs 'subdefCreation', 'writeMetadatas'
$workerRunningJobs = $this->repoWorkerRunningJob->getRunningSinceCreated($autoCancelingJob, ['subdefCreation', 'writeMetadatas']);

// update the status for table workerRunningJob
$this->repoWorkerRunningJob->updateStatusRunningToCanceledSinceCreated($autoCancelingJob);

// last, treat the log_docs
$finishedDate = new \DateTime('now');
/** @var WorkerRunningJob $workerRunningJob */
foreach ($workerRunningJobs as $workerRunningJob) {
$databox = $this->findDataboxById($workerRunningJob->getDataboxId());
$record = $databox->get_record($workerRunningJob->getRecordId());
$subdefName = $workerRunningJob->getWorkOn();
$action = $workerRunningJob->getWork();
$status = 'canceled';

$this->getDataboxLogger($databox)->initOrUpdateLogDocsFromWorker($record, $databox, $workerRunningJob, $subdefName, $action, $finishedDate, $status);
}

$em = $this->repoWorkerJob->getEntityManager();
$em->getConnection()->beginTransaction();
/** @var WorkerJob $workerJob */
Expand Down

0 comments on commit cd8ddea

Please sign in to comment.