From f8d3cf3be403db61f95002b06b3d490e47169aa5 Mon Sep 17 00:00:00 2001 From: Yaacov Rydzinski Date: Tue, 13 Feb 2024 21:48:05 +0200 Subject: [PATCH] Introduce `@stream` --- spec/Section 6 -- Execution.md | 101 ++++++++++++++++++++++++++++++--- 1 file changed, 94 insertions(+), 7 deletions(-) diff --git a/spec/Section 6 -- Execution.md b/spec/Section 6 -- Execution.md index 17a99dc88..8387d51dd 100644 --- a/spec/Section 6 -- Execution.md +++ b/spec/Section 6 -- Execution.md @@ -546,6 +546,22 @@ serial): - Let {hasNext} be {true}. - Let {update} be an unordered map consisting of {errors}, {data}, {pending}, and {hasNext}. + - Otherwise, if {result} incrementally completes a Stream: + - Let {stream}, {items}, {errors}, {newPendingResults}, and {futures} be + the corresponding entries on {result}. + - Let {id} be the entry on {ids} for {stream}. + - If {items} is not defined or is {null}: + - Let {completedEntry} be an unordered map containing {id}. + - If {items} is {null}, set the corresponding entry on {completedEntry} + to {errors}. + - Append {completedEntry} to {completed}. + - Remove {stream} from {pendingResults}. + - Let {update} be an unordered map consisting of {completed}. + - Otherwise: + - Let {incrementalEntry} be an unordered map containing {id}, {items}, + and {errors}. + - Append {incrementalEntry} to {incremental}. + - Let {update} be an unordered map consisting of {incremental}. - Otherwise, {result} incrementally completes Deferred Fragments: - Let {deferredFragments} be those Deferred Fragments. - If {data} is {null}: @@ -583,13 +599,19 @@ serial): - Append {completedEntry} to {completed}. - Remove {deferredFragment} from {pendingResults}. - For each {newPendingResult} in {newPendingResults}: - - Let {parent} be the corresponding entry on {newPendingResult}. - - Add {newPendingResult} to {pendingResults} as a new node directed from - {parent}, if it is also present in {pendingResults}. + - If {newPendingResult} represents a Stream: + - Add {newPendingResult} to {pendingResults}. + - Otherwise: + - Let {parent} be the corresponding entry on {newPendingResult}. + - Add {newPendingResult} to {pendingResults} as a new node directed from + {parent}, if it is also present in {pendingResults}. - For each {future} of {futures}: - - Let {deferredFragments} be the Deferred Fragments completed by {future}. - - Add {future} to {pendingResults} as a node directed from each of - {deferredFragments}. + - If {future} incrementally completes a Stream: + - If {future} has not been initiated, initiate {future}. + - Otherwise: + - Let {deferredFragments} be the Deferred Fragments completed by {future}. + - Add {future} to {pendingResults} as a node directed from each of + {deferredFragments}. - While any root nodes in {pendingResults} representing Deferred Fragments contain no direct child Futures, remove those root nodes from {pendingResults}. @@ -1026,8 +1048,37 @@ pendingResults, path, deferUsageSet, deferMap): - Initialize {newPendingResults} and {futures} to empty lists. - Let {fieldDetails} be the first entry in {fieldDetailsList}. - Let {field} be the corresponding entry on {fieldDetails}. +- If {field} provides the directive `@stream` and its {if} argument is not + {false} and is not a variable in {variableValues} with the value {false} and + {innerType} is the outermost inner type of the list type defined for + {fieldDetailsList}: + - Let {streamDirective} be that directive. + - If this execution is for a subscription operation, raise a _field error_. + - Let {initialCount} be the value or variable provided to {streamDirective}'s + {initialCount} argument. + - If {initialCount} is less than zero, raise a _field error_. + - Let {label} be the value or variable provided to {streamDirective}'s {label} + argument. +- Let {iterator} be an iterator for {result}. - Let {items} be an empty list. -- For each {resultItem} of {result}: +- Let {index} be zero. +- While {result} is not closed: + - If {streamDirective} is defined and {index} is greater than or equal to + {initialCount}: + - Let {stream} be an unordered map containing {path} and {label}. + - Let {streamFieldDetails} be the result of + {GetStreamFieldDetailsList(fieldDetailsList)}. + - Let {future} represent the future execution of {ExecuteStreamField(stream, + iterator, streamFieldDetailsList, index, innerType, variableValues, + pendingResults)}. + - If early execution is desired, following any implementation specific + deferral, whichever occurs first. + - Append {future} to {futures}. + - Return {items}, {newPendingResults}, and {futures}. + - Wait for the next item from {result} via the {iterator}. + - If an item is not retrieved because of an error, raise a _field error_. + - Let {item} be the item retrieved from {result}. + - Let {itemPath} be {path} with {index} appended. - Let {completedItem}, {itemNewPendingResults}, and {itemFutures} be the result of calling {CompleteValue(innerType, fieldDetailsList, item, variableValues, pendingResults, itemPath)}. @@ -1036,6 +1087,42 @@ pendingResults, path, deferUsageSet, deferMap): {newPendingResults}, and {futures}, respectively. - Return {items}, {newPendingResults}, and {futures}. +GetStreamFieldDetailsList(fieldDetailsList): + +- Let {streamFields} be an empty list. +- For each {fieldDetails} in {fieldDetailsList}: + - Let {field} be the corresponding entry on {fieldDetails}. + - Let {newFieldDetails} be a new Field Details record created from {field}. + - Append {newFieldDetails} to {streamFields}. +- Return {streamFields}. + +#### Execute Stream Field + +ExecuteStreamField(stream, iterator, fieldDetailsList, index, innerType, +variableValues, pendingResults): + +- Let {path} be the corresponding entry on {stream}. +- Let {itemPath} be {path} with {index} appended. +- Wait for the next item from {iterator}. +- If {iterator} is closed, complete this data stream and return. +- Let {item} be the next item retrieved via {iterator}. +- Let {nextIndex} be {index} plus one. +- Let {completedItem}, {newPendingResults}, and {futures} be the result of + {CompleteValue(innerType, fields, item, variableValues, itemPath)}. +- Initialize {items} to an empty list. +- Append {completedItem} to {items}. +- Let {errors} be the list of all _field error_ raised while completing the + item. +- Let {future} represent the future execution of {ExecuteStreamField(stream, + path, iterator, fieldDetailsList, nextIndex, innerType, variableValues, + pendingResults)}. +- If early execution of streamed fields is desired: + - Following any implementation specific deferral of further execution, + initiate {future}. +- Append {future} to {futures}. +- Return an unordered map containing {items}, {errors}, {newPendingResults}, and + {futures}. + **Coercing Results** The primary purpose of value completion is to ensure that the values returned by