Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Presence channel handler #25

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions js-src/Channel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,9 @@ export class Channel extends BaseChannel implements PresenceChannel {
}

here(callback: Function): Channel {
// TODO: implement
this.on('subscription_succeeded', (data) => {
callback(data)
})

return this
}
Expand All @@ -126,7 +128,9 @@ export class Channel extends BaseChannel implements PresenceChannel {
* Listen for someone joining the channel.
*/
joining(callback: Function): Channel {
// TODO: implement
this.on('member_added', (data) => {
callback(data)
})

return this
}
Expand All @@ -135,7 +139,9 @@ export class Channel extends BaseChannel implements PresenceChannel {
* Listen for someone leaving the channel.
*/
leaving(callback: Function): Channel {
// TODO: implement
this.on('member_removed', (data) => {
callback(data)
})

return this
}
Expand Down
5 changes: 3 additions & 2 deletions src/ConnectionRepository.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
use Aws\ApiGatewayManagementApi\ApiGatewayManagementApiClient;
use Aws\ApiGatewayManagementApi\Exception\ApiGatewayManagementApiException;
use GuzzleHttp\Exception\ClientException;
use Symfony\Component\HttpFoundation\Response;

class ConnectionRepository
{
Expand All @@ -28,8 +29,8 @@ public function sendMessage(string $connectionId, string $data): void
'Data' => $data,
]);
} catch (ApiGatewayManagementApiException $e) {
// GoneException: The connection with the provided id no longer exists.
if ($e->getAwsErrorCode() === 'GoneException') {
// GoneException: The connection with the provided id no longer exists.
if ($e->getStatusCode() === Response::HTTP_GONE) {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you test this? AWS doesnt return a standard HTTP gone exception.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually had to change it because I was not getting the 'GoneException' but was getting the HTTP Gone exception instead. Maybe we should keep both cases?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've pushed an update that responds to both exceptions.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So AWS does a real poor job on documenting this. Their Python and Java SDK's speak about a separate GoneException, not a HTTP exception: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/apigatewaymanagementapi.html#ApiGatewayManagementApi.Client.post_to_connection

This sets me to believe the error will just be 400 and the actual error is in the respone body. The SDK will map it to the approriate exception.

Is this really the case? I don't know. The string version doesnt seem to work so perhaps you are right.

Have to tested this behavior with an actual API? If so, I believe you and will be happy to merge.

Sorry for all the questions. But this has been a long standing issue, and really want to get it fixed for good ;)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I was working directly with the AWS API Gateway when I made this change. It was the only way I could avoid errors when doing presence removals. I did update the PR to include both cases, in case there is a situation where the other one is necessary.

$this->subscriptionRepository->clearConnection($connectionId);

return;
Expand Down
64 changes: 61 additions & 3 deletions src/Handler.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
use Bref\Event\Http\HttpResponse;
use Illuminate\Support\Arr;
use Illuminate\Support\Str;
use Symfony\Component\HttpFoundation\Response;
use Throwable;

class Handler extends WebsocketHandler
Expand Down Expand Up @@ -42,6 +43,7 @@ public function handleWebsocket(WebsocketEvent $event, Context $context): HttpRe

protected function handleDisconnect(WebsocketEvent $event, Context $context): void
{
$this->sendPresenceDisconnectNotices($event);
$this->subscriptionRepository->clearConnection($event->getConnectionId());
}

Expand Down Expand Up @@ -115,12 +117,28 @@ protected function subscribe(WebsocketEvent $event, Context $context): void
}
}

$this->subscriptionRepository->subscribeToChannel($event->getConnectionId(), $channel);
if (Str::startsWith($channel, 'presence-')) {
$this->subscriptionRepository->subscribeToPresenceChannel(
$event->getConnectionId(),
$channelData,
$channel
);
$data = $this->subscriptionRepository->getUserListForPresenceChannel($channel)
->transform(function ($user) {
$user = json_decode($user, true);
return Arr::get($user, 'user_info', json_encode($user));
})
->toArray();
$this->sendPresenceAdd($event, $channel, Arr::get(json_decode($channelData, true), 'user_info'));
} else {
$this->subscriptionRepository->subscribeToChannel($event->getConnectionId(), $channel);
$data = [];
}

$this->sendMessage($event, $context, [
'event' => 'subscription_succeeded',
'channel' => $channel,
'data' => [],
'data' => $data,
]);
}

Expand All @@ -138,6 +156,18 @@ protected function unsubscribe(WebsocketEvent $event, Context $context): void
]);
}

public function sendPresenceDisconnectNotices(WebsocketEvent $event): void
{
$channels = $this->subscriptionRepository->getChannelsSubscribedToByConnectionId($event->getConnectionId());
$channels->filter(function ($info) {
return Str::startsWith(Arr::get($info, 'channel'), 'presence-');
})->each(function ($info) use ($event) {
$channel = Arr::get($info, 'channel');
$userData = json_decode(Arr::get($info, 'userData'), true);
$this->sendPresenceRemove($event, $channel, Arr::get($userData, 'user_info'));
});
}

public function broadcastToChannel(WebsocketEvent $event, Context $context): void
{
$skipConnectionId = $event->getConnectionId();
Expand All @@ -158,6 +188,34 @@ public function broadcastToChannel(WebsocketEvent $event, Context $context): voi
->each(fn (string $connectionId) => $this->sendMessageToConnection($connectionId, $data));
}

public function sendPresenceAdd(WebsocketEvent $event, string $channel, array $data): void
{
$skipConnectionId = $event->getConnectionId();
$eventBody = json_decode($event->getBody(), true);
$data = json_encode([
'event'=>'member_added',
'channel'=>$channel,
'data'=>$data
]) ?: '';
$this->subscriptionRepository->getConnectionIdsForChannel($channel)
->reject(fn ($connectionId) => $connectionId === $skipConnectionId)
->each(fn (string $connectionId) => $this->sendMessageToConnection($connectionId, $data));
}

public function sendPresenceRemove(WebsocketEvent $event, string $channel, array $data): void
{
$skipConnectionId = $event->getConnectionId();
$eventBody = json_decode($event->getBody(), true);
$data = json_encode([
'event'=>'member_removed',
'channel'=>$channel,
'data'=>$data
]) ?: '';
$this->subscriptionRepository->getConnectionIdsForChannel($channel)
->reject(fn ($connectionId) => $connectionId === $skipConnectionId)
->each(fn (string $connectionId) => $this->sendMessageToConnection($connectionId, $data));
}

public function sendMessage(WebsocketEvent $event, Context $context, array $data): void
{
$this->connectionRepository->sendMessage($event->getConnectionId(), json_encode($data, JSON_THROW_ON_ERROR));
Expand All @@ -168,7 +226,7 @@ protected function sendMessageToConnection(string $connectionId, string $data):
try {
$this->connectionRepository->sendMessage($connectionId, $data);
} catch (ApiGatewayManagementApiException $exception) {
if ($exception->getAwsErrorCode() === 'GoneException') {
if ($exception->getStatusCode() === Response::HTTP_GONE) {
$this->subscriptionRepository->clearConnection($connectionId);
return;
}
Expand Down
49 changes: 49 additions & 0 deletions src/SubscriptionRepository.php
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,43 @@ public function getConnectionIdsForChannel(string ...$channels): Collection
->unique();
}

public function getUserListForPresenceChannel(string ...$channels): Collection
{
$promises = collect($channels)->map(fn ($channel) => $this->dynamoDb->queryAsync([
'TableName' => $this->table,
'IndexName' => 'lookup-by-channel',
'KeyConditionExpression' => 'channel = :channel',
'ExpressionAttributeValues' => [
':channel' => ['S' => $channel],
],
]))->toArray();

$responses = Utils::all($promises)->wait();

return collect($responses)
->flatmap(fn (\Aws\Result $result): array => $result['Items'])
->map(fn (array $item): string => Arr::get($item, 'userData.S', ''))
->unique();
}

public function getChannelsSubscribedToByConnectionId(string $connectionId): Collection
{
$response = $this->dynamoDb->query([
'TableName' => $this->table,
'KeyConditionExpression' => 'connectionId = :connectionId',
'ExpressionAttributeValues' => [
':connectionId' => ['S' => $connectionId],
],
]);
return collect(Arr::get($response, 'Items', []))
->transform(function ($item) {
return [
'channel'=>Arr::get($item, 'channel.S'),
'userData'=>Arr::get($item, 'userData.S'),
];
});
}

public function clearConnection(string $connectionId): void
{
$response = $this->dynamoDb->query([
Expand Down Expand Up @@ -86,4 +123,16 @@ public function unsubscribeFromChannel(string $connectionId, string $channel): v
],
]);
}

public function subscribeToPresenceChannel(string $connectionId, string $userData, string $channel): void
{
$this->dynamoDb->putItem([
'TableName' => $this->table,
'Item' => [
'connectionId' => ['S' => $connectionId],
'userData' => ['S' => $userData],
'channel' => ['S' => $channel],
],
]);
}
}
58 changes: 57 additions & 1 deletion tests/HandlerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
use GuzzleHttp\Psr7\Response;
use Mockery\Mock;
use Psr\Http\Message\RequestInterface;
use Symfony\Component\HttpFoundation\Response as SymfonyResponse;

it('can subscribe to open channels', function () {
app()->instance(SubscriptionRepository::class, Mockery::mock(SubscriptionRepository::class, function ($mock) {
Expand Down Expand Up @@ -113,11 +114,66 @@
], $context);
});

it('leaves presence channels', function () {
app()->instance(SubscriptionRepository::class, Mockery::mock(SubscriptionRepository::class, function ($mock) {
/** @var Mock $mock */
$mock->shouldReceive('getChannelsSubscribedToByConnectionId')->withArgs(function (string $connectionId): bool {
return $connectionId === 'connection-id-1';
})->once()
->andReturn(collect([
[
'channel'=>'presence-channel',
'userData'=>json_encode(['user_info'=>['the user info']]),
],
[
'channel'=>'other-channel',
]
]));
$mock->shouldReceive('getConnectionIdsForChannel')->withArgs(function (string $channel) {
return $channel === 'presence-channel';
})->once()
->andReturn(collect(['connection-id-1', 'connection-id-2']));
$mock->shouldReceive('clearConnection')->withArgs(function (string $connectionId) {
return $connectionId === 'connection-id-1';
})->once();
}));

app()->instance(ConnectionRepository::class, Mockery::mock(ConnectionRepository::class, function ($mock) {
/** @var Mock $mock */
$mock->shouldReceive('sendMessage')->withArgs(function (string $connectionId, string $data): bool {
return $connectionId === 'connection-id-2' and $data === '{"event":"member_removed","channel":"presence-channel","data":["the user info"]}';
})->once();
}));

/** @var Handler $handler */
$handler = app(Handler::class);

$context = new Context('request-id-1', 50_000, 'function-arn', 'trace-id-1');

$handler->handle([
'requestContext' => [
'routeKey' => 'my-test-route-key',
'eventType' => 'DISCONNECT',
'connectionId' => 'connection-id-1',
'domainName' => 'test-domain',
'apiId' => 'api-id-1',
'stage' => 'stage-test',
],
'body' => json_encode(['event' => 'disconnect']),
], $context);
});

it('handles dropped connections', function () {
$mock = new MockHandler();

$mock->append(function (CommandInterface $cmd, RequestInterface $req) {
return new ApiGatewayManagementApiException('', $cmd, ['code' => 'GoneException']);
$mock = Mockery::mock(SymfonyResponse::class, function ($mock) {
$mock->shouldReceive('getStatusCode')
->andReturn(SymfonyResponse::HTTP_GONE);
});
return new ApiGatewayManagementApiException('', $cmd, [
'response' => $mock
]);
});

/** @var SubscriptionRepository */
Expand Down