From b09dd8dd5991b99b21c8a1b1c4bd2af4a5002582 Mon Sep 17 00:00:00 2001 From: Garrett Stevens Date: Wed, 25 Oct 2023 00:47:02 +0000 Subject: [PATCH] Use more streams --- .../.development.env | 5 +- .../src/features/features.controller.ts | 9 +- .../src/features/features.service.ts | 217 +++++++++++------- 3 files changed, 138 insertions(+), 93 deletions(-) diff --git a/packages/apollo-collaboration-server/.development.env b/packages/apollo-collaboration-server/.development.env index 0841a7bb7..27a243cdb 100644 --- a/packages/apollo-collaboration-server/.development.env +++ b/packages/apollo-collaboration-server/.development.env @@ -60,10 +60,7 @@ MICROSOFT_CLIENT_SECRET=~Gr8Q~h6RTU7SMC-fjNxXy_~nabTD-ME_rFyLa.M LOG_LEVELS=error,warn,log,debug # Reference sequence chunk size, defaults to 262144 (256 KiB) -# CHUNK_SIZE=500 - -# Sequence line lenght in exported GFF3 file -SEQ_LINE_LEN=90 +CHUNK_SIZE=500 # Default new user role, possible values are admin, user, readOnly, and none # Defaults to none diff --git a/packages/apollo-collaboration-server/src/features/features.controller.ts b/packages/apollo-collaboration-server/src/features/features.controller.ts index d3043ec6c..4a1908bf1 100644 --- a/packages/apollo-collaboration-server/src/features/features.controller.ts +++ b/packages/apollo-collaboration-server/src/features/features.controller.ts @@ -32,12 +32,13 @@ export class FeaturesController { @Public() @Get('exportGFF3') async exportGFF3( - @Query() request: { exportID: string }, + @Query() request: { exportID: string; fastaWidth?: number }, @Response({ passthrough: true }) res: ExpressResponse, ) { - const [stream, assembly] = await this.featuresService.exportGFF3( - request.exportID, - ) + const { exportID, ...rest } = request + const [stream, assembly] = await this.featuresService.exportGFF3(exportID, { + ...rest, + }) const assemblyName = await this.featuresService.getAssemblyName(assembly) res.set({ 'Content-Type': 'application/text', diff --git a/packages/apollo-collaboration-server/src/features/features.service.ts b/packages/apollo-collaboration-server/src/features/features.service.ts index a0a1adcc0..458c2c001 100644 --- a/packages/apollo-collaboration-server/src/features/features.service.ts +++ b/packages/apollo-collaboration-server/src/features/features.service.ts @@ -1,4 +1,10 @@ -import { Readable, Transform, pipeline } from 'node:stream' +import { + Readable, + Transform, + TransformCallback, + TransformOptions, + pipeline, +} from 'node:stream' import gff, { GFF3Feature } from '@gmod/gff' import { Injectable, Logger, NotFoundException } from '@nestjs/common' @@ -24,6 +30,83 @@ import { OperationsService } from '../operations/operations.service' import { RefSeqChunksService } from '../refSeqChunks/refSeqChunks.service' import { FeatureCountRequest } from './dto/feature.dto' +interface FastaTransformOptions extends TransformOptions { + fastaWidth?: number +} + +class FastaTransform extends Transform { + lineBuffer = '' + currentRefSeq?: string = undefined + fastaWidth + + constructor(opts: FastaTransformOptions) { + super({ ...opts, objectMode: true }) + const { fastaWidth = 80 } = opts + this.fastaWidth = fastaWidth + this.push('##FASTA\n') + } + + _transform( + refSeqChunkDoc: RefSeqChunkDocument, + encoding: BufferEncoding, + callback: TransformCallback, + ): void { + const refSeqDoc = refSeqChunkDoc.refSeq + const refSeqDocId = refSeqDoc._id.toString() + if (refSeqDocId !== this.currentRefSeq) { + this.flushLineBuffer() + const refSeqDescription = refSeqDoc.description + ? ` ${refSeqDoc.description}` + : '' + const fastaHeader = `>${refSeqDoc.name}${refSeqDescription}\n` + this.push(fastaHeader) + this.currentRefSeq = refSeqDocId + } + let { sequence } = refSeqChunkDoc + if (this.lineBuffer) { + const neededLength = this.fastaWidth - this.lineBuffer.length + const bufferFiller = sequence.slice(0, neededLength) + sequence = sequence.slice(neededLength) + this.lineBuffer += bufferFiller + if (this.lineBuffer.length === this.fastaWidth) { + this.flushLineBuffer() + } else { + return callback() + } + } + const seqLines = splitStringIntoChunks(sequence, this.fastaWidth) + const lastLine = seqLines.at(-1) ?? '' + if (lastLine.length > 0 && lastLine.length !== this.fastaWidth) { + this.lineBuffer = seqLines.pop() ?? '' + } + if (seqLines.length > 0) { + this.push(`${seqLines.join('\n')}\n`) + } + callback() + } + + flushLineBuffer() { + if (this.lineBuffer) { + this.push(`${this.lineBuffer}\n`) + this.lineBuffer = '' + } + } + + _flush(callback: TransformCallback): void { + this.flushLineBuffer() + callback() + } +} + +function splitStringIntoChunks(input: string, chunkSize: number): string[] { + const chunks: string[] = [] + for (let i = 0; i < input.length; i += chunkSize) { + const chunk = input.slice(i, i + chunkSize) + chunks.push(chunk) + } + return chunks +} + function makeGFF3Feature( featureDocument: Feature, refSeqs: RefSeqDocument[], @@ -201,95 +284,43 @@ export class FeaturesService { return assemblyDoc.name } - splitStringIntoChunks(input: string, chunkSize: number): string { - const chunks: string[] = [] - for (let i = 0; i < input.length; i += chunkSize) { - // eslint-disable-next-line unicorn/prefer-string-slice - const chunk = input.substring(i, i + chunkSize) - chunks.push(chunk) - } - return chunks.join('\n') - } - // eslint-disable-next-line @typescript-eslint/no-explicit-any - async exportGFF3(exportID: string): Promise { + async exportGFF3( + exportID: string, + opts: { fastaWidth?: number }, + ): Promise<[Readable, string]> { const exportDoc = await this.exportModel.findById(exportID) if (!exportDoc) { throw new NotFoundException() } + const { fastaWidth } = opts const { assembly } = exportDoc const refSeqs = await this.refSeqModel.find({ assembly }).exec() const refSeqIds = refSeqs.map((refSeq) => refSeq._id) - let printFasta = true - let printSeqName = true - const { SEQ_LINE_LEN } = process.env - if (!SEQ_LINE_LEN) { - throw new NotFoundException() - } - const seqLineLenght = Number(SEQ_LINE_LEN) - const headerStream = new Readable({ objectMode: true }) - const sequenceStream = new Readable({ objectMode: true }) - - // Implement the _read() method for the custom stream. This is just a placeholder; we can leave it empty - // eslint-disable-next-line @typescript-eslint/no-empty-function - headerStream._read = function () {} - // eslint-disable-next-line @typescript-eslint/no-empty-function - sequenceStream._read = function () {} - - headerStream.push('##gff-version 3\n') - for (const refSeqDoc of refSeqs) { - headerStream.push( - `##sequence-region ${refSeqDoc.name} 1 ${refSeqDoc.length}\n`, - ) - let remainingLastLine = '' - if (printFasta) { - sequenceStream.push('##FASTA\n') - } - for await (const doc of this.refSeqChunksModel - .find({ refSeq: refSeqDoc.id }) - .sort({ n: 1 }) - .cursor()) { - if (printSeqName) { - refSeqDoc.description - ? sequenceStream.push( - `>${refSeqDoc.name} ${refSeqDoc.description}\n`, - ) - : sequenceStream.push(`>${refSeqDoc.name}\n`) - } - let seqLine = doc.sequence - // If previous's chunk last line was not "seqLineLenght" characters long then take the first characters from the first line and make one full line - if (remainingLastLine.length > 0) { - const tmp1: string = doc.sequence.slice( - 0, - seqLineLenght - remainingLastLine.length, - ) - sequenceStream.push(`${remainingLastLine}${tmp1}\n`) - seqLine = doc.sequence.slice(seqLineLenght - remainingLastLine.length) - remainingLastLine = '' - } - const seqData = this.splitStringIntoChunks(seqLine, seqLineLenght) - const lines: string[] = seqData.split('\n') - const lastLine: string = lines.at(-1) ?? '' - if (lastLine.length === seqLineLenght) { - sequenceStream.push(`${seqData}\n`) - } else { - for (let i = 0; i < lines.length - 1; i++) { - sequenceStream.push(`${lines[i]}\n`) - } - remainingLastLine = lastLine + + const headerStream = pipeline( + // unicorn thinks this is an Array.prototype.find, so we ignore it + // eslint-disable-next-line unicorn/no-array-callback-reference + this.refSeqModel.find({ assembly }).cursor(), + new Transform({ + objectMode: true, + construct(callback) { + this.push('##gff-version 3\n') + callback() + }, + transform(chunk: RefSeqDocument, encoding, callback) { + this.push(`##sequence-region ${chunk.name} 1 ${chunk.length}\n`) + callback() + }, + }), + (error) => { + if (error) { + this.logger.error('GFF3 export failed') + this.logger.error(error) } - printSeqName = false - } - if (remainingLastLine.length > 0) { - sequenceStream.push(`${remainingLastLine}\n`) - remainingLastLine = '' - } - printFasta = false - printSeqName = true - } - headerStream.push(null) - sequenceStream.push(null) + }, + ) const query = { refSeq: { $in: refSeqIds } } const featureStream = pipeline( @@ -297,9 +328,8 @@ export class FeaturesService { // eslint-disable-next-line unicorn/no-array-callback-reference this.featureModel.find(query).cursor(), new Transform({ - writableObjectMode: true, - readableObjectMode: true, - transform: (chunk, encoding, callback) => { + objectMode: true, + transform: (chunk: FeatureDocument, encoding, callback) => { try { const flattened = chunk.toObject({ flattenMaps: true }) const gff3Feature = makeGFF3Feature(flattened, refSeqs) @@ -318,12 +348,29 @@ export class FeaturesService { }, ) - const combinedStream = new StreamConcat([ + const sequenceStream = pipeline( + this.refSeqChunksModel + // unicorn thinks this is an Array.prototype.find, so we ignore it + // eslint-disable-next-line unicorn/no-array-callback-reference + .find(query) + .sort({ refSeq: 1, n: 1 }) + .populate('refSeq') + .cursor(), + new FastaTransform({ fastaWidth }), + (error) => { + if (error) { + this.logger.error('GFF3 export failed') + this.logger.error(error) + } + }, + ) + + const combinedStream: Readable = new StreamConcat([ headerStream, featureStream, sequenceStream, ]) - return [combinedStream, assembly] + return [combinedStream, assembly.toString()] } /**