Skip to content

Commit

Permalink
Merge pull request #13368 from ssilve1989/refactor/cleanup-grpc-call-…
Browse files Browse the repository at this point in the history
…handling

refactor(microservices): prevent grpc write promise from throwing
  • Loading branch information
kamilmysliwiec authored Jun 3, 2024
2 parents 24755d6 + 22a9245 commit 75fa678
Show file tree
Hide file tree
Showing 5 changed files with 104 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,15 @@ import { INestApplication } from '@nestjs/common';
import { MicroserviceOptions, Transport } from '@nestjs/microservices';
import { Test } from '@nestjs/testing';
import { fail } from 'assert';
import { expect } from 'chai';
import { expect, use } from 'chai';
import * as chaiAsPromised from 'chai-as-promised';
import { join } from 'path';
import * as sinon from 'sinon';
import * as request from 'supertest';
import { GrpcController } from '../src/grpc/grpc.controller';

use(chaiAsPromised);

describe('GRPC transport', () => {
let server;
let app: INestApplication;
Expand All @@ -32,6 +36,7 @@ describe('GRPC transport', () => {
],
},
});

// Start gRPC microservice
await app.startAllMicroservices();
await app.init();
Expand Down Expand Up @@ -149,6 +154,50 @@ describe('GRPC transport', () => {
expect(receivedIds).to.deep.equal(expectedIds);
});

describe('streaming calls that error', () => {
// We want to assert that the application does not crash when an error is encountered with an unhandledRejection
// the best way to do that is to listen for the unhandledRejection event and fail the test if it is called
let processSpy: sinon.SinonSpy;

beforeEach(() => {
processSpy = sinon.spy();
process.on('unhandledRejection', processSpy);
});

afterEach(() => {
process.off('unhandledRejection', processSpy);
});

it('should not crash when replying with an error', async () => {
const call = new Promise<void>((resolve, reject) => {
const stream = client.streamDivide({
data: [{ dividend: 1, divisor: 0 }],
});

stream.on('data', () => {
fail('Stream should not have emitted any data');
});

stream.on('error', err => {
if (err.code !== GRPC.status.CANCELLED) {
reject(err);
}
});

stream.on('end', () => {
resolve();
});
});

await expect(call).to.eventually.be.rejectedWith(
'3 INVALID_ARGUMENT: dividing by 0 is not possible',
);

// if this fails the application has crashed
expect(processSpy.called).to.be.false;
});
});

after(async () => {
await app.close();
});
Expand Down
13 changes: 12 additions & 1 deletion integration/microservices/src/grpc/grpc.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import {
RpcException,
} from '@nestjs/microservices';
import { join } from 'path';
import { Observable, of, catchError } from 'rxjs';
import { Observable, of, catchError, from, mergeMap } from 'rxjs';

class ErrorHandlingProxy extends ClientGrpcProxy {
serializeError(err) {
Expand Down Expand Up @@ -107,6 +107,17 @@ export class GrpcController {
};
}

// contrived example meant to show when an error is encountered, like dividing by zero, the
// application does not crash and the error is returned appropriately to the client
@GrpcMethod('Math', 'StreamDivide')
streamDivide({
data,
}: {
data: { dividend: number; divisor: number }[];
}): Observable<any> {
return from(data).pipe(mergeMap(request => this.divide(request)));
}

@GrpcMethod('Math2')
async sum2({ data }: { data: number[] }): Promise<any> {
return of({
Expand Down
12 changes: 11 additions & 1 deletion integration/microservices/src/grpc/math.proto
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ service Math {
rpc SumStream(stream RequestSum) returns(stream SumResult);
rpc SumStreamPass(stream RequestSum) returns(stream SumResult);
rpc Divide (RequestDivide) returns (DivideResult);
rpc StreamLargeMessages(Empty) returns (stream BackpressureData) {}
rpc StreamLargeMessages(Empty) returns (stream BackpressureData);
/* Given a series of dividend and divisor, stream back the division results for each */
rpc StreamDivide (StreamDivideRequest) returns (stream StreamDivideResponse);
}

message BackpressureData {
Expand All @@ -33,3 +35,11 @@ message RequestDivide {
message DivideResult {
int32 result = 1;
}

message StreamDivideRequest {
repeated RequestDivide data = 1;
}

message StreamDivideResponse {
DivideResult data = 1;
}
32 changes: 10 additions & 22 deletions packages/microservices/server/server-grpc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -245,13 +245,7 @@ export class ServerGrpc extends Server implements CustomTransportStrategy {
return async (call: GrpcCall, callback: Function) => {
const handler = methodHandler(call.request, call.metadata, call);
const result$ = this.transformToObservable(await handler);

try {
await this.writeObservableToGrpc(result$, call);
} catch (err) {
call.emit('error', err);
return;
}
await this.writeObservableToGrpc(result$, call);
};
}

Expand All @@ -265,11 +259,13 @@ export class ServerGrpc extends Server implements CustomTransportStrategy {
* @param call The GRPC call we want to write to.
* @returns A promise that resolves when we're done writing to the call.
*/
public writeObservableToGrpc<T>(
private writeObservableToGrpc<T>(
source: Observable<T>,
call: GrpcCall<T>,
): Promise<void> {
return new Promise((resolve, reject) => {
// this promise should **not** reject, as we're handling errors in the observable for the Call
// the promise is only needed to signal when writing/draining has been completed
return new Promise((resolve, _doNotUse) => {
const valuesWaitingToBeDrained: T[] = [];
let shouldErrorAfterDraining = false;
let error: any;
Expand All @@ -282,17 +278,14 @@ export class ServerGrpc extends Server implements CustomTransportStrategy {
// If the call is cancelled, unsubscribe from the source
const cancelHandler = () => {
subscription.unsubscribe();
// The call has been cancelled, so we need to either resolve
// or reject the promise. We're resolving in this case because
// rejection is noisy. If at any point in the future, we need to
// know that cancellation happened, we can either reject or
// start resolving with some sort of outcome value.
// Calls that are cancelled by the client should be successfully resolved here
resolve();
};
call.on(CANCEL_EVENT, cancelHandler);
subscription.add(() => call.off(CANCEL_EVENT, cancelHandler));

// In all cases, when we finalize, end the writable stream
// being careful that errors and writes must be emitted _before_ this call is ended
subscription.add(() => call.end());

const drain = () => {
Expand All @@ -316,7 +309,7 @@ export class ServerGrpc extends Server implements CustomTransportStrategy {
} else if (shouldErrorAfterDraining) {
call.emit('error', error);
subscription.unsubscribe();
reject(error);
resolve();
}
};

Expand All @@ -341,7 +334,7 @@ export class ServerGrpc extends Server implements CustomTransportStrategy {
// reject and teardown.
call.emit('error', err);
subscription.unsubscribe();
reject(err);
resolve();
} else {
// We're waiting for a drain event, record the
// error so it can be handled after everything is drained.
Expand Down Expand Up @@ -390,12 +383,7 @@ export class ServerGrpc extends Server implements CustomTransportStrategy {
const handler = methodHandler(req.asObservable(), call.metadata, call);
const res = this.transformToObservable(await handler);
if (isResponseStream) {
try {
await this.writeObservableToGrpc(res, call);
} catch (err) {
call.emit('error', err);
return;
}
await this.writeObservableToGrpc(res, call);
} else {
const response = await lastValueFrom(
res.pipe(
Expand Down
21 changes: 21 additions & 0 deletions packages/microservices/test/server/server-grpc.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,7 @@ describe('ServerGrpc', () => {
const fn = server.createStreamServiceMethod(sinon.spy());
expect(fn).to.be.a('function');
});

describe('on call', () => {
it('should call native method', async () => {
const call = {
Expand All @@ -405,6 +406,26 @@ describe('ServerGrpc', () => {
expect(call.off.calledWith('cancelled')).to.be.true;
});

it('should handle error thrown in handler', async () => {
const call = {
write: sinon.spy(() => true),
end: sinon.spy(),
on: sinon.spy(),
off: sinon.spy(),
emit: sinon.spy(),
};

const callback = sinon.spy();
const error = new Error('handler threw');
const native = sinon.spy(() => throwError(() => error));

// implicit assertion that this will never throw when call.emit emits an error event
await server.createStreamServiceMethod(native)(call, callback);
expect(native.called).to.be.true;
expect(call.emit.calledWith('error', error)).to.be.ok;
expect(call.end.called).to.be.true;
});

it(`should close the result observable when receiving an 'cancelled' event from the client`, async () => {
const et = new EventTarget();
const cancel = () => et.dispatchEvent(new Event('cancelled'));
Expand Down

0 comments on commit 75fa678

Please sign in to comment.