From 73f6e71b34dfaf1811e85854638c9cafb617589c Mon Sep 17 00:00:00 2001 From: Niklas Keller Date: Sun, 10 Jan 2021 17:01:27 +0100 Subject: [PATCH] Ignore only stream_select errors due to signal interruptions (#338) This also fixes signal interruptions not being ignored on PHP 8, as the error message changed slightly. Fixes #265. --- .travis.yml | 2 +- lib/Loop/NativeDriver.php | 262 +++++++++++++++++++-------------- test/Loop/NativeDriverTest.php | 62 +++++++- travis/install-uv.sh | 2 +- 4 files changed, 211 insertions(+), 117 deletions(-) diff --git a/.travis.yml b/.travis.yml index b678ac5a..55902897 100644 --- a/.travis.yml +++ b/.travis.yml @@ -43,7 +43,7 @@ install: script: # Run testNoMemoryLeak separately, as those are skipped with enabled coverage - php vendor/bin/phpunit --verbose --group memoryleak - - php vendor/bin/phpunit --verbose --exclude-group memoryleak --coverage-php coverage/cov/main.cov + - php -dxdebug.mode=coverage vendor/bin/phpunit --verbose --exclude-group memoryleak --coverage-php coverage/cov/main.cov - PHP_CS_FIXER_IGNORE_ENV=1 php vendor/bin/php-cs-fixer --diff --dry-run -v fix - if [[ ${TRAVIS_PHP_VERSION:0:3} == "7.0" ]]; then echo "Skipped psalm static analysis"; else vendor/bin/psalm.phar; fi diff --git a/lib/Loop/NativeDriver.php b/lib/Loop/NativeDriver.php index fdbc2a69..038a7b60 100644 --- a/lib/Loop/NativeDriver.php +++ b/lib/Loop/NativeDriver.php @@ -40,6 +40,12 @@ class NativeDriver extends Driver /** @var bool */ private $signalHandling; + /** @var callable */ + private $streamSelectErrorHandler; + + /** @var bool */ + private $streamSelectIgnoreResult = false; + public function __construct() { $this->timerQueue = new Internal\TimerQueue; @@ -47,6 +53,31 @@ public function __construct() $this->nowOffset = getCurrentTime(); $this->now = \random_int(0, $this->nowOffset); $this->nowOffset -= $this->now; + $this->streamSelectErrorHandler = function ($errno, $message) { + // Casing changed in PHP 8 from 'unable' to 'Unable' + if (\stripos($message, "stream_select(): unable to select [4]: ") === 0) { // EINTR + $this->streamSelectIgnoreResult = true; + + return; + } + + if (\strpos($message, 'FD_SETSIZE') !== false) { + $message = \str_replace(["\r\n", "\n", "\r"], " ", $message); + $pattern = '(stream_select\(\): You MUST recompile PHP with a larger value of FD_SETSIZE. It is set to (\d+), but you have descriptors numbered at least as high as (\d+)\.)'; + + if (\preg_match($pattern, $message, $match)) { + $helpLink = 'https://amphp.org/amp/event-loop/#implementations'; + + $message = 'You have reached the limits of stream_select(). It has a FD_SETSIZE of ' . $match[1] + . ', but you have file descriptors numbered at least as high as ' . $match[2] . '. ' + . "You can install one of the extensions listed on {$helpLink} to support a higher number of " + . "concurrent file descriptors. If a large number of open file descriptors is unexpected, you " + . "might be leaking file descriptors that aren't closed correctly."; + } + } + + throw new \Exception($message, $errno); + }; } /** @@ -131,6 +162,110 @@ protected function dispatch(bool $blocking) } } + /** + * {@inheritdoc} + * + * @return void + */ + protected function activate(array $watchers) + { + foreach ($watchers as $watcher) { + switch ($watcher->type) { + case Watcher::READABLE: + \assert(\is_resource($watcher->value)); + + $streamId = (int) $watcher->value; + $this->readWatchers[$streamId][$watcher->id] = $watcher; + $this->readStreams[$streamId] = $watcher->value; + break; + + case Watcher::WRITABLE: + \assert(\is_resource($watcher->value)); + + $streamId = (int) $watcher->value; + $this->writeWatchers[$streamId][$watcher->id] = $watcher; + $this->writeStreams[$streamId] = $watcher->value; + break; + + case Watcher::DELAY: + case Watcher::REPEAT: + \assert(\is_int($watcher->value)); + $this->timerQueue->insert($watcher); + break; + + case Watcher::SIGNAL: + \assert(\is_int($watcher->value)); + + if (!isset($this->signalWatchers[$watcher->value])) { + if (!@\pcntl_signal($watcher->value, $this->callableFromInstanceMethod('handleSignal'))) { + $message = "Failed to register signal handler"; + if ($error = \error_get_last()) { + $message .= \sprintf("; Errno: %d; %s", $error["type"], $error["message"]); + } + throw new \Error($message); + } + } + + $this->signalWatchers[$watcher->value][$watcher->id] = $watcher; + break; + + default: + // @codeCoverageIgnoreStart + throw new \Error("Unknown watcher type"); + // @codeCoverageIgnoreEnd + } + } + } + + /** + * {@inheritdoc} + * + * @return void + */ + protected function deactivate(Watcher $watcher) + { + switch ($watcher->type) { + case Watcher::READABLE: + $streamId = (int) $watcher->value; + unset($this->readWatchers[$streamId][$watcher->id]); + if (empty($this->readWatchers[$streamId])) { + unset($this->readWatchers[$streamId], $this->readStreams[$streamId]); + } + break; + + case Watcher::WRITABLE: + $streamId = (int) $watcher->value; + unset($this->writeWatchers[$streamId][$watcher->id]); + if (empty($this->writeWatchers[$streamId])) { + unset($this->writeWatchers[$streamId], $this->writeStreams[$streamId]); + } + break; + + case Watcher::DELAY: + case Watcher::REPEAT: + $this->timerQueue->remove($watcher); + break; + + case Watcher::SIGNAL: + \assert(\is_int($watcher->value)); + + if (isset($this->signalWatchers[$watcher->value])) { + unset($this->signalWatchers[$watcher->value][$watcher->id]); + + if (empty($this->signalWatchers[$watcher->value])) { + unset($this->signalWatchers[$watcher->value]); + @\pcntl_signal($watcher->value, \SIG_DFL); + } + } + break; + + default: + // @codeCoverageIgnoreStart + throw new \Error("Unknown watcher type"); + // @codeCoverageIgnoreEnd + } + } + /** * @param resource[] $read * @param resource[] $write @@ -153,19 +288,22 @@ private function selectStreams(array $read, array $write, int $timeout) $except = null; - // Error reporting suppressed since stream_select() emits an E_WARNING if it is interrupted by a signal. - if (!($result = @\stream_select($read, $write, $except, $seconds, $microseconds))) { - if ($result === 0) { - return; - } + \set_error_handler($this->streamSelectErrorHandler); - $error = \error_get_last(); + try { + $result = \stream_select($read, $write, $except, $seconds, $microseconds); + } finally { + \restore_error_handler(); + } - if (\strpos($error["message"] ?? '', "unable to select") !== 0) { - return; - } + if ($this->streamSelectIgnoreResult || $result === 0) { + $this->streamSelectIgnoreResult = false; + return; + } - $this->error(new \Exception($error["message"] ?? 'Unknown error during stream_select')); + if (!$result) { + $this->error(new \Exception('Unknown error during stream_select')); + return; } foreach ($read as $stream) { @@ -261,110 +399,6 @@ private function getTimeout(): int return $expiration > 0 ? $expiration : 0; } - /** - * {@inheritdoc} - * - * @return void - */ - protected function activate(array $watchers) - { - foreach ($watchers as $watcher) { - switch ($watcher->type) { - case Watcher::READABLE: - \assert(\is_resource($watcher->value)); - - $streamId = (int) $watcher->value; - $this->readWatchers[$streamId][$watcher->id] = $watcher; - $this->readStreams[$streamId] = $watcher->value; - break; - - case Watcher::WRITABLE: - \assert(\is_resource($watcher->value)); - - $streamId = (int) $watcher->value; - $this->writeWatchers[$streamId][$watcher->id] = $watcher; - $this->writeStreams[$streamId] = $watcher->value; - break; - - case Watcher::DELAY: - case Watcher::REPEAT: - \assert(\is_int($watcher->value)); - $this->timerQueue->insert($watcher); - break; - - case Watcher::SIGNAL: - \assert(\is_int($watcher->value)); - - if (!isset($this->signalWatchers[$watcher->value])) { - if (!@\pcntl_signal($watcher->value, $this->callableFromInstanceMethod('handleSignal'))) { - $message = "Failed to register signal handler"; - if ($error = \error_get_last()) { - $message .= \sprintf("; Errno: %d; %s", $error["type"], $error["message"]); - } - throw new \Error($message); - } - } - - $this->signalWatchers[$watcher->value][$watcher->id] = $watcher; - break; - - default: - // @codeCoverageIgnoreStart - throw new \Error("Unknown watcher type"); - // @codeCoverageIgnoreEnd - } - } - } - - /** - * {@inheritdoc} - * - * @return void - */ - protected function deactivate(Watcher $watcher) - { - switch ($watcher->type) { - case Watcher::READABLE: - $streamId = (int) $watcher->value; - unset($this->readWatchers[$streamId][$watcher->id]); - if (empty($this->readWatchers[$streamId])) { - unset($this->readWatchers[$streamId], $this->readStreams[$streamId]); - } - break; - - case Watcher::WRITABLE: - $streamId = (int) $watcher->value; - unset($this->writeWatchers[$streamId][$watcher->id]); - if (empty($this->writeWatchers[$streamId])) { - unset($this->writeWatchers[$streamId], $this->writeStreams[$streamId]); - } - break; - - case Watcher::DELAY: - case Watcher::REPEAT: - $this->timerQueue->remove($watcher); - break; - - case Watcher::SIGNAL: - \assert(\is_int($watcher->value)); - - if (isset($this->signalWatchers[$watcher->value])) { - unset($this->signalWatchers[$watcher->value][$watcher->id]); - - if (empty($this->signalWatchers[$watcher->value])) { - unset($this->signalWatchers[$watcher->value]); - @\pcntl_signal($watcher->value, \SIG_DFL); - } - } - break; - - default: - // @codeCoverageIgnoreStart - throw new \Error("Unknown watcher type"); - // @codeCoverageIgnoreEnd - } - } - /** * @param int $signo * diff --git a/test/Loop/NativeDriverTest.php b/test/Loop/NativeDriverTest.php index 04cf4cf8..a2271124 100644 --- a/test/Loop/NativeDriverTest.php +++ b/test/Loop/NativeDriverTest.php @@ -19,6 +19,66 @@ public function testHandle() $this->assertNull($this->loop->getHandle()); } + public function testTooLargeFileDescriptorSet() + { + $sockets = []; + $domain = \stripos(PHP_OS, 'win') === 0 ? STREAM_PF_INET : STREAM_PF_UNIX; + + for ($i = 0; $i < 1001; $i++) { + $sockets[] = \stream_socket_pair($domain, STREAM_SOCK_STREAM, STREAM_IPPROTO_IP); + } + + $this->expectException(\Exception::class); + $this->expectExceptionMessage("You have reached the limits of stream_select(). It has a FD_SETSIZE of 1024, but you have file descriptors numbered at least as high as 200"); + + $this->start(function (Driver $loop) use ($sockets) { + $loop->delay(100, function () { + // here to provide timeout to stream_select, as the warning is only issued after the system call returns + }); + + foreach ($sockets as list($left, $right)) { + $loop->onReadable($left, function () { + // nothing + }); + + $loop->onReadable($right, function () { + // nothing + }); + } + }); + } + + public function testSignalDuringStreamSelectIgnored() + { + $domain = \stripos(PHP_OS, 'win') === 0 ? STREAM_PF_INET : STREAM_PF_UNIX; + $sockets = \stream_socket_pair($domain, STREAM_SOCK_STREAM, STREAM_IPPROTO_IP); + + $this->start(function (Driver $loop) use ($sockets) { + $socketWatchers = [ + $loop->onReadable($sockets[0], function () { + // nothing + }), + $loop->onReadable($sockets[1], function () { + // nothing + }), + ]; + + $loop->onSignal(\SIGUSR2, function ($signalWatcher) use ($socketWatchers, $loop) { + $loop->cancel($signalWatcher); + + foreach ($socketWatchers as $watcher) { + $loop->cancel($watcher); + } + + $this->assertTrue(true); + }); + + $loop->delay(100, function () { + \proc_open('sh -c "sleep 1; kill -USR2 ' . \getmypid() . '"', [], $pipes); + }); + }); + } + /** * @requires PHP 7.1 */ @@ -28,7 +88,7 @@ public function testAsyncSignals() try { $this->start(function (Driver $loop) use (&$invoked) { - $watcher = $loop->onSignal(SIGUSR1, function () use (&$invoked) { + $watcher = $loop->onSignal(\SIGUSR1, function () use (&$invoked) { $invoked = true; }); $loop->unreference($watcher); diff --git a/travis/install-uv.sh b/travis/install-uv.sh index ab32ca0a..7e544484 100755 --- a/travis/install-uv.sh +++ b/travis/install-uv.sh @@ -3,7 +3,7 @@ set -e wget https://github.com/libuv/libuv/archive/v1.24.1.tar.gz -O /tmp/libuv.tar.gz -q & -wget https://github.com/bwoebi/php-uv/archive/master.tar.gz -O /tmp/php-uv.tar.gz -q & +wget https://github.com/bwoebi/php-uv/archive/444acfb9636094c2c96d1e7b43332027f464efc5.tar.gz -O /tmp/php-uv.tar.gz -q & wait mkdir libuv && tar -xf /tmp/libuv.tar.gz -C libuv --strip-components=1