Skip to content

Commit

Permalink
Synchronize subscribe with unsubscribe to properly work in emulation …
Browse files Browse the repository at this point in the history
…case (#288)
  • Loading branch information
FZambia authored Jun 19, 2024
1 parent a5384ce commit ec9a35f
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 21 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -522,7 +522,7 @@ You can call `unsubscribe` method to unsubscribe from a channel:
sub.unsubscribe();
```

**Important thing to know** is that unsubscribing from subscription does not remove event handlers you already set to that Subscription object. This allows to simply subscribe to channel again later calling `.subscribe()` method of subscription (see below). But there are cases when your code structured in a way that you need to remove event handlers after unsubscribe **to prevent them be executed twice** in the future. To do this remove event listeners explicitly after calling `unsubscribe()`:
**Important thing to mention** is that unsubscribing from subscription does not remove event handlers you already set to that Subscription object. This allows to simply subscribe to channel again later calling `.subscribe()` method of subscription (see below). But there are cases when your code structured in a way that you need to remove event handlers after unsubscribe **to prevent them be executed twice** in the future. To do this remove event listeners explicitly after calling `unsubscribe()`:

```javascript
sub.unsubscribe();
Expand Down
4 changes: 3 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ version: '3.8'

services:
centrifugo:
image: centrifugo/centrifugo:v5
image: centrifugo/centrifugo:v5.4.0
command:
- centrifugo
ports:
Expand All @@ -12,3 +12,5 @@ services:
- CENTRIFUGO_HTTP_STREAM=true
- CENTRIFUGO_SSE=true
- CENTRIFUGO_PRESENCE=true
- CENTRIFUGO_CLIENT_CONCURRENCY=8
- CENTRIFUGO_LOG_LEVEL=debug
13 changes: 13 additions & 0 deletions src/centrifuge.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,19 @@ test.each(transportCases)("%s: subscribe and unsubscribe loop", async (transport
expect(sub.state).toBe(SubscriptionState.Unsubscribed);
await unsubscribedPromise;

sub.subscribe()
const presenceStats = await sub.presenceStats();
expect(presenceStats.numClients).toBe(1)
expect(presenceStats.numUsers).toBe(1);
const presence = await sub.presence();
expect(Object.keys(presence.clients).length).toBe(1)
await sub.unsubscribe()
const presenceStats2 = await c.presenceStats('test');
expect(presenceStats2.numClients).toBe(0)
expect(presenceStats2.numUsers).toBe(0);
const presence2 = await c.presence('test');
expect(Object.keys(presence2.clients).length).toBe(0)

let disconnectCalled: any;
const disconnectedPromise = new Promise<DisconnectedContext>((resolve, _) => {
disconnectCalled = resolve;
Expand Down
28 changes: 17 additions & 11 deletions src/centrifuge.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1374,7 +1374,7 @@ export class Centrifuge extends (EventEmitter as new () => TypedEventEmitter<Cli

protected _unsubscribe(sub: Subscription) {
if (!this._transportIsOpen) {
return;
return Promise.resolve();
}
const req = {
channel: sub.channel
Expand All @@ -1383,18 +1383,24 @@ export class Centrifuge extends (EventEmitter as new () => TypedEventEmitter<Cli

const self = this;

this._call(cmd, false).then(resolveCtx => {
// @ts-ignore - improve later.
if (resolveCtx.next) {
const unsubscribePromise = new Promise<void>((resolve, _) => {
this._call(cmd, false).then(resolveCtx => {
resolve()
// @ts-ignore - improve later.
resolveCtx.next();
}
}, rejectCtx => {
if (rejectCtx.next) {
rejectCtx.next();
}
self._disconnect(connectingCodes.unsubscribeError, 'unsubscribe error', true);
if (resolveCtx.next) {
// @ts-ignore - improve later.
resolveCtx.next();
}
}, rejectCtx => {
resolve()
if (rejectCtx.next) {
rejectCtx.next();
}
self._disconnect(connectingCodes.unsubscribeError, 'unsubscribe error', true);
});
});

return unsubscribePromise;
}

private _getSub(channel: string) {
Expand Down
26 changes: 18 additions & 8 deletions src/subscription.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ export class Subscription extends (EventEmitter as new () => TypedEventEmitter<S
// @ts-ignore – this is used by a client in centrifuge.ts.
private _inflight: boolean;
private _prevValue: any;
private _unsubPromise: any;

/** Subscription constructor should not be used directly, create subscriptions using Client method. */
constructor(centrifuge: Centrifuge, channel: string, options?: Partial<SubscriptionOptions>) {
Expand Down Expand Up @@ -65,6 +66,7 @@ export class Subscription extends (EventEmitter as new () => TypedEventEmitter<S
this._delta = '';
this._delta_negotiated = false;
this._prevValue = null;
this._unsubPromise = Promise.resolve();
this._setOptions(options);
// @ts-ignore – we are hiding some symbols from public API autocompletion.
if (this._centrifuge._debugEnabled) {
Expand Down Expand Up @@ -117,7 +119,7 @@ export class Subscription extends (EventEmitter as new () => TypedEventEmitter<S

/** unsubscribe from a channel, keeping position state.*/
unsubscribe() {
this._setUnsubscribed(unsubscribedCodes.unsubscribeCalled, 'unsubscribe called', true);
this._unsubPromise = this._setUnsubscribed(unsubscribedCodes.unsubscribeCalled, 'unsubscribe called', true);
}

/** publish data to a channel.*/
Expand Down Expand Up @@ -254,7 +256,7 @@ export class Subscription extends (EventEmitter as new () => TypedEventEmitter<S
}
}

private _setSubscribing(code: number, reason: string) {
private async _setSubscribing(code: number, reason: string) {
if (this._isSubscribing()) {
return;
}
Expand All @@ -264,6 +266,13 @@ export class Subscription extends (EventEmitter as new () => TypedEventEmitter<S
if (this._setState(SubscriptionState.Subscribing)) {
this.emit('subscribing', { channel: this.channel, code: code, reason: reason });
}
// @ts-ignore – for performance reasons only await _unsubPromise for emulution case where it's required.
if (this._centrifuge._transport && this._centrifuge._transport.emulation()) {
await this._unsubPromise;
}
if (!this._isSubscribing()) {
return;
}
this._subscribe();
}

Expand Down Expand Up @@ -437,28 +446,29 @@ export class Subscription extends (EventEmitter as new () => TypedEventEmitter<S
this._setSubscribed(result);
}

private _setUnsubscribed(code, reason, sendUnsubscribe) {
private _setUnsubscribed(code, reason, sendUnsubscribe): Promise<void> {
if (this._isUnsubscribed()) {
return;
return Promise.resolve();
}
let promise = Promise.resolve();
if (this._isSubscribed()) {
if (sendUnsubscribe) {
// @ts-ignore – we are hiding some methods from public API autocompletion.
this._centrifuge._unsubscribe(this);
promise = this._centrifuge._unsubscribe(this);
}
this._clearSubscribedState();
}
if (this._isSubscribing()) {
} else if (this._isSubscribing()) {
if (this._inflight && sendUnsubscribe) {
// @ts-ignore – we are hiding some methods from public API autocompletion.
this._centrifuge._unsubscribe(this);
promise = this._centrifuge._unsubscribe(this);
}
this._clearSubscribingState();
}
if (this._setState(SubscriptionState.Unsubscribed)) {
this.emit('unsubscribed', { channel: this.channel, code: code, reason: reason });
}
this._rejectPromises({ code: errorCodes.subscriptionUnsubscribed, message: this.state });
return promise;
}

private _handlePublication(pub: any) {
Expand Down

0 comments on commit ec9a35f

Please sign in to comment.