Skip to content

Commit

Permalink
✅ Rework and add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
matyo91 committed Aug 13, 2024
1 parent f2fde69 commit b8d78f9
Show file tree
Hide file tree
Showing 4 changed files with 104 additions and 117 deletions.
184 changes: 85 additions & 99 deletions tests/Flow/FlowTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,16 @@
namespace Flow\Test\Flow;

use ArrayObject;
use Flow\AsyncHandler\AsyncHandler;
use Flow\AsyncHandler\BatchAsyncHandler;
use Flow\AsyncHandler\DeferAsyncHandler;
use Flow\Driver\AmpDriver;
use Flow\Driver\FiberDriver;
use Flow\Driver\ReactDriver;
use Flow\DriverInterface;
use Flow\ExceptionInterface;
use Flow\Flow\Flow;
use Flow\Ip;
use Flow\IpStrategy\MaxIpStrategy;
use Flow\IpStrategyInterface;
use PHPUnit\Framework\TestCase;
use RuntimeException;

Expand All @@ -28,53 +29,16 @@ class FlowTest extends TestCase
/**
* @dataProvider provideJobCases
*
* @param DriverInterface<T1,T2> $driver
* @param IpStrategyInterface<T1> $ipStrategy
* @param array<mixed> $jobs
* @param DriverInterface<T1,T2> $driver
* @param array<mixed> $jobs
*/
public function testJob(DriverInterface $driver, IpStrategyInterface $ipStrategy, array $jobs, int $resultNumber): void
public function testJob(DriverInterface $driver, array $jobs, int $resultNumber): void
{
$flow = array_reduce(
array_map(static fn ($job) => new Flow(
$job,
static function (ExceptionInterface $exception) {
self::assertSame(RuntimeException::class, $exception->getPrevious()::class);
},
$ipStrategy,
null,
null,
$driver
), $jobs),
static fn ($flow, $flowIt) => $flow ? $flow->fn($flowIt) : $flowIt
);
$flow->fn(static function (ArrayObject $data) use ($resultNumber) {
self::assertSame(ArrayObject::class, $data::class);
self::assertSame($resultNumber, $data['number']);
});
$ip = new Ip(new ArrayObject(['number' => 0]));
($flow)($ip);

$flow->await();
}

/**
* @dataProvider provideJobCases
*
* @param DriverInterface<T1,T2> $driver
* @param IpStrategyInterface<T1> $ipStrategy
* @param array<mixed> $jobs
*/
public function testBatchAsyncJob(DriverInterface $driver, IpStrategyInterface $ipStrategy, array $jobs, int $resultNumber): void
{
if ($ipStrategy instanceof MaxIpStrategy) {
self::assertTrue(true);

return;
}

$count = 0;
$flow = array_reduce(
array_map(static function ($job) use ($ipStrategy, &$count, $driver) {
array_map(static function ($args) use ($driver, &$count) {
[$job, $ipStrategy, $asyncHandler] = $args;

return new Flow(
$job,
static function (ExceptionInterface $exception) use (&$count) {
Expand All @@ -83,7 +47,7 @@ static function (ExceptionInterface $exception) use (&$count) {
},
$ipStrategy,
null,
new BatchAsyncHandler(2),
$asyncHandler,
$driver
);
}, $jobs),
Expand All @@ -107,29 +71,32 @@ static function (ExceptionInterface $exception) use (&$count) {
/**
* @dataProvider provideJobCases
*
* @param DriverInterface<T1,T2> $driver
* @param IpStrategyInterface<T1> $ipStrategy
* @param array<mixed> $jobs
* @param DriverInterface<T1,T2> $driver
* @param array<mixed> $jobs
*/
public function testTick(DriverInterface $driver, IpStrategyInterface $ipStrategy, array $jobs, int $resultNumber): void
public function testTick(DriverInterface $driver, array $jobs, int $resultNumber): void
{
$cancel = $driver->tick(1, static function () use (&$flow) {
$ip = new Ip(new ArrayObject(['number' => 0]));
($flow)($ip); // @phpstan-ignore-line
});

$flow = array_reduce(
array_map(static fn ($job) => new Flow(
$job,
static function (ExceptionInterface $exception) use ($cancel) {
self::assertSame(RuntimeException::class, $exception->getPrevious()::class);
$cancel();
},
$ipStrategy,
null,
null,
$driver
), $jobs),
array_map(static function ($args) use ($driver, $cancel) {
[$job, $ipStrategy, $asyncHandler] = $args;

return new Flow(
$job,
static function (ExceptionInterface $exception) use ($cancel) {
self::assertSame(RuntimeException::class, $exception->getPrevious()::class);
$cancel();
},
$ipStrategy,
null,
$asyncHandler,
$driver
);
}, $jobs),
static fn ($flow, $flowIt) => $flow ? $flow->fn($flowIt) : $flowIt
);
$flow->fn(static function (ArrayObject $data) use ($resultNumber) {
Expand All @@ -149,15 +116,14 @@ static function (ExceptionInterface $exception) use ($cancel) {
/**
* @dataProvider provideDoCases
*
* @param DriverInterface<T1,T2> $driver
* @param IpStrategyInterface<T1> $ipStrategy
* @param array<mixed> $config
* @param DriverInterface<T1,T2> $driver
* @param array<mixed> $config
*/
public function testDo(DriverInterface $driver, IpStrategyInterface $ipStrategy, callable $callable, ?array $config, int $resultNumber): void
public function testDo(DriverInterface $driver, callable $callable, ?array $config, int $resultNumber): void
{
$ip = new Ip(new ArrayObject(['number' => 0]));
$flow = Flow::do($callable, [
...['driver' => $driver, 'ipStrategy' => $ipStrategy],
...['driver' => $driver],
...($config ?? []),
])->fn(static function (ArrayObject $data) use ($resultNumber) {
self::assertSame(ArrayObject::class, $data::class);
Expand All @@ -178,46 +144,66 @@ public static function provideJobCases(): iterable
{
$exception = new RuntimeException('job error');

return self::matrix(static fn (DriverInterface $driver) => [
'job' => [[static function (ArrayObject $data) {
$data['number'] = 5;

return $data;
}], 5],
'asyncJob' => [[static function (ArrayObject $data) use ($driver) {
$driver->delay(1 / 1000);
$data['number'] = 5;

return $data;
}], 5],
'exceptionJob' => [[static function () use ($exception) {
throw $exception;
}], 0],
]);
return self::matrix(static function (DriverInterface $driver, $strategyBuilder) use ($exception) {
$cases = [
'job' => [[[static function (ArrayObject $data) {
$data['number'] = 5;

return $data;
}, $strategyBuilder(), new AsyncHandler()]], 5],
'asyncJob' => [[[static function (ArrayObject $data) use ($driver) {
$driver->delay(1 / 1000);
$data['number'] = 5;

return $data;
}, $strategyBuilder(), new AsyncHandler()]], 5],
'exceptionJob' => [[[static function () use ($exception) {
throw $exception;
}, $strategyBuilder(), new AsyncHandler()]], 0],
];

if ($driver instanceof AmpDriver || $driver instanceof ReactDriver) {
$cases['deferJob'] = [[[static function ($args) {
[$data, $defer] = $args;

return $defer(static function ($complete) use ($data, $defer) {
$data['number'] = 8;
$complete([$data, $defer]);
});
}, $strategyBuilder(), new DeferAsyncHandler()]], 8];
}

$strategy = $strategyBuilder();
if (!$strategy instanceof MaxIpStrategy) {
$cases['batchJob'] = [[[static function (ArrayObject $data) {
$data['number'] = 6;

return $data;
}, $strategy, new BatchAsyncHandler(2)]], 6];
}

return $cases;
});
}

/**
* @return array<array<mixed>>
*/
public static function provideDoCases(): iterable
{
return self::matrix(static fn (DriverInterface $driver) => [
'simpleGenerator' => [static function () use ($driver) {
if ($driver::class !== AmpDriver::class) {
yield static function (ArrayObject $data) {
$data['number'] = 5;

return $data;
};
}
if ($driver::class !== FiberDriver::class) {
yield static function (ArrayObject $data) use ($driver) {
$driver->delay(1 / 1000);
$data['number'] = 10;

return $data;
};
}
return self::matrix(static fn (DriverInterface $driver, $strategyBuilder) => [
'simpleGenerator' => [static function () use ($driver, $strategyBuilder) {
yield [static function (ArrayObject $data) {
$data['number'] = 5;

return $data;
}, null, $strategyBuilder()];
yield [static function (ArrayObject $data) use ($driver) {
$driver->delay(1 / 1000);
$data['number'] = 10;

return $data;
}, null, $strategyBuilder()];
}, null, 10],
]);
}
Expand Down
7 changes: 3 additions & 4 deletions tests/Flow/FlowTrait.php
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,11 @@ protected static function matrix(Closure $datas): array

$matrixDatas = [];
foreach ($drivers as $keyDriver => $driverBuilder) {
$driver = $driverBuilder();
$dataValues = $datas($driver);
foreach ($strategies as $keyStrategy => $strategyBuilder) {
$strategy = $strategyBuilder();
$driver = $driverBuilder();
$dataValues = $datas($driver, $strategyBuilder);
foreach ($dataValues as $key => $values) {
$matrixDatas["{$keyDriver}.{$keyStrategy}.{$key}"] = [$driver, $strategy, ...$values];
$matrixDatas["{$keyDriver}.{$keyStrategy}.{$key}"] = [$driver, ...$values];
}
}
}
Expand Down
24 changes: 13 additions & 11 deletions tests/Flow/TransportFlowTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
use Flow\DriverInterface;
use Flow\Flow\Flow;
use Flow\Flow\TransportFlow;
use Flow\IpStrategyInterface;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Transport\InMemory\InMemoryTransport;
Expand All @@ -24,14 +23,17 @@ class TransportFlowTest extends TestCase
/**
* @dataProvider provideJobsCases
*
* @param DriverInterface<T1,T2> $driver
* @param IpStrategyInterface<T1> $ipStrategy
* @param array<mixed> $jobs
* @param DriverInterface<T1,T2> $driver
* @param array<mixed> $jobs
*/
public function testJobs(DriverInterface $driver, IpStrategyInterface $ipStrategy, array $jobs, int $resultNumber): void
public function testJobs(DriverInterface $driver, array $jobs, int $resultNumber): void
{
$flow = array_reduce(
array_map(static fn ($job) => new Flow($job, static function () {}, $ipStrategy, null, null, $driver), $jobs),
array_map(static function ($args) use ($driver) {
[$job, $ipStrategy] = $args;

return new Flow($job, static function () {}, $ipStrategy, null, null, $driver);
}, $jobs),
static fn ($flow, $flowIt) => $flow ? $flow->fn($flowIt) : $flowIt
);
$flow->fn(static function (ArrayObject $data) use ($resultNumber) {
Expand Down Expand Up @@ -63,18 +65,18 @@ public function testJobs(DriverInterface $driver, IpStrategyInterface $ipStrateg
*/
public static function provideJobsCases(): iterable
{
return self::matrix(static fn (DriverInterface $driver) => [
'job' => [[static function (ArrayObject $data) {
return self::matrix(static fn (DriverInterface $driver, $strategyBuilder) => [
'job' => [[[static function (ArrayObject $data) {
$data['number'] = 1;

return $data;
}], 1],
'asyncJob' => [[static function (ArrayObject $data) use ($driver) {
}, $strategyBuilder()]], 1],
'asyncJob' => [[[static function (ArrayObject $data) use ($driver) {
$driver->delay(1 / 1000);
$data['number'] = 5;

return $data;
}], 5],
}, $strategyBuilder()]], 5],
]);
}
}
6 changes: 3 additions & 3 deletions tests/Flow/YFlowTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class YFlowTest extends TestCase
* @param DriverInterface<T1,T2> $driver
* @param IpStrategyInterface<T1> $ipStrategy
*/
public function testJob(DriverInterface $driver, IpStrategyInterface $ipStrategy, Closure $job, int $resultNumber): void
public function testJob(DriverInterface $driver, Closure $job, IpStrategyInterface $ipStrategy, int $resultNumber): void
{
$ip = new Ip(new ArrayObject(['number' => 6]));
$errorJob = static function () {};
Expand All @@ -45,14 +45,14 @@ public function testJob(DriverInterface $driver, IpStrategyInterface $ipStrategy
*/
public static function provideJobCases(): iterable
{
return self::matrix(static fn () => [
return self::matrix(static fn (DriverInterface $driver, $strategyBuilder) => [
'job' => [static function (callable $factorial): Closure {
return static function (ArrayObject $data) use ($factorial) {
return new ArrayObject([
'number' => ($data['number'] <= 1) ? 1 : $data['number'] * $factorial(new ArrayObject(['number' => $data['number'] - 1]))['number'],
]);
};
}, 720],
}, $strategyBuilder(), 720],
]);
}
}

0 comments on commit b8d78f9

Please sign in to comment.