diff --git a/README.md b/README.md index 0f927c22..4d653759 100644 --- a/README.md +++ b/README.md @@ -522,7 +522,7 @@ You can call `unsubscribe` method to unsubscribe from a channel: sub.unsubscribe(); ``` -**Important thing to know** is that unsubscribing from subscription does not remove event handlers you already set to that Subscription object. This allows to simply subscribe to channel again later calling `.subscribe()` method of subscription (see below). But there are cases when your code structured in a way that you need to remove event handlers after unsubscribe **to prevent them be executed twice** in the future. To do this remove event listeners explicitly after calling `unsubscribe()`: +**Important thing to mention** is that unsubscribing from subscription does not remove event handlers you already set to that Subscription object. This allows to simply subscribe to channel again later calling `.subscribe()` method of subscription (see below). But there are cases when your code structured in a way that you need to remove event handlers after unsubscribe **to prevent them be executed twice** in the future. To do this remove event listeners explicitly after calling `unsubscribe()`: ```javascript sub.unsubscribe(); diff --git a/docker-compose.yml b/docker-compose.yml index 268937e4..3f78bf2c 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -2,7 +2,7 @@ version: '3.8' services: centrifugo: - image: centrifugo/centrifugo:v5 + image: centrifugo/centrifugo:v5.4.0 command: - centrifugo ports: @@ -12,3 +12,5 @@ services: - CENTRIFUGO_HTTP_STREAM=true - CENTRIFUGO_SSE=true - CENTRIFUGO_PRESENCE=true + - CENTRIFUGO_CLIENT_CONCURRENCY=8 + - CENTRIFUGO_LOG_LEVEL=debug diff --git a/src/centrifuge.test.ts b/src/centrifuge.test.ts index dc941414..8b81a9c2 100644 --- a/src/centrifuge.test.ts +++ b/src/centrifuge.test.ts @@ -366,6 +366,19 @@ test.each(transportCases)("%s: subscribe and unsubscribe loop", async (transport expect(sub.state).toBe(SubscriptionState.Unsubscribed); await unsubscribedPromise; + sub.subscribe() + const presenceStats = await sub.presenceStats(); + expect(presenceStats.numClients).toBe(1) + expect(presenceStats.numUsers).toBe(1); + const presence = await sub.presence(); + expect(Object.keys(presence.clients).length).toBe(1) + await sub.unsubscribe() + const presenceStats2 = await c.presenceStats('test'); + expect(presenceStats2.numClients).toBe(0) + expect(presenceStats2.numUsers).toBe(0); + const presence2 = await c.presence('test'); + expect(Object.keys(presence2.clients).length).toBe(0) + let disconnectCalled: any; const disconnectedPromise = new Promise((resolve, _) => { disconnectCalled = resolve; diff --git a/src/centrifuge.ts b/src/centrifuge.ts index 0852aae7..d1e6b59c 100644 --- a/src/centrifuge.ts +++ b/src/centrifuge.ts @@ -1374,7 +1374,7 @@ export class Centrifuge extends (EventEmitter as new () => TypedEventEmitter TypedEventEmitter { - // @ts-ignore - improve later. - if (resolveCtx.next) { + const unsubscribePromise = new Promise((resolve, _) => { + this._call(cmd, false).then(resolveCtx => { + resolve() // @ts-ignore - improve later. - resolveCtx.next(); - } - }, rejectCtx => { - if (rejectCtx.next) { - rejectCtx.next(); - } - self._disconnect(connectingCodes.unsubscribeError, 'unsubscribe error', true); + if (resolveCtx.next) { + // @ts-ignore - improve later. + resolveCtx.next(); + } + }, rejectCtx => { + resolve() + if (rejectCtx.next) { + rejectCtx.next(); + } + self._disconnect(connectingCodes.unsubscribeError, 'unsubscribe error', true); + }); }); + + return unsubscribePromise; } private _getSub(channel: string) { diff --git a/src/subscription.ts b/src/subscription.ts index 769cc923..c829d45c 100644 --- a/src/subscription.ts +++ b/src/subscription.ts @@ -37,6 +37,7 @@ export class Subscription extends (EventEmitter as new () => TypedEventEmitter) { @@ -65,6 +66,7 @@ export class Subscription extends (EventEmitter as new () => TypedEventEmitter TypedEventEmitter TypedEventEmitter TypedEventEmitter TypedEventEmitter { if (this._isUnsubscribed()) { - return; + return Promise.resolve(); } + let promise = Promise.resolve(); if (this._isSubscribed()) { if (sendUnsubscribe) { // @ts-ignore – we are hiding some methods from public API autocompletion. - this._centrifuge._unsubscribe(this); + promise = this._centrifuge._unsubscribe(this); } this._clearSubscribedState(); - } - if (this._isSubscribing()) { + } else if (this._isSubscribing()) { if (this._inflight && sendUnsubscribe) { // @ts-ignore – we are hiding some methods from public API autocompletion. - this._centrifuge._unsubscribe(this); + promise = this._centrifuge._unsubscribe(this); } this._clearSubscribingState(); } @@ -459,6 +468,7 @@ export class Subscription extends (EventEmitter as new () => TypedEventEmitter