Skip to content

Commit

Permalink
remove extra ticks from executeStreamIterator
Browse files Browse the repository at this point in the history
by inlining executeStreamIteratorItem and using an async helper
  • Loading branch information
yaacovCR committed Dec 27, 2022
1 parent 3ffb33c commit ea8d0f2
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 112 deletions.
42 changes: 13 additions & 29 deletions src/execution/__tests__/stream-test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -678,9 +678,6 @@ describe('Execute: stream directive', () => {
path: ['friendList', 2],
},
],
hasNext: true,
},
{
hasNext: false,
},
]);
Expand Down Expand Up @@ -720,7 +717,7 @@ describe('Execute: stream directive', () => {
}
}
`);
const result = await completeAsync(document, 3, {
const result = await completeAsync(document, 2, {
async *friendList() {
yield await Promise.resolve(friends[0]);
yield await Promise.resolve(friends[1]);
Expand Down Expand Up @@ -749,10 +746,9 @@ describe('Execute: stream directive', () => {
path: ['friendList', 2],
},
],
hasNext: true,
hasNext: false,
},
},
{ done: false, value: { hasNext: false } },
{ done: true, value: undefined },
]);
});
Expand Down Expand Up @@ -1214,9 +1210,6 @@ describe('Execute: stream directive', () => {
],
},
],
hasNext: true,
},
{
hasNext: false,
},
]);
Expand All @@ -1240,25 +1233,19 @@ describe('Execute: stream directive', () => {
} /* c8 ignore stop */,
},
});
expectJSON(result).toDeepEqual([
{
errors: [
{
message:
'Cannot return null for non-nullable field NestedObject.nonNullScalarField.',
locations: [{ line: 4, column: 11 }],
path: ['nestedObject', 'nonNullScalarField'],
},
],
data: {
nestedObject: null,
expectJSON(result).toDeepEqual({
errors: [
{
message:
'Cannot return null for non-nullable field NestedObject.nonNullScalarField.',
locations: [{ line: 4, column: 11 }],
path: ['nestedObject', 'nonNullScalarField'],
},
hasNext: true,
},
{
hasNext: false,
],
data: {
nestedObject: null,
},
]);
});
});
it('Filters payloads that are nulled by a later synchronous error', async () => {
const document = parse(`
Expand Down Expand Up @@ -1399,9 +1386,6 @@ describe('Execute: stream directive', () => {
],
},
],
hasNext: true,
},
{
hasNext: false,
},
]);
Expand Down
178 changes: 95 additions & 83 deletions src/execution/execute.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1950,59 +1950,49 @@ function executeStreamField(
return asyncPayloadRecord;
}

async function executeStreamIteratorItem(
async function completedItemsFromPromisedCompletedStreamedItem(
iterator: AsyncIterator<unknown>,
exeContext: ExecutionContext,
fieldNodes: ReadonlyArray<FieldNode>,
info: GraphQLResolveInfo,
itemType: GraphQLOutputType,
asyncPayloadRecord: StreamRecord,
fieldNodes: ReadonlyArray<FieldNode>,
path: Path,
itemPath: Path,
): Promise<IteratorResult<unknown>> {
let item;
completedItem: Promise<unknown>,
asyncPayloadRecord: AsyncPayloadRecord,
): Promise<[unknown] | null> {
try {
const { value, done } = await iterator.next();
if (done) {
asyncPayloadRecord.setIsCompletedIterator();
return { done, value: undefined };
try {
return [await completedItem];
} catch (rawError) {
const error = locatedError(rawError, fieldNodes, pathToArray(itemPath));
const handledError = handleFieldError(
error,
itemType,
asyncPayloadRecord.errors,
);
filterSubsequentPayloads(exeContext, itemPath, asyncPayloadRecord);
return [handledError];
}
item = value;
} catch (rawError) {
const error = locatedError(rawError, fieldNodes, pathToArray(itemPath));
const value = handleFieldError(error, itemType, asyncPayloadRecord.errors);
// don't continue if iterator throws
return { done: true, value };
} catch (error) {
handleStreamError(iterator, exeContext, path, asyncPayloadRecord, error);
return null;
}
let completedItem;
try {
completedItem = completeValue(
exeContext,
itemType,
fieldNodes,
info,
itemPath,
item,
asyncPayloadRecord,
);
}

if (isPromise(completedItem)) {
completedItem = completedItem.then(undefined, (rawError) => {
const error = locatedError(rawError, fieldNodes, pathToArray(itemPath));
const handledError = handleFieldError(
error,
itemType,
asyncPayloadRecord.errors,
);
filterSubsequentPayloads(exeContext, itemPath, asyncPayloadRecord);
return handledError;
});
}
return { done: false, value: completedItem };
} catch (rawError) {
const error = locatedError(rawError, fieldNodes, pathToArray(itemPath));
const value = handleFieldError(error, itemType, asyncPayloadRecord.errors);
filterSubsequentPayloads(exeContext, itemPath, asyncPayloadRecord);
return { done: false, value };
function handleStreamError(
iterator: AsyncIterator<unknown>,
exeContext: ExecutionContext,
path: Path,
asyncPayloadRecord: AsyncPayloadRecord,
error: GraphQLError,
): void {
asyncPayloadRecord.errors.push(error);
filterSubsequentPayloads(exeContext, path, asyncPayloadRecord);
// entire stream has errored and bubbled upwards
if (iterator?.return) {
iterator.return().catch(() => {
// ignore errors
});
}
}

Expand Down Expand Up @@ -2032,50 +2022,72 @@ async function executeStreamIterator(

let iteration;
try {
// eslint-disable-next-line no-await-in-loop
iteration = await executeStreamIteratorItem(
iterator,
exeContext,
fieldNodes,
info,
itemType,
asyncPayloadRecord,
itemPath,
);
} catch (error) {
asyncPayloadRecord.errors.push(error);
filterSubsequentPayloads(exeContext, path, asyncPayloadRecord);
asyncPayloadRecord.addItems(null);
// entire stream has errored and bubbled upwards
if (iterator?.return) {
iterator.return().catch(() => {
// ignore errors
});
try {
// eslint-disable-next-line no-await-in-loop
iteration = await iterator.next();
} catch (rawError) {
const error = locatedError(rawError, fieldNodes, pathToArray(itemPath));
const value = handleFieldError(
error,
itemType,
asyncPayloadRecord.errors,
);
// don't continue if iterator throws
asyncPayloadRecord.addItems([value]);
break;
}
return;
}

const { done, value: completedItem } = iteration;
const { done, value: item } = iteration;

let completedItems: PromiseOrValue<Array<unknown> | null>;
if (isPromise(completedItem)) {
completedItems = completedItem.then(
(value) => [value],
(error) => {
asyncPayloadRecord.errors.push(error);
filterSubsequentPayloads(exeContext, path, asyncPayloadRecord);
return null;
},
);
} else {
completedItems = [completedItem];
}
if (done) {
asyncPayloadRecord.setIsCompletedIterator();
asyncPayloadRecord.addItems(null);
break;
}

asyncPayloadRecord.addItems(completedItems);
let completedItem;
try {
completedItem = completeValue(
exeContext,
itemType,
fieldNodes,
info,
itemPath,
item,
asyncPayloadRecord,
);
} catch (rawError) {
const error = locatedError(rawError, fieldNodes, pathToArray(itemPath));
completedItem = handleFieldError(
error,
itemType,
asyncPayloadRecord.errors,
);
filterSubsequentPayloads(exeContext, itemPath, asyncPayloadRecord);
}

if (done) {
break;
if (isPromise(completedItem)) {
asyncPayloadRecord.addItems(
completedItemsFromPromisedCompletedStreamedItem(
iterator,
exeContext,
itemType,
fieldNodes,
path,
itemPath,
completedItem,
asyncPayloadRecord,
),
);
} else {
asyncPayloadRecord.addItems([completedItem]);
}
} catch (error) {
handleStreamError(iterator, exeContext, path, asyncPayloadRecord, error);
asyncPayloadRecord.addItems(null);
return;
}

previousAsyncPayloadRecord = asyncPayloadRecord;
index++;
}
Expand Down

0 comments on commit ea8d0f2

Please sign in to comment.