Skip to content

Commit

Permalink
Use more streams
Browse files Browse the repository at this point in the history
  • Loading branch information
garrettjstevens committed Oct 25, 2023
1 parent fd57b0c commit 699a5e8
Show file tree
Hide file tree
Showing 3 changed files with 136 additions and 94 deletions.
5 changes: 1 addition & 4 deletions packages/apollo-collaboration-server/.development.env
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,7 @@ MICROSOFT_CLIENT_SECRET=~Gr8Q~h6RTU7SMC-fjNxXy_~nabTD-ME_rFyLa.M
LOG_LEVELS=error,warn,log,debug

# Reference sequence chunk size, defaults to 262144 (256 KiB)
# CHUNK_SIZE=500

# Sequence line lenght in exported GFF3 file
SEQ_LINE_LEN=90
CHUNK_SIZE=500

# Default new user role, possible values are admin, user, readOnly, and none
# Defaults to none
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,13 @@ export class FeaturesController {
@Public()
@Get('exportGFF3')
async exportGFF3(
@Query() request: { exportID: string },
@Query() request: { exportID: string; fastaWidth?: number },
@Response({ passthrough: true }) res: ExpressResponse,
) {
const [stream, assembly] = await this.featuresService.exportGFF3(
request.exportID,
)
const { exportID, ...rest } = request
const [stream, assembly] = await this.featuresService.exportGFF3(exportID, {
...rest,
})
const assemblyName = await this.featuresService.getAssemblyName(assembly)
res.set({
'Content-Type': 'application/text',
Expand Down
216 changes: 130 additions & 86 deletions packages/apollo-collaboration-server/src/features/features.service.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
import { Readable, Transform, pipeline } from 'node:stream'
import {
Readable,
Transform,
TransformCallback,
TransformOptions,
pipeline,
} from 'node:stream'

import gff, { GFF3Feature } from '@gmod/gff'
import { Injectable, Logger, NotFoundException } from '@nestjs/common'
Expand All @@ -24,6 +30,83 @@ import { OperationsService } from '../operations/operations.service'
import { RefSeqChunksService } from '../refSeqChunks/refSeqChunks.service'
import { FeatureCountRequest } from './dto/feature.dto'

interface FastaTransformOptions extends TransformOptions {
fastaWidth?: number
}

class FastaTransform extends Transform {
lineBuffer = ''
currentRefSeq?: string = undefined
fastaWidth

constructor(opts: FastaTransformOptions) {
super({ ...opts, objectMode: true })
const { fastaWidth = 80 } = opts
this.fastaWidth = fastaWidth
this.push('##FASTA\n')
}

_transform(
refSeqChunkDoc: RefSeqChunkDocument,
encoding: BufferEncoding,
callback: TransformCallback,
): void {
const refSeqDoc = refSeqChunkDoc.refSeq
const refSeqDocId = refSeqDoc._id.toString()
if (refSeqDocId !== this.currentRefSeq) {
this.flushLineBuffer()
const refSeqDescription = refSeqDoc.description
? ` ${refSeqDoc.description}`
: ''
const fastaHeader = `>${refSeqDoc.name}${refSeqDescription}\n`
this.push(fastaHeader)
this.currentRefSeq = refSeqDocId
}
let { sequence } = refSeqChunkDoc
if (this.lineBuffer) {
const neededLength = this.fastaWidth - this.lineBuffer.length
const bufferFiller = sequence.slice(0, neededLength)
sequence = sequence.slice(neededLength)
this.lineBuffer += bufferFiller
if (this.lineBuffer.length === this.fastaWidth) {
this.flushLineBuffer()
} else {
return callback()
}
}
const seqLines = splitStringIntoChunks(sequence, this.fastaWidth)
const lastLine = seqLines.at(-1) ?? ''
if (lastLine.length > 0 && lastLine.length !== this.fastaWidth) {
this.lineBuffer = seqLines.pop() ?? ''
}
if (seqLines.length > 0) {
this.push(`${seqLines.join('\n')}\n`)
}
callback()
}

flushLineBuffer() {
if (this.lineBuffer) {
this.push(`${this.lineBuffer}\n`)
this.lineBuffer = ''
}
}

_flush(callback: TransformCallback): void {
this.flushLineBuffer()
callback()
}
}

function splitStringIntoChunks(input: string, chunkSize: number): string[] {
const chunks: string[] = []
for (let i = 0; i < input.length; i += chunkSize) {
const chunk = input.slice(i, i + chunkSize)
chunks.push(chunk)
}
return chunks
}

function makeGFF3Feature(
featureDocument: Feature,
refSeqs: RefSeqDocument[],
Expand Down Expand Up @@ -201,105 +284,49 @@ export class FeaturesService {
return assemblyDoc.name
}

splitStringIntoChunks(input: string, chunkSize: number): string {
const chunks: string[] = []
for (let i = 0; i < input.length; i += chunkSize) {
// eslint-disable-next-line unicorn/prefer-string-slice
const chunk = input.substring(i, i + chunkSize)
chunks.push(chunk)
}
return chunks.join('\n')
}

// eslint-disable-next-line @typescript-eslint/no-explicit-any
async exportGFF3(exportID: string): Promise<any> {
async exportGFF3(
exportID: string,
opts: { fastaWidth?: number },
): Promise<[Readable, string]> {
const exportDoc = await this.exportModel.findById(exportID)
if (!exportDoc) {
throw new NotFoundException()
}
const { fastaWidth } = opts

const { assembly } = exportDoc
const refSeqs = await this.refSeqModel.find({ assembly }).exec()
const refSeqIds = refSeqs.map((refSeq) => refSeq._id)
let printFasta = true
let printSeqName = true
const { SEQ_LINE_LEN } = process.env
if (!SEQ_LINE_LEN) {
throw new NotFoundException()
}
const seqLineLenght = Number(SEQ_LINE_LEN)
const headerStream = new Readable({ objectMode: true })
const sequenceStream = new Readable({ objectMode: true })

// Implement the _read() method for the custom stream. This is just a placeholder; we can leave it empty
// eslint-disable-next-line @typescript-eslint/no-empty-function
headerStream._read = function () {}
// eslint-disable-next-line @typescript-eslint/no-empty-function
sequenceStream._read = function () {}

headerStream.push('##gff-version 3\n')
for (const refSeqDoc of refSeqs) {
headerStream.push(
`##sequence-region ${refSeqDoc.name} 1 ${refSeqDoc.length}\n`,
)
let remainingLastLine = ''
if (printFasta) {
sequenceStream.push('##FASTA\n')
}
for await (const doc of this.refSeqChunksModel
.find({ refSeq: refSeqDoc.id })
.sort({ n: 1 })
.cursor()) {
if (printSeqName) {
refSeqDoc.description
? sequenceStream.push(
`>${refSeqDoc.name} ${refSeqDoc.description}\n`,
)
: sequenceStream.push(`>${refSeqDoc.name}\n`)
}
let seqLine = doc.sequence
// If previous's chunk last line was not "seqLineLenght" characters long then take the first characters from the first line and make one full line
if (remainingLastLine.length > 0) {
const tmp1: string = doc.sequence.slice(
0,
seqLineLenght - remainingLastLine.length,
)
sequenceStream.push(`${remainingLastLine}${tmp1}\n`)
seqLine = doc.sequence.slice(seqLineLenght - remainingLastLine.length)
remainingLastLine = ''
}
const seqData = this.splitStringIntoChunks(seqLine, seqLineLenght)
const lines: string[] = seqData.split('\n')
const lastLine: string = lines.at(-1) ?? ''
if (lastLine.length === seqLineLenght) {
sequenceStream.push(`${seqData}\n`)
} else {
for (let i = 0; i < lines.length - 1; i++) {
sequenceStream.push(`${lines[i]}\n`)
}
remainingLastLine = lastLine

const headerStream = pipeline(
this.refSeqModel.find({ assembly }).cursor(),
new Transform({
objectMode: true,
construct(callback) {
this.push('##gff-version 3\n')
callback()
},
transform(chunk: RefSeqDocument, encoding, callback) {
this.push(`##sequence-region ${chunk.name} 1 ${chunk.length}\n`)
callback()
},
}),
(error) => {
if (error) {
this.logger.error('GFF3 export failed')
this.logger.error(error)
}
printSeqName = false
}
if (remainingLastLine.length > 0) {
sequenceStream.push(`${remainingLastLine}\n`)
remainingLastLine = ''
}
printFasta = false
printSeqName = true
}
headerStream.push(null)
sequenceStream.push(null)
},
)

const query = { refSeq: { $in: refSeqIds } }
const featureStream = pipeline(
// unicorn thinks this is an Array.prototype.find, so we ignore it
// eslint-disable-next-line unicorn/no-array-callback-reference
this.featureModel.find(query).cursor(),
new Transform({
writableObjectMode: true,
readableObjectMode: true,
transform: (chunk, encoding, callback) => {
objectMode: true,
transform: (chunk: FeatureDocument, encoding, callback) => {
try {
const flattened = chunk.toObject({ flattenMaps: true })
const gff3Feature = makeGFF3Feature(flattened, refSeqs)
Expand All @@ -318,12 +345,29 @@ export class FeaturesService {
},
)

const combinedStream = new StreamConcat([
const sequenceStream = pipeline(
this.refSeqChunksModel
// unicorn thinks this is an Array.prototype.find, so we ignore it
// eslint-disable-next-line unicorn/no-array-callback-reference
.find(query)
.sort({ refSeq: 1, n: 1 })
.populate('refSeq')
.cursor(),
new FastaTransform({ fastaWidth }),
(error) => {
if (error) {
this.logger.error('GFF3 export failed')
this.logger.error(error)
}
},
)

const combinedStream: Readable = new StreamConcat([
headerStream,
featureStream,
sequenceStream,
])
return [combinedStream, assembly]
return [combinedStream, assembly.toString()]
}

/**
Expand Down

0 comments on commit 699a5e8

Please sign in to comment.