Skip to content

Commit

Permalink
Merge branch 'master' into pub_channel
Browse files Browse the repository at this point in the history
  • Loading branch information
FZambia committed Jun 22, 2024
2 parents a48e5ad + ffe0b66 commit a57d110
Show file tree
Hide file tree
Showing 7 changed files with 64 additions and 33 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -522,7 +522,7 @@ You can call `unsubscribe` method to unsubscribe from a channel:
sub.unsubscribe();
```

**Important thing to know** is that unsubscribing from subscription does not remove event handlers you already set to that Subscription object. This allows to simply subscribe to channel again later calling `.subscribe()` method of subscription (see below). But there are cases when your code structured in a way that you need to remove event handlers after unsubscribe **to prevent them be executed twice** in the future. To do this remove event listeners explicitly after calling `unsubscribe()`:
**Important thing to mention** is that unsubscribing from subscription does not remove event handlers you already set to that Subscription object. This allows to simply subscribe to channel again later calling `.subscribe()` method of subscription (see below). But there are cases when your code structured in a way that you need to remove event handlers after unsubscribe **to prevent them be executed twice** in the future. To do this remove event listeners explicitly after calling `unsubscribe()`:

```javascript
sub.unsubscribe();
Expand Down
4 changes: 3 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ version: '3.8'

services:
centrifugo:
image: centrifugo/centrifugo:v5
image: centrifugo/centrifugo:v5.4.0
command:
- centrifugo
ports:
Expand All @@ -12,3 +12,5 @@ services:
- CENTRIFUGO_HTTP_STREAM=true
- CENTRIFUGO_SSE=true
- CENTRIFUGO_PRESENCE=true
- CENTRIFUGO_CLIENT_CONCURRENCY=8
- CENTRIFUGO_LOG_LEVEL=debug
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "centrifuge",
"version": "5.2.0",
"version": "5.2.1",
"description": "JavaScript client SDK for bidirectional communication with Centrifugo and Centrifuge-based server from browser, NodeJS and React Native",
"main": "build/index.js",
"types": "build/index.d.ts",
Expand Down
13 changes: 13 additions & 0 deletions src/centrifuge.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,19 @@ test.each(transportCases)("%s: subscribe and unsubscribe loop", async (transport
expect(sub.state).toBe(SubscriptionState.Unsubscribed);
await unsubscribedPromise;

sub.subscribe()
const presenceStats = await sub.presenceStats();
expect(presenceStats.numClients).toBe(1)
expect(presenceStats.numUsers).toBe(1);
const presence = await sub.presence();
expect(Object.keys(presence.clients).length).toBe(1)
await sub.unsubscribe()
const presenceStats2 = await c.presenceStats('test');
expect(presenceStats2.numClients).toBe(0)
expect(presenceStats2.numUsers).toBe(0);
const presence2 = await c.presence('test');
expect(Object.keys(presence2.clients).length).toBe(0)

let disconnectCalled: any;
const disconnectedPromise = new Promise<DisconnectedContext>((resolve, _) => {
disconnectCalled = resolve;
Expand Down
28 changes: 17 additions & 11 deletions src/centrifuge.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1374,7 +1374,7 @@ export class Centrifuge extends (EventEmitter as new () => TypedEventEmitter<Cli

protected _unsubscribe(sub: Subscription) {
if (!this._transportIsOpen) {
return;
return Promise.resolve();
}
const req = {
channel: sub.channel
Expand All @@ -1383,18 +1383,24 @@ export class Centrifuge extends (EventEmitter as new () => TypedEventEmitter<Cli

const self = this;

this._call(cmd, false).then(resolveCtx => {
// @ts-ignore - improve later.
if (resolveCtx.next) {
const unsubscribePromise = new Promise<void>((resolve, _) => {
this._call(cmd, false).then(resolveCtx => {
resolve()
// @ts-ignore - improve later.
resolveCtx.next();
}
}, rejectCtx => {
if (rejectCtx.next) {
rejectCtx.next();
}
self._disconnect(connectingCodes.unsubscribeError, 'unsubscribe error', true);
if (resolveCtx.next) {
// @ts-ignore - improve later.
resolveCtx.next();
}
}, rejectCtx => {
resolve()
if (rejectCtx.next) {
rejectCtx.next();
}
self._disconnect(connectingCodes.unsubscribeError, 'unsubscribe error', true);
});
});

return unsubscribePromise;
}

private _getSub(channel: string) {
Expand Down
26 changes: 18 additions & 8 deletions src/subscription.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ export class Subscription extends (EventEmitter as new () => TypedEventEmitter<S
// @ts-ignore – this is used by a client in centrifuge.ts.
private _inflight: boolean;
private _prevValue: any;
private _unsubPromise: any;

/** Subscription constructor should not be used directly, create subscriptions using Client method. */
constructor(centrifuge: Centrifuge, channel: string, options?: Partial<SubscriptionOptions>) {
Expand Down Expand Up @@ -65,6 +66,7 @@ export class Subscription extends (EventEmitter as new () => TypedEventEmitter<S
this._delta = '';
this._delta_negotiated = false;
this._prevValue = null;
this._unsubPromise = Promise.resolve();
this._setOptions(options);
// @ts-ignore – we are hiding some symbols from public API autocompletion.
if (this._centrifuge._debugEnabled) {
Expand Down Expand Up @@ -117,7 +119,7 @@ export class Subscription extends (EventEmitter as new () => TypedEventEmitter<S

/** unsubscribe from a channel, keeping position state.*/
unsubscribe() {
this._setUnsubscribed(unsubscribedCodes.unsubscribeCalled, 'unsubscribe called', true);
this._unsubPromise = this._setUnsubscribed(unsubscribedCodes.unsubscribeCalled, 'unsubscribe called', true);
}

/** publish data to a channel.*/
Expand Down Expand Up @@ -254,7 +256,7 @@ export class Subscription extends (EventEmitter as new () => TypedEventEmitter<S
}
}

private _setSubscribing(code: number, reason: string) {
private async _setSubscribing(code: number, reason: string) {
if (this._isSubscribing()) {
return;
}
Expand All @@ -264,6 +266,13 @@ export class Subscription extends (EventEmitter as new () => TypedEventEmitter<S
if (this._setState(SubscriptionState.Subscribing)) {
this.emit('subscribing', { channel: this.channel, code: code, reason: reason });
}
// @ts-ignore – for performance reasons only await _unsubPromise for emulution case where it's required.
if (this._centrifuge._transport && this._centrifuge._transport.emulation()) {
await this._unsubPromise;
}
if (!this._isSubscribing()) {
return;
}
this._subscribe();
}

Expand Down Expand Up @@ -437,28 +446,29 @@ export class Subscription extends (EventEmitter as new () => TypedEventEmitter<S
this._setSubscribed(result);
}

private _setUnsubscribed(code, reason, sendUnsubscribe) {
private _setUnsubscribed(code, reason, sendUnsubscribe): Promise<void> {
if (this._isUnsubscribed()) {
return;
return Promise.resolve();
}
let promise = Promise.resolve();
if (this._isSubscribed()) {
if (sendUnsubscribe) {
// @ts-ignore – we are hiding some methods from public API autocompletion.
this._centrifuge._unsubscribe(this);
promise = this._centrifuge._unsubscribe(this);
}
this._clearSubscribedState();
}
if (this._isSubscribing()) {
} else if (this._isSubscribing()) {
if (this._inflight && sendUnsubscribe) {
// @ts-ignore – we are hiding some methods from public API autocompletion.
this._centrifuge._unsubscribe(this);
promise = this._centrifuge._unsubscribe(this);
}
this._clearSubscribingState();
}
if (this._setState(SubscriptionState.Unsubscribed)) {
this.emit('unsubscribed', { channel: this.channel, code: code, reason: reason });
}
this._rejectPromises({ code: errorCodes.subscriptionUnsubscribed, message: this.state });
return promise;
}

private _handlePublication(pub: any) {
Expand Down
22 changes: 11 additions & 11 deletions yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -1335,11 +1335,11 @@ brace-expansion@^2.0.1:
balanced-match "^1.0.0"

braces@^3.0.2:
version "3.0.2"
resolved "https://registry.npmjs.org/braces/-/braces-3.0.2.tgz"
integrity sha512-b8um+L1RzM3WDSzvhm6gIz1yfTbBt6YTlcEKAvsmqCZZFw46z626lVj9j1yEPW33H5H+lBQpZMP1k8l+78Ha0A==
version "3.0.3"
resolved "https://registry.yarnpkg.com/braces/-/braces-3.0.3.tgz#490332f40919452272d55a8480adc0c441358789"
integrity sha512-yQbXgO/OSZVD2IsiLlro+7Hf6Q18EJrKSEsdoMzKePKXct3gvD8oLcOQdIzGupr5Fj+EDe8gO/lxc1BzfMpxvA==
dependencies:
fill-range "^7.0.1"
fill-range "^7.1.1"

browserslist@^4.21.9:
version "4.21.10"
Expand Down Expand Up @@ -1873,10 +1873,10 @@ file-entry-cache@^6.0.1:
dependencies:
flat-cache "^3.0.4"

fill-range@^7.0.1:
version "7.0.1"
resolved "https://registry.npmjs.org/fill-range/-/fill-range-7.0.1.tgz"
integrity sha512-qOo9F+dMUmC2Lcb4BbVvnKJxTPjCm+RRpe4gDuGrzkL7mEVl/djYSu2OdQ2Pa302N4oqkSg9ir6jaLWJ2USVpQ==
fill-range@^7.1.1:
version "7.1.1"
resolved "https://registry.yarnpkg.com/fill-range/-/fill-range-7.1.1.tgz#44265d3cac07e3ea7dc247516380643754a05292"
integrity sha512-YsGpe3WHLK8ZYi4tWDg2Jy3ebRz2rXowDxnld4bkQB00cc/1Zw9AWnC0i9ztDJitivtQvaI9KaLyKrc+hBW0yg==
dependencies:
to-regex-range "^5.0.1"

Expand Down Expand Up @@ -3632,9 +3632,9 @@ write-file-atomic@^4.0.2:
signal-exit "^3.0.7"

ws@^8.14.1:
version "8.14.1"
resolved "https://registry.npmjs.org/ws/-/ws-8.14.1.tgz"
integrity sha512-4OOseMUq8AzRBI/7SLMUwO+FEDnguetSk7KMb1sHwvF2w2Wv5Hoj0nlifx8vtGsftE/jWHojPy8sMMzYLJ2G/A==
version "8.17.1"
resolved "https://registry.yarnpkg.com/ws/-/ws-8.17.1.tgz#9293da530bb548febc95371d90f9c878727d919b"
integrity sha512-6XQFvXTkbfUOZOKKILFG1PDK2NDQs4azKQl26T0YS5CxqWLgXajbPZ+h4gZekJyRqFU8pvnbAbbs/3TgRPy+GQ==

xmlcreate@^2.0.4:
version "2.0.4"
Expand Down

0 comments on commit a57d110

Please sign in to comment.