Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
theosanderson committed Sep 30, 2024
1 parent c442ade commit b804aa5
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 79 deletions.
4 changes: 3 additions & 1 deletion taxonium_component/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@
"storybook": "storybook dev -p 6006",
"build-storybook": "storybook build"
},
"dependencies": {},
"dependencies": {
"stream-json": "^1.8.0"
},
"devDependencies": {
"@fontsource/roboto": "^5.0.1",
"@headlessui/react": "^1.7.17",
Expand Down
5 changes: 4 additions & 1 deletion taxonium_component/src/webworkers/localBackendWorker.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import { processNewickAndMetadata } from "../utils/processNewick.js";
import { processNextstrain } from "../utils/processNextstrain.js";
import { ReadableWebToNodeStream } from "readable-web-to-node-stream";

import {parser as jsonlParser} from "stream-json/jsonl/Parser";

console.log("worker starting");
postMessage({ data: "Worker starting" });

Expand Down Expand Up @@ -211,7 +213,8 @@ onmessage = async (event) => {
processedUploadedData = await processJsonl(
data.data,
sendStatusMessage,
ReadableWebToNodeStream
ReadableWebToNodeStream,
jsonlParser
);
console.log("processedUploadedData created");
} else if (
Expand Down
12 changes: 12 additions & 0 deletions taxonium_component/yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -10164,6 +10164,11 @@ stream-browserify@^3.0.0:
inherits "~2.0.4"
readable-stream "^3.5.0"

stream-chain@^2.2.5:
version "2.2.5"
resolved "https://registry.yarnpkg.com/stream-chain/-/stream-chain-2.2.5.tgz#b30967e8f14ee033c5b9a19bbe8a2cba90ba0d09"
integrity sha512-1TJmBx6aSWqZ4tx7aTpBDXK0/e2hhcNSTV8+CbFJtDjbb+I1mZ8lHit0Grw9GRT+6JbIrrDd8esncgBi8aBXGA==

stream-http@^3.2.0:
version "3.2.0"
resolved "https://registry.yarnpkg.com/stream-http/-/stream-http-3.2.0.tgz#1872dfcf24cb15752677e40e5c3f9cc1926028b5"
Expand All @@ -10174,6 +10179,13 @@ stream-http@^3.2.0:
readable-stream "^3.6.0"
xtend "^4.0.2"

stream-json@^1.8.0:
version "1.8.0"
resolved "https://registry.yarnpkg.com/stream-json/-/stream-json-1.8.0.tgz#53f486b2e3b4496c506131f8d7260ba42def151c"
integrity sha512-HZfXngYHUAr1exT4fxlbc1IOce1RYxp2ldeaf97LYCOPSoOqY/1Psp7iGvpb+6JIOgkra9zDYnPX01hGAHzEPw==
dependencies:
stream-chain "^2.2.5"

stream-shift@^1.0.0:
version "1.0.1"
resolved "https://registry.yarnpkg.com/stream-shift/-/stream-shift-1.0.1.tgz#d7088281559ab2778424279b0877da3c392d5a3d"
Expand Down
107 changes: 30 additions & 77 deletions taxonium_data_handling/importing.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@

import zlib from "zlib";
import stream from "stream";
import buffer from "buffer";
Expand Down Expand Up @@ -28,8 +29,12 @@ function reduceMaxOrMin(array, accessFunction, maxOrMin) {
}
}

export const setUpStream = (the_stream, data, sendStatusMessage) => {
function processLine(line, line_number) {
export const setUpStream = (the_stream, data, sendStatusMessage, jsonlParser) => {
const pipeline = the_stream.pipe(jsonlParser());

let line_number = 0;

pipeline.on('data', decoded => {
if ((line_number % 10000 === 0 && line_number > 0) || line_number == 500) {
console.log(`Processed ${formatNumber(line_number)} lines`);
if (data.header.total_nodes) {
Expand All @@ -45,77 +50,59 @@ export const setUpStream = (the_stream, data, sendStatusMessage) => {
});
}
}
// console.log("LINE",line_number,line);
const decoded = JSON.parse(line);

if (line_number === 0) {
data.header = decoded;
data.header = decoded.value;
data.nodes = [];
data.node_to_mut = {};
} else {
data.node_to_mut[decoded.node_id] = decoded.mutations; // this is an int to ints map
data.nodes.push(decoded);
}
}
let cur_line = "";
let line_counter = 0;
the_stream.on("data", function (data) {
cur_line += data.toString();
if (cur_line.includes("\n")) {
const lines = cur_line.split("\n");
cur_line = lines.pop();
lines.forEach((line) => {
processLine(line, line_counter);
line_counter++;
});
data.node_to_mut[decoded.value.node_id] = decoded.value.mutations;
data.nodes.push(decoded.value);
}

line_number++;
});

the_stream.on("error", function (err) {
pipeline.on('error', function (err) {
console.log(err);
});

the_stream.on("end", function () {
pipeline.on('end', function () {
console.log("end");
});
};

export const processJsonl = async (
jsonl,
sendStatusMessage,
ReadableWebToNodeStream
ReadableWebToNodeStream,
jsonlParser,
) => {
console.log(
"Worker processJsonl" //, jsonl
);
console.log("Worker processJsonl");
const data = jsonl.data;
const status = jsonl.status;
let the_stream;
if (jsonl.filename.includes("gz")) {
// Create a stream
the_stream = zlib.createGunzip();
} else {
// create a fallback stream, and process the output, initially just logging it
the_stream = new stream.PassThrough();
}
let new_data = {};
setUpStream(the_stream, new_data, sendStatusMessage);
setUpStream(the_stream, new_data, sendStatusMessage, jsonlParser);

if (status === "loaded") {
const dataAsArrayBuffer = data;
// In a Convert the arrayBuffer to a buffer in a series of chunks
let chunkSize = 5 * 1024 * 1024;
for (let i = 0; i < dataAsArrayBuffer.byteLength; i += chunkSize) {
const chunk = dataAsArrayBuffer.slice(i, i + chunkSize);
const chunkAsBuffer = buffer.Buffer.from(chunk);
// Pipe the chunkStream to the stream
the_stream.write(chunkAsBuffer);
}
console.log("Worker processJsonl", data);
the_stream.end();
} else if (status === "url_supplied") {
const url = jsonl.filename;
let response;
// Try fetch
console.log("STARTING FETCH");
try {
response = await fetch(url);
Expand All @@ -128,7 +115,6 @@ export const processJsonl = async (
sendStatusMessage({ message: "Loading root genome" });

const readableWebStream = response.body;

const nodeStream = new ReadableWebToNodeStream(readableWebStream);
nodeStream.pipe(the_stream);
} else if (status === "stream_supplied") {
Expand All @@ -140,20 +126,14 @@ export const processJsonl = async (

// Wait for the stream to finish
await new Promise((resolve, reject) => {
the_stream.on("end", resolve);
the_stream.on("error", reject);
the_stream.on('end', resolve);
the_stream.on('error', reject);
});
console.log("done with stream");

const scale_y =
24e2 /
(new_data.nodes.length > 10e3
? new_data.nodes.length
: new_data.nodes.length * 0.6666);
const scale_y = 24e2 / (new_data.nodes.length > 10e3 ? new_data.nodes.length : new_data.nodes.length * 0.6666);
console.log("Scaling");
for (const node of new_data.nodes) {
// numerically round to the nearest 0.1

node.y = roundToDp(node.y * scale_y, 6);
}
console.log("Calculating y positions");
Expand All @@ -163,16 +143,8 @@ export const processJsonl = async (

const overallMaxY = reduceMaxOrMin(new_data.nodes, (node) => node.y, "max");
const overallMinY = reduceMaxOrMin(new_data.nodes, (node) => node.y, "min");
const overallMaxX = reduceMaxOrMin(
new_data.nodes,
(node) => node.x_dist,
"max"
);
const overallMinX = reduceMaxOrMin(
new_data.nodes,
(node) => node.x_dist,
"min"
);
const overallMaxX = reduceMaxOrMin(new_data.nodes, (node) => node.x_dist, "max");
const overallMinX = reduceMaxOrMin(new_data.nodes, (node) => node.x_dist, "min");

const root = new_data.nodes.find((node) => node.parent_id === node.node_id);
const rootMutations = root.mutations;
Expand All @@ -190,9 +162,7 @@ export const processJsonl = async (
overallMinX,
overallMinY,
y_positions,
mutations: new_data.header.mutations
? new_data.header.mutations
: new_data.header.aa_mutations,
mutations: new_data.header.mutations ? new_data.header.mutations : new_data.header.aa_mutations,
node_to_mut: new_data.node_to_mut,
rootMutations: rootMutations,
rootId: root.node_id,
Expand All @@ -204,14 +174,10 @@ export const processJsonl = async (

export const generateConfig = (config, processedUploadedData) => {
config.num_nodes = processedUploadedData.nodes.length;
config.initial_x =
(processedUploadedData.overallMaxX + processedUploadedData.overallMinX) / 2;
config.initial_y =
(processedUploadedData.overallMaxY + processedUploadedData.overallMinY) / 2;
config.initial_x = (processedUploadedData.overallMaxX + processedUploadedData.overallMinX) / 2;
config.initial_y = (processedUploadedData.overallMaxY + processedUploadedData.overallMinY) / 2;
config.initial_zoom = config.initial_zoom ? config.initial_zoom : -2;
config.genes = [
...new Set(processedUploadedData.mutations.map((x) => (x ? x.gene : null))),
]
config.genes = [...new Set(processedUploadedData.mutations.map((x) => (x ? x.gene : null)))]
.filter((x) => x)
.sort();

Expand Down Expand Up @@ -247,20 +213,10 @@ export const generateConfig = (config, processedUploadedData) => {
(x) => !to_remove.includes(x)
);

/*config.search_types = [
{ name: "name", label: "Name", type: "text_match" },
{ name: "meta_Lineage", label: "PANGO lineage", type: "text_exact" },
{ name: "meta_Country", label: "Country", type: "text_match" },
{ name: "mutation", label: "Mutation", type: "mutation" },
{ name: "revertant", label: "Revertant", type: "revertant" },
{ name: "genbank", label: "Genbank", type: "text_per_line" },
];*/
const prettyName = (x) => {
// if x starts with meta_
if (x.startsWith("meta_")) {
const bit = x.substring(5);
const capitalised_first_letter =
bit.charAt(0).toUpperCase() + bit.slice(1);
const capitalised_first_letter = bit.charAt(0).toUpperCase() + bit.slice(1);
return capitalised_first_letter;
}
if (x === "mutation") {
Expand Down Expand Up @@ -318,7 +274,6 @@ export const generateConfig = (config, processedUploadedData) => {
}));

config.search_types.forEach((x) => {
// if "text" is found in the type
if (x.type.includes("text")) {
x.controls = true;
}
Expand All @@ -336,11 +291,9 @@ export const generateConfig = (config, processedUploadedData) => {

config.colorBy = { colorByOptions };

//check if 'meta_pangolin_lineage' is in options

config.defaultColorByField = colorByOptions.includes("meta_pangolin_lineage")
? "meta_pangolin_lineage"
: colorByOptions[0];
};

export default { processJsonl, generateConfig };
export default { processJsonl, generateConfig };

0 comments on commit b804aa5

Please sign in to comment.