Skip to content

Commit

Permalink
Adjust useNewerProcessingPipelineIfPossible to only update table when…
Browse files Browse the repository at this point in the history
… there are changes
  • Loading branch information
anna-parker committed Sep 10, 2024
1 parent c154095 commit da53f02
Showing 1 changed file with 31 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import org.jetbrains.exposed.sql.Database
import org.jetbrains.exposed.sql.JoinType
import org.jetbrains.exposed.sql.Op
import org.jetbrains.exposed.sql.SortOrder
import org.jetbrains.exposed.sql.SqlExpressionBuilder.eq
import org.jetbrains.exposed.sql.SqlExpressionBuilder.less
import org.jetbrains.exposed.sql.SqlExpressionBuilder.plus
import org.jetbrains.exposed.sql.alias
Expand Down Expand Up @@ -66,6 +67,7 @@ import org.loculus.backend.service.datauseterms.DataUseTermsTable
import org.loculus.backend.service.groupmanagement.GroupEntity
import org.loculus.backend.service.groupmanagement.GroupManagementDatabaseService
import org.loculus.backend.service.groupmanagement.GroupManagementPreconditionValidator
import org.loculus.backend.service.submission.SequenceEntriesTable.originalDataColumn
import org.loculus.backend.utils.Accession
import org.loculus.backend.utils.Version
import org.loculus.backend.utils.toTimestamp
Expand Down Expand Up @@ -181,6 +183,7 @@ class SubmissionDatabaseService(

private fun updateStatusToProcessing(sequenceEntries: List<UnprocessedData>, pipelineVersion: Long) {
val now = Clock.System.now().toLocalDateTime(TimeZone.UTC)
log.info { "updating status to processing. Number of sequence entries: ${sequenceEntries.size}" }

SequenceEntriesPreprocessedDataTable.batchInsert(sequenceEntries) {
this[SequenceEntriesPreprocessedDataTable.accessionColumn] = it.accession
Expand Down Expand Up @@ -986,7 +989,6 @@ class SubmissionDatabaseService(

transaction {
// Check if there are any stale sequences before attempting to delete
// Check if there are any stale sequences before issuing a delete
val staleSequencesExist = SequenceEntriesPreprocessedDataTable
.selectAll()
.where {
Expand All @@ -1011,10 +1013,8 @@ class SubmissionDatabaseService(
fun useNewerProcessingPipelineIfPossible(): Long? {
log.info("Checking for newer processing pipeline versions")
val sql = """
update current_processing_pipeline
set
version = newest.version,
started_using_at = now()
select
newest.version as version
from
(
select max(pipeline_version) as version
Expand Down Expand Up @@ -1047,16 +1047,37 @@ class SubmissionDatabaseService(
and this.processing_status = 'FINISHED'
)
)
) as newest
where
current_processing_pipeline.version != newest.version AND newest.version is not null
returning current_processing_pipeline.version;
) as newest;
""".trimIndent()
var newVersion: Long? = null
transaction {
exec(sql, explicitStatementType = StatementType.SELECT) { rs ->
if (rs.next()) {
newVersion = rs.getLong("version")
val version = rs.getLong("version")
// Handle null value correctly
newVersion = if (rs.wasNull()) null else version
}
}
val pipelineNeedsUpdate = if (newVersion != null) {
CurrentProcessingPipelineTable
.selectAll().where { CurrentProcessingPipelineTable.versionColumn neq newVersion!! }
.limit(1)
.empty()
.not()
} else {
false
}
if (pipelineNeedsUpdate) {
log.info { "Updating current processing pipeline to newer version: $newVersion" }
val now = Clock.System.now().toLocalDateTime(TimeZone.UTC)
CurrentProcessingPipelineTable.update(
where = {
CurrentProcessingPipelineTable.versionColumn neq
newVersion!!
},
) {
it[versionColumn] = newVersion ?: 0L
it[startedUsingAtColumn] = now
}
}
}
Expand Down

0 comments on commit da53f02

Please sign in to comment.