diff --git a/packages/connect-web-bench/README.md b/packages/connect-web-bench/README.md index 6fc01fc42..0cdf05480 100644 --- a/packages/connect-web-bench/README.md +++ b/packages/connect-web-bench/README.md @@ -10,5 +10,5 @@ it like a web server would usually do. | code generator | bundle size | minified | compressed | |----------------|-------------------:|-----------------------:|---------------------:| -| connect | 112,186 b | 49,484 b | 13,326 b | +| connect | 112,396 b | 49,571 b | 13,350 b | | grpc-web | 414,071 b | 300,352 b | 53,255 b | diff --git a/packages/connect/src/protocol/run-call.spec.ts b/packages/connect/src/protocol/run-call.spec.ts index 6bb7233b9..0fcc38e74 100644 --- a/packages/connect/src/protocol/run-call.spec.ts +++ b/packages/connect/src/protocol/run-call.spec.ts @@ -232,4 +232,31 @@ describe("runStreamingCall()", function () { const it = req.message[Symbol.asyncIterator](); expect(await it.next()).toEqual({ done: true, value: undefined }); }); + it("should propagate the error thrown in next", async function () { + const req = makeReq(); + let reqError: Error | undefined; + req.message = { + [Symbol.asyncIterator]() { + return { + next() { + fail("unexpected call"); + throw new Error("unexpected call"); + }, + throw(e) { + reqError = e as Error; + return Promise.reject({ done: true, value: undefined }); + }, + }; + }, + }; + await expectAsync( + runStreamingCall({ + req: req, + next() { + return Promise.reject(new Error("foo")); + }, + }), + ).toBeRejectedWithError("[unknown] foo"); + expect(reqError?.message).toEqual("[unknown] foo"); + }); }); diff --git a/packages/connect/src/protocol/run-call.ts b/packages/connect/src/protocol/run-call.ts index 0edae2a15..e3a988356 100644 --- a/packages/connect/src/protocol/run-call.ts +++ b/packages/connect/src/protocol/run-call.ts @@ -105,18 +105,28 @@ export function runStreamingCall< message: normalizeIterable(opt.req.method.I, opt.req.message), signal, }; + let doneCalled = false; // Call return on the request iterable to indicate // that we will no longer consume it and it should // cleanup any allocated resources. signal.addEventListener("abort", function () { - opt.req.message[Symbol.asyncIterator]() - .return?.() - .catch(() => { - // return returns a promise, which we don't care about. + const it = opt.req.message[Symbol.asyncIterator](); + // If the signal is aborted due to an error, we want to throw + // the error to the request iterator. + if (!doneCalled) { + it.throw?.(this.reason).catch(() => { + // throw returns a promise, which we don't care about. // // Uncaught promises are thrown at sometime/somewhere by the event loop, // this is to ensure error is caught and ignored. }); + } + it.return?.().catch(() => { + // return returns a promise, which we don't care about. + // + // Uncaught promises are thrown at sometime/somewhere by the event loop, + // this is to ensure error is caught and ignored. + }); }); return next(req).then((res) => { return { @@ -128,6 +138,7 @@ export function runStreamingCall< next() { return it.next().then((r) => { if (r.done == true) { + doneCalled = true; done(); } return r;