diff --git a/packages/core/router/router-response-controller.ts b/packages/core/router/router-response-controller.ts index 96bd06b94a3..e66544a1ded 100644 --- a/packages/core/router/router-response-controller.ts +++ b/packages/core/router/router-response-controller.ts @@ -8,7 +8,7 @@ import { import { isObject } from '@nestjs/common/utils/shared.utils'; import { IncomingMessage } from 'http'; import { EMPTY, lastValueFrom, Observable, isObservable } from 'rxjs'; -import { catchError, debounce, map } from 'rxjs/operators'; +import { catchError, concatMap, map } from 'rxjs/operators'; import { AdditionalHeaders, WritableHeaderStream, @@ -128,7 +128,7 @@ export class RouterResponseController { return { data: message as object | string }; }), - debounce( + concatMap( message => new Promise(resolve => stream.writeMessage(message, () => resolve()), @@ -153,6 +153,9 @@ export class RouterResponseController { request.on('close', () => { subscription.unsubscribe(); + if (!stream.writableEnded) { + stream.end(); + } }); } diff --git a/packages/core/router/sse-stream.ts b/packages/core/router/sse-stream.ts index 3bf666c03c3..523e745e02a 100644 --- a/packages/core/router/sse-stream.ts +++ b/packages/core/router/sse-stream.ts @@ -116,7 +116,7 @@ export class SseStream extends Transform { message.id = this.lastEventId.toString(); } - if (!this.write(message, 'utf-8', cb)) { + if (!this.write(message, 'utf-8')) { this.once('drain', cb); } else { process.nextTick(cb); diff --git a/packages/core/test/router/router-response-controller.spec.ts b/packages/core/test/router/router-response-controller.spec.ts index 7fdc31d13fa..1fdd92e7b2b 100644 --- a/packages/core/test/router/router-response-controller.spec.ts +++ b/packages/core/test/router/router-response-controller.spec.ts @@ -436,9 +436,7 @@ data: test await new Promise(resolve => process.nextTick(resolve)); - expect(() => { - expect(maxDrainListenersExceededWarning).to.equal(null); - }, 'it will fail as there is an issue here to be addressed').to.throw(); + expect(maxDrainListenersExceededWarning).to.equal(null); }); });