diff --git a/taxonium_backend/package.json b/taxonium_backend/package.json index 4f021ee7..89dc9c07 100644 --- a/taxonium_backend/package.json +++ b/taxonium_backend/package.json @@ -19,8 +19,9 @@ "express-queue": "^0.0.13", "node-fetch": "^3.2.10", "pako": "^2.0.4", - "taxonium_data_handling": "file:../taxonium_data_handling", "readable-web-to-node-stream": "^3.0.2", + "stream-json": "^1.8.0", + "taxonium_data_handling": "file:../taxonium_data_handling", "xml2js": "^0.4.23" } } diff --git a/taxonium_backend/server.js b/taxonium_backend/server.js index 104a875a..db361718 100644 --- a/taxonium_backend/server.js +++ b/taxonium_backend/server.js @@ -11,6 +11,8 @@ var xml2js = require("xml2js"); var axios = require("axios"); var pako = require("pako"); const URL = require("url").URL; +var streamJson = require("stream-json"); +var streamValues = require("stream-json/streamers/StreamValues").streamValues; const ReadableWebToNodeStream = require("readable-web-to-node-stream"); const { execSync } = require("child_process"); const { Readable } = require("stream"); @@ -434,7 +436,9 @@ const loadData = async () => { processedData = await importing.processJsonl( supplied_object, logStatusMessage, - ReadableWebToNodeStream.ReadableWebToNodeStream + ReadableWebToNodeStream.ReadableWebToNodeStream, + streamJson.parser, + streamValues ); logStatusMessage({ diff --git a/taxonium_backend/yarn.lock b/taxonium_backend/yarn.lock index 695f0a60..97417216 100644 --- a/taxonium_backend/yarn.lock +++ b/taxonium_backend/yarn.lock @@ -502,6 +502,18 @@ setprototypeof@1.2.0: resolved "https://registry.npmjs.org/statuses/-/statuses-1.5.0.tgz" integrity sha1-Fhx9rBd2Wf2YEfQ3cfqZOBR4Yow= +stream-chain@^2.2.5: + version "2.2.5" + resolved "https://registry.yarnpkg.com/stream-chain/-/stream-chain-2.2.5.tgz#b30967e8f14ee033c5b9a19bbe8a2cba90ba0d09" + integrity sha512-1TJmBx6aSWqZ4tx7aTpBDXK0/e2hhcNSTV8+CbFJtDjbb+I1mZ8lHit0Grw9GRT+6JbIrrDd8esncgBi8aBXGA== + +stream-json@^1.8.0: + version "1.8.0" + resolved "https://registry.yarnpkg.com/stream-json/-/stream-json-1.8.0.tgz#53f486b2e3b4496c506131f8d7260ba42def151c" + integrity sha512-HZfXngYHUAr1exT4fxlbc1IOce1RYxp2ldeaf97LYCOPSoOqY/1Psp7iGvpb+6JIOgkra9zDYnPX01hGAHzEPw== + dependencies: + stream-chain "^2.2.5" + string_decoder@^1.1.1: version "1.3.0" resolved "https://registry.yarnpkg.com/string_decoder/-/string_decoder-1.3.0.tgz#42f114594a46cf1a8e30b0a84f56c78c3edac21e" diff --git a/taxonium_component/package.json b/taxonium_component/package.json index 72d5ef5c..f5723a2d 100644 --- a/taxonium_component/package.json +++ b/taxonium_component/package.json @@ -24,7 +24,9 @@ "storybook": "storybook dev -p 6006", "build-storybook": "storybook build" }, - "dependencies": {}, + "dependencies": { + "stream-json": "^1.8.0" + }, "devDependencies": { "@fontsource/roboto": "^5.0.1", "@headlessui/react": "^1.7.17", diff --git a/taxonium_component/src/webworkers/localBackendWorker.js b/taxonium_component/src/webworkers/localBackendWorker.js index 9514d15a..ba9ed835 100644 --- a/taxonium_component/src/webworkers/localBackendWorker.js +++ b/taxonium_component/src/webworkers/localBackendWorker.js @@ -7,6 +7,8 @@ import { import { processNewickAndMetadata } from "../utils/processNewick.js"; import { processNextstrain } from "../utils/processNextstrain.js"; import { ReadableWebToNodeStream } from "readable-web-to-node-stream"; +import { parser } from "stream-json"; +import { streamValues } from "stream-json/streamers/StreamValues"; console.log("worker starting"); postMessage({ data: "Worker starting" }); @@ -211,7 +213,9 @@ onmessage = async (event) => { processedUploadedData = await processJsonl( data.data, sendStatusMessage, - ReadableWebToNodeStream + ReadableWebToNodeStream, + parser, + streamValues ); console.log("processedUploadedData created"); } else if ( diff --git a/taxonium_component/yarn.lock b/taxonium_component/yarn.lock index 58c83a36..fd688848 100644 --- a/taxonium_component/yarn.lock +++ b/taxonium_component/yarn.lock @@ -10164,6 +10164,11 @@ stream-browserify@^3.0.0: inherits "~2.0.4" readable-stream "^3.5.0" +stream-chain@^2.2.5: + version "2.2.5" + resolved "https://registry.yarnpkg.com/stream-chain/-/stream-chain-2.2.5.tgz#b30967e8f14ee033c5b9a19bbe8a2cba90ba0d09" + integrity sha512-1TJmBx6aSWqZ4tx7aTpBDXK0/e2hhcNSTV8+CbFJtDjbb+I1mZ8lHit0Grw9GRT+6JbIrrDd8esncgBi8aBXGA== + stream-http@^3.2.0: version "3.2.0" resolved "https://registry.yarnpkg.com/stream-http/-/stream-http-3.2.0.tgz#1872dfcf24cb15752677e40e5c3f9cc1926028b5" @@ -10174,6 +10179,13 @@ stream-http@^3.2.0: readable-stream "^3.6.0" xtend "^4.0.2" +stream-json@^1.8.0: + version "1.8.0" + resolved "https://registry.yarnpkg.com/stream-json/-/stream-json-1.8.0.tgz#53f486b2e3b4496c506131f8d7260ba42def151c" + integrity sha512-HZfXngYHUAr1exT4fxlbc1IOce1RYxp2ldeaf97LYCOPSoOqY/1Psp7iGvpb+6JIOgkra9zDYnPX01hGAHzEPw== + dependencies: + stream-chain "^2.2.5" + stream-shift@^1.0.0: version "1.0.1" resolved "https://registry.yarnpkg.com/stream-shift/-/stream-shift-1.0.1.tgz#d7088281559ab2778424279b0877da3c392d5a3d" diff --git a/taxonium_data_handling/importing.js b/taxonium_data_handling/importing.js index 2a043477..05ea64c2 100644 --- a/taxonium_data_handling/importing.js +++ b/taxonium_data_handling/importing.js @@ -28,8 +28,20 @@ function reduceMaxOrMin(array, accessFunction, maxOrMin) { } } -export const setUpStream = (the_stream, data, sendStatusMessage) => { - function processLine(line, line_number) { +export const setUpStream = ( + the_stream, + data, + sendStatusMessage, + parser, + streamValues +) => { + const pipeline = the_stream + .pipe(parser({ jsonStreaming: true })) + .pipe(streamValues()); + + let line_number = 0; + + pipeline.on("data", (decoded) => { if ((line_number % 10000 === 0 && line_number > 0) || line_number == 500) { console.log(`Processed ${formatNumber(line_number)} lines`); if (data.header.total_nodes) { @@ -45,69 +57,55 @@ export const setUpStream = (the_stream, data, sendStatusMessage) => { }); } } - // console.log("LINE",line_number,line); - const decoded = JSON.parse(line); + if (line_number === 0) { - data.header = decoded; + data.header = decoded.value; data.nodes = []; data.node_to_mut = {}; } else { - data.node_to_mut[decoded.node_id] = decoded.mutations; // this is an int to ints map - data.nodes.push(decoded); - } - } - let cur_line = ""; - let line_counter = 0; - the_stream.on("data", function (data) { - cur_line += data.toString(); - if (cur_line.includes("\n")) { - const lines = cur_line.split("\n"); - cur_line = lines.pop(); - lines.forEach((line) => { - processLine(line, line_counter); - line_counter++; - }); + data.node_to_mut[decoded.value.node_id] = decoded.value.mutations; + data.nodes.push(decoded.value); } + + line_number++; }); - the_stream.on("error", function (err) { + pipeline.on("error", function (err) { console.log(err); + sendStatusMessage({ error: `Stream error: ${err.message}` }); }); - the_stream.on("end", function () { - console.log("end"); + pipeline.on("end", function () { + console.log("Stream processing completed."); + sendStatusMessage({ message: "Stream processing completed." }); }); }; export const processJsonl = async ( jsonl, sendStatusMessage, - ReadableWebToNodeStream + ReadableWebToNodeStream, + parser, + streamValues ) => { - console.log( - "Worker processJsonl" //, jsonl - ); + console.log("Worker processJsonl"); const data = jsonl.data; const status = jsonl.status; let the_stream; if (jsonl.filename.includes("gz")) { - // Create a stream the_stream = zlib.createGunzip(); } else { - // create a fallback stream, and process the output, initially just logging it the_stream = new stream.PassThrough(); } let new_data = {}; - setUpStream(the_stream, new_data, sendStatusMessage); + setUpStream(the_stream, new_data, sendStatusMessage, parser, streamValues); if (status === "loaded") { const dataAsArrayBuffer = data; - // In a Convert the arrayBuffer to a buffer in a series of chunks - let chunkSize = 5 * 1024 * 1024; + let chunkSize = 5 * 1024 * 1024; // 5 MB chunks for (let i = 0; i < dataAsArrayBuffer.byteLength; i += chunkSize) { const chunk = dataAsArrayBuffer.slice(i, i + chunkSize); const chunkAsBuffer = buffer.Buffer.from(chunk); - // Pipe the chunkStream to the stream the_stream.write(chunkAsBuffer); } console.log("Worker processJsonl", data); @@ -115,8 +113,7 @@ export const processJsonl = async ( } else if (status === "url_supplied") { const url = jsonl.filename; let response; - // Try fetch - console.log("STARTING FETCH"); + console.log("Starting fetch from URL:", url); try { response = await fetch(url); } catch (error) { @@ -124,11 +121,10 @@ export const processJsonl = async ( sendStatusMessage({ error: `Fetch error: ${error}` }); return; } - console.log("ALL FINE", response); + console.log("Fetch successful", response); sendStatusMessage({ message: "Loading root genome" }); const readableWebStream = response.body; - const nodeStream = new ReadableWebToNodeStream(readableWebStream); nodeStream.pipe(the_stream); } else if (status === "stream_supplied") { @@ -143,23 +139,21 @@ export const processJsonl = async ( the_stream.on("end", resolve); the_stream.on("error", reject); }); - console.log("done with stream"); + console.log("Done with stream"); const scale_y = - 24e2 / - (new_data.nodes.length > 10e3 + 2400 / + (new_data.nodes.length > 10000 ? new_data.nodes.length : new_data.nodes.length * 0.6666); - console.log("Scaling"); + console.log("Scaling y positions"); for (const node of new_data.nodes) { - // numerically round to the nearest 0.1 - node.y = roundToDp(node.y * scale_y, 6); } console.log("Calculating y positions"); const y_positions = new_data.nodes.map((node) => node.y); - console.log("Calculating coord extremes"); + console.log("Calculating coordinate extremes"); const overallMaxY = reduceMaxOrMin(new_data.nodes, (node) => node.y, "max"); const overallMinY = reduceMaxOrMin(new_data.nodes, (node) => node.y, "min"); @@ -178,7 +172,7 @@ export const processJsonl = async ( const rootMutations = root.mutations; root.mutations = []; - console.log("Creating output obj"); + console.log("Creating output object"); const overwrite_config = new_data.header.config ? new_data.header.config : {}; overwrite_config.num_tips = root.num_tips; @@ -243,20 +237,11 @@ export const generateConfig = (config, processedUploadedData) => { ? ["x_dist"] : ["x_time"]; - config.keys_to_display = Object.keys(processedUploadedData.nodes[0]).filter( + config.keys_to_display = Object.keys(firstNode).filter( (x) => !to_remove.includes(x) ); - /*config.search_types = [ - { name: "name", label: "Name", type: "text_match" }, - { name: "meta_Lineage", label: "PANGO lineage", type: "text_exact" }, - { name: "meta_Country", label: "Country", type: "text_match" }, - { name: "mutation", label: "Mutation", type: "mutation" }, - { name: "revertant", label: "Revertant", type: "revertant" }, - { name: "genbank", label: "Genbank", type: "text_per_line" }, - ];*/ const prettyName = (x) => { - // if x starts with meta_ if (x.startsWith("meta_")) { const bit = x.substring(5); const capitalised_first_letter = @@ -318,7 +303,6 @@ export const generateConfig = (config, processedUploadedData) => { })); config.search_types.forEach((x) => { - // if "text" is found in the type if (x.type.includes("text")) { x.controls = true; } @@ -336,8 +320,6 @@ export const generateConfig = (config, processedUploadedData) => { config.colorBy = { colorByOptions }; - //check if 'meta_pangolin_lineage' is in options - config.defaultColorByField = colorByOptions.includes("meta_pangolin_lineage") ? "meta_pangolin_lineage" : colorByOptions[0]; diff --git a/taxonium_electron/package.json b/taxonium_electron/package.json index e5c5edd6..595a2a12 100644 --- a/taxonium_electron/package.json +++ b/taxonium_electron/package.json @@ -29,6 +29,7 @@ }, "dependencies": { "electron-squirrel-startup": "^1.0.0", + "stream-json": "^1.8.0", "taxonium_backend": "file:../taxonium_backend" }, "config": { diff --git a/taxonium_electron/yarn.lock b/taxonium_electron/yarn.lock index a68e6d07..2b5c45ba 100644 --- a/taxonium_electron/yarn.lock +++ b/taxonium_electron/yarn.lock @@ -3506,6 +3506,18 @@ stream-buffers@~2.2.0: resolved "https://registry.yarnpkg.com/stream-buffers/-/stream-buffers-2.2.0.tgz#91d5f5130d1cef96dcfa7f726945188741d09ee4" integrity sha512-uyQK/mx5QjHun80FLJTfaWE7JtwfRMKBLkMne6udYOmvH0CawotVa7TfgYHzAnpphn4+TweIx1QKMnRIbipmUg== +stream-chain@^2.2.5: + version "2.2.5" + resolved "https://registry.yarnpkg.com/stream-chain/-/stream-chain-2.2.5.tgz#b30967e8f14ee033c5b9a19bbe8a2cba90ba0d09" + integrity sha512-1TJmBx6aSWqZ4tx7aTpBDXK0/e2hhcNSTV8+CbFJtDjbb+I1mZ8lHit0Grw9GRT+6JbIrrDd8esncgBi8aBXGA== + +stream-json@^1.8.0: + version "1.8.0" + resolved "https://registry.yarnpkg.com/stream-json/-/stream-json-1.8.0.tgz#53f486b2e3b4496c506131f8d7260ba42def151c" + integrity sha512-HZfXngYHUAr1exT4fxlbc1IOce1RYxp2ldeaf97LYCOPSoOqY/1Psp7iGvpb+6JIOgkra9zDYnPX01hGAHzEPw== + dependencies: + stream-chain "^2.2.5" + "string-width@^1.0.2 || 2 || 3 || 4", string-width@^4.1.0, string-width@^4.2.0, string-width@^4.2.3: version "4.2.3" resolved "https://registry.npmjs.org/string-width/-/string-width-4.2.3.tgz" @@ -3594,7 +3606,8 @@ tar@^6.0.5, tar@^6.1.11, tar@^6.1.2: express-queue "^0.0.13" node-fetch "^3.2.10" pako "^2.0.4" - taxonium_data_handling "file:../../../Library/Caches/Yarn/v6/npm-taxonium-backend-1.0.0-dee063ce-02ad-484a-9869-43d898d92144-1665540516221/node_modules/taxonium_data_handling" + readable-web-to-node-stream "^3.0.2" + taxonium_data_handling "file:../../Library/Caches/Yarn/v6/npm-taxonium-backend-1.0.0-54dbd3ec-85d3-4058-a62e-263bea13e9a0-1727736806855/node_modules/taxonium_data_handling" xml2js "^0.4.23" "taxonium_data_handling@file:../taxonium_data_handling":