diff --git a/lib/ResourceInputStream.php b/lib/ResourceInputStream.php index 3fe21e1..ac02b5a 100644 --- a/lib/ResourceInputStream.php +++ b/lib/ResourceInputStream.php @@ -31,6 +31,12 @@ final class ResourceInputStream implements InputStream { /** @var bool */ private $useSingleRead; + /** @var callable */ + private $immediateCallable; + + /** @var string */ + private $immediateWatcher; + /** * @param resource $stream Stream resource. * @param int $chunkSize Chunk size per read operation. @@ -88,6 +94,12 @@ public function __construct($stream, int $chunkSize = self::DEFAULT_CHUNK_SIZE) } }); + $this->immediateCallable = static function ($watcherId, $data) use (&$deferred) { + $temp = $deferred; + $deferred = null; + $temp->resolve($data); + }; + Loop::disable($this->watcher); } @@ -126,7 +138,12 @@ public function read(): Promise { } } - return new Success($data); + // Prevent an immediate read → write loop from blocking everything + // See e.g. examples/benchmark-throughput.php + $this->deferred = new Deferred; + $this->immediateWatcher = Loop::defer($this->immediateCallable, $data); + + return $this->deferred->promise(); } /** @@ -163,6 +180,10 @@ private function free() { } Loop::cancel($this->watcher); + + if ($this->immediateWatcher !== null) { + Loop::cancel($this->immediateWatcher); + } } /**