Skip to content

Commit

Permalink
Add better error handling (#4)
Browse files Browse the repository at this point in the history
* Add better error handling
---------

Co-authored-by: Maxime Rainville <[email protected]>
  • Loading branch information
maxime-rainville and Maxime Rainville authored Dec 13, 2024
1 parent e33f638 commit f925a65
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 17 deletions.
8 changes: 8 additions & 0 deletions examples/basic-usage.php
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,11 @@ function () use ($event) {
$event = new UserCreatedEvent('789', '[email protected]');
$future = $dispatcher->dispatch($event, new TimeoutCancellation(30));
EventLoop::run();

// Set up logging for your dispatcher - all errors will be logged to PSR logger
$dispatcher = new AsyncEventDispatcher(
$listenerProvider,
function (Throwable $exception) {
error_log($exception->getMessage());
}
);
22 changes: 17 additions & 5 deletions src/AsyncEventDispatcher.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,17 @@
use function Amp\async;

use Amp\Cancellation;

use Amp\Future;

use function Amp\Future\awaitAll;

use Amp\NullCancellation;
use Closure;
use Psr\EventDispatcher\EventDispatcherInterface;
use Psr\EventDispatcher\ListenerProviderInterface;
use Psr\EventDispatcher\StoppableEventInterface;
use Throwable;

/**
* Asynchronous implementation of PSR-14 EventDispatcherInterface using Revolt and AMPHP.
Expand All @@ -24,12 +27,22 @@
*/
class AsyncEventDispatcher implements EventDispatcherInterface
{
/** @var Closure(Throwable): (void) */
private Closure $errorHandler;

/**
* @param ListenerProviderInterface $listenerProvider The provider of event listeners
* @param callable(Throwable): (void) $errorHandler The handler for errors thrown by listeners
*/
public function __construct(
private readonly ListenerProviderInterface $listenerProvider
private readonly ListenerProviderInterface $listenerProvider,
?callable $errorHandler = null
) {
if ($errorHandler === null) {
$this->errorHandler = function (Throwable $exception): void {};
} else {
$this->errorHandler = Closure::fromCallable($errorHandler);
}
}

/**
Expand Down Expand Up @@ -85,7 +98,7 @@ private function dispatchStoppableEvent(
// that doesn't mean we want to block other listeners outside this loop.
$future = async(function () use ($event, $listener) {
$listener($event);
});
})->catch($this->errorHandler);

$future->await($cancellation);

Expand Down Expand Up @@ -120,14 +133,13 @@ private function dispatchNonStoppableEvent(
foreach ($listeners as $listener) {
$futures[] = async(function () use ($event, $listener) {
$listener($event);
});
})->catch($this->errorHandler);
}

// Wait for all listeners to complete
// Wait for all listeners to complete. This will carry on despite errors.
awaitAll($futures, $cancellation);

return $event;
});
}

}
35 changes: 23 additions & 12 deletions tests/AsyncEventDispatcherTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
use ArchiPro\EventDispatcher\Tests\Fixture\TestEvent;
use Exception;
use PHPUnit\Framework\TestCase;
use Revolt\EventLoop;

use Throwable;

/**
Expand All @@ -34,18 +32,21 @@ class AsyncEventDispatcherTest extends TestCase
private ListenerProvider $listenerProvider;
private AsyncEventDispatcher $dispatcher;

/** @var array<Throwable> */
private array $errors = [];

/**
* Sets up the test environment before each test.
*/
protected function setUp(): void
{
$this->listenerProvider = new ListenerProvider();
$this->dispatcher = new AsyncEventDispatcher($this->listenerProvider);

EventLoop::setErrorHandler(function (Throwable $err) {
throw $err;
});

$this->dispatcher = new AsyncEventDispatcher(
$this->listenerProvider,
function (Throwable $exception) {
$this->errors[] = $exception;
}
);
}

/**
Expand Down Expand Up @@ -80,6 +81,8 @@ public function testDispatchEventToMultipleListeners(): void
$this->assertCount(2, $results);
$this->assertContains('listener1: test data', $results);
$this->assertContains('listener2: test data', $results);

$this->assertCount(0, $this->errors, 'No errors are logged');
}

/**
Expand All @@ -103,6 +106,8 @@ public function testSynchronousStoppableEvent(): void

$this->assertCount(1, $results);
$this->assertEquals(['listener1'], $results);

$this->assertCount(0, $this->errors, 'No errors are logged');
}

/**
Expand All @@ -114,6 +119,7 @@ public function testNoListenersForEvent(): void
$dispatchedEvent = $this->dispatcher->dispatch($event);

$this->assertSame($event, $dispatchedEvent->await());
$this->assertCount(0, $this->errors, 'No errors are logged');
}

/**
Expand Down Expand Up @@ -158,16 +164,18 @@ public function testDispatchesFailureInOneListenerDoesNotAffectOthers(): void

$futureEvent = $this->dispatcher->dispatch($event);

$futureEvent = $futureEvent->await();
$futureEvent->await();

$this->assertTrue(
$futureEvent->calledOnce,
$event->calledOnce,
'The first listener should have been called'
);
$this->assertTrue(
$futureEvent->calledTwice,
$event->calledTwice,
'The second listener should have been called despite the failure of the first listener'
);

$this->assertCount(2, $this->errors, 'Errors are caught for both listeners');
}

public function testCancellationOfStoppableEvent(): void
Expand All @@ -187,6 +195,8 @@ public function testCancellationOfStoppableEvent(): void
$this->expectException(CancelledException::class);

$this->dispatcher->dispatch($event, $cancellation)->await();

$this->assertCount(0, $this->errors, 'No errors are caught');
}

public function testCancellationOfNonStoppableEvent(): void
Expand All @@ -206,6 +216,7 @@ public function testCancellationOfNonStoppableEvent(): void
$this->expectException(CancelledException::class);

$this->dispatcher->dispatch($event, $cancellation)->await();
}

$this->assertCount(0, $this->errors, 'No errors are caught');
}
}

0 comments on commit f925a65

Please sign in to comment.