Skip to content

Commit

Permalink
Merge pull request #4 from aternosorg/socket-chunks
Browse files Browse the repository at this point in the history
send socket messages in chunks, test larger messages
  • Loading branch information
matthi4s authored Jan 26, 2024
2 parents 65af214 + 7942845 commit d12f73e
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 7 deletions.
34 changes: 27 additions & 7 deletions src/Communication/Socket/Socket.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
use Aternos\Taskmaster\Communication\MessageInterface;
use Aternos\Taskmaster\Communication\Socket\Exception\SocketReadException;
use Aternos\Taskmaster\Communication\Socket\Exception\SocketWriteException;
use Aternos\Taskmaster\Taskmaster;
use Generator;

/**
Expand All @@ -22,6 +23,8 @@ class Socket implements SocketInterface, SelectableSocketInterface
*/
protected mixed $socket;

protected string $receiveBuffer = "";

/**
* @param resource|Socket $socket
*/
Expand Down Expand Up @@ -62,10 +65,23 @@ public function receiveRaw(): Generator
if (!is_resource($this->socket) || feof($this->socket)) {
throw new SocketReadException("Could not read from socket.");
}
$result = fgets($this->socket);
$result = $this->receiveBuffer;
do {
$chunk = fgets($this->socket, 10_001);
if ($chunk === false || strlen($chunk) === 0) {
break;
}

$result .= $chunk;
} while (!str_ends_with($result, PHP_EOL));
if (!$result) {
break;
}
if (!str_ends_with($result, PHP_EOL)) {
$this->receiveBuffer = $result;
break;
}
$this->receiveBuffer = "";
$decoded = base64_decode($result);
yield $decoded;
} while (true);
Expand Down Expand Up @@ -102,18 +118,22 @@ public function sendRaw(string $data): bool
}
$data = base64_encode($data);
$data .= PHP_EOL;
$total = 0;
$expected = strlen($data);
$current = 0;
$total = strlen($data);
do {
if (!is_resource($this->socket) || feof($this->socket)) {
throw new SocketWriteException("Could not write to socket.");
}
$result = @fwrite($this->socket, $data);
if ($result === false || $result === 0) {
$chunk = substr($data, $current, 10_000);
$result = @fwrite($this->socket, $chunk);
if ($result === false) {
throw new SocketWriteException("Could not write to socket.");
}
$total += $result;
} while ($total < $expected);
if ($result === 0) {
usleep(Taskmaster::SOCKET_WAIT_TIME);
}
$current += $result;
} while ($current < $total);
return true;
}

Expand Down
9 changes: 9 additions & 0 deletions test/Integration/WorkerTestCase.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
use Aternos\Taskmaster\Test\Util\Task\CallbackTask;
use Aternos\Taskmaster\Test\Util\Task\ChildExceptionTask;
use Aternos\Taskmaster\Test\Util\Task\EmptyTask;
use Aternos\Taskmaster\Test\Util\Task\LargeTask;
use Aternos\Taskmaster\Test\Util\Task\ParentExceptionTask;
use Aternos\Taskmaster\Test\Util\Task\SynchronizedFieldTask;
use Aternos\Taskmaster\Test\Util\Task\UnsynchronizedFieldTask;
Expand Down Expand Up @@ -68,6 +69,14 @@ public function testGetTaskResult(): void
$this->assertEquals(3, $task->getResult());
}

public function testRunLargeTask(): void
{
$task = new LargeTask(1_000_000);
$this->taskmaster->runTask($task);
$this->taskmaster->wait();
$this->assertEquals(1_000_000, strlen($task->getResult()));
}

public function testGetTaskResultFromPromise(): void
{
$task = new AdditionTask(1, 2);
Expand Down
25 changes: 25 additions & 0 deletions test/Util/Task/LargeTask.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
<?php

namespace Aternos\Taskmaster\Test\Util\Task;

use Aternos\Taskmaster\Task\OnBoth;
use Aternos\Taskmaster\Task\OnChild;
use Aternos\Taskmaster\Task\Task;

class LargeTask extends Task
{
#[OnBoth] protected string $data;

/**
* @param int $length
*/
public function __construct(int $length = 100_000)
{
$this->data = str_repeat("T", $length);
}

#[OnChild] public function run()
{
return $this->data;
}
}

0 comments on commit d12f73e

Please sign in to comment.