Skip to content

Commit

Permalink
Replace TimeoutCache with PriorityQueue from amphp/sync
Browse files Browse the repository at this point in the history
  • Loading branch information
trowski committed Dec 14, 2024
1 parent 9f1b874 commit 84554fc
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 136 deletions.
2 changes: 1 addition & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
"amphp/http": "^2",
"amphp/pipeline": "^1",
"amphp/socket": "^2.1",
"amphp/sync": "^2",
"amphp/sync": "^2.2",
"league/uri": "^6.8 | ^7.1",
"league/uri-interfaces": "^2.3 | ^7.1",
"psr/http-message": "^1 | ^2",
Expand Down
128 changes: 0 additions & 128 deletions src/Driver/Internal/TimeoutCache.php

This file was deleted.

15 changes: 8 additions & 7 deletions src/Driver/Internal/TimeoutQueue.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@
namespace Amp\Http\Server\Driver\Internal;

use Amp\Http\Server\Driver\Client;
use Amp\Sync\PriorityQueue;
use Revolt\EventLoop;
use function Amp\async;
use function Amp\weakClosure;

/** @internal */
final class TimeoutQueue
{
private readonly TimeoutCache $timeoutCache;
private readonly PriorityQueue $priorityQueue;

private readonly \WeakMap $streamNames;

Expand All @@ -23,15 +24,15 @@ final class TimeoutQueue

public function __construct()
{
$this->timeoutCache = new TimeoutCache();
$this->priorityQueue = new PriorityQueue();
$this->streamNames = new \WeakMap();
$this->now = \time();

$this->callbackId = EventLoop::unreference(
EventLoop::repeat(1, weakClosure(function (): void {
$this->now = \time();

while ($id = $this->timeoutCache->extract($this->now)) {
while ($id = $this->priorityQueue->extract($this->now)) {
\assert(isset($this->callbacks[$id]), "Timeout cache contains an invalid client ID");

// Client is either idle or taking too long to send request, so simply close the connection.
Expand Down Expand Up @@ -59,7 +60,7 @@ public function insert(Client $client, int $streamId, \Closure $onTimeout, int $
\assert(!isset($this->callbacks[$cacheId]));

$this->callbacks[$cacheId] = [$client, $streamId, $onTimeout];
$this->timeoutCache->update($cacheId, $this->now + $timeout);
$this->priorityQueue->insert($cacheId, $this->now + $timeout);
}

private function makeId(Client $client, int $streamId): string
Expand All @@ -77,15 +78,15 @@ public function update(Client $client, int $streamId, int $timeout): void
$cacheId = $this->makeId($client, $streamId);
\assert(isset($this->callbacks[$cacheId], $this->streamNames[$client][$streamId]));

$this->timeoutCache->update($cacheId, $this->now + $timeout);
$this->priorityQueue->insert($cacheId, $this->now + $timeout);
}

public function suspend(Client $client, int $streamId): void
{
$cacheId = $this->makeId($client, $streamId);
\assert(isset($this->callbacks[$cacheId], $this->streamNames[$client][$streamId]));

$this->timeoutCache->clear($cacheId);
$this->priorityQueue->remove($cacheId);
}

/**
Expand All @@ -95,7 +96,7 @@ public function remove(Client $client, int $streamId): void
{
$cacheId = $this->makeId($client, $streamId);

$this->timeoutCache->clear($cacheId);
$this->priorityQueue->remove($cacheId);
unset($this->callbacks[$cacheId], $this->streamNames[$client][$streamId]);
}
}

0 comments on commit 84554fc

Please sign in to comment.