Skip to content

Commit

Permalink
Defer immediate reads
Browse files Browse the repository at this point in the history
Immediate reads have been introduced to support in-memory streams and STDIN on Windows, but this causes problems during piping of streams that always have data available, because it blocks everything else. This is resolved by deferring the promise resolution to the next tick.
  • Loading branch information
kelunik committed Mar 9, 2018
1 parent a4739c8 commit 062f16d
Showing 1 changed file with 22 additions and 1 deletion.
23 changes: 22 additions & 1 deletion lib/ResourceInputStream.php
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@ final class ResourceInputStream implements InputStream {
/** @var bool */
private $useSingleRead;

/** @var callable */
private $immediateCallable;

/** @var string */
private $immediateWatcher;

/**
* @param resource $stream Stream resource.
* @param int $chunkSize Chunk size per read operation.
Expand Down Expand Up @@ -88,6 +94,12 @@ public function __construct($stream, int $chunkSize = self::DEFAULT_CHUNK_SIZE)
}
});

$this->immediateCallable = static function ($watcherId, $data) use (&$deferred) {
$temp = $deferred;
$deferred = null;
$temp->resolve($data);
};

Loop::disable($this->watcher);
}

Expand Down Expand Up @@ -126,7 +138,12 @@ public function read(): Promise {
}
}

return new Success($data);
// Prevent an immediate read → write loop from blocking everything
// See e.g. examples/benchmark-throughput.php
$this->deferred = new Deferred;
$this->immediateWatcher = Loop::defer($this->immediateCallable, $data);

return $this->deferred->promise();
}

/**
Expand Down Expand Up @@ -163,6 +180,10 @@ private function free() {
}

Loop::cancel($this->watcher);

if ($this->immediateWatcher !== null) {
Loop::cancel($this->immediateWatcher);
}
}

/**
Expand Down

0 comments on commit 062f16d

Please sign in to comment.