Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#116: controlCol -> measuredCol #118

Merged
merged 3 commits into from
Nov 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions agent/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ Create multiple `AtumContext` with different control measures to be applied
### Option 1
```scala
val atumContextInstanceWithRecordCount = AtumContext(processor = processor)
.withMeasureAdded(RecordCount(MockMeasureNames.recordCount1, controlCol = "id"))
.withMeasureAdded(RecordCount(MockMeasureNames.recordCount1, measuredColumn = "id"))

val atumContextWithSalaryAbsMeasure = atumContextInstanceWithRecordCount
.withMeasureAdded(AbsSumOfValuesOfColumn(controlCol = "salary"))
.withMeasureAdded(AbsSumOfValuesOfColumn(measuredColumn = "salary"))
```

### Option 2
Expand Down
62 changes: 31 additions & 31 deletions agent/src/main/scala/za/co/absa/atum/agent/model/Measure.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import za.co.absa.spark.commons.implicits.StructTypeImplicits.StructTypeEnhancem
* Type of different measures to be applied to the columns.
*/
sealed trait Measure extends MeasurementProcessor with MeasureType {
val controlCol: String
val measuredColumn: String
}

trait MeasureType {
Expand All @@ -50,107 +50,107 @@ object Measure {
val supportedMeasureNames: Seq[String] = supportedMeasures.map(_.measureName)

case class RecordCount private (
controlCol: String,
measureName: String,
resultValueType: ResultValueType.ResultValueType
measuredColumn: String,
measureName: String,
resultValueType: ResultValueType.ResultValueType
) extends Measure {

override def function: MeasurementFunction =
(ds: DataFrame) => {
val resultValue = ds.select(col(controlCol)).count().toString
val resultValue = ds.select(col(measuredColumn)).count().toString
ResultOfMeasurement(resultValue, resultValueType)
}
}
object RecordCount extends MeasureType {
def apply(controlCol: String): RecordCount = RecordCount(controlCol, measureName, resultValueType)
def apply(measuredColumn: String): RecordCount = RecordCount(measuredColumn, measureName, resultValueType)

override val measureName: String = "count"
override val resultValueType: ResultValueType.ResultValueType = ResultValueType.Long
}

case class DistinctRecordCount private (
controlCol: String,
measureName: String,
resultValueType: ResultValueType.ResultValueType
measuredColumn: String,
measureName: String,
resultValueType: ResultValueType.ResultValueType
) extends Measure {

override def function: MeasurementFunction =
(ds: DataFrame) => {
val resultValue = ds.select(col(controlCol)).distinct().count().toString
val resultValue = ds.select(col(measuredColumn)).distinct().count().toString
ResultOfMeasurement(resultValue, resultValueType)
}
}
object DistinctRecordCount extends MeasureType {
def apply(controlCol: String): DistinctRecordCount = {
DistinctRecordCount(controlCol, measureName, resultValueType)
def apply(measuredColumn: String): DistinctRecordCount = {
DistinctRecordCount(measuredColumn, measureName, resultValueType)
}

override val measureName: String = "distinctCount"
override val resultValueType: ResultValueType.ResultValueType = ResultValueType.Long
}

case class SumOfValuesOfColumn private (
controlCol: String,
measureName: String,
resultValueType: ResultValueType.ResultValueType
measuredColumn: String,
measureName: String,
resultValueType: ResultValueType.ResultValueType
) extends Measure {

override def function: MeasurementFunction = (ds: DataFrame) => {
val aggCol = sum(col(valueColumnName))
val resultValue = aggregateColumn(ds, controlCol, aggCol)
val resultValue = aggregateColumn(ds, measuredColumn, aggCol)
ResultOfMeasurement(resultValue, resultValueType)
}
}
object SumOfValuesOfColumn extends MeasureType {
def apply(controlCol: String): SumOfValuesOfColumn = {
SumOfValuesOfColumn(controlCol, measureName, resultValueType)
def apply(measuredColumn: String): SumOfValuesOfColumn = {
SumOfValuesOfColumn(measuredColumn, measureName, resultValueType)
}

override val measureName: String = "aggregatedTotal"
override val resultValueType: ResultValueType.ResultValueType = ResultValueType.BigDecimal
}

case class AbsSumOfValuesOfColumn private (
controlCol: String,
measureName: String,
resultValueType: ResultValueType.ResultValueType
measuredColumn: String,
measureName: String,
resultValueType: ResultValueType.ResultValueType
) extends Measure {

override def function: MeasurementFunction = (ds: DataFrame) => {
val aggCol = sum(abs(col(valueColumnName)))
val resultValue = aggregateColumn(ds, controlCol, aggCol)
val resultValue = aggregateColumn(ds, measuredColumn, aggCol)
ResultOfMeasurement(resultValue, resultValueType)
}
}
object AbsSumOfValuesOfColumn extends MeasureType {
def apply(controlCol: String): AbsSumOfValuesOfColumn = {
AbsSumOfValuesOfColumn(controlCol, measureName, resultValueType)
def apply(measuredColumn: String): AbsSumOfValuesOfColumn = {
AbsSumOfValuesOfColumn(measuredColumn, measureName, resultValueType)
}

override val measureName: String = "absAggregatedTotal"
override val resultValueType: ResultValueType.ResultValueType = ResultValueType.Double
}

case class SumOfHashesOfColumn private (
controlCol: String,
measureName: String,
resultValueType: ResultValueType.ResultValueType
measuredColumn: String,
measureName: String,
resultValueType: ResultValueType.ResultValueType
) extends Measure {

override def function: MeasurementFunction = (ds: DataFrame) => {

val aggregatedColumnName = ds.schema.getClosestUniqueName("sum_of_hashes")
val value = ds
.withColumn(aggregatedColumnName, crc32(col(controlCol).cast("String")))
.withColumn(aggregatedColumnName, crc32(col(measuredColumn).cast("String")))
.agg(sum(col(aggregatedColumnName)))
.collect()(0)(0)
val resultValue = if (value == null) "" else value.toString
ResultOfMeasurement(resultValue, ResultValueType.String)
}
}
object SumOfHashesOfColumn extends MeasureType {
def apply(controlCol: String): SumOfHashesOfColumn = {
SumOfHashesOfColumn(controlCol, measureName, resultValueType)
def apply(measuredColumn: String): SumOfHashesOfColumn = {
SumOfHashesOfColumn(measuredColumn, measureName, resultValueType)
}

override val measureName: String = "hashCrc32"
Expand All @@ -169,7 +169,7 @@ object Measure {
// scala> sc.parallelize(List(Long.MaxValue, 1)).toDF.agg(sum("value")).take(1)(0)(0)
// res11: Any = -9223372036854775808
// Converting to BigDecimal fixes the issue
// val ds2 = ds.select(col(measurement.controlCol).cast(DecimalType(38, 0)).as("value"))
// val ds2 = ds.select(col(measurement.measuredColumn).cast(DecimalType(38, 0)).as("value"))
// ds2.agg(sum(abs($"value"))).collect()(0)(0)
val ds2 = ds.select(
col(measureColumn).cast(DecimalType(38, 0)).as(valueColumnName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ private [agent] object MeasurementBuilder {

private [agent] def buildMeasurementDTO(measurement: Measurement): MeasurementDTO = {
val measureName = measurement.measure.measureName
val controlCols = Seq(measurement.measure.controlCol)
val measureDTO = MeasureDTO(measureName, controlCols)
val measuredColumns = Seq(measurement.measure.measuredColumn)
val measureDTO = MeasureDTO(measureName, measuredColumns)

val measureResultDTO = MeasureResultDTO(TypedValue(measurement.resultValue.toString, measurement.resultType))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,14 @@ private [agent] object MeasuresBuilder {
}

private def createMeasure(measure: dto.MeasureDTO): za.co.absa.atum.agent.model.Measure = {
val controlColumn = measure.controlColumns.head
val measuredColumn = measure.measuredColumns.head

measure.measureName match {
case RecordCount.measureName => RecordCount(controlColumn)
case DistinctRecordCount.measureName => DistinctRecordCount(controlColumn)
case SumOfValuesOfColumn.measureName => SumOfValuesOfColumn(controlColumn)
case AbsSumOfValuesOfColumn.measureName => AbsSumOfValuesOfColumn(controlColumn)
case SumOfHashesOfColumn.measureName => SumOfHashesOfColumn(controlColumn)
case RecordCount.measureName => RecordCount(measuredColumn)
case DistinctRecordCount.measureName => DistinctRecordCount(measuredColumn)
case SumOfValuesOfColumn.measureName => SumOfValuesOfColumn(measuredColumn)
case AbsSumOfValuesOfColumn.measureName => AbsSumOfValuesOfColumn(measuredColumn)
case SumOfHashesOfColumn.measureName => SumOfHashesOfColumn(measuredColumn)
case unsupportedMeasure =>
throw MeasureException(
s"Measure not supported: $unsupportedMeasure. Supported measures are: ${Measure.supportedMeasureNames}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,11 @@ class MeasureTest extends AnyFlatSpec with Matchers with SparkTestBase { self =>
"Measure" should "be based on the dataframe" in {

// Measures
val measureIds: Measure = RecordCount(controlCol = "id")
val salaryAbsSum: Measure = AbsSumOfValuesOfColumn(
controlCol = "salary"
)
val salarySum = SumOfValuesOfColumn(controlCol = "salary")
val sumOfHashes: Measure = SumOfHashesOfColumn(controlCol = "id")
val measureIds: Measure = RecordCount(measuredColumn = "id")
val salaryAbsSum: Measure = AbsSumOfValuesOfColumn(measuredColumn = "salary")

val salarySum = SumOfValuesOfColumn(measuredColumn = "salary")
val sumOfHashes: Measure = SumOfHashesOfColumn(measuredColumn = "id")

// AtumContext contains `Measurement`
val atumContextInstanceWithRecordCount = AtumAgent
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,5 @@ package za.co.absa.atum.model.dto

case class MeasureDTO(
measureName: String,
controlColumns: Seq[String]
measuredColumns: Seq[String]
)
Original file line number Diff line number Diff line change
Expand Up @@ -82,14 +82,14 @@ class SerializationUtilsTest extends AnyFlatSpecLike {
measures = seqMeasureDTO
)

val expectedAdditionalDataJson = "{\"partitioning\":[{\"key\":\"key\",\"value\":\"val\"}],\"measures\":[{\"measureName\":\"count\",\"controlColumns\":[\"col\"]}],\"additionalData\":{\"additionalData\":{}}}"
val expectedAdditionalDataJson = "{\"partitioning\":[{\"key\":\"key\",\"value\":\"val\"}],\"measures\":[{\"measureName\":\"count\",\"measuredColumns\":[\"col\"]}],\"additionalData\":{\"additionalData\":{}}}"
val actualAdditionalDataJson = SerializationUtils.asJson(atumContextDTO)

assert(expectedAdditionalDataJson == actualAdditionalDataJson)
}

"fromJson" should "deserialize AtumContextDTO from json string" in {
val atumContextDTOJson = "{\"partitioning\":[{\"key\":\"key\",\"value\":\"val\"}],\"measures\":[{\"measureName\":\"count\",\"controlColumns\":[\"col\"]}],\"additionalData\":{\"additionalData\":{}}}"
val atumContextDTOJson = "{\"partitioning\":[{\"key\":\"key\",\"value\":\"val\"}],\"measures\":[{\"measureName\":\"count\",\"measuredColumns\":[\"col\"]}],\"additionalData\":{\"additionalData\":{}}}"

val seqPartitionDTO = Seq(PartitionDTO("key", "val"))
val seqMeasureDTO = Set(MeasureDTO("count", Seq("col")))
Expand Down Expand Up @@ -150,7 +150,7 @@ class SerializationUtilsTest extends AnyFlatSpecLike {
measurements = seqMeasurementDTO
)

val expectedCheckpointDTOJson = "{\"id\":\"" + uuid + "\",\"name\":\"checkpoint\",\"author\":\"author\",\"measuredByAtumAgent\":true,\"partitioning\":[{\"key\":\"key\",\"value\":\"val\"}],\"processStartTime\":\"2023-10-24 10:20:59.005000+02\",\"processEndTime\":\"2023-10-24 10:20:59.005000+02\",\"measurements\":[{\"measure\":{\"measureName\":\"count\",\"controlColumns\":[\"col\"]},\"result\":{\"mainValue\":{\"value\":\"1\",\"valueType\":\"Long\"},\"supportValues\":{}}}]}"
val expectedCheckpointDTOJson = "{\"id\":\"" + uuid + "\",\"name\":\"checkpoint\",\"author\":\"author\",\"measuredByAtumAgent\":true,\"partitioning\":[{\"key\":\"key\",\"value\":\"val\"}],\"processStartTime\":\"2023-10-24 10:20:59.005000+02\",\"processEndTime\":\"2023-10-24 10:20:59.005000+02\",\"measurements\":[{\"measure\":{\"measureName\":\"count\",\"measuredColumns\":[\"col\"]},\"result\":{\"mainValue\":{\"value\":\"1\",\"valueType\":\"Long\"},\"supportValues\":{}}}]}"
val actualCheckpointDTOJson = SerializationUtils.asJson(checkpointDTO)

assert(expectedCheckpointDTOJson == actualCheckpointDTOJson)
Expand All @@ -161,7 +161,7 @@ class SerializationUtilsTest extends AnyFlatSpecLike {
val seqPartitionDTO = Seq(PartitionDTO("key", "val"))
val timeWithZone = OffsetDateTime.of(2023, 10, 24, 10, 20, 59, 5000000, ZoneOffset.ofHours(2))

val checkpointDTOJson = "{\"id\":\"" + uuid + "\",\"name\":\"checkpoint\",\"author\":\"author\",\"measuredByAtumAgent\":true,\"partitioning\":[{\"key\":\"key\",\"value\":\"val\"}],\"processStartTime\":\"2023-10-24 10:20:59.005000+02\",\"processEndTime\":\"2023-10-24 10:20:59.005000+02\",\"measurements\":[{\"measure\":{\"measureName\":\"count\",\"controlColumns\":[\"col\"]},\"result\":{\"mainValue\":{\"value\":\"1\",\"valueType\":\"Long\"},\"supportValues\":{}}}]}"
val checkpointDTOJson = "{\"id\":\"" + uuid + "\",\"name\":\"checkpoint\",\"author\":\"author\",\"measuredByAtumAgent\":true,\"partitioning\":[{\"key\":\"key\",\"value\":\"val\"}],\"processStartTime\":\"2023-10-24 10:20:59.005000+02\",\"processEndTime\":\"2023-10-24 10:20:59.005000+02\",\"measurements\":[{\"measure\":{\"measureName\":\"count\",\"measuredColumns\":[\"col\"]},\"result\":{\"mainValue\":{\"value\":\"1\",\"valueType\":\"Long\"},\"supportValues\":{}}}]}"

val seqMeasurementDTO = Seq(
MeasurementDTO(
Expand Down Expand Up @@ -191,14 +191,14 @@ class SerializationUtilsTest extends AnyFlatSpecLike {
"asJson" should "serialize MeasureDTO into json string" in {
val measureDTO = MeasureDTO("count", Seq("col"))

val expectedMeasureDTOJson = "{\"measureName\":\"count\",\"controlColumns\":[\"col\"]}"
val expectedMeasureDTOJson = "{\"measureName\":\"count\",\"measuredColumns\":[\"col\"]}"
val actualMeasureDTOJson = SerializationUtils.asJson(measureDTO)

assert(expectedMeasureDTOJson == actualMeasureDTOJson)
}

"fromJson" should "deserialize MeasureDTO from json string" in {
val measureDTOJson = "{\"measureName\":\"count\",\"controlColumns\":[\"col\"]}"
val measureDTOJson = "{\"measureName\":\"count\",\"measuredColumns\":[\"col\"]}"

val expectedMeasureDTO = MeasureDTO("count", Seq("col"))
val actualMeasureDTO = SerializationUtils.fromJson[MeasureDTO](measureDTOJson)
Expand All @@ -213,14 +213,14 @@ class SerializationUtilsTest extends AnyFlatSpecLike {

val measurementDTO = MeasurementDTO(measureDTO, measureResultDTO)

val expectedMeasurementDTOJson = "{\"measure\":{\"measureName\":\"count\",\"controlColumns\":[\"col\"]},\"result\":{\"mainValue\":{\"value\":\"1\",\"valueType\":\"Long\"},\"supportValues\":{}}}"
val expectedMeasurementDTOJson = "{\"measure\":{\"measureName\":\"count\",\"measuredColumns\":[\"col\"]},\"result\":{\"mainValue\":{\"value\":\"1\",\"valueType\":\"Long\"},\"supportValues\":{}}}"
val actualMeasurementDTOJson = SerializationUtils.asJson(measurementDTO)

assert(expectedMeasurementDTOJson == actualMeasurementDTOJson)
}

"fromJson" should "deserialize MeasurementDTO from json string" in {
val measurementDTOJson = "{\"measure\":{\"measureName\":\"count\",\"controlColumns\":[\"col\"]},\"result\":{\"mainValue\":{\"value\":\"1\",\"valueType\":\"Long\"},\"supportValues\":{}}}"
val measurementDTOJson = "{\"measure\":{\"measureName\":\"count\",\"measuredColumns\":[\"col\"]},\"result\":{\"mainValue\":{\"value\":\"1\",\"valueType\":\"Long\"},\"supportValues\":{}}}"

val measureDTO = MeasureDTO("count", Seq("col"))
val measureResultDTO = MeasureResultDTO(mainValue = TypedValue("1", ResultValueType.Long))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ object Runs {
val measureNames = values.measurements.map(_.measure.measureName).toSeq
val measureNamesNormalized = scalaSeqToPgArray(measureNames)

val controlColumns = values.measurements.map(_.measure.controlColumns).toSeq
val controlColumnsNormalized = nestedScalaSeqToPgArray(controlColumns)
val measuredColumns = values.measurements.map(_.measure.measuredColumns).toSeq
val measuredColumnsNormalized = nestedScalaSeqToPgArray(measuredColumns)

val measureResults = values.measurements.map(_.result).toSeq
val measureResultsNormalized = measureResults.map(SerializationUtils.asJson)
Expand All @@ -76,7 +76,7 @@ object Runs {
${values.processStartTime}::TIMESTAMPTZ,
${values.processEndTime}::TIMESTAMPTZ,
$measureNamesNormalized::TEXT[],
$controlColumnsNormalized::TEXT[][],
$measuredColumnsNormalized::TEXT[][],
$measureResultsNormalized::JSONB[],
${values.author}
) #$alias;"""
Expand Down
Loading