diff --git a/packages/microservices/server/server-grpc.ts b/packages/microservices/server/server-grpc.ts index f7cefe34b68..54911ed8689 100644 --- a/packages/microservices/server/server-grpc.ts +++ b/packages/microservices/server/server-grpc.ts @@ -384,7 +384,8 @@ export class ServerGrpc extends Server implements CustomTransportStrategy { // Needs to be a Proxy in order to buffer messages that come before handler is executed // This could happen if handler has any async guards or interceptors registered that would delay // the execution. - const { subject, next, error, complete } = this.bufferUntilDrained(); + const { subject, next, error, complete, cleanup } = + this.bufferUntilDrained(); call.on('data', (m: any) => next(m)); call.on('error', (e: any) => { // Check if error means that stream ended on other end @@ -397,7 +398,10 @@ export class ServerGrpc extends Server implements CustomTransportStrategy { // If another error then just pass it along error(e); }); - call.on('end', () => complete()); + call.on('end', () => { + complete(); + cleanup(); + }); const handler = methodHandler( subject.asObservable(), @@ -633,7 +637,7 @@ export class ServerGrpc extends Server implements CustomTransportStrategy { let hasDrained = false; function drainBuffer(this: DrainableSubject) { - if (hasDrained) { + if (hasDrained || !replayBuffer) { return; } hasDrained = true; @@ -689,6 +693,12 @@ export class ServerGrpc extends Server implements CustomTransportStrategy { } subject.complete(); }, + cleanup: () => { + if (hasDrained) { + return; + } + replayBuffer = null; + }, }; } }