From f7e151d001cf7b3b66f2f5050baa4182b8a62ca0 Mon Sep 17 00:00:00 2001 From: Yaacov Rydzinski Date: Tue, 26 Nov 2024 22:59:41 +0200 Subject: [PATCH] transmit source event stream errors as an error payload 1. akin to a request error, without any data field 2. and close the response event stream ensuring no further events are sent --- src/execution/__tests__/cancellation-test.ts | 25 +++++++++++----- .../__tests__/mapAsyncIterable-test.ts | 22 ++++++++++++++ src/execution/__tests__/subscribe-test.ts | 30 ++++++++++++------- src/execution/execute.ts | 1 + src/execution/mapAsyncIterable.ts | 20 +++++++++---- 5 files changed, 74 insertions(+), 24 deletions(-) diff --git a/src/execution/__tests__/cancellation-test.ts b/src/execution/__tests__/cancellation-test.ts index 3c2f41553f..ba6b367990 100644 --- a/src/execution/__tests__/cancellation-test.ts +++ b/src/execution/__tests__/cancellation-test.ts @@ -2,7 +2,6 @@ import { assert, expect } from 'chai'; import { describe, it } from 'mocha'; import { expectJSON } from '../../__testUtils__/expectJSON.js'; -import { expectPromise } from '../../__testUtils__/expectPromise.js'; import { resolveOnNextTick } from '../../__testUtils__/resolveOnNextTick.js'; import { isAsyncIterable } from '../../jsutils/isAsyncIterable.js'; @@ -902,9 +901,15 @@ describe('Execute: Cancellation', () => { abortController.abort(); - await expectPromise(subscription.next()).toRejectWith( - 'This operation was aborted', - ); + expectJSON(await subscription.next()).toDeepEqual({ + value: { errors: [{ message: 'This operation was aborted' }] }, + done: false, + }); + + expectJSON(await subscription.next()).toDeepEqual({ + value: undefined, + done: true, + }); }); it('should stop the execution when aborted during subscription returned asynchronously', async () => { @@ -941,8 +946,14 @@ describe('Execute: Cancellation', () => { abortController.abort(); - await expectPromise(subscription.next()).toRejectWith( - 'This operation was aborted', - ); + expectJSON(await subscription.next()).toDeepEqual({ + value: { errors: [{ message: 'This operation was aborted' }] }, + done: false, + }); + + expectJSON(await subscription.next()).toDeepEqual({ + value: undefined, + done: true, + }); }); }); diff --git a/src/execution/__tests__/mapAsyncIterable-test.ts b/src/execution/__tests__/mapAsyncIterable-test.ts index 599e15f05e..7a51abc33e 100644 --- a/src/execution/__tests__/mapAsyncIterable-test.ts +++ b/src/execution/__tests__/mapAsyncIterable-test.ts @@ -89,6 +89,26 @@ describe('mapAsyncIterable', () => { }); }); + it('calls onError with iterator errors', async () => { + async function* source() { + yield 1; + throw new Error('Oops'); + } + + const doubles = mapAsyncIterable( + source(), + (x) => Promise.resolve(x + x), + () => Promise.resolve(0), + ); + + expect(await doubles.next()).to.deep.equal({ value: 2, done: false }); + expect(await doubles.next()).to.deep.equal({ value: 0, done: false }); + expect(await doubles.next()).to.deep.equal({ + value: undefined, + done: true, + }); + }); + it('calls done when completes', async () => { async function* source() { yield 1; @@ -100,6 +120,7 @@ describe('mapAsyncIterable', () => { const doubles = mapAsyncIterable( source(), (x) => Promise.resolve(x + x), + undefined, () => { done = true; }, @@ -126,6 +147,7 @@ describe('mapAsyncIterable', () => { const doubles = mapAsyncIterable( source(), (x) => Promise.resolve(x + x), + undefined, () => { done = true; }, diff --git a/src/execution/__tests__/subscribe-test.ts b/src/execution/__tests__/subscribe-test.ts index ffa1c85276..2d1a0e85a7 100644 --- a/src/execution/__tests__/subscribe-test.ts +++ b/src/execution/__tests__/subscribe-test.ts @@ -1013,7 +1013,7 @@ describe('Subscription Publish Phase', () => { }); }); - it('should not trigger when subscription is thrown', async () => { + it('should terminate when subscription is thrown', async () => { const pubsub = new SimplePubSub(); const subscription = createSubscription(pubsub); assert(isAsyncIterable(subscription)); @@ -1050,15 +1050,14 @@ describe('Subscription Publish Phase', () => { payload = subscription.next(); - // Throw error - let caughtError; - try { - /* c8 ignore next 2 */ - await subscription.throw('ouch'); - } catch (e) { - caughtError = e; - } - expect(caughtError).to.equal('ouch'); + const thrown = subscription.throw('ouch'); + + expectJSON(await thrown).toDeepEqual({ + done: false, + value: { + errors: [{ message: 'Unexpected error value: "ouch"' }], + }, + }); expect(await payload).to.deep.equal({ done: true, @@ -1230,7 +1229,16 @@ describe('Subscription Publish Phase', () => { }, }); - await expectPromise(subscription.next()).toRejectWith('test error'); + expectJSON(await subscription.next()).toDeepEqual({ + done: false, + value: { + errors: [ + { + message: 'test error', + }, + ], + }, + }); expect(await subscription.next()).to.deep.equal({ done: true, diff --git a/src/execution/execute.ts b/src/execution/execute.ts index 00f0e7b1ae..5a3a17fc0a 100644 --- a/src/execution/execute.ts +++ b/src/execution/execute.ts @@ -2137,6 +2137,7 @@ function mapSourceToResponse( }; return validatedExecutionArgs.perEventExecutor(perEventExecutionArgs); }, + (error) => ({ errors: [locatedError(error, undefined)] }), () => abortSignalListener?.disconnect(), ); } diff --git a/src/execution/mapAsyncIterable.ts b/src/execution/mapAsyncIterable.ts index e0f942fd53..c65b281964 100644 --- a/src/execution/mapAsyncIterable.ts +++ b/src/execution/mapAsyncIterable.ts @@ -1,16 +1,21 @@ import type { PromiseOrValue } from '../jsutils/PromiseOrValue.js'; /** - * Given an AsyncIterable and a callback function, return an AsyncIterator - * which produces values mapped via calling the callback function. + * Given an AsyncIterable and a onValue function, return an AsyncIterator + * which produces values mapped via calling the onValue function. */ export function mapAsyncIterable( iterable: AsyncGenerator | AsyncIterable, - callback: (value: T) => PromiseOrValue, + onValue: (value: T) => PromiseOrValue, + onError: (error: any) => PromiseOrValue = (error: any) => { + throw error; + }, onDone?: (() => void) | undefined, ): AsyncGenerator { const iterator = iterable[Symbol.asyncIterator](); + let errored = false; + async function mapResult( promise: Promise>, ): Promise> { @@ -23,12 +28,13 @@ export function mapAsyncIterable( } value = result.value; } catch (error) { + errored = true; onDone?.(); - throw error; + return { value: await onError(error), done: false }; } try { - return { value: await callback(value), done: false }; + return { value: await onValue(value), done: false }; } catch (error) { /* c8 ignore start */ // FIXME: add test case @@ -46,7 +52,9 @@ export function mapAsyncIterable( return { async next() { - return mapResult(iterator.next()); + return errored + ? Promise.resolve({ value: undefined as any, done: true }) + : mapResult(iterator.next()); }, async return(): Promise> { // If iterator.return() does not exist, then type R must be undefined.