Skip to content

Commit

Permalink
Streaming bug fixes and renames etc
Browse files Browse the repository at this point in the history
  • Loading branch information
abrenneke committed Oct 20, 2023
1 parent 5ffe0e8 commit 2ffc9a3
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 18 deletions.
10 changes: 8 additions & 2 deletions packages/node/src/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,12 @@ import type { PascalCase } from 'type-fest';
import { NodeNativeApi } from './native/NodeNativeApi.js';
import { mapValues } from 'lodash-es';
import * as events from 'node:events';
import { getProcessorEventStream, getProcessorEvents, type RivetEventStreamFilterSpec } from './streaming.js';
import {
getProcessorSSEStream,
getProcessorEvents,
type RivetEventStreamFilterSpec,
getSingleNodeStream,
} from './streaming.js';

export async function loadProjectFromFile(path: string): Promise<Project> {
const content = await readFile(path, { encoding: 'utf8' });
Expand Down Expand Up @@ -200,7 +205,8 @@ export function createProcessor(project: Project, options: RunGraphOptions) {
inputs: resolvedInputs,
contextValues: resolvedContextValues,
getEvents: (spec: RivetEventStreamFilterSpec) => getProcessorEvents(processor, spec),
getEventStream: (spec: RivetEventStreamFilterSpec) => getProcessorEventStream(processor, spec),
getSSEStream: (spec: RivetEventStreamFilterSpec) => getProcessorSSEStream(processor, spec),
streamNode: (nodeIdOrTitle: string) => getSingleNodeStream(processor, nodeIdOrTitle),
async run() {
const outputs = await processor.processGraph(
{
Expand Down
1 change: 1 addition & 0 deletions packages/node/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ export * from '@ironclad/rivet-core';

export * from './native/NodeNativeApi.js';
export * from './api.js';
export * from './streaming.js';
export * from './debugger.js';
export * from './native/NodeDatasetProvider.js';
export * from './native/DebuggerDatasetProvider.js';
55 changes: 40 additions & 15 deletions packages/node/src/streaming.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,28 +67,27 @@ export async function* getProcessorEvents(
spec: RivetEventStreamFilterSpec,
): AsyncGenerator<RivetEventStreamEventInfo, void> {
const previousIndexes = new Map<NodeId, number>();

for await (const event of processor.events()) {
if (event.type === 'partialOutput') {
if (
spec.partialOutputs === true ||
!spec.partialOutputs?.includes(event.node.id) ||
!spec.partialOutputs?.includes(event.node.title)
spec.partialOutputs?.includes(event.node.id) ||
spec.partialOutputs?.includes(event.node.title)
) {
return;
}

const currentOutput = coerceType(event.outputs['response' as PortId], 'string');
const currentOutput = coerceType(event.outputs['response' as PortId], 'string');

const delta = currentOutput.slice(previousIndexes.get(event.node.id) ?? 0);
const delta = currentOutput.slice(previousIndexes.get(event.node.id) ?? 0);

yield {
type: 'partialOutput',
nodeId: event.node.id,
nodeTitle: event.node.title,
delta,
};
yield {
type: 'partialOutput',
nodeId: event.node.id,
nodeTitle: event.node.title,
delta,
};

previousIndexes.set(event.node.id, currentOutput.length);
previousIndexes.set(event.node.id, currentOutput.length);
}
} else if (event.type === 'done') {
if (spec.done) {
yield {
Expand Down Expand Up @@ -140,7 +139,7 @@ export async function* getProcessorEvents(
* Includes configuration for what events to send to the client, for example you can stream the partial output deltas
* for specific nodes, and/or the graph output when done.
*/
export function getProcessorEventStream(
export function getProcessorSSEStream(
processor: GraphProcessor,

/** The spec for what you're streaming to the client */
Expand Down Expand Up @@ -170,3 +169,29 @@ export function getProcessorEventStream(
},
});
}

export function getSingleNodeStream(processor: GraphProcessor, nodeIdOrTitle: string) {
return new ReadableStream<string>({
async start(controller) {
try {
for await (const event of getProcessorEvents(processor, {
partialOutputs: [nodeIdOrTitle],
nodeFinish: [nodeIdOrTitle],
})) {
if (event.type === 'partialOutput' && (event.nodeId === nodeIdOrTitle || event.nodeTitle === nodeIdOrTitle)) {
controller.enqueue(`data: ${JSON.stringify(event.delta)}\n\n`);
} else if (
event.type === 'nodeFinish' &&
(event.nodeId === nodeIdOrTitle || event.nodeTitle === nodeIdOrTitle)
) {
controller.close();
}
}

controller.close();
} catch (err) {
controller.error(err);
}
},
});
}
2 changes: 1 addition & 1 deletion packages/node/test/api.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ describe('api', () => {
processor.run();

const reader = processor
.getEventStream({
.getSSEStream({
nodeFinish: true,
})
.getReader();
Expand Down

0 comments on commit 2ffc9a3

Please sign in to comment.