From 5f2c099172bf79c3c2259764bd0949f6baa039d2 Mon Sep 17 00:00:00 2001 From: Aryaman Agrawal Date: Tue, 3 Dec 2024 14:29:27 -0800 Subject: [PATCH] make traces options in GraphProcessor --- packages/core/src/api/createProcessor.ts | 3 +- packages/core/src/model/GraphProcessor.ts | 118 +++++++++++++++------- 2 files changed, 84 insertions(+), 37 deletions(-) diff --git a/packages/core/src/api/createProcessor.ts b/packages/core/src/api/createProcessor.ts index b44ce2d18..7cf329a53 100644 --- a/packages/core/src/api/createProcessor.ts +++ b/packages/core/src/api/createProcessor.ts @@ -37,6 +37,7 @@ export type RunGraphOptions = { }; abortSignal?: AbortSignal; registry?: NodeRegistration; + includeTrace?: boolean; getChatNodeEndpoint?: ProcessContext['getChatNodeEndpoint']; } & { [P in keyof ProcessEvents as `on${PascalCase

}`]?: (params: ProcessEvents[P]) => void; @@ -75,7 +76,7 @@ export function coreCreateProcessor(project: Project, options: RunGraphOptions) throw new Error(`Graph not found, and no main graph specified.`); } - const processor = new GraphProcessor(project, graphId as GraphId, options.registry); + const processor = new GraphProcessor(project, graphId as GraphId, options.registry, options.includeTrace); if (options.onStart) { processor.on('start', options.onStart); diff --git a/packages/core/src/model/GraphProcessor.ts b/packages/core/src/model/GraphProcessor.ts index 6fa6fe604..1a03f7bde 100644 --- a/packages/core/src/model/GraphProcessor.ts +++ b/packages/core/src/model/GraphProcessor.ts @@ -187,6 +187,8 @@ export class GraphProcessor { readonly #registry: NodeRegistration; id = nanoid(); + #includeTrace?: boolean = true; + executor?: 'nodejs' | 'browser'; /** If set, specifies the node(s) that the graph will run TO, instead of the nodes without any dependents. */ @@ -246,7 +248,7 @@ export class GraphProcessor { return this.#running; } - constructor(project: Project, graphId?: GraphId, registry?: NodeRegistration) { + constructor(project: Project, graphId?: GraphId, registry?: NodeRegistration, includeTrace?: boolean) { this.#project = project; const graph = graphId ? project.graphs[graphId] @@ -259,6 +261,7 @@ export class GraphProcessor { } this.#graph = graph; + this.#includeTrace = includeTrace; this.#nodeInstances = {}; this.#connections = {}; this.#nodesById = {}; @@ -747,7 +750,9 @@ export class GraphProcessor { if (this.#hasPreloadedData) { for (const node of this.#graph.nodes) { if (this.#nodeResults.has(node.id)) { - this.#emitter.emit('trace', `Node ${node.title} has preloaded data`); + if(this.#includeTrace){ + this.#emitter.emit('trace', `Node ${node.title} has preloaded data`); + } await this.#emitter.emit('nodeStart', { node, @@ -869,10 +874,12 @@ export class GraphProcessor { if (!inputsReady) { return; } - this.#emitter.emit( - 'trace', - `Node ${node.title} has required inputs nodes: ${inputNodes.map((n) => n.title).join(', ')}`, - ); + if(this.#includeTrace){ + this.#emitter.emit( + 'trace', + `Node ${node.title} has required inputs nodes: ${inputNodes.map((n) => n.title).join(', ')}`, + ); + } const attachedData = this.#getAttachedDataTo(node); @@ -898,7 +905,9 @@ export class GraphProcessor { this.#processingQueue.addAll( inputNodes.map((inputNode) => { return async () => { - this.#emitter.emit('trace', `Fetching required data for node ${inputNode.title} (${inputNode.id})`); + if(this.#includeTrace){ + this.#emitter.emit('trace', `Fetching required data for node ${inputNode.title} (${inputNode.id})`); + } await this.#fetchNodeDataAndProcessNode(inputNode); }; }), @@ -912,7 +921,9 @@ export class GraphProcessor { const builtInNode = node as BuiltInNodes; if (this.#ignoreNodes.has(node.id)) { - this.#emitter.emit('trace', `Node ${node.title} is ignored`); + if(this.#includeTrace){ + this.#emitter.emit('trace', `Node ${node.title} is ignored`); + } return; } @@ -920,24 +931,32 @@ export class GraphProcessor { const dependencyNodes = this.getDependencyNodesDeep(node.id); if (this.runToNodeIds.some((runTo) => runTo != node.id && dependencyNodes.includes(runTo))) { - this.#emitter.emit('trace', `Node ${node.title} is excluded due to runToNodeIds`); + if(this.#includeTrace){ + this.#emitter.emit('trace', `Node ${node.title} is excluded due to runToNodeIds`); + } return; } } if (this.#currentlyProcessing.has(node.id)) { - this.#emitter.emit('trace', `Node ${node.title} is already being processed`); + if(this.#includeTrace){ + this.#emitter.emit('trace', `Node ${node.title} is already being processed`); + } return; } // For a loop controller, it can run multiple times, otherwise we already processed this node so bail out if (this.#visitedNodes.has(node.id) && node.type !== 'loopController') { - this.#emitter.emit('trace', `Node ${node.title} has already been processed`); + if(this.#includeTrace){ + this.#emitter.emit('trace', `Node ${node.title} has already been processed`); + } return; } if (this.#erroredNodes.has(node.id)) { - this.#emitter.emit('trace', `Node ${node.title} has already errored`); + if(this.#includeTrace){ + this.#emitter.emit('trace', `Node ${node.title} has already errored`); + } return; } @@ -946,7 +965,9 @@ export class GraphProcessor { // Check if all input nodes are free of errors for (const inputNode of inputNodes) { if (this.#erroredNodes.has(inputNode.id)) { - this.#emitter.emit('trace', `Node ${node.title} has errored input node ${inputNode.title}`); + if(this.#includeTrace){ + this.#emitter.emit('trace', `Node ${node.title} has errored input node ${inputNode.title}`); + } return; } } @@ -959,10 +980,12 @@ export class GraphProcessor { }); if (!inputsReady) { - await this.#emitter.emit( - 'trace', - `Node ${node.title} has required inputs nodes: ${inputNodes.map((n) => n.title).join(', ')}`, - ); + if(this.#includeTrace){ + await this.#emitter.emit( + 'trace', + `Node ${node.title} has required inputs nodes: ${inputNodes.map((n) => n.title).join(', ')}`, + ); + } return; } @@ -970,7 +993,9 @@ export class GraphProcessor { const inputValues = this.#getInputValuesForNode(node); if (this.#excludedDueToControlFlow(node, inputValues, nanoid() as ProcessId, 'loop-not-broken')) { - this.#emitter.emit('trace', `Node ${node.title} is excluded due to control flow`); + if(this.#includeTrace){ + this.#emitter.emit('trace', `Node ${node.title} is excluded due to control flow`); + } return; } @@ -1001,7 +1026,9 @@ export class GraphProcessor { } if (waitingForInputNode) { - this.#emitter.emit('trace', `Node ${node.title} is waiting for input node ${waitingForInputNode}`); + if(this.#includeTrace){ + this.#emitter.emit('trace', `Node ${node.title} is waiting for input node ${waitingForInputNode}`); + } return; } @@ -1018,7 +1045,9 @@ export class GraphProcessor { } if (attachedData.races?.completed) { - this.#emitter.emit('trace', `Node ${node.title} is part of a race that was completed`); + if(this.#includeTrace){ + this.#emitter.emit('trace', `Node ${node.title} is part of a race that was completed`); + } return; } @@ -1027,8 +1056,9 @@ export class GraphProcessor { if (this.slowMode) { await new Promise((resolve) => setTimeout(resolve, 250)); } - - this.#emitter.emit('trace', `Finished processing node ${node.title} (${node.id})`); + if(this.#includeTrace){ + this.#emitter.emit('trace', `Finished processing node ${node.title} (${node.id})`); + } this.#visitedNodes.add(node.id); this.#currentlyProcessing.delete(node.id); this.#remainingNodes.delete(node.id); @@ -1046,10 +1076,14 @@ export class GraphProcessor { this.#excludedDueToControlFlow(node, this.#getInputValuesForNode(node), nanoid() as ProcessId); if (!didBreak) { - this.#emitter.emit('trace', `Loop controller ${node.title} did not break, so we're looping again`); + if(this.#includeTrace){ + this.#emitter.emit('trace', `Loop controller ${node.title} did not break, so we're looping again`); + } for (const loopNodeId of attachedData.loopInfo?.nodes ?? []) { const cycleNode = this.#nodesById[loopNodeId]!; - this.#emitter.emit('trace', `Clearing cycle node ${cycleNode.title} (${cycleNode.id})`); + if(this.#includeTrace){ + this.#emitter.emit('trace', `Clearing cycle node ${cycleNode.title} (${cycleNode.id})`); + } this.#visitedNodes.delete(cycleNode.id); this.#currentlyProcessing.delete(cycleNode.id); this.#remainingNodes.add(cycleNode.id); @@ -1067,7 +1101,9 @@ export class GraphProcessor { for (const [nodeId] of allNodesForRace) { for (const [key, abortController] of this.#nodeAbortControllers.entries()) { if (key.startsWith(nodeId)) { - this.#emitter.emit('trace', `Aborting node ${nodeId} because other race branch won`); + if(this.#includeTrace){ + this.#emitter.emit('trace', `Aborting node ${nodeId} because other race branch won`); + } abortController.abort(); } } @@ -1133,10 +1169,12 @@ export class GraphProcessor { // Node is finished, check if we can run any more nodes that depend on this one this.#processingQueue.addAll( outputNodes.nodes.map((outputNode) => async () => { - this.#emitter.emit( - 'trace', - `Trying to run output node from ${node.title}: ${outputNode.title} (${outputNode.id})`, - ); + if(this.#includeTrace){ + this.#emitter.emit( + 'trace', + `Trying to run output node from ${node.title}: ${outputNode.title} (${outputNode.id})`, + ); + } await this.#processNodeIfAllInputsAvailable(outputNode); }), @@ -1385,7 +1423,9 @@ export class GraphProcessor { #nodeErrored(node: ChartNode, e: unknown, processId: ProcessId) { const error = getError(e); this.#emitter.emit('nodeError', { node, error, processId }); - this.#emitter.emit('trace', `Node ${node.title} (${node.id}-${processId}) errored: ${error.stack}`); + if(this.#includeTrace){ + this.#emitter.emit('trace', `Node ${node.title} (${node.id}-${processId}) errored: ${error.stack}`); + } this.#erroredNodes.set(node.id, error.toString()); } @@ -1545,7 +1585,9 @@ export class GraphProcessor { return processor; }, trace: (message) => { - this.#emitter.emit('trace', message); + if(this.#includeTrace){ + this.#emitter.emit('trace', message); + } }, abortGraph: (error) => { this.abort(error === undefined, error); @@ -1572,7 +1614,9 @@ export class GraphProcessor { typeOfExclusion: ControlFlowExcludedDataValue['value'] = undefined, ) { if (node.disabled) { - this.#emitter.emit('trace', `Excluding node ${node.title} because it's disabled`); + if(this.#includeTrace){ + this.#emitter.emit('trace', `Excluding node ${node.title} because it's disabled`); + } this.#visitedNodes.add(node.id); this.#markAsExcluded(node, processId, inputValues, 'disabled'); @@ -1606,10 +1650,12 @@ export class GraphProcessor { if (inputIsExcludedValue && !allowedToConsumedExcludedValue) { if (!isWaitingForLoop) { if (inputIsExcludedValue) { - this.#emitter.emit( - 'trace', - `Excluding node ${node.title} because of control flow. Input is has excluded value: ${controlFlowExcludedValues[0]?.[0]}`, - ); + if(this.#includeTrace){ + this.#emitter.emit( + 'trace', + `Excluding node ${node.title} because of control flow. Input is has excluded value: ${controlFlowExcludedValues[0]?.[0]}`, + ); + } } this.#visitedNodes.add(node.id);