Skip to content

Commit

Permalink
#116: controlCol -> measuredCol (#118)
Browse files Browse the repository at this point in the history
* #116: controlCol -> measuredCol

* #116: measuredCol -> measuredColumn,, for the sake of consistency
  • Loading branch information
lsulak authored Nov 12, 2023
1 parent 355feb7 commit f09fc4d
Show file tree
Hide file tree
Showing 8 changed files with 58 additions and 59 deletions.
4 changes: 2 additions & 2 deletions agent/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ Create multiple `AtumContext` with different control measures to be applied
### Option 1
```scala
val atumContextInstanceWithRecordCount = AtumContext(processor = processor)
.withMeasureAdded(RecordCount(MockMeasureNames.recordCount1, controlCol = "id"))
.withMeasureAdded(RecordCount(MockMeasureNames.recordCount1, measuredColumn = "id"))

val atumContextWithSalaryAbsMeasure = atumContextInstanceWithRecordCount
.withMeasureAdded(AbsSumOfValuesOfColumn(controlCol = "salary"))
.withMeasureAdded(AbsSumOfValuesOfColumn(measuredColumn = "salary"))
```

### Option 2
Expand Down
62 changes: 31 additions & 31 deletions agent/src/main/scala/za/co/absa/atum/agent/model/Measure.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import za.co.absa.spark.commons.implicits.StructTypeImplicits.StructTypeEnhancem
* Type of different measures to be applied to the columns.
*/
sealed trait Measure extends MeasurementProcessor with MeasureType {
val controlCol: String
val measuredColumn: String
}

trait MeasureType {
Expand All @@ -50,107 +50,107 @@ object Measure {
val supportedMeasureNames: Seq[String] = supportedMeasures.map(_.measureName)

case class RecordCount private (
controlCol: String,
measureName: String,
resultValueType: ResultValueType.ResultValueType
measuredColumn: String,
measureName: String,
resultValueType: ResultValueType.ResultValueType
) extends Measure {

override def function: MeasurementFunction =
(ds: DataFrame) => {
val resultValue = ds.select(col(controlCol)).count().toString
val resultValue = ds.select(col(measuredColumn)).count().toString
ResultOfMeasurement(resultValue, resultValueType)
}
}
object RecordCount extends MeasureType {
def apply(controlCol: String): RecordCount = RecordCount(controlCol, measureName, resultValueType)
def apply(measuredColumn: String): RecordCount = RecordCount(measuredColumn, measureName, resultValueType)

override val measureName: String = "count"
override val resultValueType: ResultValueType.ResultValueType = ResultValueType.Long
}

case class DistinctRecordCount private (
controlCol: String,
measureName: String,
resultValueType: ResultValueType.ResultValueType
measuredColumn: String,
measureName: String,
resultValueType: ResultValueType.ResultValueType
) extends Measure {

override def function: MeasurementFunction =
(ds: DataFrame) => {
val resultValue = ds.select(col(controlCol)).distinct().count().toString
val resultValue = ds.select(col(measuredColumn)).distinct().count().toString
ResultOfMeasurement(resultValue, resultValueType)
}
}
object DistinctRecordCount extends MeasureType {
def apply(controlCol: String): DistinctRecordCount = {
DistinctRecordCount(controlCol, measureName, resultValueType)
def apply(measuredColumn: String): DistinctRecordCount = {
DistinctRecordCount(measuredColumn, measureName, resultValueType)
}

override val measureName: String = "distinctCount"
override val resultValueType: ResultValueType.ResultValueType = ResultValueType.Long
}

case class SumOfValuesOfColumn private (
controlCol: String,
measureName: String,
resultValueType: ResultValueType.ResultValueType
measuredColumn: String,
measureName: String,
resultValueType: ResultValueType.ResultValueType
) extends Measure {

override def function: MeasurementFunction = (ds: DataFrame) => {
val aggCol = sum(col(valueColumnName))
val resultValue = aggregateColumn(ds, controlCol, aggCol)
val resultValue = aggregateColumn(ds, measuredColumn, aggCol)
ResultOfMeasurement(resultValue, resultValueType)
}
}
object SumOfValuesOfColumn extends MeasureType {
def apply(controlCol: String): SumOfValuesOfColumn = {
SumOfValuesOfColumn(controlCol, measureName, resultValueType)
def apply(measuredColumn: String): SumOfValuesOfColumn = {
SumOfValuesOfColumn(measuredColumn, measureName, resultValueType)
}

override val measureName: String = "aggregatedTotal"
override val resultValueType: ResultValueType.ResultValueType = ResultValueType.BigDecimal
}

case class AbsSumOfValuesOfColumn private (
controlCol: String,
measureName: String,
resultValueType: ResultValueType.ResultValueType
measuredColumn: String,
measureName: String,
resultValueType: ResultValueType.ResultValueType
) extends Measure {

override def function: MeasurementFunction = (ds: DataFrame) => {
val aggCol = sum(abs(col(valueColumnName)))
val resultValue = aggregateColumn(ds, controlCol, aggCol)
val resultValue = aggregateColumn(ds, measuredColumn, aggCol)
ResultOfMeasurement(resultValue, resultValueType)
}
}
object AbsSumOfValuesOfColumn extends MeasureType {
def apply(controlCol: String): AbsSumOfValuesOfColumn = {
AbsSumOfValuesOfColumn(controlCol, measureName, resultValueType)
def apply(measuredColumn: String): AbsSumOfValuesOfColumn = {
AbsSumOfValuesOfColumn(measuredColumn, measureName, resultValueType)
}

override val measureName: String = "absAggregatedTotal"
override val resultValueType: ResultValueType.ResultValueType = ResultValueType.Double
}

case class SumOfHashesOfColumn private (
controlCol: String,
measureName: String,
resultValueType: ResultValueType.ResultValueType
measuredColumn: String,
measureName: String,
resultValueType: ResultValueType.ResultValueType
) extends Measure {

override def function: MeasurementFunction = (ds: DataFrame) => {

val aggregatedColumnName = ds.schema.getClosestUniqueName("sum_of_hashes")
val value = ds
.withColumn(aggregatedColumnName, crc32(col(controlCol).cast("String")))
.withColumn(aggregatedColumnName, crc32(col(measuredColumn).cast("String")))
.agg(sum(col(aggregatedColumnName)))
.collect()(0)(0)
val resultValue = if (value == null) "" else value.toString
ResultOfMeasurement(resultValue, ResultValueType.String)
}
}
object SumOfHashesOfColumn extends MeasureType {
def apply(controlCol: String): SumOfHashesOfColumn = {
SumOfHashesOfColumn(controlCol, measureName, resultValueType)
def apply(measuredColumn: String): SumOfHashesOfColumn = {
SumOfHashesOfColumn(measuredColumn, measureName, resultValueType)
}

override val measureName: String = "hashCrc32"
Expand All @@ -169,7 +169,7 @@ object Measure {
// scala> sc.parallelize(List(Long.MaxValue, 1)).toDF.agg(sum("value")).take(1)(0)(0)
// res11: Any = -9223372036854775808
// Converting to BigDecimal fixes the issue
// val ds2 = ds.select(col(measurement.controlCol).cast(DecimalType(38, 0)).as("value"))
// val ds2 = ds.select(col(measurement.measuredColumn).cast(DecimalType(38, 0)).as("value"))
// ds2.agg(sum(abs($"value"))).collect()(0)(0)
val ds2 = ds.select(
col(measureColumn).cast(DecimalType(38, 0)).as(valueColumnName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ private [agent] object MeasurementBuilder {

private [agent] def buildMeasurementDTO(measurement: Measurement): MeasurementDTO = {
val measureName = measurement.measure.measureName
val controlCols = Seq(measurement.measure.controlCol)
val measureDTO = MeasureDTO(measureName, controlCols)
val measuredColumns = Seq(measurement.measure.measuredColumn)
val measureDTO = MeasureDTO(measureName, measuredColumns)

val measureResultDTO = MeasureResultDTO(TypedValue(measurement.resultValue.toString, measurement.resultType))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,14 @@ private [agent] object MeasuresBuilder {
}

private def createMeasure(measure: dto.MeasureDTO): za.co.absa.atum.agent.model.Measure = {
val controlColumn = measure.controlColumns.head
val measuredColumn = measure.measuredColumns.head

measure.measureName match {
case RecordCount.measureName => RecordCount(controlColumn)
case DistinctRecordCount.measureName => DistinctRecordCount(controlColumn)
case SumOfValuesOfColumn.measureName => SumOfValuesOfColumn(controlColumn)
case AbsSumOfValuesOfColumn.measureName => AbsSumOfValuesOfColumn(controlColumn)
case SumOfHashesOfColumn.measureName => SumOfHashesOfColumn(controlColumn)
case RecordCount.measureName => RecordCount(measuredColumn)
case DistinctRecordCount.measureName => DistinctRecordCount(measuredColumn)
case SumOfValuesOfColumn.measureName => SumOfValuesOfColumn(measuredColumn)
case AbsSumOfValuesOfColumn.measureName => AbsSumOfValuesOfColumn(measuredColumn)
case SumOfHashesOfColumn.measureName => SumOfHashesOfColumn(measuredColumn)
case unsupportedMeasure =>
throw MeasureException(
s"Measure not supported: $unsupportedMeasure. Supported measures are: ${Measure.supportedMeasureNames}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,11 @@ class MeasureTest extends AnyFlatSpec with Matchers with SparkTestBase { self =>
"Measure" should "be based on the dataframe" in {

// Measures
val measureIds: Measure = RecordCount(controlCol = "id")
val salaryAbsSum: Measure = AbsSumOfValuesOfColumn(
controlCol = "salary"
)
val salarySum = SumOfValuesOfColumn(controlCol = "salary")
val sumOfHashes: Measure = SumOfHashesOfColumn(controlCol = "id")
val measureIds: Measure = RecordCount(measuredColumn = "id")
val salaryAbsSum: Measure = AbsSumOfValuesOfColumn(measuredColumn = "salary")

val salarySum = SumOfValuesOfColumn(measuredColumn = "salary")
val sumOfHashes: Measure = SumOfHashesOfColumn(measuredColumn = "id")

// AtumContext contains `Measurement`
val atumContextInstanceWithRecordCount = AtumAgent
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,5 @@ package za.co.absa.atum.model.dto

case class MeasureDTO(
measureName: String,
controlColumns: Seq[String]
measuredColumns: Seq[String]
)
Original file line number Diff line number Diff line change
Expand Up @@ -82,14 +82,14 @@ class SerializationUtilsTest extends AnyFlatSpecLike {
measures = seqMeasureDTO
)

val expectedAdditionalDataJson = "{\"partitioning\":[{\"key\":\"key\",\"value\":\"val\"}],\"measures\":[{\"measureName\":\"count\",\"controlColumns\":[\"col\"]}],\"additionalData\":{\"additionalData\":{}}}"
val expectedAdditionalDataJson = "{\"partitioning\":[{\"key\":\"key\",\"value\":\"val\"}],\"measures\":[{\"measureName\":\"count\",\"measuredColumns\":[\"col\"]}],\"additionalData\":{\"additionalData\":{}}}"
val actualAdditionalDataJson = SerializationUtils.asJson(atumContextDTO)

assert(expectedAdditionalDataJson == actualAdditionalDataJson)
}

"fromJson" should "deserialize AtumContextDTO from json string" in {
val atumContextDTOJson = "{\"partitioning\":[{\"key\":\"key\",\"value\":\"val\"}],\"measures\":[{\"measureName\":\"count\",\"controlColumns\":[\"col\"]}],\"additionalData\":{\"additionalData\":{}}}"
val atumContextDTOJson = "{\"partitioning\":[{\"key\":\"key\",\"value\":\"val\"}],\"measures\":[{\"measureName\":\"count\",\"measuredColumns\":[\"col\"]}],\"additionalData\":{\"additionalData\":{}}}"

val seqPartitionDTO = Seq(PartitionDTO("key", "val"))
val seqMeasureDTO = Set(MeasureDTO("count", Seq("col")))
Expand Down Expand Up @@ -150,7 +150,7 @@ class SerializationUtilsTest extends AnyFlatSpecLike {
measurements = seqMeasurementDTO
)

val expectedCheckpointDTOJson = "{\"id\":\"" + uuid + "\",\"name\":\"checkpoint\",\"author\":\"author\",\"measuredByAtumAgent\":true,\"partitioning\":[{\"key\":\"key\",\"value\":\"val\"}],\"processStartTime\":\"2023-10-24 10:20:59.005000+02\",\"processEndTime\":\"2023-10-24 10:20:59.005000+02\",\"measurements\":[{\"measure\":{\"measureName\":\"count\",\"controlColumns\":[\"col\"]},\"result\":{\"mainValue\":{\"value\":\"1\",\"valueType\":\"Long\"},\"supportValues\":{}}}]}"
val expectedCheckpointDTOJson = "{\"id\":\"" + uuid + "\",\"name\":\"checkpoint\",\"author\":\"author\",\"measuredByAtumAgent\":true,\"partitioning\":[{\"key\":\"key\",\"value\":\"val\"}],\"processStartTime\":\"2023-10-24 10:20:59.005000+02\",\"processEndTime\":\"2023-10-24 10:20:59.005000+02\",\"measurements\":[{\"measure\":{\"measureName\":\"count\",\"measuredColumns\":[\"col\"]},\"result\":{\"mainValue\":{\"value\":\"1\",\"valueType\":\"Long\"},\"supportValues\":{}}}]}"
val actualCheckpointDTOJson = SerializationUtils.asJson(checkpointDTO)

assert(expectedCheckpointDTOJson == actualCheckpointDTOJson)
Expand All @@ -161,7 +161,7 @@ class SerializationUtilsTest extends AnyFlatSpecLike {
val seqPartitionDTO = Seq(PartitionDTO("key", "val"))
val timeWithZone = OffsetDateTime.of(2023, 10, 24, 10, 20, 59, 5000000, ZoneOffset.ofHours(2))

val checkpointDTOJson = "{\"id\":\"" + uuid + "\",\"name\":\"checkpoint\",\"author\":\"author\",\"measuredByAtumAgent\":true,\"partitioning\":[{\"key\":\"key\",\"value\":\"val\"}],\"processStartTime\":\"2023-10-24 10:20:59.005000+02\",\"processEndTime\":\"2023-10-24 10:20:59.005000+02\",\"measurements\":[{\"measure\":{\"measureName\":\"count\",\"controlColumns\":[\"col\"]},\"result\":{\"mainValue\":{\"value\":\"1\",\"valueType\":\"Long\"},\"supportValues\":{}}}]}"
val checkpointDTOJson = "{\"id\":\"" + uuid + "\",\"name\":\"checkpoint\",\"author\":\"author\",\"measuredByAtumAgent\":true,\"partitioning\":[{\"key\":\"key\",\"value\":\"val\"}],\"processStartTime\":\"2023-10-24 10:20:59.005000+02\",\"processEndTime\":\"2023-10-24 10:20:59.005000+02\",\"measurements\":[{\"measure\":{\"measureName\":\"count\",\"measuredColumns\":[\"col\"]},\"result\":{\"mainValue\":{\"value\":\"1\",\"valueType\":\"Long\"},\"supportValues\":{}}}]}"

val seqMeasurementDTO = Seq(
MeasurementDTO(
Expand Down Expand Up @@ -191,14 +191,14 @@ class SerializationUtilsTest extends AnyFlatSpecLike {
"asJson" should "serialize MeasureDTO into json string" in {
val measureDTO = MeasureDTO("count", Seq("col"))

val expectedMeasureDTOJson = "{\"measureName\":\"count\",\"controlColumns\":[\"col\"]}"
val expectedMeasureDTOJson = "{\"measureName\":\"count\",\"measuredColumns\":[\"col\"]}"
val actualMeasureDTOJson = SerializationUtils.asJson(measureDTO)

assert(expectedMeasureDTOJson == actualMeasureDTOJson)
}

"fromJson" should "deserialize MeasureDTO from json string" in {
val measureDTOJson = "{\"measureName\":\"count\",\"controlColumns\":[\"col\"]}"
val measureDTOJson = "{\"measureName\":\"count\",\"measuredColumns\":[\"col\"]}"

val expectedMeasureDTO = MeasureDTO("count", Seq("col"))
val actualMeasureDTO = SerializationUtils.fromJson[MeasureDTO](measureDTOJson)
Expand All @@ -213,14 +213,14 @@ class SerializationUtilsTest extends AnyFlatSpecLike {

val measurementDTO = MeasurementDTO(measureDTO, measureResultDTO)

val expectedMeasurementDTOJson = "{\"measure\":{\"measureName\":\"count\",\"controlColumns\":[\"col\"]},\"result\":{\"mainValue\":{\"value\":\"1\",\"valueType\":\"Long\"},\"supportValues\":{}}}"
val expectedMeasurementDTOJson = "{\"measure\":{\"measureName\":\"count\",\"measuredColumns\":[\"col\"]},\"result\":{\"mainValue\":{\"value\":\"1\",\"valueType\":\"Long\"},\"supportValues\":{}}}"
val actualMeasurementDTOJson = SerializationUtils.asJson(measurementDTO)

assert(expectedMeasurementDTOJson == actualMeasurementDTOJson)
}

"fromJson" should "deserialize MeasurementDTO from json string" in {
val measurementDTOJson = "{\"measure\":{\"measureName\":\"count\",\"controlColumns\":[\"col\"]},\"result\":{\"mainValue\":{\"value\":\"1\",\"valueType\":\"Long\"},\"supportValues\":{}}}"
val measurementDTOJson = "{\"measure\":{\"measureName\":\"count\",\"measuredColumns\":[\"col\"]},\"result\":{\"mainValue\":{\"value\":\"1\",\"valueType\":\"Long\"},\"supportValues\":{}}}"

val measureDTO = MeasureDTO("count", Seq("col"))
val measureResultDTO = MeasureResultDTO(mainValue = TypedValue("1", ResultValueType.Long))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ object Runs {
val measureNames = values.measurements.map(_.measure.measureName).toSeq
val measureNamesNormalized = scalaSeqToPgArray(measureNames)

val controlColumns = values.measurements.map(_.measure.controlColumns).toSeq
val controlColumnsNormalized = nestedScalaSeqToPgArray(controlColumns)
val measuredColumns = values.measurements.map(_.measure.measuredColumns).toSeq
val measuredColumnsNormalized = nestedScalaSeqToPgArray(measuredColumns)

val measureResults = values.measurements.map(_.result).toSeq
val measureResultsNormalized = measureResults.map(SerializationUtils.asJson)
Expand All @@ -76,7 +76,7 @@ object Runs {
${values.processStartTime}::TIMESTAMPTZ,
${values.processEndTime}::TIMESTAMPTZ,
$measureNamesNormalized::TEXT[],
$controlColumnsNormalized::TEXT[][],
$measuredColumnsNormalized::TEXT[][],
$measureResultsNormalized::JSONB[],
${values.author}
) #$alias;"""
Expand Down

0 comments on commit f09fc4d

Please sign in to comment.