From 8b3bad622236f86978d162b9f55495432b0434e2 Mon Sep 17 00:00:00 2001 From: Arthur de Moulins Date: Wed, 30 Oct 2024 00:00:14 +0100 Subject: [PATCH] WIP --- .../Admin/WorkflowStateCrudController.php | 2 +- lib/php/workflow/src/Executor/JobExecutor.php | 47 +++++++++++++++---- .../src/Message/JobConsumerHandler.php | 1 + .../State/Repository/JobStatusCacheTrait.php | 1 + lib/php/workflow/src/WorkflowOrchestrator.php | 17 +++---- 5 files changed, 46 insertions(+), 22 deletions(-) diff --git a/databox/api/src/Controller/Admin/WorkflowStateCrudController.php b/databox/api/src/Controller/Admin/WorkflowStateCrudController.php index bf4a102ea..844c90fd6 100644 --- a/databox/api/src/Controller/Admin/WorkflowStateCrudController.php +++ b/databox/api/src/Controller/Admin/WorkflowStateCrudController.php @@ -47,7 +47,7 @@ public function configureActions(Actions $actions): Actions { $viewWorkflow = Action::new('viewWorkflow', 'View', 'fa fa-eye') ->setHtmlAttributes(['target' => '_blank']) - ->linkToUrl(fn (WorkflowState $entity): string => sprintf('%s/workflows/%s', $this->databoxClientBaseUrl, $entity->getId())); + ->linkToUrl(fn (WorkflowState $entity): string => sprintf('%s/?_m=%s', $this->databoxClientBaseUrl, urlencode(sprintf('/workflows/%s', $entity->getId())))); $cancel = Action::new('cancelWorkflow', 'Cancel Workflow', 'fas fa-ban') ->displayIf(fn (WorkflowState $entity) => ModelWorkflowState::STATUS_STARTED === $entity->getStatus()) diff --git a/lib/php/workflow/src/Executor/JobExecutor.php b/lib/php/workflow/src/Executor/JobExecutor.php index eedc710d2..40337f492 100644 --- a/lib/php/workflow/src/Executor/JobExecutor.php +++ b/lib/php/workflow/src/Executor/JobExecutor.php @@ -24,18 +24,23 @@ use Symfony\Component\Console\Output\OutputInterface; use Symfony\Component\EventDispatcher\EventDispatcher; -readonly class JobExecutor +final class JobExecutor { - private LoggerInterface $logger; - private OutputInterface $output; - private EnvContainer $envs; - private EventDispatcherInterface $eventDispatcher; + private readonly LoggerInterface $logger; + private readonly OutputInterface $output; + private readonly EnvContainer $envs; + private readonly EventDispatcherInterface $eventDispatcher; + + /** + * @var JobUpdateEvent[] + */ + private array $eventsToDispatch = []; public function __construct( - private iterable $executors, - private ActionRegistryInterface $actionRegistry, - private ExpressionParser $expressionParser, - private StateRepositoryInterface $stateRepository, + private readonly iterable $executors, + private readonly ActionRegistryInterface $actionRegistry, + private readonly ExpressionParser $expressionParser, + private readonly StateRepositoryInterface $stateRepository, ?OutputInterface $output = null, ?LoggerInterface $logger = null, ?EnvContainer $envs = null, @@ -152,11 +157,24 @@ public function executeJob(WorkflowState $workflowState, Job $job, string $jobSt } }); + $this->flushEvents(); + if (null === $context) { return; } $this->runJob($context, $job); + + $this->flushEvents(); + } + + private function dispatchEvent(JobUpdateEvent $event): void + { + if ($this->stateRepository instanceof TransactionalStateRepositoryInterface) { + $this->eventsToDispatch[] = $event; + } else { + $this->eventDispatcher->dispatch($event); + } } private function persistJobState(JobState $jobState): void @@ -167,7 +185,7 @@ private function persistJobState(JobState $jobState): void $this->stateRepository->releaseJobLock($jobState->getWorkflowId(), $jobState->getId()); } - $this->eventDispatcher->dispatch(new JobUpdateEvent($jobState->getWorkflowId(), $jobState->getJobId(), $jobState->getId(), $jobState->getStatus())); + $this->dispatchEvent(new JobUpdateEvent($jobState->getWorkflowId(), $jobState->getJobId(), $jobState->getId(), $jobState->getStatus())); } private function runJob(JobExecutionContext $context, Job $job): void @@ -272,4 +290,13 @@ private function extractOutputs(Job $job, JobExecutionContext $context): void } } } + + private function flushEvents(): void + { + $events = $this->eventsToDispatch; + $this->eventsToDispatch = []; + foreach ($events as $event) { + $this->eventDispatcher->dispatch($event); + } + } } diff --git a/lib/php/workflow/src/Message/JobConsumerHandler.php b/lib/php/workflow/src/Message/JobConsumerHandler.php index c5307ef4b..377324724 100644 --- a/lib/php/workflow/src/Message/JobConsumerHandler.php +++ b/lib/php/workflow/src/Message/JobConsumerHandler.php @@ -23,6 +23,7 @@ public function __invoke(JobConsumer $message): void $message->getJobId(), $message->getJobStateId(), )); + $this->orchestrator->continueWorkflow($message->getWorkflowId()); } } diff --git a/lib/php/workflow/src/State/Repository/JobStatusCacheTrait.php b/lib/php/workflow/src/State/Repository/JobStatusCacheTrait.php index aac23cf64..5df66cf23 100644 --- a/lib/php/workflow/src/State/Repository/JobStatusCacheTrait.php +++ b/lib/php/workflow/src/State/Repository/JobStatusCacheTrait.php @@ -33,6 +33,7 @@ public function clearCache(): void $this->statuses = []; $this->statusesByJobId = []; $this->workflows = []; + $this->lastByJobId = []; } protected function cacheWorkflowState(string $workflowId, ?object $state): void diff --git a/lib/php/workflow/src/WorkflowOrchestrator.php b/lib/php/workflow/src/WorkflowOrchestrator.php index aa3b33319..2529dbbde 100644 --- a/lib/php/workflow/src/WorkflowOrchestrator.php +++ b/lib/php/workflow/src/WorkflowOrchestrator.php @@ -80,9 +80,7 @@ public function startWorkflow(string $workflowName, ?WorkflowEvent $event = null $this->doContinueWorkflow($workflowState); - if (!$this->trigger->isSynchronous()) { - $this->flush(); - } + $this->flush(); return $workflowState; } @@ -147,9 +145,7 @@ public function continueWorkflow(string $workflowId): void } }); - if (!$this->trigger->isSynchronous()) { - $this->flush(); - } + $this->flush(); } private function wrapInTransaction(callable $callback): void @@ -217,9 +213,7 @@ public function rerunJobs(string $workflowId, ?string $jobIdFilter = null, ?arra $this->doRerunJobs($workflowId, $jobIdFilter, $expectedStatuses, $jobInputs); }); - if (!$this->trigger->isSynchronous()) { - $this->flush(); - } + $this->flush(); } private function doRerunJobs(string $workflowId, ?string $jobIdFilter, ?array $expectedStatuses, ?array $jobInputs): void @@ -360,10 +354,11 @@ private function triggerJob(WorkflowState $workflowState, string $jobId, ?array } $this->stateRepository->persistJobState($jobState); + $jobTrigger = new JobTrigger($workflowId, $jobId, $jobState->getId()); if ($this->trigger->isSynchronous()) { - $this->trigger->triggerJob(new JobTrigger($workflowId, $jobId, $jobState->getId())); + $this->trigger->triggerJob($jobTrigger); } else { - $this->workflowsToTrigger[] = new JobTrigger($workflowId, $jobId, $jobState->getId()); + $this->workflowsToTrigger[] = $jobTrigger; } }