From e9f3b788e13fb70aadd8a138d9fbe94e0176c116 Mon Sep 17 00:00:00 2001 From: Yaacov Rydzinski Date: Sun, 25 Feb 2024 13:01:11 +0200 Subject: [PATCH] Introduce `@stream` directive --- spec/Section 6 -- Execution.md | 102 +++++++++++++++++++++++++++++---- 1 file changed, 92 insertions(+), 10 deletions(-) diff --git a/spec/Section 6 -- Execution.md b/spec/Section 6 -- Execution.md index 4ccc3565e..81bdb1030 100644 --- a/spec/Section 6 -- Execution.md +++ b/spec/Section 6 -- Execution.md @@ -366,7 +366,7 @@ YieldIncrementalResults(data, errors, futures, eventEmitter): {newPendingResults} as pending. - For each completed child Future node of a root node in {graph}: - Let {completedFuture} be that Future; let {result} be its result. - - If {data} on {result} is {null}: + - If {FutureCompletedWithoutData(completedFuture, result)} is {true}: - Initialize {completed} to an empty list. - Let {parents} be the parents of {completedFuture}. - Initialize {completed} to an empty list. @@ -380,8 +380,8 @@ YieldIncrementalResults(data, errors, futures, eventEmitter): - Add each {future} of {futures} on {result} to {graph} via the same procedure as above. - Let {futuresToRelease} be the set of completed Future nodes in {graph} - completing a Deferred Fragment root node where all of the sibling Futures - are also complete. + either completing a Stream root node, or completing a Deferred Fragment root + node where all of the sibling Futures are also complete. - If {futuresToRelease} is empty, continue to the next completed child Future node in {graph}. - Initialize {incremental} to an empty lists. @@ -397,9 +397,8 @@ YieldIncrementalResults(data, errors, futures, eventEmitter): nodes. - Prune root nodes of {graph} containing no direct child Futures, as above. - Let {hasNext} be {false} if {graph} is empty. - - Let {incrementalResult} be an unordered map containing {hasNext}. - - If {incremental} is not empty, set the corresponding entry on - {incrementalResult} to {incremental}. + - Let {incrementalResult} be an unordered map containing {incremental} and + {hasNext}. - If {completed} is not empty, set the corresponding entry on {incrementalResult} to {completed}. - Let {newPendingResults} be the set of new root nodes in {graph}, promoted by @@ -409,7 +408,8 @@ YieldIncrementalResults(data, errors, futures, eventEmitter): - Set the corresponding entry on {incrementalResult} to {pending}. - Yield {incrementalResult}. - Emit events on {eventEmitter} signalling the release of each of the Deferred - Fragments in {newPendingResults} as pending. + Fragments in {newPendingResults} as pending, as well as each of the {items} + entries on the Futures in {futuresToRelease}. - Complete this incremental result stream. GetPending(newPendingResults): @@ -422,8 +422,21 @@ GetPending(newPendingResults): - Append {pendingEntry} to {pending}. - Return {pending}. +FutureCompletedWithoutData(completedFuture, result): + +- If {completedFuture} incrementally completes Deferred Fragments: + - If {data} on {result} is {null}, return {true}. +- Otherwise: + - If {items} on {result} is not defined or {null}, return {true}. +- Return {false}. + GetIncrementalEntry(future, graph): +- If {future} completes a Stream: + - Let {stream} be the Stream incrementally completed by {future}. + - Let {items} and {errors} be the corresponding entries on {result}. + - Let {id} be the unique identifier for {stream}. + - Return an unordered map containing {id}, {items}, and {errors}. - Let {deferredFragments} be the Deferred Fragments incrementally completed by {future} at {path}. - Let {result} be the result of {future}. @@ -967,17 +980,86 @@ CompleteListValue(innerType, fieldDetailsList, result, variableValues, eventEmitter, path, deferUsageSet, deferMap): - Initialize {items} and {futures} to empty lists. -- Let {index} be {0}. -- For each {resultItem} of {result}: +- Let {fieldDetails} be the first entry in {fieldDetailsList}. +- Let {field} be the corresponding entry on {fieldDetails}. +- If {field} provides the directive `@stream` and its {if} argument is not + {false} and is not a variable in {variableValues} with the value {false} and + {innerType} is the outermost inner type of the list type defined for + {fieldDetailsList}: + - Let {streamDirective} be that directive. + - If this execution is for a subscription operation, raise a _field error_. + - Let {initialCount} be the value or variable provided to {streamDirective}'s + {initialCount} argument. + - If {initialCount} is less than zero, raise a _field error_. + - Let {label} be the value or variable provided to {streamDirective}'s {label} + argument. +- Let {iterator} be an iterator for {result}. +- Let {index} be zero. +- While {result} is not closed: + - If {streamDirective} is defined and {index} is greater than or equal to + {initialCount}: + - Initialize {parents} to an empty list. + - For each {deferUsage} in {deferUsageSet}: + - Let {parent} be the entry in {deferMap} for {deferUsage}. + - Append {parent} to {parents}. + - Let {stream} be an unordered map containing {parents}, {path}, and + {label}. + - Let {streamFieldDetails} be the result of + {GetStreamFieldDetailsList(fieldDetailsList)}. + - Let {future} represent the future execution of {ExecuteStreamField(stream, + iterator, streamFieldDetailsList, index, innerType, variableValues, + eventEmitter)}. + - Defer the execution of {future} until {stream} is released as pending, as + signalled by {eventEmitter}, or if early execution is desired, following + any implementation specific deferral, whichever occurs first. + - Append {future} to {futures}. + - Return {items} and {futures}. + - Wait for the next item from {result} via the {iterator}. + - If an item is not retrieved because of an error, raise a _field error_. + - Let {item} be the item retrieved from {result}. - Let {itemPath} be {path} with {index} appended. - Let {completedItem} and {itemFutures} be the result of calling {CompleteValue(innerType, fieldDetailsList, item, variableValues, eventEmitter, itemPath)}. - Append {completedItem} to {items}. - Append all items in {itemFutures} to {futures}. - - Increment {index} by {1}. - Return {items} and {futures}. +GetStreamFieldDetailsList(fieldDetailsList): + +- Let {streamFields} be an empty list. +- For each {fieldDetails} in {fieldDetailsList}: + - Let {field} be the corresponding entry on {fieldDetails}. + - Let {newFieldDetails} be a new Field Details record created from {field}. + - Append {newFieldDetails} to {streamFields}. +- Return {streamFields}. + +#### Execute Stream Field + +ExecuteStreamField(stream, iterator, fieldDetailsList, index, innerType, +variableValues, eventEmitter): + +- Let {path} be the corresponding entry on {stream}. +- Let {itemPath} be {path} with {index} appended. +- Wait for the next item from {iterator}. +- If {iterator} is closed, complete this data stream and return. +- Let {item} be the next item retrieved via {iterator}. +- Let {nextIndex} be {index} plus one. +- Let {completedItem} and {futures} be the result of {CompleteValue(innerType, + fields, item, variableValues, itemPath)}. +- Initialize {items} to an empty list. +- Append {completedItem} to {items}. +- Let {errors} be the list of all _field error_ raised while completing the + item. +- Let {future} represent the future execution of {ExecuteStreamField(stream, + path, iterator, fieldDetailsList, nextIndex, innerType, variableValues, + eventEmitter)}. +- Defer the execution of {future} until {items} is released, or if early + execution is desired, following any implementation specific deferral, + whichever occurs first. +- Append {future} to {futures}. +- Return an unordered map containing {items}, {errors}, and {futures}. + **Coercing Results** The primary purpose of value completion is to ensure that the values returned by