Skip to content

Commit

Permalink
sketch: do not hide metadata processing in sequence compression function
Browse files Browse the repository at this point in the history
  • Loading branch information
fengelniederhammer committed Nov 19, 2024
1 parent b8090dc commit 448b0eb
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 32 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package org.loculus.backend.service.submission

import com.fasterxml.jackson.databind.node.NullNode
import com.github.luben.zstd.Zstd
import org.loculus.backend.api.GeneticSequence
import org.loculus.backend.api.Organism
Expand Down Expand Up @@ -85,40 +84,34 @@ class CompressionService(private val backendConfig: BackendConfig) {
},
)

fun decompressProcessedData(processedData: ProcessedData<CompressedSequence>, organism: Organism) = ProcessedData(
backendConfig
.getInstanceConfig(organism)
.schema
.metadata
.map { it.name }
.associateWith { fieldName ->
processedData.metadata[fieldName] ?: NullNode.instance
},
processedData
.unalignedNucleotideSequences.mapValues { (segmentName, sequenceData) ->
fun decompressSequencesInProcessedData(processedData: ProcessedData<CompressedSequence>, organism: Organism) =
ProcessedData(
processedData.metadata,
processedData
.unalignedNucleotideSequences.mapValues { (segmentName, sequenceData) ->
when (sequenceData) {
null -> null
else -> decompressNucleotideSequence(sequenceData, segmentName, organism)
}
},
processedData.alignedNucleotideSequences.mapValues { (segmentName, sequenceData) ->
when (sequenceData) {
null -> null
else -> decompressNucleotideSequence(sequenceData, segmentName, organism)
}
},
processedData.alignedNucleotideSequences.mapValues { (segmentName, sequenceData) ->
when (sequenceData) {
null -> null
else -> decompressNucleotideSequence(sequenceData, segmentName, organism)
}
},
processedData.nucleotideInsertions,
processedData.alignedAminoAcidSequences.mapValues { (gene, sequenceData) ->
when (sequenceData) {
null -> null
else -> decompressAminoAcidSequence(sequenceData, gene, organism)
}
},
processedData.aminoAcidInsertions,
)
processedData.nucleotideInsertions,
processedData.alignedAminoAcidSequences.mapValues { (gene, sequenceData) ->
when (sequenceData) {
null -> null
else -> decompressAminoAcidSequence(sequenceData, gene, organism)
}
},
processedData.aminoAcidInsertions,
)

fun compressProcessedData(processedData: ProcessedData<String>, organism: Organism) = ProcessedData(
processedData.metadata.filterNot { (_, value) -> value.isNull },
fun compressSequencesInProcessedData(processedData: ProcessedData<String>, organism: Organism) = ProcessedData(
processedData.metadata,
processedData
.unalignedNucleotideSequences.mapValues { (segmentName, sequenceData) ->
when (sequenceData) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package org.loculus.backend.service.submission

import org.loculus.backend.api.Organism
import org.loculus.backend.api.ProcessedData
import org.springframework.stereotype.Service

@Service
class ProcessedDataPostprocessor(
private val compressionService: CompressionService,
private val processedMetadataPostprocessor: ProcessedMetadataPostprocessor,
) {
fun prepareForStorage(processedData: ProcessedData<String>, organism: Organism) = processedData
.let { compressionService.compressSequencesInProcessedData(it, organism) }
.let { processedMetadataPostprocessor.stripNullValuesFromMetadata(it) }

fun retrieveFromStoredValue(storedValue: ProcessedData<CompressedSequence>, organism: Organism) = storedValue
.let { processedMetadataPostprocessor.addMissingMetadataAsNull(it, organism) }
.let { compressionService.decompressSequencesInProcessedData(it, organism) }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package org.loculus.backend.service.submission

import com.fasterxml.jackson.databind.node.NullNode
import org.loculus.backend.api.Organism
import org.loculus.backend.api.ProcessedData
import org.loculus.backend.config.BackendConfig
import org.springframework.stereotype.Service

@Service
class ProcessedMetadataPostprocessor(
private val backendConfig: BackendConfig,
) {
fun <SequenceType> stripNullValuesFromMetadata(processedData: ProcessedData<SequenceType>) =
processedData.copy(metadata = processedData.metadata.filterNot { (_, value) -> value.isNull })

fun <SequenceType> addMissingMetadataAsNull(processedData: ProcessedData<SequenceType>, organism: Organism) =
processedData.copy(
metadata = backendConfig
.getInstanceConfig(organism)
.schema
.metadata
.map { it.name }
.associateWith { fieldName ->
processedData.metadata[fieldName] ?: NullNode.instance
},
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ class SubmissionDatabaseService(
pool: DataSource,
private val emptyProcessedDataProvider: EmptyProcessedDataProvider,
private val compressionService: CompressionService,
private val processedDataPostprocessor: ProcessedDataPostprocessor,
private val auditLogger: AuditLogger,
private val dateProvider: DateProvider,
@Value("\${${BackendSpringProperty.STREAM_BATCH_SIZE}}") private val streamBatchSize: Int,
Expand Down Expand Up @@ -332,7 +333,7 @@ class SubmissionDatabaseService(
) {
it[processingStatusColumn] = PROCESSED.name
it[processedDataColumn] =
compressionService.compressProcessedData(processedData, organism)
processedDataPostprocessor.prepareForStorage(processedData, organism)
it[errorsColumn] = submittedErrors
it[warningsColumn] = submittedWarnings
it[finishedProcessingAtColumn] = dateProvider.getCurrentDateTime()
Expand Down Expand Up @@ -605,7 +606,7 @@ class SubmissionDatabaseService(
submissionId = it[SequenceEntriesView.submissionIdColumn],
processedData = when (val processedData = it[SequenceEntriesView.jointDataColumn]) {
null -> emptyProcessedDataProvider.provide(organism)
else -> compressionService.decompressProcessedData(processedData, organism)
else -> processedDataPostprocessor.retrieveFromStoredValue(processedData, organism)
},
submittedAtTimestamp = it[SequenceEntriesView.submittedAtTimestampColumn],
releasedAtTimestamp = it[SequenceEntriesView.releasedAtTimestampColumn]!!,
Expand Down Expand Up @@ -979,7 +980,7 @@ class SubmissionDatabaseService(
version = selectedSequenceEntry[SequenceEntriesView.versionColumn],
status = Status.fromString(selectedSequenceEntry[SequenceEntriesView.statusColumn]),
groupId = selectedSequenceEntry[SequenceEntriesView.groupIdColumn],
processedData = compressionService.decompressProcessedData(
processedData = processedDataPostprocessor.retrieveFromStoredValue(
selectedSequenceEntry[SequenceEntriesView.processedDataColumn]!!,
organism,
),
Expand Down

0 comments on commit 448b0eb

Please sign in to comment.