Skip to content

Commit

Permalink
[pre-commit.ci] auto fixes from pre-commit.com hooks
Browse files Browse the repository at this point in the history
for more information, see https://pre-commit.ci
  • Loading branch information
pre-commit-ci[bot] committed Oct 8, 2024
1 parent 2fb64d7 commit 87e5d71
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 16 deletions.
9 changes: 5 additions & 4 deletions taxonium_component/src/webworkers/localBackendWorker.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ import {
import { processNewickAndMetadata } from "../utils/processNewick.js";
import { processNextstrain } from "../utils/processNextstrain.js";
import { ReadableWebToNodeStream } from "readable-web-to-node-stream";
import {parser } from "stream-json";
import {streamValues } from "stream-json/streamers/StreamValues";
import { parser } from "stream-json";
import { streamValues } from "stream-json/streamers/StreamValues";

console.log("worker starting");
postMessage({ data: "Worker starting" });
Expand Down Expand Up @@ -214,9 +214,10 @@ onmessage = async (event) => {
data.data,
sendStatusMessage,
ReadableWebToNodeStream,
parser,streamValues
parser,
streamValues
);

console.log("processedUploadedData created");
} else if (
data.type === "upload" &&
Expand Down
28 changes: 16 additions & 12 deletions taxonium_data_handling/importing.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ class StreamSplitter extends stream.Transform {
const headerData = data.slice(0, newlineIndex);
const restData = data.slice(newlineIndex + 1);


// Write header data to headerParser
this.headerParser.write(headerData);
this.headerParser.end();
Expand All @@ -33,7 +32,7 @@ class StreamSplitter extends stream.Transform {
this.firstPart = false;
} else {
// No newline found, store data in buffer
this.headerParser.write(data);
this.headerParser.write(data);
}
} else {
// After header is processed, pass data to dataParser
Expand All @@ -55,7 +54,6 @@ class StreamSplitter extends stream.Transform {
}
}


const roundToDp = (number, dp) => {
return Math.round(number * Math.pow(10, dp)) / Math.pow(10, dp);
};
Expand Down Expand Up @@ -92,21 +90,24 @@ export const setUpStream = (
// Header parser
const headerParser = parser({ jsonStreaming: true });
const headerPipeline = headerParser.pipe(streamValues());

let headerBytesProcessed = 0;
const HEADER_PROGRESS_INTERVAL = 1024 * 1024; // 1MB

headerParser.on('data', (chunk) => {
headerParser.on("data", (chunk) => {
headerBytesProcessed += chunk.length;
if (headerBytesProcessed >= HEADER_PROGRESS_INTERVAL) {
sendStatusMessage({
message: `Processing header: ${(headerBytesProcessed / (1024 * 1024)).toFixed(2)} MB processed`,
message: `Processing header: ${(
headerBytesProcessed /
(1024 * 1024)
).toFixed(2)} MB processed`,
});
headerBytesProcessed = 0; // Reset the counter
}
});

headerPipeline.on('data', (chunk) => {
headerPipeline.on("data", (chunk) => {
data.header = chunk.value;
data.nodes = [];
data.node_to_mut = {};
Expand All @@ -115,7 +116,7 @@ export const setUpStream = (
});
});

headerPipeline.on('error', (err) => {
headerPipeline.on("error", (err) => {
console.error("Header parser error:", err);
sendStatusMessage({
error: `Header parser error: ${err.message}`,
Expand Down Expand Up @@ -177,19 +178,22 @@ export const setUpStream = (
const splitter = new StreamSplitter(headerParser, dataParser);

// Pipe the input stream through the splitter
the_stream.pipe(splitter).on("error", (err) => console.error("Splitter error:", err));
the_stream
.pipe(splitter)
.on("error", (err) => console.error("Splitter error:", err));

// Handle the completion of the dataParser
dataParser.on("finish", () => {
console.log("Finished processing the stream");
});
};


export const processJsonl = async (
jsonl,
sendStatusMessage,
ReadableWebToNodeStream, parser, streamValues
ReadableWebToNodeStream,
parser,
streamValues
) => {
console.log(
"Worker processJsonl" //, jsonl
Expand All @@ -205,7 +209,7 @@ export const processJsonl = async (
the_stream = new stream.PassThrough();
}
let new_data = {};
setUpStream(the_stream, new_data, sendStatusMessage,parser, streamValues);
setUpStream(the_stream, new_data, sendStatusMessage, parser, streamValues);

if (status === "loaded") {
const dataAsArrayBuffer = data;
Expand Down

0 comments on commit 87e5d71

Please sign in to comment.