Skip to content

Commit

Permalink
add more logging
Browse files Browse the repository at this point in the history
  • Loading branch information
theosanderson committed Oct 8, 2024
1 parent bca1578 commit 6fb42aa
Showing 1 changed file with 37 additions and 3 deletions.
40 changes: 37 additions & 3 deletions taxonium_data_handling/importing.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,40 @@
import zlib from "zlib";
import stream from "stream";
import buffer from "buffer";
import { send } from "process";


class ChunkCounterStream extends stream.PassThrough {
constructor(sendStatusMessage, options = {}) {
super(options);
this.sendStatusMessage = sendStatusMessage;
this.chunkCount = 0;
}

_transform(chunk, encoding, callback) {
this.chunkCount++;
if (this.chunkCount % 100 === 0) {
this.sendStatusMessage({
message: `Processed ${this.chunkCount} groups of mutations`,
count: this.chunkCount
});
}

// Pass the chunk through unchanged
this.push(chunk);
callback();
}

_flush(callback) {
this.sendStatusMessage({
message: `Finished processing. Total chunks: ${this.chunkCount}`,
count: this.chunkCount,
finished: true
});
callback();
}
}


class StreamSplitter extends stream.Transform {
constructor(headerParser, dataParser, options = {}) {
Expand Down Expand Up @@ -153,9 +187,9 @@ export const setUpStream = (
data.node_to_mut[decoded.node_id] = decoded.mutations;
data.nodes.push(decoded);
}

// Set up the splitter with headerParser and dataParser
const splitter = new StreamSplitter(headerParser, dataParser);
const chunkCounterStream = new ChunkCounterStream(sendStatusMessage);
chunkCounterStream.pipe(headerParser);
const splitter = new StreamSplitter(chunkCounterStream, dataParser);

// Pipe the input stream through the splitter
the_stream.pipe(splitter).on("error", (err) => console.error("Splitter error:", err));
Expand Down

0 comments on commit 6fb42aa

Please sign in to comment.