Skip to content

Commit

Permalink
Added backend workers
Browse files Browse the repository at this point in the history
Added two files, nodeMapper.js and extract.js, which are used in generating the output for the SNP distance search. nodeMapper makes a map of the entire tree in order to perform downwards traversal, and extract does the actualn traversal and calculation of SNP distance.
  • Loading branch information
DLiarakos authored May 20, 2024
1 parent 141b560 commit 519006b
Show file tree
Hide file tree
Showing 2 changed files with 281 additions and 0 deletions.
119 changes: 119 additions & 0 deletions taxonium_component/src/utils/extract.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
import processJsonLines from './nodeMapper.js';

/*
getParsimonySamples function outline:
inputs:
sampleID, which is the explicit node or sample name, not internal ID
maxParsimony, which is the SNP distance threshold of interest
Nested functions:
processJsonLines: main worker of the backend, which reads in the jsonl file and constructs a map of all internal nodes, children, and mutations, and checks if the sample exists in the tree
findNodesWithinDistance: a helper function to find all nodes within a certain distance of a given node
traverses up and down the tree, adding nodes to a results array if they are within the distance threshold
returns the results array
traverseUp: helper function to traverse up the tree, adding nodes to the results array if they are within the distance threshold
traverseDown: helper function to traverse down the tree, adding nodes to the results array if they are within the distance threshold
outputs: should output a simple list/array-like of internal IDs and their SNP distances from the queried node, some flags that determine whether or not a valid search was performed, as well as a map of each nodes name with genbank accession and pangolin lineage
with this list as a result, we can then query the backend for more information about each node, like name, mutations, etc, if needed
this above includes if specificMut is passed through, which would allow for filtering based on whether or not a node has a specific mutation,
but the filtering is done after all SNPs within distance are found, to reduce processing time if flag isnt specified
once the list is obtained, snpComponent formats the list for output into Taxonium(Big step)
*/
async function getParsimonySamples(sampleID, maxParsimony) {
return processJsonLines("https://cov2tree.nyc3.cdn.digitaloceanspaces.com/latest_public.jsonl.gz",sampleID).then(myResult => {//answersArray=[nodes, foundSample, foundSampleID, foundParentID, foundSNPCount, isBranch]
if (myResult==="Error parsing JSON"){//if error parsing JSON, return error
return "Error parsing JSON";
}
var nodeMap=myResult[0]//index of all internal nodes and children
// Main function to find all nodes within a certain distance of a given node
function findNodesWithinDistance(node, distanceThreshold) {
// Helper function to traverse up (towards the parent)
function traverseUp(node, currentDistance) {
var parent_id=nodeMap[node].parent_id
var snpCount=nodeMap[node].snpCount
if (parent_id===node || currentDistance > distanceThreshold) {//if root node(root has itself as parent), or if threshold is reached,
//console.log("reached root node or threshold, returning at distance "+currentDistance+" from node "+node+" with parent "+parent_id+" and snpCount "+snpCount)
return;//end traversal
}
//console.log("traversing up, new node is "+parent_id+" with distance "+(currentDistance + snpCount))
if (!visited.has(parent_id)) {// Check if this node has already been visited to avoid infinite loops
visited.add(parent_id);
traverseDown(parent_id, currentDistance + snpCount);//Traverse down from the parent
traverseUp(parent_id, currentDistance + snpCount);// Traverse further up
}
}
// Helper function to traverse down (towards the children)
function traverseDown(node, currentDistance) {
if (!nodeMap[node]|| currentDistance > distanceThreshold) {return;}//if node is a leaf node, or it threshold is reached, return
for (const child of nodeMap[node].children) {// Traverse all children
let decodedChild=child.split("=")//split encoded child into internal ID and SNP distance
let childId=decodedChild[0]//get internal ID of child
let childSnpDist=parseInt(decodedChild[1])//get SNP distance of child
let newTotal=currentDistance+childSnpDist//add SNP distance of child to current distance
//console.log("traversing down, new node is "+childId+" with distance "+childSnpDist+" for new total "+newTotal)
if (!visited.has(childId)&&!visited.has(decodedChild[3])) {//need a switch to add childs as genbank accession or node ID, since some sample names are repeated
if (childId.match(/^\d+$/)){//if its just numbers, its an internal node, so we add it to visited as is
visited.add(childId);
}
else {visited.add(decodedChild[3]);}//if its not just numbers, its a leaf node, so we add the genbank accession to visited
if ((newTotal <= distanceThreshold)){ //dont add the root node, as its always going to be within SNP distance of itself
//console.log("adding node to results:"+childId+" with distance "+newTotal)
if (!nodeMap[childId]){//if its not an entry in node map, means its not an internal node, so we add it to the results
//console.log("adding node to results:"+decodedChild)
results.push([decodedChild[0], newTotal, decodedChild[2], decodedChild[3]]);
}
}
if (nodeMap[childId]){//if the child is an internal node, traverse down
traverseDown(childId,newTotal);// Traverse further down; pass ID, not node info itself
}
}
}
}

// Start of the main function
//boolean obtained during traversal of the whole tree; if the queried sample exists in taxonium, this will flag as true
if (!myResult[1]) {//if boolean is falsey
console.log("Node not found in the tree");//its not a valid node, return error statement
return "Node not found in the tree";
}

let visited = new Set(); // To keep track of visited nodes
let results = []; // To store nodes within the distance threshold
visited.add(myResult[2]); //add ID of queried sample to visited
if (myResult[5]){//if the node is an internal node
traverseDown(myResult[2], 0);//start traversal from the internal node, we have a neutral distance of 0
traverseUp(myResult[2], 0);
}
else{
traverseDown(myResult[3], myResult[4]);
traverseUp(myResult[3], myResult[4]);

}
//internal ID of the queried sample
// Traverse as far down as possible first, then go up, and traverse down again ignoring visited nodes
return results;
}

let goodSamples = findNodesWithinDistance(sampleID, maxParsimony)
nodeMap=null;
return goodSamples
})
.catch(error => {
// Catch any errors from processJsonLines or thrown in the then block
console.error('Error in getParsimonySamples:', error);
return "Error processing samples";
});
}
/*
getParsimonySamples("node_960478", 5)
.then(result => {
console.log("Results:", result);
})
.catch(error => {
console.error("Error processing samples:", error);
});
*/
export default getParsimonySamples;

/*
NOTES:
*/
162 changes: 162 additions & 0 deletions taxonium_component/src/utils/nodeMapper.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
/*
TODO:
*/

async function processJsonLines(url,sampleID) {
// Fetch the gzipped JSONL file
//const startTime = new Date(); // Start timing
const response = await fetch(url);

// Ensure the fetch was successful
if (!response.ok) {
throw new Error(`HTTP error! status: ${response.status}`);
}

// Stream the response through decompression and decoding
const decompressedStream = response.body.pipeThrough(new DecompressionStream('gzip'));
const textStream = decompressedStream.pipeThrough(new TextDecoderStream());

// Reader to read the stream line by line
const reader = textStream.getReader();
let remainder = '';
let result;
let nodes = {};
let foundSample=false;//we will be looking for a specific ID when we construct
let foundSampleID=""
let foundParentID=""
let foundSNPCount=0
let isBranch=false
while (!(result = await reader.read()).done) {
const chunk = remainder + result.value;
const lines = chunk.split('\n');
remainder = lines.pop(); // Save the last line in case it's incomplete
for (const line of lines) {
if (line) {
var snpCount=0;
try {
const json = JSON.parse(line);
if (json.config){//if line has the config file, skip it to avoid an error
continue;//this first line also has mutations dictionary for decoding, if we need that later
}
for (const mut of json.mutations){
if (mut>107435){
snpCount+=1;
}
}
if (json.name===sampleID){//check if this is the sample we will be searching for
foundSample=true;//if it is, we have found it
foundSampleID=json.node_id//store its ID so we can use it later
foundParentID=json.parent_id//need to get parent ID of first node as a jumping off point for internal nodes, since theyre not being stored
foundSNPCount=snpCount
if (json.name.includes("node_")) {
isBranch=true
}
//console.log(json)
}

if (json.name.includes("node_")) { // Check if the node is internal
var encodedChild=(String(json.node_id)+"="+String(snpCount))//encode child and snp count without further nesting, as trying to store them as separate objects causes Stringify error due to excessive nesting
if (!nodes[json.node_id]) {//if internal, but not added to list
nodes[json.node_id] = {//create new node
parent_id: json.parent_id,
snpCount: snpCount,
children: []
};
if (!nodes[nodes[json.node_id].parent_id]){//if the parent is not yet added to the list,
nodes[nodes[json.node_id].parent_id] = {// add it to the list, with null name and parent, since we wont have that info until we read in parent node
parent_id: null,
snpCount: null,
children: [encodedChild]//store the node ID and the number of mutations
};
}
else{
nodes[nodes[json.node_id].parent_id].children.push(encodedChild);// if the parent node has been added, add this node to its children
}
}
if(nodes[json.node_id] && (nodes[json.node_id].parent_id===null || nodes[json.node_id].name===null)){//if we have added this parent node previously, but finally come across in JSON
//console.log("Node ID being updated:"+json.name)
nodes[json.node_id].parent_id=json.parent_id;//fill in the parent ID
nodes[json.node_id].snpCount=snpCount;//fill in the snp count
if (!nodes[nodes[json.node_id].parent_id]){//if this node, which was added by a previous step and therefore does not flag new internal step above, has a parent that has not been added to the list
nodes[nodes[json.node_id].parent_id] = {// so add it
parent_id: null,
snpCount: null,
children: [encodedChild]//store the node ID and the number of mutations
};
}
else{
nodes[nodes[json.node_id].parent_id].children.push(encodedChild);// if the parent node has been added, add this node to its children
}
}
}
else {// if doesnt contain "node_", then its a leaf node
encodedChild=(String(json.name)+"="+String(snpCount)+"="+String(json.meta_pangolin_lineage)+"="+String(json.meta_genbank_accession))//encode child and snp count without further nesting, as trying to store them as separate objects causes Stringify error due to excessive nesting
if (!nodes[json.parent_id]) {//we dont track leaf nodes, so if parent node is not in list, add it
nodes[json.parent_id] = {//add line which fills in these null values when we read in the parent node
parent_id: null,
snpCount: null,
children: [encodedChild]
};
} else {

nodes[json.parent_id].children.push(encodedChild);//if parent node is in list, add this node to its children
}
}
} catch (e) {
console.error('Error parsing JSON:', e);
return "Error parsing JSON"
}
}
}
}


var answersArray=[nodes, foundSample, foundSampleID, foundParentID, foundSNPCount, isBranch]
return answersArray;
}
/*
processJsonLines('https://cov2tree.nyc3.cdn.digitaloceanspaces.com/latest_public.jsonl.gz', "node_3").then(result => {
let sliced = Object.fromEntries(Object.entries(result[0][0]).slice(0,3))//get first 3 entries
console.log("First 3 entries: ",sliced)
//saveObjectToJson(result[0], 'C:/Users/david/my-app/src/InternalNodeMap.json');
})
.catch(error => {
console.error("Error processing samples:", error);
});
function saveObjectToJson(dataObject, outputPath) {
const fs = require('fs');
const JSONStream = require('JSONStream');
return new Promise((resolve, reject) => {
const writeStream = fs.createWriteStream(outputPath);
const stringifyStream = JSONStream.stringifyObject();
stringifyStream.pipe(writeStream);
writeStream.on('finish', () => {
console.log('JSON file has been written successfully.');
resolve();
});
writeStream.on('error', (error) => {
console.error('Stream write error:', error);
reject(error);
});
stringifyStream.on('error', (error) => {
console.error('JSON stringify error:', error);
reject(error);
});
for (const key in dataObject) {
stringifyStream.write([key, dataObject[key]]);
}
stringifyStream.end();
});
}
*/
export default processJsonLines;

// Usage example
//at ~2gb of ram, 4.2ghz with 6 cores, a little under 60sec when reading from url
//time to write to file is more extensive, but ideally not a factor if its happening in the backend
//time to query backend for single node: ~0.6s
//time to add snp dist when reading is negligible

0 comments on commit 519006b

Please sign in to comment.