diff --git a/packages/core/router/router-response-controller.ts b/packages/core/router/router-response-controller.ts index 96bd06b94a3..e66544a1ded 100644 --- a/packages/core/router/router-response-controller.ts +++ b/packages/core/router/router-response-controller.ts @@ -8,7 +8,7 @@ import { import { isObject } from '@nestjs/common/utils/shared.utils'; import { IncomingMessage } from 'http'; import { EMPTY, lastValueFrom, Observable, isObservable } from 'rxjs'; -import { catchError, debounce, map } from 'rxjs/operators'; +import { catchError, concatMap, map } from 'rxjs/operators'; import { AdditionalHeaders, WritableHeaderStream, @@ -128,7 +128,7 @@ export class RouterResponseController { return { data: message as object | string }; }), - debounce( + concatMap( message => new Promise(resolve => stream.writeMessage(message, () => resolve()), @@ -153,6 +153,9 @@ export class RouterResponseController { request.on('close', () => { subscription.unsubscribe(); + if (!stream.writableEnded) { + stream.end(); + } }); } diff --git a/packages/core/router/sse-stream.ts b/packages/core/router/sse-stream.ts index 3bf666c03c3..523e745e02a 100644 --- a/packages/core/router/sse-stream.ts +++ b/packages/core/router/sse-stream.ts @@ -116,7 +116,7 @@ export class SseStream extends Transform { message.id = this.lastEventId.toString(); } - if (!this.write(message, 'utf-8', cb)) { + if (!this.write(message, 'utf-8')) { this.once('drain', cb); } else { process.nextTick(cb); diff --git a/packages/core/test/router/router-response-controller.spec.ts b/packages/core/test/router/router-response-controller.spec.ts index aac3212628e..1fdd92e7b2b 100644 --- a/packages/core/test/router/router-response-controller.spec.ts +++ b/packages/core/test/router/router-response-controller.spec.ts @@ -7,6 +7,7 @@ import { PassThrough, Writable } from 'stream'; import { HttpStatus, RequestMethod } from '../../../common'; import { RouterResponseController } from '../../router/router-response-controller'; import { NoopHttpAdapter } from '../utils/noop-adapter.spec'; +import { SseStream } from '../../router/sse-stream'; describe('RouterResponseController', () => { let adapter: NoopHttpAdapter; @@ -374,6 +375,71 @@ data: test done(); }); + describe('when writing data too densely', () => { + const DEFAULT_MAX_LISTENERS = SseStream.defaultMaxListeners; + const MAX_LISTENERS = 1; + const sandbox = sinon.createSandbox(); + + beforeEach(() => { + // Can't access to the internal sseStream, + // as a workround, set `defaultMaxListeners` of `SseStream` and reset the max listeners of `process` + const PROCESS_MAX_LISTENERS = process.getMaxListeners(); + SseStream.defaultMaxListeners = MAX_LISTENERS; + process.setMaxListeners(PROCESS_MAX_LISTENERS); + + const sseStream = sinon.createStubInstance(SseStream); + const originalWrite = SseStream.prototype.write; + // Make `.write()` always return false, so as to listen `drain` event + sseStream.write.callsFake(function (...args: any[]) { + originalWrite.apply(this, args); + return false; + }); + sandbox.replace(SseStream.prototype, 'write', sseStream.write); + }); + + afterEach(() => { + sandbox.restore(); + SseStream.defaultMaxListeners = DEFAULT_MAX_LISTENERS; + }); + + it('should not cause memory leak', async () => { + let maxDrainListenersExceededWarning = null; + process.on('warning', (warning: any) => { + if ( + warning.name === 'MaxListenersExceededWarning' && + warning.emitter instanceof SseStream && + warning.type === 'drain' && + warning.count === MAX_LISTENERS + 1 + ) { + maxDrainListenersExceededWarning = warning; + } + }); + + const result = new Subject(); + + const response = new Writable(); + response._write = () => {}; + + const request = new Writable(); + request._write = () => {}; + + routerResponseController.sse( + result, + response as unknown as ServerResponse, + request as unknown as IncomingMessage, + ); + + // Send multiple messages simultaneously + Array.from({ length: MAX_LISTENERS + 1 }).forEach((_, i) => + result.next(String(i)), + ); + + await new Promise(resolve => process.nextTick(resolve)); + + expect(maxDrainListenersExceededWarning).to.equal(null); + }); + }); + describe('when there is an error', () => { it('should close the request', done => { const result = new Subject();