Skip to content

Commit

Permalink
refactor fromItemsToCheckpointWithPartitioningDTO
Browse files Browse the repository at this point in the history
  • Loading branch information
salamonpavel committed Nov 22, 2024
1 parent 30c0766 commit 93a64f0
Showing 1 changed file with 36 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package za.co.absa.atum.server.model

import io.circe.{DecodingFailure, Json}
import io.circe.Json
import za.co.absa.atum.model.dto.{
CheckpointWithPartitioningDTO,
MeasureDTO,
Expand Down Expand Up @@ -50,6 +50,26 @@ object CheckpointItemWithPartitioningFromDB {
private def fromItemsToCheckpointWithPartitioningDTO(
checkpointItems: Seq[CheckpointItemWithPartitioningFromDB]
): Either[Throwable, CheckpointWithPartitioningDTO] = {
for {
measurements <- extractMeasurements(checkpointItems)
partitioning <- extractPartitioning(checkpointItems)
} yield {
CheckpointWithPartitioningDTO(
id = checkpointItems.head.idCheckpoint,
name = checkpointItems.head.checkpointName,
author = checkpointItems.head.author,
measuredByAtumAgent = checkpointItems.head.measuredByAtumAgent,
processStartTime = checkpointItems.head.checkpointStartTime,
processEndTime = checkpointItems.head.checkpointEndTime,
measurements = measurements.toSet,
partitioning
)
}
}

private def extractMeasurements(
checkpointItems: Seq[CheckpointItemWithPartitioningFromDB]
): Either[Throwable, Seq[MeasurementDTO]] = {
val measurementsOrErr = checkpointItems.map { checkpointItem =>
checkpointItem.measurementValue.as[MeasureResultDTO].map { measureResult =>
MeasurementDTO(
Expand All @@ -61,40 +81,22 @@ object CheckpointItemWithPartitioningFromDB {
)
}
}
val partitioningOrErr: Either[DecodingFailure, PartitioningWithIdDTO] = {
val decodingResult = checkpointItems.head.partitioning.as[PartitioningForDB]
decodingResult.map{ partitioningForDB =>
val partitioningDTO = partitioningForDB.keys.map { key =>
PartitionDTO(key, partitioningForDB.keysToValuesMap(key))
}
PartitioningWithIdDTO(
id = checkpointItems.head.idPartitioning,
partitioning = partitioningDTO,
author = checkpointItems.head.partitioningAuthor
)
}
}

}

val measurementsErrors = measurementsOrErr.collect { case Left(err) => err }
val errors = measurementsErrors ++ partitioningOrErr.left.toSeq
measurementsOrErr
.collectFirst { case Left(err) => Left(err) }
.getOrElse(Right(measurementsOrErr.collect { case Right(measurement) => measurement }))
}

if (errors.nonEmpty) {
Left(measurementsErrors.head)
} else {
val measurements = measurementsOrErr.collect { case Right(measurement) => measurement }.toSet
Right(
CheckpointWithPartitioningDTO(
id = checkpointItems.head.idCheckpoint,
name = checkpointItems.head.checkpointName,
author = checkpointItems.head.author,
measuredByAtumAgent = checkpointItems.head.measuredByAtumAgent,
processStartTime = checkpointItems.head.checkpointStartTime,
processEndTime = checkpointItems.head.checkpointEndTime,
measurements = measurements,
partitioningOrErr.toOption.get
)
private def extractPartitioning(
checkpointItems: Seq[CheckpointItemWithPartitioningFromDB]
): Either[Throwable, PartitioningWithIdDTO] = {
checkpointItems.head.partitioning.as[PartitioningForDB].map { partitioningForDB =>
val partitioningDTO = partitioningForDB.keys.map { key =>
PartitionDTO(key, partitioningForDB.keysToValuesMap(key))
}
PartitioningWithIdDTO(
id = checkpointItems.head.idPartitioning,
partitioning = partitioningDTO,
author = checkpointItems.head.partitioningAuthor
)
}
}
Expand Down

0 comments on commit 93a64f0

Please sign in to comment.