Skip to content

Commit

Permalink
Ignore only stream_select errors due to signal interruptions (#338)
Browse files Browse the repository at this point in the history
This also fixes signal interruptions not being ignored on PHP 8, as the error message changed slightly.

Fixes #265.
  • Loading branch information
kelunik authored Jan 10, 2021
1 parent dbb3c28 commit 73f6e71
Show file tree
Hide file tree
Showing 4 changed files with 211 additions and 117 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ install:
script:
# Run testNoMemoryLeak separately, as those are skipped with enabled coverage
- php vendor/bin/phpunit --verbose --group memoryleak
- php vendor/bin/phpunit --verbose --exclude-group memoryleak --coverage-php coverage/cov/main.cov
- php -dxdebug.mode=coverage vendor/bin/phpunit --verbose --exclude-group memoryleak --coverage-php coverage/cov/main.cov
- PHP_CS_FIXER_IGNORE_ENV=1 php vendor/bin/php-cs-fixer --diff --dry-run -v fix
- if [[ ${TRAVIS_PHP_VERSION:0:3} == "7.0" ]]; then echo "Skipped psalm static analysis"; else vendor/bin/psalm.phar; fi

Expand Down
262 changes: 148 additions & 114 deletions lib/Loop/NativeDriver.php
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,44 @@ class NativeDriver extends Driver
/** @var bool */
private $signalHandling;

/** @var callable */
private $streamSelectErrorHandler;

/** @var bool */
private $streamSelectIgnoreResult = false;

public function __construct()
{
$this->timerQueue = new Internal\TimerQueue;
$this->signalHandling = \extension_loaded("pcntl");
$this->nowOffset = getCurrentTime();
$this->now = \random_int(0, $this->nowOffset);
$this->nowOffset -= $this->now;
$this->streamSelectErrorHandler = function ($errno, $message) {
// Casing changed in PHP 8 from 'unable' to 'Unable'
if (\stripos($message, "stream_select(): unable to select [4]: ") === 0) { // EINTR
$this->streamSelectIgnoreResult = true;

return;
}

if (\strpos($message, 'FD_SETSIZE') !== false) {
$message = \str_replace(["\r\n", "\n", "\r"], " ", $message);
$pattern = '(stream_select\(\): You MUST recompile PHP with a larger value of FD_SETSIZE. It is set to (\d+), but you have descriptors numbered at least as high as (\d+)\.)';

if (\preg_match($pattern, $message, $match)) {
$helpLink = 'https://amphp.org/amp/event-loop/#implementations';

$message = 'You have reached the limits of stream_select(). It has a FD_SETSIZE of ' . $match[1]
. ', but you have file descriptors numbered at least as high as ' . $match[2] . '. '
. "You can install one of the extensions listed on {$helpLink} to support a higher number of "
. "concurrent file descriptors. If a large number of open file descriptors is unexpected, you "
. "might be leaking file descriptors that aren't closed correctly.";
}
}

throw new \Exception($message, $errno);
};
}

/**
Expand Down Expand Up @@ -131,6 +162,110 @@ protected function dispatch(bool $blocking)
}
}

/**
* {@inheritdoc}
*
* @return void
*/
protected function activate(array $watchers)
{
foreach ($watchers as $watcher) {
switch ($watcher->type) {
case Watcher::READABLE:
\assert(\is_resource($watcher->value));

$streamId = (int) $watcher->value;
$this->readWatchers[$streamId][$watcher->id] = $watcher;
$this->readStreams[$streamId] = $watcher->value;
break;

case Watcher::WRITABLE:
\assert(\is_resource($watcher->value));

$streamId = (int) $watcher->value;
$this->writeWatchers[$streamId][$watcher->id] = $watcher;
$this->writeStreams[$streamId] = $watcher->value;
break;

case Watcher::DELAY:
case Watcher::REPEAT:
\assert(\is_int($watcher->value));
$this->timerQueue->insert($watcher);
break;

case Watcher::SIGNAL:
\assert(\is_int($watcher->value));

if (!isset($this->signalWatchers[$watcher->value])) {
if (!@\pcntl_signal($watcher->value, $this->callableFromInstanceMethod('handleSignal'))) {
$message = "Failed to register signal handler";
if ($error = \error_get_last()) {
$message .= \sprintf("; Errno: %d; %s", $error["type"], $error["message"]);
}
throw new \Error($message);
}
}

$this->signalWatchers[$watcher->value][$watcher->id] = $watcher;
break;

default:
// @codeCoverageIgnoreStart
throw new \Error("Unknown watcher type");
// @codeCoverageIgnoreEnd
}
}
}

/**
* {@inheritdoc}
*
* @return void
*/
protected function deactivate(Watcher $watcher)
{
switch ($watcher->type) {
case Watcher::READABLE:
$streamId = (int) $watcher->value;
unset($this->readWatchers[$streamId][$watcher->id]);
if (empty($this->readWatchers[$streamId])) {
unset($this->readWatchers[$streamId], $this->readStreams[$streamId]);
}
break;

case Watcher::WRITABLE:
$streamId = (int) $watcher->value;
unset($this->writeWatchers[$streamId][$watcher->id]);
if (empty($this->writeWatchers[$streamId])) {
unset($this->writeWatchers[$streamId], $this->writeStreams[$streamId]);
}
break;

case Watcher::DELAY:
case Watcher::REPEAT:
$this->timerQueue->remove($watcher);
break;

case Watcher::SIGNAL:
\assert(\is_int($watcher->value));

if (isset($this->signalWatchers[$watcher->value])) {
unset($this->signalWatchers[$watcher->value][$watcher->id]);

if (empty($this->signalWatchers[$watcher->value])) {
unset($this->signalWatchers[$watcher->value]);
@\pcntl_signal($watcher->value, \SIG_DFL);
}
}
break;

default:
// @codeCoverageIgnoreStart
throw new \Error("Unknown watcher type");
// @codeCoverageIgnoreEnd
}
}

/**
* @param resource[] $read
* @param resource[] $write
Expand All @@ -153,19 +288,22 @@ private function selectStreams(array $read, array $write, int $timeout)

$except = null;

// Error reporting suppressed since stream_select() emits an E_WARNING if it is interrupted by a signal.
if (!($result = @\stream_select($read, $write, $except, $seconds, $microseconds))) {
if ($result === 0) {
return;
}
\set_error_handler($this->streamSelectErrorHandler);

$error = \error_get_last();
try {
$result = \stream_select($read, $write, $except, $seconds, $microseconds);
} finally {
\restore_error_handler();
}

if (\strpos($error["message"] ?? '', "unable to select") !== 0) {
return;
}
if ($this->streamSelectIgnoreResult || $result === 0) {
$this->streamSelectIgnoreResult = false;
return;
}

$this->error(new \Exception($error["message"] ?? 'Unknown error during stream_select'));
if (!$result) {
$this->error(new \Exception('Unknown error during stream_select'));
return;
}

foreach ($read as $stream) {
Expand Down Expand Up @@ -261,110 +399,6 @@ private function getTimeout(): int
return $expiration > 0 ? $expiration : 0;
}

/**
* {@inheritdoc}
*
* @return void
*/
protected function activate(array $watchers)
{
foreach ($watchers as $watcher) {
switch ($watcher->type) {
case Watcher::READABLE:
\assert(\is_resource($watcher->value));

$streamId = (int) $watcher->value;
$this->readWatchers[$streamId][$watcher->id] = $watcher;
$this->readStreams[$streamId] = $watcher->value;
break;

case Watcher::WRITABLE:
\assert(\is_resource($watcher->value));

$streamId = (int) $watcher->value;
$this->writeWatchers[$streamId][$watcher->id] = $watcher;
$this->writeStreams[$streamId] = $watcher->value;
break;

case Watcher::DELAY:
case Watcher::REPEAT:
\assert(\is_int($watcher->value));
$this->timerQueue->insert($watcher);
break;

case Watcher::SIGNAL:
\assert(\is_int($watcher->value));

if (!isset($this->signalWatchers[$watcher->value])) {
if (!@\pcntl_signal($watcher->value, $this->callableFromInstanceMethod('handleSignal'))) {
$message = "Failed to register signal handler";
if ($error = \error_get_last()) {
$message .= \sprintf("; Errno: %d; %s", $error["type"], $error["message"]);
}
throw new \Error($message);
}
}

$this->signalWatchers[$watcher->value][$watcher->id] = $watcher;
break;

default:
// @codeCoverageIgnoreStart
throw new \Error("Unknown watcher type");
// @codeCoverageIgnoreEnd
}
}
}

/**
* {@inheritdoc}
*
* @return void
*/
protected function deactivate(Watcher $watcher)
{
switch ($watcher->type) {
case Watcher::READABLE:
$streamId = (int) $watcher->value;
unset($this->readWatchers[$streamId][$watcher->id]);
if (empty($this->readWatchers[$streamId])) {
unset($this->readWatchers[$streamId], $this->readStreams[$streamId]);
}
break;

case Watcher::WRITABLE:
$streamId = (int) $watcher->value;
unset($this->writeWatchers[$streamId][$watcher->id]);
if (empty($this->writeWatchers[$streamId])) {
unset($this->writeWatchers[$streamId], $this->writeStreams[$streamId]);
}
break;

case Watcher::DELAY:
case Watcher::REPEAT:
$this->timerQueue->remove($watcher);
break;

case Watcher::SIGNAL:
\assert(\is_int($watcher->value));

if (isset($this->signalWatchers[$watcher->value])) {
unset($this->signalWatchers[$watcher->value][$watcher->id]);

if (empty($this->signalWatchers[$watcher->value])) {
unset($this->signalWatchers[$watcher->value]);
@\pcntl_signal($watcher->value, \SIG_DFL);
}
}
break;

default:
// @codeCoverageIgnoreStart
throw new \Error("Unknown watcher type");
// @codeCoverageIgnoreEnd
}
}

/**
* @param int $signo
*
Expand Down
62 changes: 61 additions & 1 deletion test/Loop/NativeDriverTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,66 @@ public function testHandle()
$this->assertNull($this->loop->getHandle());
}

public function testTooLargeFileDescriptorSet()
{
$sockets = [];
$domain = \stripos(PHP_OS, 'win') === 0 ? STREAM_PF_INET : STREAM_PF_UNIX;

for ($i = 0; $i < 1001; $i++) {
$sockets[] = \stream_socket_pair($domain, STREAM_SOCK_STREAM, STREAM_IPPROTO_IP);
}

$this->expectException(\Exception::class);
$this->expectExceptionMessage("You have reached the limits of stream_select(). It has a FD_SETSIZE of 1024, but you have file descriptors numbered at least as high as 200");

$this->start(function (Driver $loop) use ($sockets) {
$loop->delay(100, function () {
// here to provide timeout to stream_select, as the warning is only issued after the system call returns
});

foreach ($sockets as list($left, $right)) {
$loop->onReadable($left, function () {
// nothing
});

$loop->onReadable($right, function () {
// nothing
});
}
});
}

public function testSignalDuringStreamSelectIgnored()
{
$domain = \stripos(PHP_OS, 'win') === 0 ? STREAM_PF_INET : STREAM_PF_UNIX;
$sockets = \stream_socket_pair($domain, STREAM_SOCK_STREAM, STREAM_IPPROTO_IP);

$this->start(function (Driver $loop) use ($sockets) {
$socketWatchers = [
$loop->onReadable($sockets[0], function () {
// nothing
}),
$loop->onReadable($sockets[1], function () {
// nothing
}),
];

$loop->onSignal(\SIGUSR2, function ($signalWatcher) use ($socketWatchers, $loop) {
$loop->cancel($signalWatcher);

foreach ($socketWatchers as $watcher) {
$loop->cancel($watcher);
}

$this->assertTrue(true);
});

$loop->delay(100, function () {
\proc_open('sh -c "sleep 1; kill -USR2 ' . \getmypid() . '"', [], $pipes);
});
});
}

/**
* @requires PHP 7.1
*/
Expand All @@ -28,7 +88,7 @@ public function testAsyncSignals()

try {
$this->start(function (Driver $loop) use (&$invoked) {
$watcher = $loop->onSignal(SIGUSR1, function () use (&$invoked) {
$watcher = $loop->onSignal(\SIGUSR1, function () use (&$invoked) {
$invoked = true;
});
$loop->unreference($watcher);
Expand Down
2 changes: 1 addition & 1 deletion travis/install-uv.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
set -e

wget https://github.com/libuv/libuv/archive/v1.24.1.tar.gz -O /tmp/libuv.tar.gz -q &
wget https://github.com/bwoebi/php-uv/archive/master.tar.gz -O /tmp/php-uv.tar.gz -q &
wget https://github.com/bwoebi/php-uv/archive/444acfb9636094c2c96d1e7b43332027f464efc5.tar.gz -O /tmp/php-uv.tar.gz -q &
wait

mkdir libuv && tar -xf /tmp/libuv.tar.gz -C libuv --strip-components=1
Expand Down

0 comments on commit 73f6e71

Please sign in to comment.