Skip to content

Commit

Permalink
fix(microservices): call cleanup function on grpc call end
Browse files Browse the repository at this point in the history
  • Loading branch information
kamilmysliwiec committed Dec 9, 2024
1 parent d0fb875 commit 909001d
Showing 1 changed file with 13 additions and 3 deletions.
16 changes: 13 additions & 3 deletions packages/microservices/server/server-grpc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,8 @@ export class ServerGrpc extends Server implements CustomTransportStrategy {
// Needs to be a Proxy in order to buffer messages that come before handler is executed
// This could happen if handler has any async guards or interceptors registered that would delay
// the execution.
const { subject, next, error, complete } = this.bufferUntilDrained();
const { subject, next, error, complete, cleanup } =
this.bufferUntilDrained();
call.on('data', (m: any) => next(m));
call.on('error', (e: any) => {
// Check if error means that stream ended on other end
Expand All @@ -397,7 +398,10 @@ export class ServerGrpc extends Server implements CustomTransportStrategy {
// If another error then just pass it along
error(e);
});
call.on('end', () => complete());
call.on('end', () => {
complete();
cleanup();
});

const handler = methodHandler(
subject.asObservable(),
Expand Down Expand Up @@ -633,7 +637,7 @@ export class ServerGrpc extends Server implements CustomTransportStrategy {
let hasDrained = false;

function drainBuffer(this: DrainableSubject<T>) {
if (hasDrained) {
if (hasDrained || !replayBuffer) {
return;
}
hasDrained = true;
Expand Down Expand Up @@ -689,6 +693,12 @@ export class ServerGrpc extends Server implements CustomTransportStrategy {
}
subject.complete();
},
cleanup: () => {
if (hasDrained) {
return;
}
replayBuffer = null;
},
};
}
}

0 comments on commit 909001d

Please sign in to comment.