-
Notifications
You must be signed in to change notification settings - Fork 255
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Make GraphProcessor trace optional #460
Open
AryamanAgrawalIronclad
wants to merge
1
commit into
main
Choose a base branch
from
aryamanagrawal/optional-traces
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
+84
−37
Open
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -187,6 +187,8 @@ export class GraphProcessor { | |
readonly #registry: NodeRegistration; | ||
id = nanoid(); | ||
|
||
#includeTrace?: boolean = true; | ||
|
||
executor?: 'nodejs' | 'browser'; | ||
|
||
/** If set, specifies the node(s) that the graph will run TO, instead of the nodes without any dependents. */ | ||
|
@@ -246,7 +248,7 @@ export class GraphProcessor { | |
return this.#running; | ||
} | ||
|
||
constructor(project: Project, graphId?: GraphId, registry?: NodeRegistration) { | ||
constructor(project: Project, graphId?: GraphId, registry?: NodeRegistration, includeTrace?: boolean) { | ||
this.#project = project; | ||
const graph = graphId | ||
? project.graphs[graphId] | ||
|
@@ -259,6 +261,7 @@ export class GraphProcessor { | |
} | ||
this.#graph = graph; | ||
|
||
this.#includeTrace = includeTrace; | ||
this.#nodeInstances = {}; | ||
this.#connections = {}; | ||
this.#nodesById = {}; | ||
|
@@ -747,7 +750,9 @@ export class GraphProcessor { | |
if (this.#hasPreloadedData) { | ||
for (const node of this.#graph.nodes) { | ||
if (this.#nodeResults.has(node.id)) { | ||
this.#emitter.emit('trace', `Node ${node.title} has preloaded data`); | ||
if(this.#includeTrace){ | ||
this.#emitter.emit('trace', `Node ${node.title} has preloaded data`); | ||
} | ||
|
||
await this.#emitter.emit('nodeStart', { | ||
node, | ||
|
@@ -869,10 +874,12 @@ export class GraphProcessor { | |
if (!inputsReady) { | ||
return; | ||
} | ||
this.#emitter.emit( | ||
'trace', | ||
`Node ${node.title} has required inputs nodes: ${inputNodes.map((n) => n.title).join(', ')}`, | ||
); | ||
if(this.#includeTrace){ | ||
this.#emitter.emit( | ||
'trace', | ||
`Node ${node.title} has required inputs nodes: ${inputNodes.map((n) => n.title).join(', ')}`, | ||
); | ||
} | ||
Comment on lines
+877
to
+882
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also we should consider adding a small util to wrap this so future calls to emitting trace don't miss the check |
||
|
||
const attachedData = this.#getAttachedDataTo(node); | ||
|
||
|
@@ -898,7 +905,9 @@ export class GraphProcessor { | |
this.#processingQueue.addAll( | ||
inputNodes.map((inputNode) => { | ||
return async () => { | ||
this.#emitter.emit('trace', `Fetching required data for node ${inputNode.title} (${inputNode.id})`); | ||
if(this.#includeTrace){ | ||
this.#emitter.emit('trace', `Fetching required data for node ${inputNode.title} (${inputNode.id})`); | ||
} | ||
await this.#fetchNodeDataAndProcessNode(inputNode); | ||
}; | ||
}), | ||
|
@@ -912,32 +921,42 @@ export class GraphProcessor { | |
const builtInNode = node as BuiltInNodes; | ||
|
||
if (this.#ignoreNodes.has(node.id)) { | ||
this.#emitter.emit('trace', `Node ${node.title} is ignored`); | ||
if(this.#includeTrace){ | ||
this.#emitter.emit('trace', `Node ${node.title} is ignored`); | ||
} | ||
return; | ||
} | ||
|
||
if (this.runToNodeIds) { | ||
const dependencyNodes = this.getDependencyNodesDeep(node.id); | ||
|
||
if (this.runToNodeIds.some((runTo) => runTo != node.id && dependencyNodes.includes(runTo))) { | ||
this.#emitter.emit('trace', `Node ${node.title} is excluded due to runToNodeIds`); | ||
if(this.#includeTrace){ | ||
this.#emitter.emit('trace', `Node ${node.title} is excluded due to runToNodeIds`); | ||
} | ||
return; | ||
} | ||
} | ||
|
||
if (this.#currentlyProcessing.has(node.id)) { | ||
this.#emitter.emit('trace', `Node ${node.title} is already being processed`); | ||
if(this.#includeTrace){ | ||
this.#emitter.emit('trace', `Node ${node.title} is already being processed`); | ||
} | ||
return; | ||
} | ||
|
||
// For a loop controller, it can run multiple times, otherwise we already processed this node so bail out | ||
if (this.#visitedNodes.has(node.id) && node.type !== 'loopController') { | ||
this.#emitter.emit('trace', `Node ${node.title} has already been processed`); | ||
if(this.#includeTrace){ | ||
this.#emitter.emit('trace', `Node ${node.title} has already been processed`); | ||
} | ||
return; | ||
} | ||
|
||
if (this.#erroredNodes.has(node.id)) { | ||
this.#emitter.emit('trace', `Node ${node.title} has already errored`); | ||
if(this.#includeTrace){ | ||
this.#emitter.emit('trace', `Node ${node.title} has already errored`); | ||
} | ||
return; | ||
} | ||
|
||
|
@@ -946,7 +965,9 @@ export class GraphProcessor { | |
// Check if all input nodes are free of errors | ||
for (const inputNode of inputNodes) { | ||
if (this.#erroredNodes.has(inputNode.id)) { | ||
this.#emitter.emit('trace', `Node ${node.title} has errored input node ${inputNode.title}`); | ||
if(this.#includeTrace){ | ||
this.#emitter.emit('trace', `Node ${node.title} has errored input node ${inputNode.title}`); | ||
} | ||
return; | ||
} | ||
} | ||
|
@@ -959,18 +980,22 @@ export class GraphProcessor { | |
}); | ||
|
||
if (!inputsReady) { | ||
await this.#emitter.emit( | ||
'trace', | ||
`Node ${node.title} has required inputs nodes: ${inputNodes.map((n) => n.title).join(', ')}`, | ||
); | ||
if(this.#includeTrace){ | ||
await this.#emitter.emit( | ||
'trace', | ||
`Node ${node.title} has required inputs nodes: ${inputNodes.map((n) => n.title).join(', ')}`, | ||
); | ||
} | ||
return; | ||
} | ||
|
||
// Excluded because control flow is still in a loop - difference between "will not execute" and "has not executed yet" | ||
const inputValues = this.#getInputValuesForNode(node); | ||
|
||
if (this.#excludedDueToControlFlow(node, inputValues, nanoid() as ProcessId, 'loop-not-broken')) { | ||
this.#emitter.emit('trace', `Node ${node.title} is excluded due to control flow`); | ||
if(this.#includeTrace){ | ||
this.#emitter.emit('trace', `Node ${node.title} is excluded due to control flow`); | ||
} | ||
return; | ||
} | ||
|
||
|
@@ -1001,7 +1026,9 @@ export class GraphProcessor { | |
} | ||
|
||
if (waitingForInputNode) { | ||
this.#emitter.emit('trace', `Node ${node.title} is waiting for input node ${waitingForInputNode}`); | ||
if(this.#includeTrace){ | ||
this.#emitter.emit('trace', `Node ${node.title} is waiting for input node ${waitingForInputNode}`); | ||
} | ||
return; | ||
} | ||
|
||
|
@@ -1018,7 +1045,9 @@ export class GraphProcessor { | |
} | ||
|
||
if (attachedData.races?.completed) { | ||
this.#emitter.emit('trace', `Node ${node.title} is part of a race that was completed`); | ||
if(this.#includeTrace){ | ||
this.#emitter.emit('trace', `Node ${node.title} is part of a race that was completed`); | ||
} | ||
return; | ||
} | ||
|
||
|
@@ -1027,8 +1056,9 @@ export class GraphProcessor { | |
if (this.slowMode) { | ||
await new Promise((resolve) => setTimeout(resolve, 250)); | ||
} | ||
|
||
this.#emitter.emit('trace', `Finished processing node ${node.title} (${node.id})`); | ||
if(this.#includeTrace){ | ||
this.#emitter.emit('trace', `Finished processing node ${node.title} (${node.id})`); | ||
} | ||
this.#visitedNodes.add(node.id); | ||
this.#currentlyProcessing.delete(node.id); | ||
this.#remainingNodes.delete(node.id); | ||
|
@@ -1046,10 +1076,14 @@ export class GraphProcessor { | |
this.#excludedDueToControlFlow(node, this.#getInputValuesForNode(node), nanoid() as ProcessId); | ||
|
||
if (!didBreak) { | ||
this.#emitter.emit('trace', `Loop controller ${node.title} did not break, so we're looping again`); | ||
if(this.#includeTrace){ | ||
this.#emitter.emit('trace', `Loop controller ${node.title} did not break, so we're looping again`); | ||
} | ||
for (const loopNodeId of attachedData.loopInfo?.nodes ?? []) { | ||
const cycleNode = this.#nodesById[loopNodeId]!; | ||
this.#emitter.emit('trace', `Clearing cycle node ${cycleNode.title} (${cycleNode.id})`); | ||
if(this.#includeTrace){ | ||
this.#emitter.emit('trace', `Clearing cycle node ${cycleNode.title} (${cycleNode.id})`); | ||
} | ||
this.#visitedNodes.delete(cycleNode.id); | ||
this.#currentlyProcessing.delete(cycleNode.id); | ||
this.#remainingNodes.add(cycleNode.id); | ||
|
@@ -1067,7 +1101,9 @@ export class GraphProcessor { | |
for (const [nodeId] of allNodesForRace) { | ||
for (const [key, abortController] of this.#nodeAbortControllers.entries()) { | ||
if (key.startsWith(nodeId)) { | ||
this.#emitter.emit('trace', `Aborting node ${nodeId} because other race branch won`); | ||
if(this.#includeTrace){ | ||
this.#emitter.emit('trace', `Aborting node ${nodeId} because other race branch won`); | ||
} | ||
abortController.abort(); | ||
} | ||
} | ||
|
@@ -1133,10 +1169,12 @@ export class GraphProcessor { | |
// Node is finished, check if we can run any more nodes that depend on this one | ||
this.#processingQueue.addAll( | ||
outputNodes.nodes.map((outputNode) => async () => { | ||
this.#emitter.emit( | ||
'trace', | ||
`Trying to run output node from ${node.title}: ${outputNode.title} (${outputNode.id})`, | ||
); | ||
if(this.#includeTrace){ | ||
this.#emitter.emit( | ||
'trace', | ||
`Trying to run output node from ${node.title}: ${outputNode.title} (${outputNode.id})`, | ||
); | ||
} | ||
|
||
await this.#processNodeIfAllInputsAvailable(outputNode); | ||
}), | ||
|
@@ -1385,7 +1423,9 @@ export class GraphProcessor { | |
#nodeErrored(node: ChartNode, e: unknown, processId: ProcessId) { | ||
const error = getError(e); | ||
this.#emitter.emit('nodeError', { node, error, processId }); | ||
this.#emitter.emit('trace', `Node ${node.title} (${node.id}-${processId}) errored: ${error.stack}`); | ||
if(this.#includeTrace){ | ||
this.#emitter.emit('trace', `Node ${node.title} (${node.id}-${processId}) errored: ${error.stack}`); | ||
} | ||
this.#erroredNodes.set(node.id, error.toString()); | ||
} | ||
|
||
|
@@ -1545,7 +1585,9 @@ export class GraphProcessor { | |
return processor; | ||
}, | ||
trace: (message) => { | ||
this.#emitter.emit('trace', message); | ||
if(this.#includeTrace){ | ||
this.#emitter.emit('trace', message); | ||
} | ||
}, | ||
abortGraph: (error) => { | ||
this.abort(error === undefined, error); | ||
|
@@ -1572,7 +1614,9 @@ export class GraphProcessor { | |
typeOfExclusion: ControlFlowExcludedDataValue['value'] = undefined, | ||
) { | ||
if (node.disabled) { | ||
this.#emitter.emit('trace', `Excluding node ${node.title} because it's disabled`); | ||
if(this.#includeTrace){ | ||
this.#emitter.emit('trace', `Excluding node ${node.title} because it's disabled`); | ||
} | ||
|
||
this.#visitedNodes.add(node.id); | ||
this.#markAsExcluded(node, processId, inputValues, 'disabled'); | ||
|
@@ -1606,10 +1650,12 @@ export class GraphProcessor { | |
if (inputIsExcludedValue && !allowedToConsumedExcludedValue) { | ||
if (!isWaitingForLoop) { | ||
if (inputIsExcludedValue) { | ||
this.#emitter.emit( | ||
'trace', | ||
`Excluding node ${node.title} because of control flow. Input is has excluded value: ${controlFlowExcludedValues[0]?.[0]}`, | ||
); | ||
if(this.#includeTrace){ | ||
this.#emitter.emit( | ||
'trace', | ||
`Excluding node ${node.title} because of control flow. Input is has excluded value: ${controlFlowExcludedValues[0]?.[0]}`, | ||
); | ||
} | ||
} | ||
|
||
this.#visitedNodes.add(node.id); | ||
|
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would be preferred for optional properties to be grouped into an options object so they can be specified in any order, but this class is exported out of core which technically makes it part of the public interface. While I think most users would use it through createGraphProcessor we can't update this without a breaking change, but we can at least note this for the future