Skip to content

Commit

Permalink
Fix unresolved promises on lost connection in client
Browse files Browse the repository at this point in the history
  • Loading branch information
brstgt committed Jan 16, 2018
1 parent 378ec31 commit 08f442d
Show file tree
Hide file tree
Showing 5 changed files with 135 additions and 17 deletions.
7 changes: 7 additions & 0 deletions src/ConnectionLostException.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
<?php

namespace Zikarsky\React\Gearman;

class ConnectionLostException extends \Exception
{
}
15 changes: 10 additions & 5 deletions src/Protocol/Connection.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
use React\Stream\Stream;
use React\Promise\Deferred;
use BadMethodCallException;
use Zikarsky\React\Gearman\ConnectionLostException;

/**
* The connection wraps around the non-async version of the protocol buffers
Expand All @@ -24,7 +25,6 @@
* @event close When the connection closed "close" is emitted
*
* @author Benjamin Zikarsky <[email protected]>
*/
class Connection extends EventEmitter
{
Expand Down Expand Up @@ -67,14 +67,14 @@ class Connection extends EventEmitter
* Creates the connection on top of the async stream and with the given
* command-factory/specification
*
* @param Stream $stream
* @param Stream $stream
* @param CommandFactoryInterface $commandFactory
*/
public function __construct(Stream $stream, CommandFactoryInterface $commandFactory)
{
$this->commandFactory = $commandFactory;
$this->writeBuffer = new WriteBuffer();
$this->readBuffer = new ReadBuffer($commandFactory);
$this->writeBuffer = new WriteBuffer();
$this->readBuffer = new ReadBuffer($commandFactory);
$this->stream = $stream;
$this->logger = new NullLogger();

Expand All @@ -95,6 +95,11 @@ public function __construct(Stream $stream, CommandFactoryInterface $commandFact
}
$this->commandSendQueue = [];
});
$this->on('close', function () {
foreach ($this->commandSendQueue as $deferred) {
$deferred->reject(new ConnectionLostException());
}
});
}

/**
Expand Down Expand Up @@ -143,7 +148,7 @@ protected function handleData($data)
/**
* Sends a command over the stream
*
* @param CommandInterface $command
* @param CommandInterface $command
* @throws BadMethodCallException when the connection is closed
* @return \React\Promise\Promise|\React\Promise\PromiseInterface
*/
Expand Down
33 changes: 23 additions & 10 deletions src/Protocol/Participant.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
use Zikarsky\React\Gearman\Command\Exception as ProtocolException;
use React\Promise\Promise;
use React\Promise\Deferred;
use Zikarsky\React\Gearman\ConnectionLostException;

/**
* A participant is a async participant in the Gearman protocol, such as Clients, Workers and Servers
Expand Down Expand Up @@ -98,8 +99,8 @@ protected function blockingActionEnd()
* All other sent commands are queued until the handler resolves the initial action-promise
*
* @param CommandInterface $command
* @param string $eventName
* @param callable $handler
* @param string $eventName
* @param callable $handler
* @return Promise
*/
protected function blockingAction(CommandInterface $command, $eventName, callable $handler)
Expand All @@ -111,11 +112,16 @@ protected function blockingAction(CommandInterface $command, $eventName, callabl

// send command
$this->send($command, $actionPromise)->then(
// as soon as the command is sent, register a one-time event-handler
// on the expected response event, which executes the handler
// as soon as the command is sent, register a one-time event-handler
// on the expected response event, which executes the handler
function (CommandInterface $sentCommand) use ($deferred, $eventName, $handler) {
$this->connection->once($eventName, function (CommandInterface $recvCommand) use ($sentCommand, $deferred, $handler) {

$successListener = null;
$failListener = function () use ($deferred, $eventName, &$successListener) {
$this->connection->removeListener($eventName, $successListener);
$deferred->reject(new ConnectionLostException());
};
$successListener = function (CommandInterface $recvCommand) use ($sentCommand, $deferred, $handler, &$failListener) {
$this->connection->removeListener('close', $failListener);
// if the result is not NULL resolve the deferred action with the handler's result
// if the result is NULL we assume the handler communicated the result on the passed in deferred
// itself
Expand All @@ -124,7 +130,12 @@ function (CommandInterface $sentCommand) use ($deferred, $eventName, $handler) {
$this->blockingActionEnd();
$deferred->resolve($result);
}
});
};
$this->connection->once('close', $failListener);
$this->connection->once($eventName, $successListener);
},
function ($e) use ($deferred) {
$deferred->reject($e);
}
);

Expand All @@ -138,7 +149,7 @@ function (CommandInterface $sentCommand) use ($deferred, $eventName, $handler) {
* promise resolves
*
* @param CommandInterface $command
* @param Promise $lock
* @param Promise $lock
* @return Promise
*/
protected function send(CommandInterface $command, Promise $lock = null)
Expand All @@ -159,8 +170,8 @@ protected function send(CommandInterface $command, Promise $lock = null)
* Other commands are queued until an optional promise resolves (unlocks)
*
* @param CommandInterface $command
* @param Deferred $deferred
* @param Promise $lock
* @param Deferred $deferred
* @param Promise $lock
*/
private function sendDeferred(CommandInterface $command, Deferred $deferred, Promise $lock = null)
{
Expand Down Expand Up @@ -193,6 +204,8 @@ function () {
$this->connection->send($command)->then(function () use ($deferred, $command) {
// resolve the the promise to send the data
$deferred->resolve($command);
}, function ($e) use ($deferred) {
$deferred->reject($e);
});
}

Expand Down
75 changes: 73 additions & 2 deletions tests/ClientTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@ class ClientTest extends PHPUnit_Framework_TestCase
protected $connection;
protected $client;
protected $factory;
protected $buffer;

public function setUp()
{
$stream = $this->createMock(\React\Stream\Stream::class);
$buffer = $this->createMock(\React\Stream\Buffer::class);
$stream->expects($this->any())->method('getBuffer')->willReturn($buffer);
$this->buffer = $this->createPartialMock(\React\Stream\Buffer::class, ['isWritable']);
$stream->expects($this->any())->method('getBuffer')->willReturn($this->buffer);
$this->factory = new \Zikarsky\React\Gearman\Command\Binary\DefaultCommandFactory();

$this->connection = $this->getMockBuilder(\Zikarsky\React\Gearman\Protocol\Connection::class)
Expand Down Expand Up @@ -305,6 +306,76 @@ private function submitBackground($f, $data, $prio = TaskInterface::PRIORITY_NOR
return $eventTask;
}

public function testSubmitBackgroundWithLostConnectionBeforeWrite()
{
$promiseTask = null;
$exception = null;

// Connection is lost before command has been written
$this->client->submitBackground('func', 'data')->then(function ($createdTask) use (&$promiseTask) {
$promiseTask = $createdTask;
}, function ($e) use (&$exception) {
$exception = $e;
});
$this->connection->emit('close');

$this->assertNull($promiseTask);
$this->assertInstanceOf(\Zikarsky\React\Gearman\ConnectionLostException::class, $exception);
}

public function testSubmitBackgroundWithLostConnectionAfterWrite()
{
// Connection is lost after command has been written but no response received
$promiseTask = null;
$exception = null;

$this->client->submitBackground('func', 'data')->then(function ($createdTask) use (&$promiseTask) {
$promiseTask = $createdTask;
}, function ($e) use (&$exception) {
$exception = $e;
});
$this->buffer->emit('full-drain');
$this->connection->emit('close');

$this->assertNull($promiseTask);
$this->assertInstanceOf(\Zikarsky\React\Gearman\ConnectionLostException::class, $exception);
}

public function testSubmitWithLostConnectionBeforeWrite()
{
$eventTask = null;
$promiseTask = null;
$exception = null;

$this->client->submit('func', 'data')->then(function ($createdTask) use (&$promiseTask) {
$promiseTask = $createdTask;
}, function ($e) use (&$exception) {
$exception = $e;
});
$this->connection->emit('close');

$this->assertNull($promiseTask);
$this->assertInstanceOf(\Zikarsky\React\Gearman\ConnectionLostException::class, $exception);
}

public function testSubmitWithLostConnectionAfterWrite()
{
// Connection is lost after command has been written but no response received
$promiseTask = null;
$exception = null;

$this->client->submit('func', 'data')->then(function ($createdTask) use (&$promiseTask) {
$promiseTask = $createdTask;
}, function ($e) use (&$exception) {
$exception = $e;
});
$this->buffer->emit('full-drain');
$this->connection->emit('close');

$this->assertNull($promiseTask);
$this->assertInstanceOf(\Zikarsky\React\Gearman\ConnectionLostException::class, $exception);
}

public function testSetOption()
{
$confirmed = false;
Expand Down
22 changes: 22 additions & 0 deletions tests/Protocol/ConnectionTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ class ConnectionTest extends PHPUnit_Framework_TestCase
protected $connection;

public function setUp()
{
$this->setUpStream();
}

protected function setUpStream()
{
$this->type = new CommandType("TEST", 1, ["a", "b"]);

Expand Down Expand Up @@ -71,6 +76,23 @@ public function testSendFailOnClosedConnection(Connection $connection)
$this->assertInstanceOf(BadMethodCallException::class, $thrown);
}

/**
* @depends testSendFailOnClosedConnection
*/
public function testSendFailsWhenConnectionIsClosedDuringSend()
{
// Need to re-establish connection
$this->setUpStream();
$thrown = null;
$this->connection->send($this->packet)->otherwise(function ($e) use (&$thrown) {
$thrown = $e;
});
$this->stream->emit('close');

$this->assertTrue($this->connection->isClosed());
$this->assertInstanceOf(\Zikarsky\React\Gearman\ConnectionLostException::class, $thrown);
}

public function testHandledPacketEvent()
{
$testCalled = false;
Expand Down

0 comments on commit 08f442d

Please sign in to comment.