diff --git a/tests/Flow/FlowTest.php b/tests/Flow/FlowTest.php index e8c48a2..858903b 100644 --- a/tests/Flow/FlowTest.php +++ b/tests/Flow/FlowTest.php @@ -5,15 +5,16 @@ namespace Flow\Test\Flow; use ArrayObject; +use Flow\AsyncHandler\AsyncHandler; use Flow\AsyncHandler\BatchAsyncHandler; +use Flow\AsyncHandler\DeferAsyncHandler; use Flow\Driver\AmpDriver; -use Flow\Driver\FiberDriver; +use Flow\Driver\ReactDriver; use Flow\DriverInterface; use Flow\ExceptionInterface; use Flow\Flow\Flow; use Flow\Ip; use Flow\IpStrategy\MaxIpStrategy; -use Flow\IpStrategyInterface; use PHPUnit\Framework\TestCase; use RuntimeException; @@ -28,53 +29,16 @@ class FlowTest extends TestCase /** * @dataProvider provideJobCases * - * @param DriverInterface $driver - * @param IpStrategyInterface $ipStrategy - * @param array $jobs + * @param DriverInterface $driver + * @param array $jobs */ - public function testJob(DriverInterface $driver, IpStrategyInterface $ipStrategy, array $jobs, int $resultNumber): void + public function testJob(DriverInterface $driver, array $jobs, int $resultNumber): void { - $flow = array_reduce( - array_map(static fn ($job) => new Flow( - $job, - static function (ExceptionInterface $exception) { - self::assertSame(RuntimeException::class, $exception->getPrevious()::class); - }, - $ipStrategy, - null, - null, - $driver - ), $jobs), - static fn ($flow, $flowIt) => $flow ? $flow->fn($flowIt) : $flowIt - ); - $flow->fn(static function (ArrayObject $data) use ($resultNumber) { - self::assertSame(ArrayObject::class, $data::class); - self::assertSame($resultNumber, $data['number']); - }); - $ip = new Ip(new ArrayObject(['number' => 0])); - ($flow)($ip); - - $flow->await(); - } - - /** - * @dataProvider provideJobCases - * - * @param DriverInterface $driver - * @param IpStrategyInterface $ipStrategy - * @param array $jobs - */ - public function testBatchAsyncJob(DriverInterface $driver, IpStrategyInterface $ipStrategy, array $jobs, int $resultNumber): void - { - if ($ipStrategy instanceof MaxIpStrategy) { - self::assertTrue(true); - - return; - } - $count = 0; $flow = array_reduce( - array_map(static function ($job) use ($ipStrategy, &$count, $driver) { + array_map(static function ($args) use ($driver, &$count) { + [$job, $ipStrategy, $asyncHandler] = $args; + return new Flow( $job, static function (ExceptionInterface $exception) use (&$count) { @@ -83,7 +47,7 @@ static function (ExceptionInterface $exception) use (&$count) { }, $ipStrategy, null, - new BatchAsyncHandler(2), + $asyncHandler, $driver ); }, $jobs), @@ -107,11 +71,10 @@ static function (ExceptionInterface $exception) use (&$count) { /** * @dataProvider provideJobCases * - * @param DriverInterface $driver - * @param IpStrategyInterface $ipStrategy - * @param array $jobs + * @param DriverInterface $driver + * @param array $jobs */ - public function testTick(DriverInterface $driver, IpStrategyInterface $ipStrategy, array $jobs, int $resultNumber): void + public function testTick(DriverInterface $driver, array $jobs, int $resultNumber): void { $cancel = $driver->tick(1, static function () use (&$flow) { $ip = new Ip(new ArrayObject(['number' => 0])); @@ -119,17 +82,21 @@ public function testTick(DriverInterface $driver, IpStrategyInterface $ipStrateg }); $flow = array_reduce( - array_map(static fn ($job) => new Flow( - $job, - static function (ExceptionInterface $exception) use ($cancel) { - self::assertSame(RuntimeException::class, $exception->getPrevious()::class); - $cancel(); - }, - $ipStrategy, - null, - null, - $driver - ), $jobs), + array_map(static function ($args) use ($driver, $cancel) { + [$job, $ipStrategy, $asyncHandler] = $args; + + return new Flow( + $job, + static function (ExceptionInterface $exception) use ($cancel) { + self::assertSame(RuntimeException::class, $exception->getPrevious()::class); + $cancel(); + }, + $ipStrategy, + null, + $asyncHandler, + $driver + ); + }, $jobs), static fn ($flow, $flowIt) => $flow ? $flow->fn($flowIt) : $flowIt ); $flow->fn(static function (ArrayObject $data) use ($resultNumber) { @@ -149,15 +116,14 @@ static function (ExceptionInterface $exception) use ($cancel) { /** * @dataProvider provideDoCases * - * @param DriverInterface $driver - * @param IpStrategyInterface $ipStrategy - * @param array $config + * @param DriverInterface $driver + * @param array $config */ - public function testDo(DriverInterface $driver, IpStrategyInterface $ipStrategy, callable $callable, ?array $config, int $resultNumber): void + public function testDo(DriverInterface $driver, callable $callable, ?array $config, int $resultNumber): void { $ip = new Ip(new ArrayObject(['number' => 0])); $flow = Flow::do($callable, [ - ...['driver' => $driver, 'ipStrategy' => $ipStrategy], + ...['driver' => $driver], ...($config ?? []), ])->fn(static function (ArrayObject $data) use ($resultNumber) { self::assertSame(ArrayObject::class, $data::class); @@ -178,22 +144,46 @@ public static function provideJobCases(): iterable { $exception = new RuntimeException('job error'); - return self::matrix(static fn (DriverInterface $driver) => [ - 'job' => [[static function (ArrayObject $data) { - $data['number'] = 5; - - return $data; - }], 5], - 'asyncJob' => [[static function (ArrayObject $data) use ($driver) { - $driver->delay(1 / 1000); - $data['number'] = 5; - - return $data; - }], 5], - 'exceptionJob' => [[static function () use ($exception) { - throw $exception; - }], 0], - ]); + return self::matrix(static function (DriverInterface $driver, $strategyBuilder) use ($exception) { + $cases = [ + 'job' => [[[static function (ArrayObject $data) { + $data['number'] = 5; + + return $data; + }, $strategyBuilder(), new AsyncHandler()]], 5], + 'asyncJob' => [[[static function (ArrayObject $data) use ($driver) { + $driver->delay(1 / 1000); + $data['number'] = 5; + + return $data; + }, $strategyBuilder(), new AsyncHandler()]], 5], + 'exceptionJob' => [[[static function () use ($exception) { + throw $exception; + }, $strategyBuilder(), new AsyncHandler()]], 0], + ]; + + if ($driver instanceof AmpDriver || $driver instanceof ReactDriver) { + $cases['deferJob'] = [[[static function ($args) { + [$data, $defer] = $args; + + return $defer(static function ($complete) use ($data, $defer) { + $data['number'] = 8; + $complete([$data, $defer]); + }); + }, $strategyBuilder(), new DeferAsyncHandler()]], 8]; + } + + $strategy = $strategyBuilder(); + if (!$strategy instanceof MaxIpStrategy) { + $cases['batchJob'] = [[[static function (ArrayObject $data) { + $data['number'] = 6; + + return $data; + }, $strategy, new BatchAsyncHandler(2)]], 6]; + } + + return $cases; + }); } /** @@ -201,23 +191,19 @@ public static function provideJobCases(): iterable */ public static function provideDoCases(): iterable { - return self::matrix(static fn (DriverInterface $driver) => [ - 'simpleGenerator' => [static function () use ($driver) { - if ($driver::class !== AmpDriver::class) { - yield static function (ArrayObject $data) { - $data['number'] = 5; - - return $data; - }; - } - if ($driver::class !== FiberDriver::class) { - yield static function (ArrayObject $data) use ($driver) { - $driver->delay(1 / 1000); - $data['number'] = 10; - - return $data; - }; - } + return self::matrix(static fn (DriverInterface $driver, $strategyBuilder) => [ + 'simpleGenerator' => [static function () use ($driver, $strategyBuilder) { + yield [static function (ArrayObject $data) { + $data['number'] = 5; + + return $data; + }, null, $strategyBuilder()]; + yield [static function (ArrayObject $data) use ($driver) { + $driver->delay(1 / 1000); + $data['number'] = 10; + + return $data; + }, null, $strategyBuilder()]; }, null, 10], ]); } diff --git a/tests/Flow/FlowTrait.php b/tests/Flow/FlowTrait.php index dafd02f..f38d67f 100644 --- a/tests/Flow/FlowTrait.php +++ b/tests/Flow/FlowTrait.php @@ -37,12 +37,11 @@ protected static function matrix(Closure $datas): array $matrixDatas = []; foreach ($drivers as $keyDriver => $driverBuilder) { - $driver = $driverBuilder(); - $dataValues = $datas($driver); foreach ($strategies as $keyStrategy => $strategyBuilder) { - $strategy = $strategyBuilder(); + $driver = $driverBuilder(); + $dataValues = $datas($driver, $strategyBuilder); foreach ($dataValues as $key => $values) { - $matrixDatas["{$keyDriver}.{$keyStrategy}.{$key}"] = [$driver, $strategy, ...$values]; + $matrixDatas["{$keyDriver}.{$keyStrategy}.{$key}"] = [$driver, ...$values]; } } } diff --git a/tests/Flow/TransportFlowTest.php b/tests/Flow/TransportFlowTest.php index da3f6ec..b88e71b 100644 --- a/tests/Flow/TransportFlowTest.php +++ b/tests/Flow/TransportFlowTest.php @@ -8,7 +8,6 @@ use Flow\DriverInterface; use Flow\Flow\Flow; use Flow\Flow\TransportFlow; -use Flow\IpStrategyInterface; use PHPUnit\Framework\TestCase; use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\Transport\InMemory\InMemoryTransport; @@ -24,14 +23,17 @@ class TransportFlowTest extends TestCase /** * @dataProvider provideJobsCases * - * @param DriverInterface $driver - * @param IpStrategyInterface $ipStrategy - * @param array $jobs + * @param DriverInterface $driver + * @param array $jobs */ - public function testJobs(DriverInterface $driver, IpStrategyInterface $ipStrategy, array $jobs, int $resultNumber): void + public function testJobs(DriverInterface $driver, array $jobs, int $resultNumber): void { $flow = array_reduce( - array_map(static fn ($job) => new Flow($job, static function () {}, $ipStrategy, null, null, $driver), $jobs), + array_map(static function ($args) use ($driver) { + [$job, $ipStrategy] = $args; + + return new Flow($job, static function () {}, $ipStrategy, null, null, $driver); + }, $jobs), static fn ($flow, $flowIt) => $flow ? $flow->fn($flowIt) : $flowIt ); $flow->fn(static function (ArrayObject $data) use ($resultNumber) { @@ -63,18 +65,18 @@ public function testJobs(DriverInterface $driver, IpStrategyInterface $ipStrateg */ public static function provideJobsCases(): iterable { - return self::matrix(static fn (DriverInterface $driver) => [ - 'job' => [[static function (ArrayObject $data) { + return self::matrix(static fn (DriverInterface $driver, $strategyBuilder) => [ + 'job' => [[[static function (ArrayObject $data) { $data['number'] = 1; return $data; - }], 1], - 'asyncJob' => [[static function (ArrayObject $data) use ($driver) { + }, $strategyBuilder()]], 1], + 'asyncJob' => [[[static function (ArrayObject $data) use ($driver) { $driver->delay(1 / 1000); $data['number'] = 5; return $data; - }], 5], + }, $strategyBuilder()]], 5], ]); } } diff --git a/tests/Flow/YFlowTest.php b/tests/Flow/YFlowTest.php index 592acca..07c8dc8 100644 --- a/tests/Flow/YFlowTest.php +++ b/tests/Flow/YFlowTest.php @@ -26,7 +26,7 @@ class YFlowTest extends TestCase * @param DriverInterface $driver * @param IpStrategyInterface $ipStrategy */ - public function testJob(DriverInterface $driver, IpStrategyInterface $ipStrategy, Closure $job, int $resultNumber): void + public function testJob(DriverInterface $driver, Closure $job, IpStrategyInterface $ipStrategy, int $resultNumber): void { $ip = new Ip(new ArrayObject(['number' => 6])); $errorJob = static function () {}; @@ -45,14 +45,14 @@ public function testJob(DriverInterface $driver, IpStrategyInterface $ipStrategy */ public static function provideJobCases(): iterable { - return self::matrix(static fn () => [ + return self::matrix(static fn (DriverInterface $driver, $strategyBuilder) => [ 'job' => [static function (callable $factorial): Closure { return static function (ArrayObject $data) use ($factorial) { return new ArrayObject([ 'number' => ($data['number'] <= 1) ? 1 : $data['number'] * $factorial(new ArrayObject(['number' => $data['number'] - 1]))['number'], ]); }; - }, 720], + }, $strategyBuilder(), 720], ]); } }