Skip to content

Commit

Permalink
add tests for incremental subscription & @defer with errorLink
Browse files Browse the repository at this point in the history
  • Loading branch information
phryneas committed Jan 17, 2025
1 parent 034dd47 commit e54cfda
Show file tree
Hide file tree
Showing 6 changed files with 260 additions and 32 deletions.
12 changes: 12 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@
"fetch-mock": "9.11.0",
"glob": "8.1.0",
"graphql": "16.9.0",
"graphql-17-alpha2": "npm:[email protected]",
"graphql-ws": "5.16.0",
"jest": "29.7.0",
"jest-environment-jsdom": "29.7.0",
Expand Down
2 changes: 1 addition & 1 deletion src/link/core/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ export interface ApolloPayloadResult<
payload: SingleExecutionResult | ExecutionPatchResult | null;
// Transport layer errors (as distinct from GraphQL or NetworkErrors),
// these are fatal errors that will include done: true.
errors?: ReadonlyArray<Error | string>;
errors?: ReadonlyArray<GraphQLFormattedError | string>;
}

export type ExecutionPatchResult<
Expand Down
130 changes: 99 additions & 31 deletions src/link/error/__tests__/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ import { Observable } from "../../../utilities/observables/Observable";
import { onError, ErrorLink } from "../";
import { ObservableStream } from "../../../testing/internal";
import { PROTOCOL_ERRORS_SYMBOL } from "../../../errors";
import {
mockDeferStream,
mockMultipartSubscriptionStream,
} from "../../../testing/internal/incremental";

describe("error handling", () => {
it("has an easy way to handle GraphQL errors", async () => {
Expand Down Expand Up @@ -72,12 +76,16 @@ describe("error handling", () => {
expect(called).toBe(true);
});

it("handles protocol errors", async () => {
expect.assertions(3);
it.failing("handles protocol errors (@defer)", async () => {
// TODO: this test doesn't execute the `errorHandler` yet. Should be 4, is 2.
fail();
expect.assertions(4);
const query = gql`
query Foo {
foo {
bar
... @defer {
bar
}
}
}
`;
Expand All @@ -86,46 +94,106 @@ describe("error handling", () => {
expect(operation.operationName).toBe("Foo");
expect(protocolErrors).toEqual([
{
message: "cannot read message from websocket",
extensions: [{ code: "WEBSOCKET_MESSAGE_ERROR" }],
message: "could not read data",
extensions: {
code: "INCREMENTAL_ERROR",
},
},
]);
});

const mockLink = new ApolloLink((_operation) => {
return new Observable((observer) => {
observer.next({
data: null,
extensions: {
[PROTOCOL_ERRORS_SYMBOL]: [
{
message: "cannot read message from websocket",
extensions: [
{
code: "WEBSOCKET_MESSAGE_ERROR",
},
],
const { httpLink, enqueueInitialChunk, enqueueProtocolErrorChunk } =
mockDeferStream();
const link = errorLink.concat(httpLink);
const stream = new ObservableStream(execute(link, { query }));

enqueueInitialChunk({
hasNext: true,
data: {},
});

enqueueProtocolErrorChunk([
{
message: "could not read data",
extensions: {
code: "INCREMENTAL_ERROR",
},
},
]);
await expect(stream).toEmitValue({
data: {},
hasNext: true,
});

await expect(stream).toEmitValue({
hasNext: true,
incremental: [
{
errors: [
{
message: "could not read data",
extensions: {
code: "INCREMENTAL_ERROR",
},
],
},
});
});
},
],
},
],
});
});

const link = errorLink.concat(mockLink);
const stream = new ObservableStream(execute(link, { query }));
it("handles protocol errors (multipart subscription)", async () => {
expect.assertions(4);
const sampleSubscription = gql`
subscription MySubscription {
aNewDieWasCreated {
die {
roll
sides
color
}
}
}
`;

const errorLink = onError((args) => {
const { operation, protocolErrors } = args;
expect(operation.operationName).toBe("MySubscription");
expect(protocolErrors).toEqual([
{
message: "Error field",
extensions: { code: "INTERNAL_SERVER_ERROR" },
},
]);
});

const { httpLink, enqueuePayloadResult, enqueueErrorResult } =
mockMultipartSubscriptionStream();
const link = errorLink.concat(httpLink);
const stream = new ObservableStream(
execute(link, { query: sampleSubscription })
);

enqueuePayloadResult({
data: { aNewDieWasCreated: { die: { color: "red", roll: 1, sides: 4 } } },
});

enqueueErrorResult([
{ message: "Error field", extensions: { code: "INTERNAL_SERVER_ERROR" } },
]);

await expect(stream).toEmitValue({
data: { aNewDieWasCreated: { die: { color: "red", roll: 1, sides: 4 } } },
});

await expect(stream).toEmitValue({
data: null,
extensions: {
[PROTOCOL_ERRORS_SYMBOL]: [
{
message: "cannot read message from websocket",
extensions: [
{
code: "WEBSOCKET_MESSAGE_ERROR",
},
],
extensions: {
code: "INTERNAL_SERVER_ERROR",
},
message: "Error field",
},
],
},
Expand Down
142 changes: 142 additions & 0 deletions src/testing/internal/incremental.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
import { HttpLink } from "../../link/http/index.js";
import type {
GraphQLFormattedError,
InitialIncrementalExecutionResult,
SubsequentIncrementalExecutionResult,
} from "graphql-17-alpha2";
import type { GraphQLError } from "graphql";
import {
ReadableStream as NodeReadableStream,
TextEncoderStream,
TransformStream,
} from "node:stream/web";
import type { ApolloPayloadResult } from "../../core/index.js";

const hasNextSymbol = Symbol("hasNext");

export function mockIncrementalStream<Chunks>({
responseHeaders,
}: {
responseHeaders: Headers;
}) {
let streamController: ReadableStreamDefaultController<
Chunks & { [hasNextSymbol]: boolean }
>;
let sentInitialChunk = false;
const stream = new NodeReadableStream<Chunks & { [hasNextSymbol]: boolean }>({
start(c) {
streamController = c;
},
})
.pipeThrough(
new TransformStream<Chunks & { [hasNextSymbol]: boolean }, string>({
transform: (chunk, controller) => {
controller.enqueue(
(!sentInitialChunk ? "\r\n---\r\n" : "") +
"content-type: application/json; charset=utf-8\r\n\r\n" +
JSON.stringify(chunk) +
(chunk[hasNextSymbol] ? "\r\n---\r\n" : "\r\n-----\r\n")
);
sentInitialChunk = true;
},
})
)
.pipeThrough(new TextEncoderStream());

const httpLink = new HttpLink({
fetch(input, init) {
return Promise.resolve(
new Response(
stream satisfies NodeReadableStream<Uint8Array> as ReadableStream<Uint8Array>,
{
status: 200,
headers: responseHeaders,
}
)
);
},
});
return {
httpLink,
streamController: streamController!,
};
}

export function mockDeferStream<
TData = Record<string, unknown>,
TExtensions = Record<string, unknown>,
>() {
const { httpLink, streamController } = mockIncrementalStream<
| InitialIncrementalExecutionResult<TData, TExtensions>
| SubsequentIncrementalExecutionResult<TData, TExtensions>
>({
responseHeaders: new Headers({
"Content-Type": 'multipart/mixed; boundary="-"; deferSpec=20220824',
}),
});
return {
httpLink,
streamController: streamController!,
enqueueInitialChunk(
chunk: InitialIncrementalExecutionResult<TData, TExtensions>
) {
streamController.enqueue({ ...chunk, [hasNextSymbol]: chunk.hasNext });
if (!chunk.hasNext) streamController.close();
},
enqueueSubsequentChunk(
chunk: SubsequentIncrementalExecutionResult<TData, TExtensions>
) {
streamController.enqueue({ ...chunk, [hasNextSymbol]: chunk.hasNext });
if (!chunk.hasNext) streamController.close();
},
enqueueProtocolErrorChunk(errors: GraphQLFormattedError[]) {
streamController.enqueue({
hasNext: true,
[hasNextSymbol]: true,
incremental: [
{
// eslint-disable-next-line @typescript-eslint/ban-types
errors: errors as GraphQLError[],
},
],
} satisfies SubsequentIncrementalExecutionResult<TData, TExtensions> & {
[hasNextSymbol]: boolean;
});
},
};
}

export function mockMultipartSubscriptionStream<
TData = Record<string, unknown>,
TExtensions = Record<string, unknown>,
>() {
const { httpLink, streamController } = mockIncrementalStream<
ApolloPayloadResult<TData, TExtensions>
>({
responseHeaders: new Headers({
"Content-Type": "multipart/mixed",
}),
});

// send initial empty chunk back
streamController.enqueue({} as any);

return {
httpLink,
streamController: streamController!,
enqueuePayloadResult(
payload: ApolloPayloadResult["payload"],
hasNext = true
) {
streamController.enqueue({ payload, [hasNextSymbol]: hasNext });
if (!hasNext) streamController.close();
},
enqueueErrorResult(
errors: ApolloPayloadResult["errors"],
payload: ApolloPayloadResult["payload"] = null
) {
streamController.enqueue({ payload, errors, [hasNextSymbol]: false });
streamController.close();
},
};
}
5 changes: 5 additions & 0 deletions src/testing/internal/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,8 @@ export {
export { actAsync } from "./rtl/actAsync.js";
export { renderAsync } from "./rtl/renderAsync.js";
export { renderHookAsync } from "./rtl/renderHookAsync.js";
export {
mockIncrementalStream,
mockDeferStream,
mockMultipartSubscriptionStream,
} from "./incremental.js";

0 comments on commit e54cfda

Please sign in to comment.