Skip to content

Commit

Permalink
Custom (Unknown) Measure (#165)
Browse files Browse the repository at this point in the history
UknownMeasure
  • Loading branch information
salamonpavel authored Mar 12, 2024
1 parent a7f3f64 commit 69897b5
Show file tree
Hide file tree
Showing 5 changed files with 21 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ class AtumContext private[agent] (
* @param measurements the measurements to be included in the checkpoint
* @return the AtumContext after the checkpoint has been created
*/
def createCheckpointOnProvidedData(checkpointName: String, measurements: Map[AtumMeasure, MeasureResult]): AtumContext = {
def createCheckpointOnProvidedData(checkpointName: String, measurements: Map[Measure, MeasureResult]): AtumContext = {
val dateTimeNow = ZonedDateTime.now()

val checkpointDTO = CheckpointDTO(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,14 @@ import za.co.absa.atum.model.dto.MeasureResultDTO.ResultValueType
sealed trait Measure {
val measureName: String
def measuredColumns: Seq[String]
}

trait AtumMeasure extends Measure with MeasurementProcessor {
val resultValueType: ResultValueType
}

trait AtumMeasure extends Measure with MeasurementProcessor

final case class UnknownMeasure(measureName: String, measuredColumns: Seq[String], resultValueType: ResultValueType)
extends Measure

object AtumMeasure {

val supportedMeasureNames: Seq[String] = Seq(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import za.co.absa.atum.model.dto.{MeasureDTO, MeasureResultDTO, MeasurementDTO}
*/
private [agent] object MeasurementBuilder {

private def validateMeasureAndResultTypeCompatibility(measure: AtumMeasure, result: MeasureResult): Unit = {
private def validateMeasureAndResultTypeCompatibility(measure: Measure, result: MeasureResult): Unit = {
val requiredType = measure.resultValueType
val actualType = result.resultValueType

Expand All @@ -35,7 +35,7 @@ private [agent] object MeasurementBuilder {
)
}

def buildMeasurementDTO(measure: AtumMeasure, measureResult: MeasureResult): MeasurementDTO = {
def buildMeasurementDTO(measure: Measure, measureResult: MeasureResult): MeasurementDTO = {
validateMeasureAndResultTypeCompatibility(measure, measureResult)

val measureDTO = MeasureDTO(measure.measureName, measure.measuredColumns)
Expand All @@ -46,8 +46,8 @@ private [agent] object MeasurementBuilder {
MeasurementDTO(measureDTO, measureResultDTO)
}

def buildAndValidateMeasurementsDTO(measurements: Map[AtumMeasure, MeasureResult]): Set[MeasurementDTO] = {
measurements.toSet[(AtumMeasure, MeasureResult)].map { case (measure: AtumMeasure, measureResult: MeasureResult) =>
def buildAndValidateMeasurementsDTO(measurements: Map[Measure, MeasureResult]): Set[MeasurementDTO] = {
measurements.toSet[(Measure, MeasureResult)].map { case (measure: Measure, measureResult: MeasureResult) =>
validateMeasureAndResultTypeCompatibility(measure, measureResult)
buildMeasurementDTO(measure, measureResult)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ import org.mockito.Mockito.{mock, times, verify, when}
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import za.co.absa.atum.agent.AtumContext.AtumPartitions
import za.co.absa.atum.agent.model.{AtumMeasure, MeasureResult, MeasurementBuilder}
import za.co.absa.atum.agent.model.AtumMeasure._
import za.co.absa.atum.agent.model.AtumMeasure.{RecordCount, SumOfValuesOfColumn}
import za.co.absa.atum.agent.model.{Measure, MeasureResult, MeasurementBuilder, UnknownMeasure}
import za.co.absa.atum.model.dto.CheckpointDTO
import za.co.absa.atum.model.dto.MeasureResultDTO.ResultValueType

Expand Down Expand Up @@ -112,9 +112,10 @@ class AtumContextTest extends AnyFlatSpec with Matchers {
val atumPartitions = AtumPartitions("key" -> "value")
val atumContext: AtumContext = new AtumContext(atumPartitions, mockAgent)

val measurements: Map[AtumMeasure, MeasureResult] = Map(
val measurements: Map[Measure, MeasureResult] = Map(
RecordCount("col") -> MeasureResult(1L),
SumOfValuesOfColumn("col") -> MeasureResult(BigDecimal(1))
SumOfValuesOfColumn("col") -> MeasureResult(BigDecimal(1)),
UnknownMeasure("customMeasureName", Seq("col"), ResultValueType.BigDecimal) -> MeasureResult(BigDecimal(1))
)

atumContext.createCheckpointOnProvidedData(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,11 @@ class MeasurementBuilderTest extends AnyFlatSpec {
}

"buildAndValidateMeasurementsDTO" should "build Seq[MeasurementDTO] for multiple measures, all unique" in {
val measurements: Map[AtumMeasure, MeasureResult] = Map(
val measurements: Map[Measure, MeasureResult] = Map(
DistinctRecordCount(Seq("col")) -> MeasureResult("1", ResultValueType.Long),
SumOfValuesOfColumn("col1") -> MeasureResult(BigDecimal(1.2)),
SumOfValuesOfColumn("col2") -> MeasureResult(BigDecimal(1.3))
SumOfValuesOfColumn("col2") -> MeasureResult(BigDecimal(1.3)),
UnknownMeasure("unknownMeasure", Seq("col"), ResultValueType.BigDecimal) -> MeasureResult(BigDecimal(1.1))
)
val measurementDTOs = MeasurementBuilder.buildAndValidateMeasurementsDTO(measurements)

Expand All @@ -103,6 +104,9 @@ class MeasurementBuilderTest extends AnyFlatSpec {
),
MeasurementDTO(
MeasureDTO("aggregatedTotal", Seq("col2")), MeasureResultDTO(TypedValue("1.3", ResultValueType.BigDecimal))
),
MeasurementDTO(
MeasureDTO("unknownMeasure", Seq("col")), MeasureResultDTO(TypedValue("1.1", ResultValueType.BigDecimal))
)
)

Expand Down

0 comments on commit 69897b5

Please sign in to comment.