Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use jsonlParser #614

Closed
wants to merge 12 commits into from
3 changes: 2 additions & 1 deletion taxonium_backend/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@
"express-queue": "^0.0.13",
"node-fetch": "^3.2.10",
"pako": "^2.0.4",
"taxonium_data_handling": "file:../taxonium_data_handling",
"readable-web-to-node-stream": "^3.0.2",
"stream-json": "^1.8.0",
"taxonium_data_handling": "file:../taxonium_data_handling",
"xml2js": "^0.4.23"
}
}
6 changes: 5 additions & 1 deletion taxonium_backend/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ var xml2js = require("xml2js");
var axios = require("axios");
var pako = require("pako");
const URL = require("url").URL;
var streamJson = require("stream-json");
var streamValues = require("stream-json/streamers/StreamValues").streamValues;
const ReadableWebToNodeStream = require("readable-web-to-node-stream");
const { execSync } = require("child_process");
const { Readable } = require("stream");
Expand Down Expand Up @@ -434,7 +436,9 @@ const loadData = async () => {
processedData = await importing.processJsonl(
supplied_object,
logStatusMessage,
ReadableWebToNodeStream.ReadableWebToNodeStream
ReadableWebToNodeStream.ReadableWebToNodeStream,
streamJson.parser,
streamValues
);

logStatusMessage({
Expand Down
12 changes: 12 additions & 0 deletions taxonium_backend/yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -502,6 +502,18 @@ [email protected]:
resolved "https://registry.npmjs.org/statuses/-/statuses-1.5.0.tgz"
integrity sha1-Fhx9rBd2Wf2YEfQ3cfqZOBR4Yow=

stream-chain@^2.2.5:
version "2.2.5"
resolved "https://registry.yarnpkg.com/stream-chain/-/stream-chain-2.2.5.tgz#b30967e8f14ee033c5b9a19bbe8a2cba90ba0d09"
integrity sha512-1TJmBx6aSWqZ4tx7aTpBDXK0/e2hhcNSTV8+CbFJtDjbb+I1mZ8lHit0Grw9GRT+6JbIrrDd8esncgBi8aBXGA==

stream-json@^1.8.0:
version "1.8.0"
resolved "https://registry.yarnpkg.com/stream-json/-/stream-json-1.8.0.tgz#53f486b2e3b4496c506131f8d7260ba42def151c"
integrity sha512-HZfXngYHUAr1exT4fxlbc1IOce1RYxp2ldeaf97LYCOPSoOqY/1Psp7iGvpb+6JIOgkra9zDYnPX01hGAHzEPw==
dependencies:
stream-chain "^2.2.5"

string_decoder@^1.1.1:
version "1.3.0"
resolved "https://registry.yarnpkg.com/string_decoder/-/string_decoder-1.3.0.tgz#42f114594a46cf1a8e30b0a84f56c78c3edac21e"
Expand Down
4 changes: 3 additions & 1 deletion taxonium_component/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@
"storybook": "storybook dev -p 6006",
"build-storybook": "storybook build"
},
"dependencies": {},
"dependencies": {
"stream-json": "^1.8.0"
},
"devDependencies": {
"@fontsource/roboto": "^5.0.1",
"@headlessui/react": "^1.7.17",
Expand Down
6 changes: 5 additions & 1 deletion taxonium_component/src/webworkers/localBackendWorker.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import {
import { processNewickAndMetadata } from "../utils/processNewick.js";
import { processNextstrain } from "../utils/processNextstrain.js";
import { ReadableWebToNodeStream } from "readable-web-to-node-stream";
import { parser } from "stream-json";
import { streamValues } from "stream-json/streamers/StreamValues";

console.log("worker starting");
postMessage({ data: "Worker starting" });
Expand Down Expand Up @@ -211,7 +213,9 @@ onmessage = async (event) => {
processedUploadedData = await processJsonl(
data.data,
sendStatusMessage,
ReadableWebToNodeStream
ReadableWebToNodeStream,
parser,
streamValues
);
console.log("processedUploadedData created");
} else if (
Expand Down
12 changes: 12 additions & 0 deletions taxonium_component/yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -10164,6 +10164,11 @@ stream-browserify@^3.0.0:
inherits "~2.0.4"
readable-stream "^3.5.0"

stream-chain@^2.2.5:
version "2.2.5"
resolved "https://registry.yarnpkg.com/stream-chain/-/stream-chain-2.2.5.tgz#b30967e8f14ee033c5b9a19bbe8a2cba90ba0d09"
integrity sha512-1TJmBx6aSWqZ4tx7aTpBDXK0/e2hhcNSTV8+CbFJtDjbb+I1mZ8lHit0Grw9GRT+6JbIrrDd8esncgBi8aBXGA==

stream-http@^3.2.0:
version "3.2.0"
resolved "https://registry.yarnpkg.com/stream-http/-/stream-http-3.2.0.tgz#1872dfcf24cb15752677e40e5c3f9cc1926028b5"
Expand All @@ -10174,6 +10179,13 @@ stream-http@^3.2.0:
readable-stream "^3.6.0"
xtend "^4.0.2"

stream-json@^1.8.0:
version "1.8.0"
resolved "https://registry.yarnpkg.com/stream-json/-/stream-json-1.8.0.tgz#53f486b2e3b4496c506131f8d7260ba42def151c"
integrity sha512-HZfXngYHUAr1exT4fxlbc1IOce1RYxp2ldeaf97LYCOPSoOqY/1Psp7iGvpb+6JIOgkra9zDYnPX01hGAHzEPw==
dependencies:
stream-chain "^2.2.5"

stream-shift@^1.0.0:
version "1.0.1"
resolved "https://registry.yarnpkg.com/stream-shift/-/stream-shift-1.0.1.tgz#d7088281559ab2778424279b0877da3c392d5a3d"
Expand Down
98 changes: 40 additions & 58 deletions taxonium_data_handling/importing.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,20 @@ function reduceMaxOrMin(array, accessFunction, maxOrMin) {
}
}

export const setUpStream = (the_stream, data, sendStatusMessage) => {
function processLine(line, line_number) {
export const setUpStream = (
the_stream,
data,
sendStatusMessage,
parser,
streamValues
) => {
const pipeline = the_stream
.pipe(parser({ jsonStreaming: true }))
.pipe(streamValues());

let line_number = 0;

pipeline.on("data", (decoded) => {
if ((line_number % 10000 === 0 && line_number > 0) || line_number == 500) {
console.log(`Processed ${formatNumber(line_number)} lines`);
if (data.header.total_nodes) {
Expand All @@ -45,90 +57,74 @@ export const setUpStream = (the_stream, data, sendStatusMessage) => {
});
}
}
// console.log("LINE",line_number,line);
const decoded = JSON.parse(line);

if (line_number === 0) {
data.header = decoded;
data.header = decoded.value;
data.nodes = [];
data.node_to_mut = {};
} else {
data.node_to_mut[decoded.node_id] = decoded.mutations; // this is an int to ints map
data.nodes.push(decoded);
}
}
let cur_line = "";
let line_counter = 0;
the_stream.on("data", function (data) {
cur_line += data.toString();
if (cur_line.includes("\n")) {
const lines = cur_line.split("\n");
cur_line = lines.pop();
lines.forEach((line) => {
processLine(line, line_counter);
line_counter++;
});
data.node_to_mut[decoded.value.node_id] = decoded.value.mutations;
data.nodes.push(decoded.value);
}

line_number++;
});

the_stream.on("error", function (err) {
pipeline.on("error", function (err) {
console.log(err);
sendStatusMessage({ error: `Stream error: ${err.message}` });
});

the_stream.on("end", function () {
console.log("end");
pipeline.on("end", function () {
console.log("Stream processing completed.");
sendStatusMessage({ message: "Stream processing completed." });
});
};

export const processJsonl = async (
jsonl,
sendStatusMessage,
ReadableWebToNodeStream
ReadableWebToNodeStream,
parser,
streamValues
) => {
console.log(
"Worker processJsonl" //, jsonl
);
console.log("Worker processJsonl");
const data = jsonl.data;
const status = jsonl.status;
let the_stream;
if (jsonl.filename.includes("gz")) {
// Create a stream
the_stream = zlib.createGunzip();
} else {
// create a fallback stream, and process the output, initially just logging it
the_stream = new stream.PassThrough();
}
let new_data = {};
setUpStream(the_stream, new_data, sendStatusMessage);
setUpStream(the_stream, new_data, sendStatusMessage, parser, streamValues);

if (status === "loaded") {
const dataAsArrayBuffer = data;
// In a Convert the arrayBuffer to a buffer in a series of chunks
let chunkSize = 5 * 1024 * 1024;
let chunkSize = 5 * 1024 * 1024; // 5 MB chunks
for (let i = 0; i < dataAsArrayBuffer.byteLength; i += chunkSize) {
const chunk = dataAsArrayBuffer.slice(i, i + chunkSize);
const chunkAsBuffer = buffer.Buffer.from(chunk);
// Pipe the chunkStream to the stream
the_stream.write(chunkAsBuffer);
}
console.log("Worker processJsonl", data);
the_stream.end();
} else if (status === "url_supplied") {
const url = jsonl.filename;
let response;
// Try fetch
console.log("STARTING FETCH");
console.log("Starting fetch from URL:", url);
try {
response = await fetch(url);
} catch (error) {
console.log("Fetch error", error);
sendStatusMessage({ error: `Fetch error: ${error}` });
return;
}
console.log("ALL FINE", response);
console.log("Fetch successful", response);
sendStatusMessage({ message: "Loading root genome" });

const readableWebStream = response.body;

const nodeStream = new ReadableWebToNodeStream(readableWebStream);
nodeStream.pipe(the_stream);
} else if (status === "stream_supplied") {
Expand All @@ -143,23 +139,21 @@ export const processJsonl = async (
the_stream.on("end", resolve);
the_stream.on("error", reject);
});
console.log("done with stream");
console.log("Done with stream");

const scale_y =
24e2 /
(new_data.nodes.length > 10e3
2400 /
(new_data.nodes.length > 10000
? new_data.nodes.length
: new_data.nodes.length * 0.6666);
console.log("Scaling");
console.log("Scaling y positions");
for (const node of new_data.nodes) {
// numerically round to the nearest 0.1

node.y = roundToDp(node.y * scale_y, 6);
}
console.log("Calculating y positions");
const y_positions = new_data.nodes.map((node) => node.y);

console.log("Calculating coord extremes");
console.log("Calculating coordinate extremes");

const overallMaxY = reduceMaxOrMin(new_data.nodes, (node) => node.y, "max");
const overallMinY = reduceMaxOrMin(new_data.nodes, (node) => node.y, "min");
Expand All @@ -178,7 +172,7 @@ export const processJsonl = async (
const rootMutations = root.mutations;
root.mutations = [];

console.log("Creating output obj");
console.log("Creating output object");

const overwrite_config = new_data.header.config ? new_data.header.config : {};
overwrite_config.num_tips = root.num_tips;
Expand Down Expand Up @@ -243,20 +237,11 @@ export const generateConfig = (config, processedUploadedData) => {
? ["x_dist"]
: ["x_time"];

config.keys_to_display = Object.keys(processedUploadedData.nodes[0]).filter(
config.keys_to_display = Object.keys(firstNode).filter(
(x) => !to_remove.includes(x)
);

/*config.search_types = [
{ name: "name", label: "Name", type: "text_match" },
{ name: "meta_Lineage", label: "PANGO lineage", type: "text_exact" },
{ name: "meta_Country", label: "Country", type: "text_match" },
{ name: "mutation", label: "Mutation", type: "mutation" },
{ name: "revertant", label: "Revertant", type: "revertant" },
{ name: "genbank", label: "Genbank", type: "text_per_line" },
];*/
const prettyName = (x) => {
// if x starts with meta_
if (x.startsWith("meta_")) {
const bit = x.substring(5);
const capitalised_first_letter =
Expand Down Expand Up @@ -318,7 +303,6 @@ export const generateConfig = (config, processedUploadedData) => {
}));

config.search_types.forEach((x) => {
// if "text" is found in the type
if (x.type.includes("text")) {
x.controls = true;
}
Expand All @@ -336,8 +320,6 @@ export const generateConfig = (config, processedUploadedData) => {

config.colorBy = { colorByOptions };

//check if 'meta_pangolin_lineage' is in options

config.defaultColorByField = colorByOptions.includes("meta_pangolin_lineage")
? "meta_pangolin_lineage"
: colorByOptions[0];
Expand Down
1 change: 1 addition & 0 deletions taxonium_electron/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
},
"dependencies": {
"electron-squirrel-startup": "^1.0.0",
"stream-json": "^1.8.0",
"taxonium_backend": "file:../taxonium_backend"
},
"config": {
Expand Down
15 changes: 14 additions & 1 deletion taxonium_electron/yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -3506,6 +3506,18 @@ stream-buffers@~2.2.0:
resolved "https://registry.yarnpkg.com/stream-buffers/-/stream-buffers-2.2.0.tgz#91d5f5130d1cef96dcfa7f726945188741d09ee4"
integrity sha512-uyQK/mx5QjHun80FLJTfaWE7JtwfRMKBLkMne6udYOmvH0CawotVa7TfgYHzAnpphn4+TweIx1QKMnRIbipmUg==

stream-chain@^2.2.5:
version "2.2.5"
resolved "https://registry.yarnpkg.com/stream-chain/-/stream-chain-2.2.5.tgz#b30967e8f14ee033c5b9a19bbe8a2cba90ba0d09"
integrity sha512-1TJmBx6aSWqZ4tx7aTpBDXK0/e2hhcNSTV8+CbFJtDjbb+I1mZ8lHit0Grw9GRT+6JbIrrDd8esncgBi8aBXGA==

stream-json@^1.8.0:
version "1.8.0"
resolved "https://registry.yarnpkg.com/stream-json/-/stream-json-1.8.0.tgz#53f486b2e3b4496c506131f8d7260ba42def151c"
integrity sha512-HZfXngYHUAr1exT4fxlbc1IOce1RYxp2ldeaf97LYCOPSoOqY/1Psp7iGvpb+6JIOgkra9zDYnPX01hGAHzEPw==
dependencies:
stream-chain "^2.2.5"

"string-width@^1.0.2 || 2 || 3 || 4", string-width@^4.1.0, string-width@^4.2.0, string-width@^4.2.3:
version "4.2.3"
resolved "https://registry.npmjs.org/string-width/-/string-width-4.2.3.tgz"
Expand Down Expand Up @@ -3594,7 +3606,8 @@ tar@^6.0.5, tar@^6.1.11, tar@^6.1.2:
express-queue "^0.0.13"
node-fetch "^3.2.10"
pako "^2.0.4"
taxonium_data_handling "file:../../../Library/Caches/Yarn/v6/npm-taxonium-backend-1.0.0-dee063ce-02ad-484a-9869-43d898d92144-1665540516221/node_modules/taxonium_data_handling"
readable-web-to-node-stream "^3.0.2"
taxonium_data_handling "file:../../Library/Caches/Yarn/v6/npm-taxonium-backend-1.0.0-54dbd3ec-85d3-4058-a62e-263bea13e9a0-1727736806855/node_modules/taxonium_data_handling"
xml2js "^0.4.23"

"taxonium_data_handling@file:../taxonium_data_handling":
Expand Down
Loading