diff --git a/src/Adapter/Composer.php b/src/Adapter/Composer.php index bce2453b..f58ad39e 100644 --- a/src/Adapter/Composer.php +++ b/src/Adapter/Composer.php @@ -6,7 +6,10 @@ use Psr\Log\AbstractLogger; use Psr\Log\LoggerInterface; -use Symfony\Component\Process\Process; +use React\ChildProcess\Process; +use React\Promise\Deferred; +use function React\Async\await; +use function React\Promise\Timer\timeout; final class Composer { @@ -21,52 +24,48 @@ public function log($level, $message, array $context = []): void }; } - private function execute(Process $process): void + private function execute(Process $process, float $timeout = 300): void { - $process->run(function ($type, $buffer): void { - if (Process::ERR === $type) { - $this->logger->info($buffer); - } else { - $this->logger->debug($buffer); - } + $process->stdout->on('data', function ($chunk) { + $this->logger->debug($chunk); + }); + $process->stderr->on('data', function ($chunk) { + $this->logger->info($chunk); }); - if (0 !== $process->getExitCode()) { - throw new ComposerFailureException($process->getCommandLine(), sprintf('Process exited unexpectedly with output: %s', $process->getErrorOutput()), $process->getExitCode()); - } - } + $deferred = new Deferred(); - private function command(string ...$command): void - { - $process = new Process($command); - $process->setWorkingDirectory($this->workdir); + $process->on('exit', function () use ($deferred) { + $deferred->resolve(); + }); - $process->setTimeout(300); + $process->start(); + $this->logger->notice(sprintf('Starting process "%s".', $process->getCommand())); - $this->execute($process); + await(timeout($deferred->promise(), $timeout)); + + if (0 !== $process->getExitCode()) { + throw new ComposerFailureException($process->getCommand(), sprintf('Process exited unexpectedly with output: %s', $process->getExitCode()), $process->getExitCode()); + } } - private function pipe(Process ...$processes): void + private function command(string ...$command): void { - $process = Process::fromShellCommandline(implode('|', array_map(fn (Process $process) => $process->getCommandLine(), $processes))); - $process->setWorkingDirectory($this->workdir); - - $process->setTimeout(300); + $process = new Process( + implode (' ', array_map(fn ($part) => escapeshellarg($part), $command)), + $this->workdir, + ); $this->execute($process); } - private function subcommand(string ...$command): Process - { - return new Process($command); - } - public function require(string ...$packages): void { $this->command( 'composer', 'require', '--with-dependencies', + '--with-all-dependencies', '--prefer-dist', '--no-progress', '--prefer-stable', diff --git a/src/Adapter/Docker/Satellite.php b/src/Adapter/Docker/Satellite.php index e5742f2a..e93019b9 100644 --- a/src/Adapter/Docker/Satellite.php +++ b/src/Adapter/Docker/Satellite.php @@ -6,10 +6,15 @@ use Kiboko\Component\Dockerfile; use Kiboko\Component\Packaging\TarArchive; +use Kiboko\Component\Satellite\Adapter\ComposerFailureException; use Kiboko\Contract\Configurator; use Kiboko\Contract\Packaging; use Psr\Log\LoggerInterface; -use Symfony\Component\Process\Process; +use React\ChildProcess\Process; +use React\Promise\Deferred; +use React\Stream\ReadableResourceStream; +use function React\Async\await; +use function React\Promise\Timer\timeout; final class Satellite implements Configurator\SatelliteInterface { @@ -65,24 +70,48 @@ public function build( } }; - $process = new Process([ - 'docker', 'build', '--rm', '-', ...iterator_to_array($iterator($this->imageTags)), - ]); + $command = ['docker', 'build', '--rm', '-', ...iterator_to_array($iterator($this->imageTags))]; - $process->setInput($archive->asResource()); + $process = new Process( + implode (' ', array_map(fn ($part) => escapeshellarg($part), $command)), + $this->workdir, + ); - $process->setTimeout(300); + $input = new ReadableResourceStream($archive->asResource()); + $input->pipe($process->stdin); - $process->run(function ($type, $buffer) use ($logger): void { - if (Process::ERR === $type) { - $logger->info($buffer); - } else { - $logger->debug($buffer); - } - }); + $this->execute($logger, $process); if (0 !== $process->getExitCode()) { throw new \RuntimeException('Process exited unexpectedly.'); } } + + private function execute( + LoggerInterface $logger, + Process $process, + float $timeout = 300 + ): void { + $process->stdout->on('data', function ($chunk) use ($logger) { + $logger->debug($chunk); + }); + $process->stderr->on('data', function ($chunk) use ($logger) { + $logger->info($chunk); + }); + + $deferred = new Deferred(); + + $process->on('exit', function () use ($deferred) { + $deferred->resolve(); + }); + + $process->start(); + $logger->notice(sprintf('Starting process "%s".', $process->getCommand())); + + await(timeout($deferred->promise(), $timeout)); + + if (0 !== $process->getExitCode()) { + throw new ComposerFailureException($process->getCommand(), sprintf('Process exited unexpectedly with output: %s', $process->getExitCode()), $process->getExitCode()); + } + } } diff --git a/src/Cloud/DTO/Pipeline.php b/src/Cloud/DTO/Pipeline.php index 7112f487..92bc205c 100644 --- a/src/Cloud/DTO/Pipeline.php +++ b/src/Cloud/DTO/Pipeline.php @@ -10,10 +10,10 @@ public function __construct( private string $label, private string $code, private StepList $steps, - private Autoload $autoload, - private PackageList $packages, - private RepositoryList $repositories, - private AuthList $auths, + private Autoload $autoload = new Autoload(), + private PackageList $packages = new PackageList(), + private RepositoryList $repositories = new RepositoryList(), + private AuthList $auths = new AuthList(), ) {} public function code(): string diff --git a/src/Console/Command/HookRunCommand.php b/src/Console/Command/HookRunCommand.php index 6a538c29..87954c7f 100644 --- a/src/Console/Command/HookRunCommand.php +++ b/src/Console/Command/HookRunCommand.php @@ -4,12 +4,14 @@ namespace Kiboko\Component\Satellite\Console\Command; +use React\ChildProcess\Process; +use React\Promise\Deferred; use Symfony\Component\Console; use Symfony\Component\Console\Input\InputInterface; use Symfony\Component\Console\Output\OutputInterface; -use Symfony\Component\Process\Process; +use function React\Async\await; -class HookRunCommand extends Console\Command\Command +final class HookRunCommand extends Console\Command\Command { protected static $defaultName = 'run:hook'; protected static $defaultDescription = 'Run the hook.'; @@ -31,20 +33,50 @@ protected function execute(InputInterface $input, OutputInterface $output): int if (!file_exists($input->getArgument('path').'/vendor/autoload.php')) { $style->error('Nothing is compiled at the provided path'); - return \Symfony\Component\Console\Command\Command::FAILURE; + return Console\Command\Command::FAILURE; } $cwd = getcwd(); chdir($input->getArgument('path')); - $process = new Process(['php', '-S', 'localhost:8000', 'main.php']); - $process->setTimeout(null); - $process->run(function ($type, $buffer): void { - echo $buffer; + $command = ['php', '-S', 'localhost:8000', 'main.php']; + + $process = new Process(implode (' ', array_map(fn ($part) => escapeshellarg($part), $command)), $cwd); + + if (!$this->executeWorker($style, $process)) { + return Console\Command\Command::FAILURE; + } + + return Console\Command\Command::SUCCESS; + } + + private function executeWorker( + Console\Style\SymfonyStyle $style, + Process $process + ): bool { + $process->stdout->on('data', function ($chunk) use ($style) { + $style->text($chunk); + }); + $process->stderr->on('data', function ($chunk) use ($style) { + $style->info($chunk); }); - chdir($cwd); + $deferred = new Deferred(); + + $process->on('exit', function () use ($deferred) { + $deferred->resolve(); + }); + + $process->start(); + $style->note(sprintf('Starting process "%s".', $process->getCommand())); + + await($deferred->promise()); + + if (0 !== $process->getExitCode()) { + $style->error(sprintf('Process exited unexpectedly with exit code %d', $process->getExitCode())); + return false; + } - return \Symfony\Component\Console\Command\Command::SUCCESS; + return true; } } diff --git a/src/Console/Command/PipelineRunCommand.php b/src/Console/Command/PipelineRunCommand.php index 6f31bf52..85925091 100644 --- a/src/Console/Command/PipelineRunCommand.php +++ b/src/Console/Command/PipelineRunCommand.php @@ -4,18 +4,13 @@ namespace Kiboko\Component\Satellite\Console\Command; -use Composer\Autoload\ClassLoader; -use Kiboko\Component\Runtime\Pipeline\Console as PipelineConsoleRuntime; use Symfony\Component\Console; use Symfony\Component\Console\Input\InputInterface; use Symfony\Component\Console\Output\OutputInterface; -use Symfony\Component\Dotenv\Dotenv; -final class PipelineRunCommand extends Console\Command\Command +#[Console\Attribute\AsCommand('run:pipeline', 'Run a data flow satellite (pipeline or workflow).', hidden: true)] +final class PipelineRunCommand extends RunCommand { - protected static $defaultName = 'run:pipeline'; - protected static $defaultDescription = 'Run the pipeline satellite.'; - protected function configure(): void { $this->addArgument('path', Console\Input\InputArgument::REQUIRED); @@ -28,99 +23,11 @@ protected function execute(InputInterface $input, OutputInterface $output): int $output, ); - $style->writeln(sprintf('Running pipeline in %s', $input->getArgument('path'))); - - if (!file_exists($input->getArgument('path').'/vendor/autoload.php')) { - $style->error('There is no compiled pipeline at the provided path'); - - return \Symfony\Component\Console\Command\Command::FAILURE; - } - - $cwd = getcwd(); - chdir($input->getArgument('path')); - - $dotenv = new Dotenv(); - $dotenv->usePutenv(); - - if (file_exists($file = $cwd.'/.env')) { - $dotenv->loadEnv($file); - } - if (file_exists($file = $cwd.'/'.$input->getArgument('path').'/.env')) { - $dotenv->loadEnv($file); - } - - /** @var ClassLoader $autoload */ - $autoload = include 'vendor/autoload.php'; - $autoload->addClassMap([ - /* @phpstan-ignore-next-line */ - \ProjectServiceContainer::class => 'container.php', + $style->warning([ + 'The command "run:pipeline is deprecated and will be removed in future releases.', + 'Please use the "run" command as a replacement.' ]); - $autoload->register(); - - $runtime = new PipelineConsoleRuntime( - $output, - new \Kiboko\Component\Pipeline\Pipeline( - new \Kiboko\Component\Pipeline\PipelineRunner( - new \Psr\Log\NullLogger() - ) - ), - ); - - if (!file_exists('pipeline.php')) { - $style->error('The provided path does not contain one single pipeline, did you mean to run "run:workflow"?'); - - return \Symfony\Component\Console\Command\Command::FAILURE; - } - /** @var callable(runtime: PipelineRuntimeInterface): \Runtime $pipeline */ - $pipeline = include 'pipeline.php'; - - $start = microtime(true); - $pipeline($runtime); - $runtime->run(); - $end = microtime(true); - - $autoload->unregister(); - - $style->writeln(sprintf('time: %s', $this->formatTime($end - $start))); - - chdir($cwd); - - return \Symfony\Component\Console\Command\Command::SUCCESS; - } - - private function formatTime(float $time): string - { - if ($time < .00001) { - return sprintf('%sµs', number_format($time * 1_000_000, 2)); - } - if ($time < .0001) { - return sprintf('%sµs', number_format($time * 1_000_000, 1)); - } - if ($time < .001) { - return sprintf('%sµs', number_format($time * 1_000_000)); - } - if ($time < .01) { - return sprintf('%sms', number_format($time * 1000, 2)); - } - if ($time < .1) { - return sprintf('%sms', number_format($time * 1000, 1)); - } - if ($time < 1) { - return sprintf('%sms', number_format($time * 1000)); - } - if ($time < 10) { - return sprintf('%ss', number_format($time, 2)); - } - if ($time < 3600) { - $minutes = floor($time / 60); - $seconds = $time - (60 * $minutes); - - return sprintf('%smin %ss', number_format($minutes), number_format($seconds, 2)); - } - $hours = floor($time / 3600); - $minutes = floor(($time - (3600 * $hours)) / 60); - $seconds = $time - (3600 * $hours) - (60 * $minutes); - return sprintf('%sh %smin %ss', number_format($hours), number_format($minutes), number_format($seconds, 2)); + return parent::execute($input, $output); } } diff --git a/src/Console/Command/RunCommand.php b/src/Console/Command/RunCommand.php new file mode 100644 index 00000000..1b198c36 --- /dev/null +++ b/src/Console/Command/RunCommand.php @@ -0,0 +1,175 @@ +addArgument('path', Console\Input\InputArgument::REQUIRED); + } + + protected function execute(InputInterface $input, OutputInterface $output): int + { + $style = new Console\Style\SymfonyStyle( + $input, + $output, + ); + + $style->writeln(sprintf('Running pipeline in %s', $input->getArgument('path'))); + + if (!file_exists($input->getArgument('path').'/vendor/autoload.php')) { + $style->error('There is no compiled pipeline at the provided path'); + + return Console\Command\Command::FAILURE; + } + + $cwd = getcwd(); + chdir($input->getArgument('path')); + + if (file_exists('pipeline.php')) { + $source = 'pipeline.php'; + } else if (file_exists('workflow.php')) { + $source = 'workflow.php'; + } else { + $style->error('The provided path does not contain either a workflow or a pipeline satellite, did you mean to run "run:api"?'); + return Console\Command\Command::FAILURE; + } + + $source =<<usePutenv(); + + if (file_exists(\$file = $cwd.'/.env')) { + \$dotenv->loadEnv(\$file); + } + if (file_exists(\$file = $cwd.'/'.{$input->getArgument('path')}.'/.env')) { + \$dotenv->loadEnv(\$file); + } + + /** @var ClassLoader \$autoload */ + \$autoload = include 'vendor/autoload.php'; + \$autoload->addClassMap([ + /* @phpstan-ignore-next-line */ + \ProjectServiceContainer::class => 'container.php', + ]); + \$autoload->register(); + + \$runtime = new PipelineConsoleRuntime( + $output, + new \Kiboko\Component\Pipeline\Pipeline( + new \Kiboko\Component\Pipeline\PipelineRunner( + new \Psr\Log\NullLogger() + ) + ), + ); + + \$satellite = include '$source'; + + \$satellite(\$runtime); + \$runtime->run(); + + \$autoload->unregister(); + PHP; + + $stream = fopen('php://temp', 'r+'); + fwrite($stream, $source); + fseek($stream, 0, SEEK_SET); + + $input = new ReadableResourceStream($stream); + + $start = microtime(true); + $end = microtime(true); + + $style->writeln(sprintf('time: %s', $this->formatTime($end - $start))); + + chdir($cwd); + + $command = ['php', '-r', 'localhost:8000', 'main.php']; + + $process = new Process(implode (' ', array_map(fn ($part) => escapeshellarg($part), $command)), $cwd); + + if (!$this->executeWorker($style, $process)) { + return Console\Command\Command::FAILURE; + } + + return Console\Command\Command::SUCCESS; + } + + private function formatTime(float $time): string + { + if ($time < .00001) { + return sprintf('%sµs', number_format($time * 1_000_000, 2)); + } + if ($time < .0001) { + return sprintf('%sµs', number_format($time * 1_000_000, 1)); + } + if ($time < .001) { + return sprintf('%sµs', number_format($time * 1_000_000)); + } + if ($time < .01) { + return sprintf('%sms', number_format($time * 1000, 2)); + } + if ($time < .1) { + return sprintf('%sms', number_format($time * 1000, 1)); + } + if ($time < 1) { + return sprintf('%sms', number_format($time * 1000)); + } + if ($time < 10) { + return sprintf('%ss', number_format($time, 2)); + } + if ($time < 3600) { + $minutes = floor($time / 60); + $seconds = $time - (60 * $minutes); + + return sprintf('%smin %ss', number_format($minutes), number_format($seconds, 2)); + } + $hours = floor($time / 3600); + $minutes = floor(($time - (3600 * $hours)) / 60); + $seconds = $time - (3600 * $hours) - (60 * $minutes); + + return sprintf('%sh %smin %ss', number_format($hours), number_format($minutes), number_format($seconds, 2)); + } + + private function executeWorker( + Console\Style\SymfonyStyle $style, + Process $process + ): bool { + $process->stdout->on('data', function ($chunk) use ($style) { + $style->text($chunk); + }); + $process->stderr->on('data', function ($chunk) use ($style) { + $style->info($chunk); + }); + + $deferred = new Deferred(); + + $process->on('exit', function () use ($deferred) { + $deferred->resolve(); + }); + + $process->start(); + $style->note(sprintf('Starting process "%s".', $process->getCommand())); + + await($deferred->promise()); + + if (0 !== $process->getExitCode()) { + $style->error(sprintf('Process exited unexpectedly with exit code %d', $process->getExitCode())); + return false; + } + + return true; + } +} diff --git a/src/Console/Command/WorkflowRunCommand.php b/src/Console/Command/WorkflowRunCommand.php index 56cf2c15..75cda29f 100644 --- a/src/Console/Command/WorkflowRunCommand.php +++ b/src/Console/Command/WorkflowRunCommand.php @@ -4,18 +4,13 @@ namespace Kiboko\Component\Satellite\Console\Command; -use Composer\Autoload\ClassLoader; -use Kiboko\Component\Runtime\Workflow\Console as WorkflowConsoleRuntime; use Symfony\Component\Console; use Symfony\Component\Console\Input\InputInterface; use Symfony\Component\Console\Output\OutputInterface; -use Symfony\Component\Dotenv\Dotenv; +#[Console\Attribute\AsCommand('run:workflow', 'Run a data flow satellite (pipeline or workflow).', hidden: true)] final class WorkflowRunCommand extends Console\Command\Command { - protected static $defaultName = 'run:workflow'; - protected static $defaultDescription = 'Run the workflow satellite.'; - protected function configure(): void { $this->addArgument('path', Console\Input\InputArgument::REQUIRED); @@ -28,94 +23,11 @@ protected function execute(InputInterface $input, OutputInterface $output): int $output, ); - $style->writeln(sprintf('Running workflow in %s', $input->getArgument('path'))); - - if (!file_exists($input->getArgument('path').'/vendor/autoload.php')) { - $style->error('There is no compiled workflow at the provided path'); - - return \Symfony\Component\Console\Command\Command::FAILURE; - } - - $cwd = getcwd(); - chdir($input->getArgument('path')); - - $dotenv = new Dotenv(); - $dotenv->usePutenv(); - if (file_exists($file = \dirname($cwd).'/.env')) { - $dotenv->loadEnv($file); - } - if (file_exists($file = $cwd.'/'.$input->getArgument('path').'/.env')) { - $dotenv->loadEnv($file); - } - - /** @var ClassLoader $autoload */ - $autoload = include 'vendor/autoload.php'; - $autoload->addClassMap([ - /* @phpstan-ignore-next-line */ - \ProjectServiceContainer::class => 'container.php', + $style->warning([ + 'The command "run:pipeline is deprecated and will be removed in future releases.', + 'Please use the "run" command as a replacement.' ]); - $autoload->register(); - - $runtime = new WorkflowConsoleRuntime( - $output, - new \Kiboko\Component\Pipeline\PipelineRunner(new \Psr\Log\NullLogger()), - ); - - if (!file_exists('workflow.php')) { - $style->error('The provided path does not contain a workflow, did you mean to run "run:pipeline"?'); - - return \Symfony\Component\Console\Command\Command::FAILURE; - } - /** @var callable(runtime: WorkflowRuntimeInterface): \Runtime $workflow */ - $workflow = include 'workflow.php'; - - $start = microtime(true); - $workflow($runtime); - $runtime->run(); - $end = microtime(true); - - $autoload->unregister(); - - $style->writeln(sprintf('time: %s', $this->formatTime($end - $start))); - - chdir($cwd); - - return \Symfony\Component\Console\Command\Command::SUCCESS; - } - - private function formatTime(float $time): string - { - if ($time < .00001) { - return sprintf('%sµs', number_format($time * 1_000_000, 2)); - } - if ($time < .0001) { - return sprintf('%sµs', number_format($time * 1_000_000, 1)); - } - if ($time < .001) { - return sprintf('%sµs', number_format($time * 1_000_000)); - } - if ($time < .01) { - return sprintf('%sms', number_format($time * 1000, 2)); - } - if ($time < .1) { - return sprintf('%sms', number_format($time * 1000, 1)); - } - if ($time < 1) { - return sprintf('%sms', number_format($time * 1000)); - } - if ($time < 10) { - return sprintf('%ss', number_format($time, 2)); - } - if ($time < 3600) { - $minutes = floor($time / 60); - $seconds = $time - (60 * $minutes); - - return sprintf('%smin %ss', number_format($minutes), number_format($seconds, 2)); - } - $hours = floor($time / 3600); - $minutes = floor(($time - (3600 * $hours)) / 60); - $seconds = $time - (3600 * $hours) - (60 * $minutes); - return sprintf('%sh %smin %ss', number_format($hours), number_format($minutes), number_format($seconds, 2)); + return parent::execute($input, $output); } }