Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Split stream and process mutations with streaming JSON parsing #619

Merged
merged 8 commits into from
Oct 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions taxonium_component/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
},
"dependencies": {},
"devDependencies": {
"stream-json": "^1.8.0",
"@fontsource/roboto": "^5.0.1",
"@headlessui/react": "^1.7.17",
"@jbrowse/core": "^2.5.0",
Expand Down
7 changes: 6 additions & 1 deletion taxonium_component/src/webworkers/localBackendWorker.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import {
import { processNewickAndMetadata } from "../utils/processNewick.js";
import { processNextstrain } from "../utils/processNextstrain.js";
import { ReadableWebToNodeStream } from "readable-web-to-node-stream";
import { parser } from "stream-json";
import { streamValues } from "stream-json/streamers/StreamValues";

console.log("worker starting");
postMessage({ data: "Worker starting" });
Expand Down Expand Up @@ -211,8 +213,11 @@ onmessage = async (event) => {
processedUploadedData = await processJsonl(
data.data,
sendStatusMessage,
ReadableWebToNodeStream
ReadableWebToNodeStream,
parser,
streamValues
);

console.log("processedUploadedData created");
} else if (
data.type === "upload" &&
Expand Down
12 changes: 12 additions & 0 deletions taxonium_component/yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -10164,6 +10164,11 @@ stream-browserify@^3.0.0:
inherits "~2.0.4"
readable-stream "^3.5.0"

stream-chain@^2.2.5:
version "2.2.5"
resolved "https://registry.yarnpkg.com/stream-chain/-/stream-chain-2.2.5.tgz#b30967e8f14ee033c5b9a19bbe8a2cba90ba0d09"
integrity sha512-1TJmBx6aSWqZ4tx7aTpBDXK0/e2hhcNSTV8+CbFJtDjbb+I1mZ8lHit0Grw9GRT+6JbIrrDd8esncgBi8aBXGA==

stream-http@^3.2.0:
version "3.2.0"
resolved "https://registry.yarnpkg.com/stream-http/-/stream-http-3.2.0.tgz#1872dfcf24cb15752677e40e5c3f9cc1926028b5"
Expand All @@ -10174,6 +10179,13 @@ stream-http@^3.2.0:
readable-stream "^3.6.0"
xtend "^4.0.2"

stream-json@^1.8.0:
version "1.8.0"
resolved "https://registry.yarnpkg.com/stream-json/-/stream-json-1.8.0.tgz#53f486b2e3b4496c506131f8d7260ba42def151c"
integrity sha512-HZfXngYHUAr1exT4fxlbc1IOce1RYxp2ldeaf97LYCOPSoOqY/1Psp7iGvpb+6JIOgkra9zDYnPX01hGAHzEPw==
dependencies:
stream-chain "^2.2.5"

stream-shift@^1.0.0:
version "1.0.1"
resolved "https://registry.yarnpkg.com/stream-shift/-/stream-shift-1.0.1.tgz#d7088281559ab2778424279b0877da3c392d5a3d"
Expand Down
185 changes: 153 additions & 32 deletions taxonium_data_handling/importing.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,91 @@
import zlib from "zlib";
import stream from "stream";
import buffer from "buffer";
import { send } from "process";

class ChunkCounterStream extends stream.PassThrough {
constructor(sendStatusMessage, options = {}) {
super(options);
this.sendStatusMessage = sendStatusMessage;
this.chunkCount = 0;
}

_transform(chunk, encoding, callback) {
this.chunkCount++;
if (this.chunkCount % 100 === 0) {
this.sendStatusMessage({
message: `Processed ${this.chunkCount} groups of mutations`,
count: this.chunkCount,
});
}

// Pass the chunk through unchanged
this.push(chunk);
callback();
}

_flush(callback) {
this.sendStatusMessage({
message: `Finished processing. Total chunks: ${this.chunkCount}`,
count: this.chunkCount,
finished: true,
});
callback();
}
}

class StreamSplitter extends stream.Transform {
constructor(headerParser, dataParser, options = {}) {
super(options);
this.headerParser = headerParser;
this.dataParser = dataParser;
this.firstPart = true;
this.buffer = null; // Buffer to hold partial data
}

_transform(chunk, encoding, callback) {
let data = chunk;
let newlineIndex = data.indexOf(10); // ASCII code for '\n'

if (this.firstPart) {
if (newlineIndex !== -1) {
// Found newline, split the data
const headerData = data.slice(0, newlineIndex);
const restData = data.slice(newlineIndex + 1);

// Write header data to headerParser
this.headerParser.write(headerData);
this.headerParser.end();

// Write restData to dataParser
if (restData.length > 0) {
this.dataParser.write(restData);
}

this.firstPart = false;
} else {
// No newline found, store data in buffer
this.headerParser.write(data);
}
} else {
// After header is processed, pass data to dataParser
this.dataParser.write(data);
}

callback();
}

_flush(callback) {
if (this.firstPart && this.buffer) {
// No newline found in the entire stream, treat entire data as header
this.headerParser.write(this.buffer);
this.headerParser.end();
this.firstPart = false;
}
this.dataParser.end();
callback();
}
}

const roundToDp = (number, dp) => {
return Math.round(number * Math.pow(10, dp)) / Math.pow(10, dp);
Expand Down Expand Up @@ -28,8 +113,57 @@ function reduceMaxOrMin(array, accessFunction, maxOrMin) {
}
}

export const setUpStream = (the_stream, data, sendStatusMessage) => {
export const setUpStream = (
the_stream,
data,
sendStatusMessage,
parser,
streamValues
) => {
// Header parser
const headerParser = parser({ jsonStreaming: true });
const headerPipeline = headerParser.pipe(streamValues());
headerPipeline.on("data", (chunk) => {
data.header = chunk.value;
data.nodes = [];
data.node_to_mut = {};
});
headerPipeline.on("error", (err) => {
console.error("Header parser error:", err);
});

// Data parser for the rest of the stream
let lineBuffer = "";
let line_number = 0;
const dataParser = new stream.Writable({
write(chunk, encoding, callback) {
const chunkStr = chunk.toString();
let start = 0;
let end = chunkStr.indexOf("\n");

while (end !== -1) {
lineBuffer += chunkStr.slice(start, end);
processLine(lineBuffer, line_number);
line_number++;
lineBuffer = "";
start = end + 1;
end = chunkStr.indexOf("\n", start);
}

lineBuffer += chunkStr.slice(start);
callback();
},
final(callback) {
if (lineBuffer) {
processLine(lineBuffer, line_number);
}
callback();
},
});

function processLine(line, line_number) {
if (line.trim() === "") return;

if ((line_number % 10000 === 0 && line_number > 0) || line_number == 500) {
console.log(`Processed ${formatNumber(line_number)} lines`);
if (data.header.total_nodes) {
Expand All @@ -45,44 +179,31 @@ export const setUpStream = (the_stream, data, sendStatusMessage) => {
});
}
}
// console.log("LINE",line_number,line);
const decoded = JSON.parse(line);
if (line_number === 0) {
data.header = decoded;
data.nodes = [];
data.node_to_mut = {};
} else {
data.node_to_mut[decoded.node_id] = decoded.mutations; // this is an int to ints map
data.nodes.push(decoded);
}
data.node_to_mut[decoded.node_id] = decoded.mutations;
data.nodes.push(decoded);
}
let cur_line = "";
let line_counter = 0;
the_stream.on("data", function (data) {
cur_line += data.toString();
if (cur_line.includes("\n")) {
const lines = cur_line.split("\n");
cur_line = lines.pop();
lines.forEach((line) => {
processLine(line, line_counter);
line_counter++;
});
}
});

the_stream.on("error", function (err) {
console.log(err);
});

the_stream.on("end", function () {
console.log("end");
const chunkCounterStream = new ChunkCounterStream(sendStatusMessage);
chunkCounterStream.pipe(headerParser);
const splitter = new StreamSplitter(chunkCounterStream, dataParser);

// Pipe the input stream through the splitter
the_stream
.pipe(splitter)
.on("error", (err) => console.error("Splitter error:", err));

// Handle the completion of the dataParser
dataParser.on("finish", () => {
console.log("Finished processing the stream");
});
};

export const processJsonl = async (
jsonl,
sendStatusMessage,
ReadableWebToNodeStream
ReadableWebToNodeStream,
parser,
streamValues
) => {
console.log(
"Worker processJsonl" //, jsonl
Expand All @@ -98,7 +219,7 @@ export const processJsonl = async (
the_stream = new stream.PassThrough();
}
let new_data = {};
setUpStream(the_stream, new_data, sendStatusMessage);
setUpStream(the_stream, new_data, sendStatusMessage, parser, streamValues);

if (status === "loaded") {
const dataAsArrayBuffer = data;
Expand Down
Loading