Skip to content

Commit

Permalink
wait unsubscribe promise
Browse files Browse the repository at this point in the history
  • Loading branch information
FZambia committed Jun 19, 2024
1 parent da57e27 commit 5f551ae
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 15 deletions.
6 changes: 2 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -519,15 +519,13 @@ sub.publish({"input": "hello world"}).then(function() {
You can call `unsubscribe` method to unsubscribe from a channel:

```javascript
await sub.unsubscribe();
sub.unsubscribe();
```

Note, `sub.unsubscribe()` is asynchronous and may be used without `await`, but in some cases it may be very important to await it before calling subscribe again. For example, this becomes important when using HTTP-based fallback transports and quick unsubscribe/subscribe calls – awaiting unsubscribe call allows to properly synchronize unsubscribe and subscribe requests (to avoid subscribe request be processed before unsubscribe request on the server). To WebSocket and Webtransport this does not apply - since requests go through a direct connection to a server instance that holds connection.

**Important thing to mention** is that unsubscribing from subscription does not remove event handlers you already set to that Subscription object. This allows to simply subscribe to channel again later calling `.subscribe()` method of subscription (see below). But there are cases when your code structured in a way that you need to remove event handlers after unsubscribe **to prevent them be executed twice** in the future. To do this remove event listeners explicitly after calling `unsubscribe()`:

```javascript
await sub.unsubscribe();
sub.unsubscribe();
sub.removeAllListeners();
```

Expand Down
6 changes: 3 additions & 3 deletions src/centrifuge.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -360,13 +360,13 @@ test.each(transportCases)("%s: subscribe and unsubscribe loop", async (transport
})

for (let index = 0; index < 10; index++) {
await sub.subscribe();
await sub.unsubscribe();
sub.subscribe();
sub.unsubscribe();
}
expect(sub.state).toBe(SubscriptionState.Unsubscribed);
await unsubscribedPromise;

await sub.subscribe()
sub.subscribe()
const presenceStats = await sub.presenceStats();
expect(presenceStats.numClients).toBe(1)
expect(presenceStats.numUsers).toBe(1);
Expand Down
18 changes: 10 additions & 8 deletions src/subscription.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ export class Subscription extends (EventEmitter as new () => TypedEventEmitter<S
// @ts-ignore – this is used by a client in centrifuge.ts.
private _inflight: boolean;
private _prevValue: any;
private _unsubPromise: any;

/** Subscription constructor should not be used directly, create subscriptions using Client method. */
constructor(centrifuge: Centrifuge, channel: string, options?: Partial<SubscriptionOptions>) {
Expand Down Expand Up @@ -65,6 +66,7 @@ export class Subscription extends (EventEmitter as new () => TypedEventEmitter<S
this._delta = '';
this._delta_negotiated = false;
this._prevValue = null;
this._unsubPromise = Promise.resolve();
this._setOptions(options);
// @ts-ignore – we are hiding some symbols from public API autocompletion.
if (this._centrifuge._debugEnabled) {
Expand Down Expand Up @@ -107,22 +109,18 @@ export class Subscription extends (EventEmitter as new () => TypedEventEmitter<S
}

/** subscribe to a channel.*/
async subscribe() {
subscribe() {
if (this._isSubscribed()) {
return;
}
this._resubscribeAttempts = 0;
this._setSubscribing(subscribingCodes.subscribeCalled, 'subscribe called');
try {
await this.ready();
} catch (e) {
// do nothing.
}
}

/** unsubscribe from a channel, keeping position state.*/
async unsubscribe() {
await this._setUnsubscribed(unsubscribedCodes.unsubscribeCalled, 'unsubscribe called', true);
this._unsubPromise = this._setUnsubscribed(unsubscribedCodes.unsubscribeCalled, 'unsubscribe called', true);
return this._unsubPromise;
}

/** publish data to a channel.*/
Expand Down Expand Up @@ -259,7 +257,7 @@ export class Subscription extends (EventEmitter as new () => TypedEventEmitter<S
}
}

private _setSubscribing(code: number, reason: string) {
private async _setSubscribing(code: number, reason: string) {
if (this._isSubscribing()) {
return;
}
Expand All @@ -269,6 +267,10 @@ export class Subscription extends (EventEmitter as new () => TypedEventEmitter<S
if (this._setState(SubscriptionState.Subscribing)) {
this.emit('subscribing', { channel: this.channel, code: code, reason: reason });
}
await this._unsubPromise;
if (!this._isSubscribing()) {
return;
}
this._subscribe();
}

Expand Down

0 comments on commit 5f551ae

Please sign in to comment.