From e58f99bd9ca8e9b1b576bae8a9f96e15475a2185 Mon Sep 17 00:00:00 2001 From: kirills-morozovs <94073530+kirills-morozovs@users.noreply.github.com> Date: Tue, 20 Aug 2024 13:07:51 +0300 Subject: [PATCH] Remove expired members from subscription topic stored in Redis set map --- .../Storage/RedisStorageManager.php | 35 ++++++++++++++----- .../Storage/RedisStorageManagerTest.php | 7 ++-- 2 files changed, 32 insertions(+), 10 deletions(-) diff --git a/src/Subscriptions/Storage/RedisStorageManager.php b/src/Subscriptions/Storage/RedisStorageManager.php index 4f945ba82..53709ddb5 100644 --- a/src/Subscriptions/Storage/RedisStorageManager.php +++ b/src/Subscriptions/Storage/RedisStorageManager.php @@ -12,6 +12,7 @@ /** * Stores subscribers and topics in redis. + * * - Topics are subscriptions like "userCreated" or "userDeleted". * - Subscribers are clients that are listening to channels like "private-lighthouse-a7ef3d". * @@ -61,10 +62,13 @@ public function subscribersByTopic(string $topic): Collection // As explained in storeSubscriber, we use redis sets to store the names of subscribers of a topic. // We can retrieve all members of a set using the command smembers. $subscriberIds = $this->connection->command('smembers', [$this->topicKey($topic)]); - if (count($subscriberIds) === 0) { + if ($subscriberIds === []) { return new Collection(); } + // Store all keys as missing keys to remove the ones which are expired later. + $missingKeys = $subscriberIds; + // Since we store the individual subscribers with a prefix, // but not in the set, we have to add the prefix here. $subscriberIds = array_map([$this, 'channelKey'], $subscriberIds); @@ -73,22 +77,37 @@ public function subscribersByTopic(string $topic): Collection // This is like using multiple get calls (getSubscriber uses the get command). $subscribers = $this->connection->command('mget', [$subscriberIds]); - return (new Collection($subscribers)) + $subscribersCollection = (new Collection($subscribers)) ->filter() - ->map(static function (?string $subscriber): ?Subscriber { - // Some entries may be expired + ->map(static function (?string $subscriber) use (&$missingKeys): ?Subscriber { + // Some entries may be expired. if ($subscriber === null) { return null; } - // Other entries may contain invalid values + // Other entries may contain invalid values. try { - return unserialize($subscriber); + $subscriber = unserialize($subscriber); + + // This key exists so remove it from the list of missing keys. + $missingKeys = array_diff($missingKeys, [$subscriber->channel]); + + return $subscriber; } catch (\ErrorException) { return null; } }) ->filter(); + + // Remove expired subscribers from the set of subscribers of this topic. + if ($missingKeys !== []) { + $this->connection->command('srem', [ + $this->topicKey($topic), + ...$missingKeys, + ]); + } + + return $subscribersCollection; } public function storeSubscriber(Subscriber $subscriber, string $topic): void @@ -96,7 +115,7 @@ public function storeSubscriber(Subscriber $subscriber, string $topic): void $subscriber->topic = $topic; // In contrast to the CacheStorageManager, we use redis sets. - // Instead of reading the entire list, adding the subscriber and storing the list; + // Instead of reading the entire list, adding the subscriber, and storing the list; // we simply add the name of the subscriber to the set of subscribers of this topic using the sadd command... $topicKey = $this->topicKey($topic); $this->connection->command('sadd', [ @@ -108,7 +127,7 @@ public function storeSubscriber(Subscriber $subscriber, string $topic): void $this->connection->command('expire', [$topicKey, $this->ttl]); } - // Lastly, we store the subscriber as a serialized string... + // Lastly, we store the subscriber as a serialized string. $setCommand = 'set'; $setArguments = [ $this->channelKey($subscriber->channel), diff --git a/tests/Unit/Subscriptions/Storage/RedisStorageManagerTest.php b/tests/Unit/Subscriptions/Storage/RedisStorageManagerTest.php index dc5fb433f..2b8e51e1a 100644 --- a/tests/Unit/Subscriptions/Storage/RedisStorageManagerTest.php +++ b/tests/Unit/Subscriptions/Storage/RedisStorageManagerTest.php @@ -197,7 +197,7 @@ public function testSubscribersByTopic(): void $subscriber2, ]; - $redisConnection->expects($this->exactly(2)) + $redisConnection->expects($this->exactly(3)) ->method('command') ->with(...$this->withConsecutive( ['smembers', ["graphql.topic.{$topic}"]], @@ -205,10 +205,12 @@ public function testSubscribersByTopic(): void 'graphql.subscriber.foo1', 'graphql.subscriber.foo2', 'graphql.subscriber.foo3', + 'graphql.subscriber.foo4', ]]], + ['srem', ["graphql.topic.{$topic}", 'foo3', 'foo4']], )) ->willReturnOnConsecutiveCalls( - ['foo1', 'foo2', 'foo3'], + ['foo1', 'foo2', 'foo3', 'foo4'], [ serialize($subscriber1), serialize($subscriber2), @@ -219,6 +221,7 @@ public function testSubscribersByTopic(): void // mget non-existing-entry false, ], + null, ); $this->assertEquals(