Skip to content

Commit

Permalink
Migrated code from symfony/process to react/child-process
Browse files Browse the repository at this point in the history
  • Loading branch information
gplanchat committed Nov 21, 2023
1 parent 3f706e0 commit 2062684
Show file tree
Hide file tree
Showing 7 changed files with 300 additions and 246 deletions.
55 changes: 27 additions & 28 deletions src/Adapter/Composer.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@

use Psr\Log\AbstractLogger;
use Psr\Log\LoggerInterface;
use Symfony\Component\Process\Process;
use React\ChildProcess\Process;
use React\Promise\Deferred;
use function React\Async\await;

Check failure on line 11 in src/Adapter/Composer.php

View workflow job for this annotation

GitHub Actions / phpstan

Used function React\Async\await not found.

Check failure on line 11 in src/Adapter/Composer.php

View workflow job for this annotation

GitHub Actions / phpstan5

Used function React\Async\await not found.
use function React\Promise\Timer\timeout;

Check failure on line 12 in src/Adapter/Composer.php

View workflow job for this annotation

GitHub Actions / phpstan

Used function React\Promise\Timer\timeout not found.

Check failure on line 12 in src/Adapter/Composer.php

View workflow job for this annotation

GitHub Actions / phpstan5

Used function React\Promise\Timer\timeout not found.

final class Composer
{
Expand All @@ -21,52 +24,48 @@ public function log($level, $message, array $context = []): void
};
}

private function execute(Process $process): void
private function execute(Process $process, float $timeout = 300): void

Check failure on line 27 in src/Adapter/Composer.php

View workflow job for this annotation

GitHub Actions / phpstan

Parameter $process of method Kiboko\Component\Satellite\Adapter\Composer::execute() has invalid type React\ChildProcess\Process.

Check failure on line 27 in src/Adapter/Composer.php

View workflow job for this annotation

GitHub Actions / phpstan5

Parameter $process of method Kiboko\Component\Satellite\Adapter\Composer::execute() has invalid type React\ChildProcess\Process.
{
$process->run(function ($type, $buffer): void {
if (Process::ERR === $type) {
$this->logger->info($buffer);
} else {
$this->logger->debug($buffer);
}
$process->stdout->on('data', function ($chunk) {

Check failure on line 29 in src/Adapter/Composer.php

View workflow job for this annotation

GitHub Actions / phpstan

Access to property $stdout on an unknown class React\ChildProcess\Process.

Check failure on line 29 in src/Adapter/Composer.php

View workflow job for this annotation

GitHub Actions / phpstan5

Access to property $stdout on an unknown class React\ChildProcess\Process.
$this->logger->debug($chunk);
});
$process->stderr->on('data', function ($chunk) {

Check failure on line 32 in src/Adapter/Composer.php

View workflow job for this annotation

GitHub Actions / phpstan

Access to property $stderr on an unknown class React\ChildProcess\Process.

Check failure on line 32 in src/Adapter/Composer.php

View workflow job for this annotation

GitHub Actions / phpstan5

Access to property $stderr on an unknown class React\ChildProcess\Process.
$this->logger->info($chunk);
});

if (0 !== $process->getExitCode()) {
throw new ComposerFailureException($process->getCommandLine(), sprintf('Process exited unexpectedly with output: %s', $process->getErrorOutput()), $process->getExitCode());
}
}
$deferred = new Deferred();

private function command(string ...$command): void
{
$process = new Process($command);
$process->setWorkingDirectory($this->workdir);
$process->on('exit', function () use ($deferred) {

Check failure on line 38 in src/Adapter/Composer.php

View workflow job for this annotation

GitHub Actions / phpstan

Call to method on() on an unknown class React\ChildProcess\Process.

Check failure on line 38 in src/Adapter/Composer.php

View workflow job for this annotation

GitHub Actions / phpstan5

Call to method on() on an unknown class React\ChildProcess\Process.
$deferred->resolve();
});

$process->setTimeout(300);
$process->start();

Check failure on line 42 in src/Adapter/Composer.php

View workflow job for this annotation

GitHub Actions / phpstan

Call to method start() on an unknown class React\ChildProcess\Process.
$this->logger->notice(sprintf('Starting process "%s".', $process->getCommand()));

Check failure on line 43 in src/Adapter/Composer.php

View workflow job for this annotation

GitHub Actions / phpstan

Call to method getCommand() on an unknown class React\ChildProcess\Process.

$this->execute($process);
await(timeout($deferred->promise(), $timeout));

Check failure on line 45 in src/Adapter/Composer.php

View workflow job for this annotation

GitHub Actions / phpstan

Function React\Async\await not found.

Check failure on line 45 in src/Adapter/Composer.php

View workflow job for this annotation

GitHub Actions / phpstan

Function React\Promise\Timer\timeout not found.

if (0 !== $process->getExitCode()) {
throw new ComposerFailureException($process->getCommand(), sprintf('Process exited unexpectedly with output: %s', $process->getExitCode()), $process->getExitCode());
}
}

private function pipe(Process ...$processes): void
private function command(string ...$command): void
{
$process = Process::fromShellCommandline(implode('|', array_map(fn (Process $process) => $process->getCommandLine(), $processes)));
$process->setWorkingDirectory($this->workdir);

$process->setTimeout(300);
$process = new Process(
implode (' ', array_map(fn ($part) => escapeshellarg($part), $command)),
$this->workdir,
);

$this->execute($process);
}

private function subcommand(string ...$command): Process
{
return new Process($command);
}

public function require(string ...$packages): void
{
$this->command(
'composer',
'require',
'--with-dependencies',
'--with-all-dependencies',
'--prefer-dist',
'--no-progress',
'--prefer-stable',
Expand Down
55 changes: 42 additions & 13 deletions src/Adapter/Docker/Satellite.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,15 @@

use Kiboko\Component\Dockerfile;
use Kiboko\Component\Packaging\TarArchive;
use Kiboko\Component\Satellite\Adapter\ComposerFailureException;
use Kiboko\Contract\Configurator;
use Kiboko\Contract\Packaging;
use Psr\Log\LoggerInterface;
use Symfony\Component\Process\Process;
use React\ChildProcess\Process;
use React\Promise\Deferred;
use React\Stream\ReadableResourceStream;
use function React\Async\await;
use function React\Promise\Timer\timeout;

final class Satellite implements Configurator\SatelliteInterface
{
Expand Down Expand Up @@ -65,24 +70,48 @@ public function build(
}
};

$process = new Process([
'docker', 'build', '--rm', '-', ...iterator_to_array($iterator($this->imageTags)),
]);
$command = ['docker', 'build', '--rm', '-', ...iterator_to_array($iterator($this->imageTags))];

$process->setInput($archive->asResource());
$process = new Process(
implode (' ', array_map(fn ($part) => escapeshellarg($part), $command)),
$this->workdir,
);

$process->setTimeout(300);
$input = new ReadableResourceStream($archive->asResource());
$input->pipe($process->stdin);

$process->run(function ($type, $buffer) use ($logger): void {
if (Process::ERR === $type) {
$logger->info($buffer);
} else {
$logger->debug($buffer);
}
});
$this->execute($logger, $process);

if (0 !== $process->getExitCode()) {
throw new \RuntimeException('Process exited unexpectedly.');
}
}

private function execute(
LoggerInterface $logger,
Process $process,
float $timeout = 300
): void {
$process->stdout->on('data', function ($chunk) use ($logger) {
$logger->debug($chunk);
});
$process->stderr->on('data', function ($chunk) use ($logger) {
$logger->info($chunk);
});

$deferred = new Deferred();

$process->on('exit', function () use ($deferred) {
$deferred->resolve();
});

$process->start();
$logger->notice(sprintf('Starting process "%s".', $process->getCommand()));

await(timeout($deferred->promise(), $timeout));

if (0 !== $process->getExitCode()) {
throw new ComposerFailureException($process->getCommand(), sprintf('Process exited unexpectedly with output: %s', $process->getExitCode()), $process->getExitCode());
}
}
}
8 changes: 4 additions & 4 deletions src/Cloud/DTO/Pipeline.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ public function __construct(
private string $label,
private string $code,
private StepList $steps,
private Autoload $autoload,
private PackageList $packages,
private RepositoryList $repositories,
private AuthList $auths,
private Autoload $autoload = new Autoload(),
private PackageList $packages = new PackageList(),
private RepositoryList $repositories = new RepositoryList(),
private AuthList $auths = new AuthList(),
) {}

public function code(): string
Expand Down
50 changes: 41 additions & 9 deletions src/Console/Command/HookRunCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@

namespace Kiboko\Component\Satellite\Console\Command;

use React\ChildProcess\Process;
use React\Promise\Deferred;
use Symfony\Component\Console;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Process\Process;
use function React\Async\await;

class HookRunCommand extends Console\Command\Command
final class HookRunCommand extends Console\Command\Command
{
protected static $defaultName = 'run:hook';
protected static $defaultDescription = 'Run the hook.';
Expand All @@ -31,20 +33,50 @@ protected function execute(InputInterface $input, OutputInterface $output): int
if (!file_exists($input->getArgument('path').'/vendor/autoload.php')) {
$style->error('Nothing is compiled at the provided path');

return \Symfony\Component\Console\Command\Command::FAILURE;
return Console\Command\Command::FAILURE;
}

$cwd = getcwd();
chdir($input->getArgument('path'));

$process = new Process(['php', '-S', 'localhost:8000', 'main.php']);
$process->setTimeout(null);
$process->run(function ($type, $buffer): void {
echo $buffer;
$command = ['php', '-S', 'localhost:8000', 'main.php'];

$process = new Process(implode (' ', array_map(fn ($part) => escapeshellarg($part), $command)), $cwd);

if (!$this->executeWorker($style, $process)) {
return Console\Command\Command::FAILURE;
}

return Console\Command\Command::SUCCESS;
}

private function executeWorker(
Console\Style\SymfonyStyle $style,
Process $process
): bool {
$process->stdout->on('data', function ($chunk) use ($style) {
$style->text($chunk);
});
$process->stderr->on('data', function ($chunk) use ($style) {
$style->info($chunk);
});

chdir($cwd);
$deferred = new Deferred();

$process->on('exit', function () use ($deferred) {
$deferred->resolve();
});

$process->start();
$style->note(sprintf('Starting process "%s".', $process->getCommand()));

await($deferred->promise());

if (0 !== $process->getExitCode()) {
$style->error(sprintf('Process exited unexpectedly with exit code %d', $process->getExitCode()));
return false;
}

return \Symfony\Component\Console\Command\Command::SUCCESS;
return true;
}
}
105 changes: 6 additions & 99 deletions src/Console/Command/PipelineRunCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,13 @@

namespace Kiboko\Component\Satellite\Console\Command;

use Composer\Autoload\ClassLoader;
use Kiboko\Component\Runtime\Pipeline\Console as PipelineConsoleRuntime;
use Symfony\Component\Console;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Dotenv\Dotenv;

final class PipelineRunCommand extends Console\Command\Command
#[Console\Attribute\AsCommand('run:pipeline', 'Run a data flow satellite (pipeline or workflow).', hidden: true)]
final class PipelineRunCommand extends RunCommand
{
protected static $defaultName = 'run:pipeline';
protected static $defaultDescription = 'Run the pipeline satellite.';

protected function configure(): void
{
$this->addArgument('path', Console\Input\InputArgument::REQUIRED);
Expand All @@ -28,99 +23,11 @@ protected function execute(InputInterface $input, OutputInterface $output): int
$output,
);

$style->writeln(sprintf('<fg=cyan>Running pipeline in %s</>', $input->getArgument('path')));

if (!file_exists($input->getArgument('path').'/vendor/autoload.php')) {
$style->error('There is no compiled pipeline at the provided path');

return \Symfony\Component\Console\Command\Command::FAILURE;
}

$cwd = getcwd();
chdir($input->getArgument('path'));

$dotenv = new Dotenv();
$dotenv->usePutenv();

if (file_exists($file = $cwd.'/.env')) {
$dotenv->loadEnv($file);
}
if (file_exists($file = $cwd.'/'.$input->getArgument('path').'/.env')) {
$dotenv->loadEnv($file);
}

/** @var ClassLoader $autoload */
$autoload = include 'vendor/autoload.php';
$autoload->addClassMap([
/* @phpstan-ignore-next-line */
\ProjectServiceContainer::class => 'container.php',
$style->warning([
'The command "run:pipeline is deprecated and will be removed in future releases.',
'Please use the "run" command as a replacement.'
]);
$autoload->register();

$runtime = new PipelineConsoleRuntime(
$output,
new \Kiboko\Component\Pipeline\Pipeline(
new \Kiboko\Component\Pipeline\PipelineRunner(
new \Psr\Log\NullLogger()
)
),
);

if (!file_exists('pipeline.php')) {
$style->error('The provided path does not contain one single pipeline, did you mean to run "run:workflow"?');

return \Symfony\Component\Console\Command\Command::FAILURE;
}
/** @var callable(runtime: PipelineRuntimeInterface): \Runtime $pipeline */
$pipeline = include 'pipeline.php';

$start = microtime(true);
$pipeline($runtime);
$runtime->run();
$end = microtime(true);

$autoload->unregister();

$style->writeln(sprintf('time: %s', $this->formatTime($end - $start)));

chdir($cwd);

return \Symfony\Component\Console\Command\Command::SUCCESS;
}

private function formatTime(float $time): string
{
if ($time < .00001) {
return sprintf('<fg=cyan>%sµs</>', number_format($time * 1_000_000, 2));
}
if ($time < .0001) {
return sprintf('<fg=cyan>%sµs</>', number_format($time * 1_000_000, 1));
}
if ($time < .001) {
return sprintf('<fg=cyan>%sµs</>', number_format($time * 1_000_000));
}
if ($time < .01) {
return sprintf('<fg=cyan>%sms</>', number_format($time * 1000, 2));
}
if ($time < .1) {
return sprintf('<fg=cyan>%sms</>', number_format($time * 1000, 1));
}
if ($time < 1) {
return sprintf('<fg=cyan>%sms</>', number_format($time * 1000));
}
if ($time < 10) {
return sprintf('<fg=cyan>%ss</>', number_format($time, 2));
}
if ($time < 3600) {
$minutes = floor($time / 60);
$seconds = $time - (60 * $minutes);

return sprintf('<fg=cyan>%smin</> <fg=cyan>%ss</>', number_format($minutes), number_format($seconds, 2));
}
$hours = floor($time / 3600);
$minutes = floor(($time - (3600 * $hours)) / 60);
$seconds = $time - (3600 * $hours) - (60 * $minutes);

return sprintf('<fg=cyan>%sh</> <fg=cyan>%smin</> <fg=cyan>%ss</>', number_format($hours), number_format($minutes), number_format($seconds, 2));
return parent::execute($input, $output);
}
}
Loading

0 comments on commit 2062684

Please sign in to comment.