Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
4rthem committed Oct 29, 2024
1 parent 8a820d0 commit 8b3bad6
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public function configureActions(Actions $actions): Actions
{
$viewWorkflow = Action::new('viewWorkflow', 'View', 'fa fa-eye')
->setHtmlAttributes(['target' => '_blank'])
->linkToUrl(fn (WorkflowState $entity): string => sprintf('%s/workflows/%s', $this->databoxClientBaseUrl, $entity->getId()));
->linkToUrl(fn (WorkflowState $entity): string => sprintf('%s/?_m=%s', $this->databoxClientBaseUrl, urlencode(sprintf('/workflows/%s', $entity->getId()))));

$cancel = Action::new('cancelWorkflow', 'Cancel Workflow', 'fas fa-ban')
->displayIf(fn (WorkflowState $entity) => ModelWorkflowState::STATUS_STARTED === $entity->getStatus())
Expand Down
47 changes: 37 additions & 10 deletions lib/php/workflow/src/Executor/JobExecutor.php
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,23 @@
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\EventDispatcher\EventDispatcher;

readonly class JobExecutor
final class JobExecutor
{
private LoggerInterface $logger;
private OutputInterface $output;
private EnvContainer $envs;
private EventDispatcherInterface $eventDispatcher;
private readonly LoggerInterface $logger;
private readonly OutputInterface $output;
private readonly EnvContainer $envs;
private readonly EventDispatcherInterface $eventDispatcher;

/**
* @var JobUpdateEvent[]
*/
private array $eventsToDispatch = [];

public function __construct(
private iterable $executors,
private ActionRegistryInterface $actionRegistry,
private ExpressionParser $expressionParser,
private StateRepositoryInterface $stateRepository,
private readonly iterable $executors,
private readonly ActionRegistryInterface $actionRegistry,
private readonly ExpressionParser $expressionParser,
private readonly StateRepositoryInterface $stateRepository,
?OutputInterface $output = null,
?LoggerInterface $logger = null,
?EnvContainer $envs = null,
Expand Down Expand Up @@ -152,11 +157,24 @@ public function executeJob(WorkflowState $workflowState, Job $job, string $jobSt
}
});

$this->flushEvents();

if (null === $context) {
return;
}

$this->runJob($context, $job);

$this->flushEvents();
}

private function dispatchEvent(JobUpdateEvent $event): void
{
if ($this->stateRepository instanceof TransactionalStateRepositoryInterface) {
$this->eventsToDispatch[] = $event;
} else {
$this->eventDispatcher->dispatch($event);
}
}

private function persistJobState(JobState $jobState): void
Expand All @@ -167,7 +185,7 @@ private function persistJobState(JobState $jobState): void
$this->stateRepository->releaseJobLock($jobState->getWorkflowId(), $jobState->getId());
}

$this->eventDispatcher->dispatch(new JobUpdateEvent($jobState->getWorkflowId(), $jobState->getJobId(), $jobState->getId(), $jobState->getStatus()));
$this->dispatchEvent(new JobUpdateEvent($jobState->getWorkflowId(), $jobState->getJobId(), $jobState->getId(), $jobState->getStatus()));
}

private function runJob(JobExecutionContext $context, Job $job): void
Expand Down Expand Up @@ -272,4 +290,13 @@ private function extractOutputs(Job $job, JobExecutionContext $context): void
}
}
}

private function flushEvents(): void
{
$events = $this->eventsToDispatch;
$this->eventsToDispatch = [];
foreach ($events as $event) {
$this->eventDispatcher->dispatch($event);
}
}
}
1 change: 1 addition & 0 deletions lib/php/workflow/src/Message/JobConsumerHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ public function __invoke(JobConsumer $message): void
$message->getJobId(),
$message->getJobStateId(),
));

$this->orchestrator->continueWorkflow($message->getWorkflowId());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public function clearCache(): void
$this->statuses = [];
$this->statusesByJobId = [];
$this->workflows = [];
$this->lastByJobId = [];
}

protected function cacheWorkflowState(string $workflowId, ?object $state): void
Expand Down
17 changes: 6 additions & 11 deletions lib/php/workflow/src/WorkflowOrchestrator.php
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,7 @@ public function startWorkflow(string $workflowName, ?WorkflowEvent $event = null

$this->doContinueWorkflow($workflowState);

if (!$this->trigger->isSynchronous()) {
$this->flush();
}
$this->flush();

return $workflowState;
}
Expand Down Expand Up @@ -147,9 +145,7 @@ public function continueWorkflow(string $workflowId): void
}
});

if (!$this->trigger->isSynchronous()) {
$this->flush();
}
$this->flush();
}

private function wrapInTransaction(callable $callback): void
Expand Down Expand Up @@ -217,9 +213,7 @@ public function rerunJobs(string $workflowId, ?string $jobIdFilter = null, ?arra
$this->doRerunJobs($workflowId, $jobIdFilter, $expectedStatuses, $jobInputs);
});

if (!$this->trigger->isSynchronous()) {
$this->flush();
}
$this->flush();
}

private function doRerunJobs(string $workflowId, ?string $jobIdFilter, ?array $expectedStatuses, ?array $jobInputs): void
Expand Down Expand Up @@ -360,10 +354,11 @@ private function triggerJob(WorkflowState $workflowState, string $jobId, ?array
}
$this->stateRepository->persistJobState($jobState);

$jobTrigger = new JobTrigger($workflowId, $jobId, $jobState->getId());
if ($this->trigger->isSynchronous()) {
$this->trigger->triggerJob(new JobTrigger($workflowId, $jobId, $jobState->getId()));
$this->trigger->triggerJob($jobTrigger);
} else {
$this->workflowsToTrigger[] = new JobTrigger($workflowId, $jobId, $jobState->getId());
$this->workflowsToTrigger[] = $jobTrigger;
}
}

Expand Down

0 comments on commit 8b3bad6

Please sign in to comment.