Skip to content

Commit

Permalink
Merge branch 'master' into feature/#219-DoobieMultipleResultFunctionW…
Browse files Browse the repository at this point in the history
…ithAggStatus

# Conflicts:
#	project/Dependencies.scala
#	server/src/test/scala/za/co/absa/atum/server/api/controller/CheckpointControllerUnitTests.scala
  • Loading branch information
TebaleloS committed Jul 11, 2024
2 parents c582b79 + 663e2a3 commit da3f9f6
Show file tree
Hide file tree
Showing 54 changed files with 450 additions and 643 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import sttp.client3._
import sttp.model.Uri
import za.co.absa.atum.agent.exception.AtumAgentException.HttpException
import za.co.absa.atum.model.dto.{AdditionalDataSubmitDTO, AtumContextDTO, CheckpointDTO, PartitioningSubmitDTO}
import za.co.absa.atum.model.utils.SerializationUtils
import za.co.absa.atum.model.utils.JsonSyntaxExtensions._

class HttpDispatcher(config: Config) extends Dispatcher(config: Config) with Logging {
import HttpDispatcher._
Expand All @@ -47,19 +47,17 @@ class HttpDispatcher(config: Config) extends Dispatcher(config: Config) with Log
override protected[agent] def createPartitioning(partitioning: PartitioningSubmitDTO): AtumContextDTO = {
val request = commonAtumRequest
.post(createPartitioningEndpoint)
.body(SerializationUtils.asJson(partitioning))
.body(partitioning.asJsonString)

val response = backend.send(request)

SerializationUtils.fromJson[AtumContextDTO](
handleResponseBody(response)
)
handleResponseBody(response).as[AtumContextDTO]
}

override protected[agent] def saveCheckpoint(checkpoint: CheckpointDTO): Unit = {
val request = commonAtumRequest
.post(createCheckpointEndpoint)
.body(SerializationUtils.asJson(checkpoint))
.body(checkpoint.asJsonString)

val response = backend.send(request)

Expand All @@ -69,7 +67,7 @@ class HttpDispatcher(config: Config) extends Dispatcher(config: Config) with Log
override protected[agent] def saveAdditionalData(additionalDataSubmitDTO: AdditionalDataSubmitDTO): Unit = {
val request = commonAtumRequest
.post(createAdditionalDataEndpoint)
.body(SerializationUtils.asJson(additionalDataSubmitDTO))
.body(additionalDataSubmitDTO.asJsonString)

val response = backend.send(request)

Expand Down
12 changes: 6 additions & 6 deletions agent/src/main/scala/za/co/absa/atum/agent/model/Measure.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import org.apache.spark.sql.types.{DataType, DecimalType, LongType, StringType}
import org.apache.spark.sql.{Column, DataFrame}
import za.co.absa.atum.agent.core.MeasurementProcessor
import za.co.absa.atum.agent.core.MeasurementProcessor.MeasurementFunction
import za.co.absa.atum.model.dto.MeasureResultDTO.ResultValueType
import za.co.absa.atum.model.ResultValueType

/**
* Type of different measures to be applied to the columns.
Expand Down Expand Up @@ -57,7 +57,7 @@ object AtumMeasure {
}

override def measuredColumns: Seq[String] = Seq.empty
override val resultValueType: ResultValueType = ResultValueType.Long
override val resultValueType: ResultValueType = ResultValueType.LongValue
}
object RecordCount {
private[agent] val measureName: String = "count"
Expand All @@ -76,7 +76,7 @@ object AtumMeasure {
}

override def measuredColumns: Seq[String] = measuredCols
override val resultValueType: ResultValueType = ResultValueType.Long
override val resultValueType: ResultValueType = ResultValueType.LongValue
}
object DistinctRecordCount {
private[agent] val measureName: String = "distinctCount"
Expand All @@ -93,7 +93,7 @@ object AtumMeasure {
}

override def measuredColumns: Seq[String] = Seq(measuredCol)
override val resultValueType: ResultValueType = ResultValueType.BigDecimal
override val resultValueType: ResultValueType = ResultValueType.BigDecimalValue
}
object SumOfValuesOfColumn {
private[agent] val measureName: String = "aggregatedTotal"
Expand All @@ -110,7 +110,7 @@ object AtumMeasure {
}

override def measuredColumns: Seq[String] = Seq(measuredCol)
override val resultValueType: ResultValueType = ResultValueType.BigDecimal
override val resultValueType: ResultValueType = ResultValueType.BigDecimalValue
}
object AbsSumOfValuesOfColumn {
private[agent] val measureName: String = "absAggregatedTotal"
Expand All @@ -125,7 +125,7 @@ object AtumMeasure {
}

override def measuredColumns: Seq[String] = Seq(measuredCol)
override val resultValueType: ResultValueType = ResultValueType.String
override val resultValueType: ResultValueType = ResultValueType.StringValue
}
object SumOfHashesOfColumn {
private[agent] val measureName: String = "hashCrc32"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package za.co.absa.atum.agent.model

import za.co.absa.atum.agent.exception.AtumAgentException.MeasurementException
import za.co.absa.atum.model.dto.MeasureResultDTO.ResultValueType
import za.co.absa.atum.model.ResultValueType

/**
* This trait defines a contract for a measure result.
Expand Down Expand Up @@ -80,13 +80,13 @@ object MeasureResult {
resultValue match {

case l: Long =>
MeasureResultProvided[Long](l, ResultValueType.Long)
MeasureResultProvided[Long](l, ResultValueType.LongValue)
case d: Double =>
MeasureResultProvided[Double](d, ResultValueType.Double)
MeasureResultProvided[Double](d, ResultValueType.DoubleValue)
case bd: BigDecimal =>
MeasureResultProvided[BigDecimal](bd, ResultValueType.BigDecimal)
MeasureResultProvided[BigDecimal](bd, ResultValueType.BigDecimalValue)
case s: String =>
MeasureResultProvided[String](s, ResultValueType.String)
MeasureResultProvided[String](s, ResultValueType.StringValue)

case unsupportedType =>
val className = unsupportedType.getClass.getSimpleName
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package za.co.absa.atum.agent.model

import za.co.absa.atum.agent.exception.AtumAgentException.MeasurementException
import za.co.absa.atum.model.dto.MeasureResultDTO.TypedValue
import za.co.absa.atum.model.dto.{MeasureDTO, MeasureResultDTO, MeasurementDTO}

/**
Expand All @@ -41,7 +42,7 @@ private [agent] object MeasurementBuilder {
val measureDTO = MeasureDTO(measure.measureName, measure.measuredColumns)

val measureResultDTO = MeasureResultDTO(
MeasureResultDTO.TypedValue(measureResult.resultValue.toString, measureResult.resultValueType)
TypedValue(measureResult.resultValue.toString, measureResult.resultValueType)
)
MeasurementDTO(measureDTO, measureResultDTO)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ import org.scalatest.matchers.should.Matchers
import za.co.absa.atum.agent.AtumContext.AtumPartitions
import za.co.absa.atum.agent.model.AtumMeasure.{RecordCount, SumOfValuesOfColumn}
import za.co.absa.atum.agent.model.{Measure, MeasureResult, MeasurementBuilder, UnknownMeasure}
import za.co.absa.atum.model.ResultValueType
import za.co.absa.atum.model.dto.CheckpointDTO
import za.co.absa.atum.model.dto.MeasureResultDTO.ResultValueType

class AtumContextUnitTests extends AnyFlatSpec with Matchers {

Expand Down Expand Up @@ -100,7 +100,7 @@ class AtumContextUnitTests extends AnyFlatSpec with Matchers {
assert(argument.getValue.author == authorTest)
assert(argument.getValue.partitioning == AtumPartitions.toSeqPartitionDTO(atumPartitions))
assert(argument.getValue.measurements.head.result.mainValue.value == "3")
assert(argument.getValue.measurements.head.result.mainValue.valueType == ResultValueType.Long)
assert(argument.getValue.measurements.head.result.mainValue.valueType == ResultValueType.LongValue)
}

"createCheckpointOnProvidedData" should "create a Checkpoint on provided data" in {
Expand All @@ -115,7 +115,7 @@ class AtumContextUnitTests extends AnyFlatSpec with Matchers {
val measurements: Map[Measure, MeasureResult] = Map(
RecordCount("col") -> MeasureResult(1L),
SumOfValuesOfColumn("col") -> MeasureResult(BigDecimal(1)),
UnknownMeasure("customMeasureName", Seq("col"), ResultValueType.BigDecimal) -> MeasureResult(BigDecimal(1))
UnknownMeasure("customMeasureName", Seq("col"), ResultValueType.BigDecimalValue) -> MeasureResult(BigDecimal(1))
)

atumContext.createCheckpointOnProvidedData(
Expand Down Expand Up @@ -172,7 +172,7 @@ class AtumContextUnitTests extends AnyFlatSpec with Matchers {
assert(argumentFirst.getValue.author == authorTest)
assert(argumentFirst.getValue.partitioning == AtumPartitions.toSeqPartitionDTO(atumPartitions))
assert(argumentFirst.getValue.measurements.head.result.mainValue.value == "4")
assert(argumentFirst.getValue.measurements.head.result.mainValue.valueType == ResultValueType.Long)
assert(argumentFirst.getValue.measurements.head.result.mainValue.valueType == ResultValueType.LongValue)

atumContext.addMeasure(SumOfValuesOfColumn("columnForSum"))
when(mockAgent.currentUser).thenReturn(authorTest + "Another") // maybe a process changed the author / current user
Expand All @@ -185,7 +185,7 @@ class AtumContextUnitTests extends AnyFlatSpec with Matchers {
assert(argumentSecond.getValue.author == authorTest + "Another")
assert(argumentSecond.getValue.partitioning == AtumPartitions.toSeqPartitionDTO(atumPartitions))
assert(argumentSecond.getValue.measurements.tail.head.result.mainValue.value == "22.5")
assert(argumentSecond.getValue.measurements.tail.head.result.mainValue.valueType == ResultValueType.BigDecimal)
assert(argumentSecond.getValue.measurements.tail.head.result.mainValue.valueType == ResultValueType.BigDecimalValue)
}

"addAdditionalData" should "add key/value pair to map for additional data" in {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.scalatest.matchers.should.Matchers
import za.co.absa.atum.agent.AtumAgent
import za.co.absa.atum.agent.AtumContext.{AtumPartitions, DatasetWrapper}
import za.co.absa.atum.agent.model.AtumMeasure._
import za.co.absa.atum.model.dto.MeasureResultDTO.ResultValueType
import za.co.absa.atum.model.ResultValueType
import za.co.absa.spark.commons.test.SparkTestBase

class AtumMeasureUnitTests extends AnyFlatSpec with Matchers with SparkTestBase { self =>
Expand Down Expand Up @@ -94,17 +94,17 @@ class AtumMeasureUnitTests extends AnyFlatSpec with Matchers with SparkTestBase

// Assertions
assert(dfPersonCntResult.resultValue == "1000")
assert(dfPersonCntResult.resultValueType == ResultValueType.Long)
assert(dfPersonCntResult.resultValueType == ResultValueType.LongValue)
assert(dfFullCntResult.resultValue == "1000")
assert(dfFullCntResult.resultValueType == ResultValueType.Long)
assert(dfFullCntResult.resultValueType == ResultValueType.LongValue)
assert(dfFullSalaryAbsSumResult.resultValue == "2987144")
assert(dfFullSalaryAbsSumResult.resultValueType == ResultValueType.BigDecimal)
assert(dfFullSalaryAbsSumResult.resultValueType == ResultValueType.BigDecimalValue)
assert(dfFullHashResult.resultValue == "2044144307532")
assert(dfFullHashResult.resultValueType == ResultValueType.String)
assert(dfFullHashResult.resultValueType == ResultValueType.StringValue)
assert(dfExtraPersonSalarySumResult.resultValue == "2986144")
assert(dfExtraPersonSalarySumResult.resultValueType == ResultValueType.BigDecimal)
assert(dfExtraPersonSalarySumResult.resultValueType == ResultValueType.BigDecimalValue)
assert(dfFullSalarySumResult.resultValue == "2987144")
assert(dfFullSalarySumResult.resultValueType == ResultValueType.BigDecimal)
assert(dfFullSalarySumResult.resultValueType == ResultValueType.BigDecimalValue)
}

"AbsSumOfValuesOfColumn" should "return expected value" in {
Expand All @@ -119,7 +119,7 @@ class AtumMeasureUnitTests extends AnyFlatSpec with Matchers with SparkTestBase
val result = salaryAbsSum.function(df)

assert(result.resultValue == "300.3")
assert(result.resultValueType == ResultValueType.BigDecimal)
assert(result.resultValueType == ResultValueType.BigDecimalValue)
}

"AbsSumOfValuesOfColumn" should "return expected value for null result" in {
Expand All @@ -134,7 +134,7 @@ class AtumMeasureUnitTests extends AnyFlatSpec with Matchers with SparkTestBase
val result = salaryAbsSum.function(df)

assert(result.resultValue == "0")
assert(result.resultValueType == ResultValueType.BigDecimal)
assert(result.resultValueType == ResultValueType.BigDecimalValue)
}

"RecordCount" should "return expected value" in {
Expand All @@ -149,7 +149,7 @@ class AtumMeasureUnitTests extends AnyFlatSpec with Matchers with SparkTestBase
val result = distinctCount.function(df)

assert(result.resultValue == "4")
assert(result.resultValueType == ResultValueType.Long)
assert(result.resultValueType == ResultValueType.LongValue)
}

"DistinctRecordCount" should "return expected value for multiple columns" in {
Expand All @@ -164,7 +164,7 @@ class AtumMeasureUnitTests extends AnyFlatSpec with Matchers with SparkTestBase
val result = distinctCount.function(df)

assert(result.resultValue == "3")
assert(result.resultValueType == ResultValueType.Long)
assert(result.resultValueType == ResultValueType.LongValue)
}

"DistinctRecordCount" should "fail requirements when no control columns given" in {
Expand All @@ -183,7 +183,7 @@ class AtumMeasureUnitTests extends AnyFlatSpec with Matchers with SparkTestBase
val result = distinctCount.function(df)

assert(result.resultValue == "4")
assert(result.resultValueType == ResultValueType.BigDecimal)
assert(result.resultValueType == ResultValueType.BigDecimalValue)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ import org.scalatest.matchers.should.Matchers
import za.co.absa.atum.agent.AtumAgent
import za.co.absa.atum.agent.AtumContext.AtumPartitions
import za.co.absa.atum.agent.model.AtumMeasure.{AbsSumOfValuesOfColumn, RecordCount, SumOfHashesOfColumn, SumOfValuesOfColumn}
import za.co.absa.atum.model.dto.MeasureResultDTO.ResultValueType
import za.co.absa.spark.commons.test.SparkTestBase
import za.co.absa.atum.agent.AtumContext._
import za.co.absa.atum.model.ResultValueType

class MeasureUnitTests extends AnyFlatSpec with Matchers with SparkTestBase { self =>

Expand Down Expand Up @@ -92,17 +92,17 @@ class MeasureUnitTests extends AnyFlatSpec with Matchers with SparkTestBase { se

// Assertions
assert(dfPersonCntResult.resultValue == "1000")
assert(dfPersonCntResult.resultValueType == ResultValueType.Long)
assert(dfPersonCntResult.resultValueType == ResultValueType.LongValue)
assert(dfFullCntResult.resultValue == "1000")
assert(dfFullCntResult.resultValueType == ResultValueType.Long)
assert(dfFullCntResult.resultValueType == ResultValueType.LongValue)
assert(dfFullSalaryAbsSumResult.resultValue == "2987144")
assert(dfFullSalaryAbsSumResult.resultValueType == ResultValueType.BigDecimal)
assert(dfFullSalaryAbsSumResult.resultValueType == ResultValueType.BigDecimalValue)
assert(dfFullHashResult.resultValue == "2044144307532")
assert(dfFullHashResult.resultValueType == ResultValueType.String)
assert(dfFullHashResult.resultValueType == ResultValueType.StringValue)
assert(dfExtraPersonSalarySumResult.resultValue == "2986144")
assert(dfExtraPersonSalarySumResult.resultValueType == ResultValueType.BigDecimal)
assert(dfExtraPersonSalarySumResult.resultValueType == ResultValueType.BigDecimalValue)
assert(dfFullSalarySumResult.resultValue == "2987144")
assert(dfFullSalarySumResult.resultValueType == ResultValueType.BigDecimal)
assert(dfFullSalarySumResult.resultValueType == ResultValueType.BigDecimalValue)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ import org.scalatest.flatspec.AnyFlatSpec
import za.co.absa.atum.agent.exception.AtumAgentException.MeasurementException
import za.co.absa.atum.model.dto.{MeasureDTO, MeasureResultDTO, MeasurementDTO}
import za.co.absa.atum.agent.model.AtumMeasure._
import za.co.absa.atum.model.dto.MeasureResultDTO.{ResultValueType, TypedValue}
import za.co.absa.atum.model.ResultValueType
import za.co.absa.atum.model.dto.MeasureResultDTO.TypedValue

class MeasurementBuilderUnitTests extends AnyFlatSpec {

Expand All @@ -35,7 +36,7 @@ class MeasurementBuilderUnitTests extends AnyFlatSpec {
val expectedMeasureDTO = MeasureDTO("aggregatedTotal", Seq("col"))

val expectedMeasureResultDTO = MeasureResultDTO(
TypedValue("1", ResultValueType.BigDecimal)
TypedValue("1", ResultValueType.BigDecimalValue)
)

assert(measurementDTO.measure == expectedMeasureDTO)
Expand All @@ -49,7 +50,7 @@ class MeasurementBuilderUnitTests extends AnyFlatSpec {
val measureResult = MeasureResult(BigDecimal(3.14))
val measurementDTO = MeasurementBuilder.buildMeasurementDTO(measure, measureResult)

val expectedTypedValue = TypedValue("3.14", ResultValueType.BigDecimal)
val expectedTypedValue = TypedValue("3.14", ResultValueType.BigDecimalValue)

assert(measurementDTO.result.mainValue == expectedTypedValue)
}
Expand All @@ -59,11 +60,11 @@ class MeasurementBuilderUnitTests extends AnyFlatSpec {
"when Measurement provided" in {

val measure = SumOfValuesOfColumn("col")
val measureResult = MeasureResult("stringValue", ResultValueType.BigDecimal)
val measureResult = MeasureResult("string", ResultValueType.BigDecimalValue)

val measurementDTO = MeasurementBuilder.buildMeasurementDTO(measure, measureResult)

val expectedTypedValue = TypedValue("stringValue", ResultValueType.BigDecimal)
val expectedTypedValue = TypedValue("string", ResultValueType.BigDecimalValue)

assert(measurementDTO.result.mainValue == expectedTypedValue)
}
Expand All @@ -72,14 +73,14 @@ class MeasurementBuilderUnitTests extends AnyFlatSpec {
"build MeasurementDTO for BigDecimal type of result value when measured by Agent" in {

val measure = SumOfValuesOfColumn("col")
val measureResult = MeasureResult("1", ResultValueType.BigDecimal)
val measureResult = MeasureResult("1", ResultValueType.BigDecimalValue)

val measurementDTO = MeasurementBuilder.buildMeasurementDTO(measure, measureResult)

val expectedMeasureDTO = MeasureDTO("aggregatedTotal", Seq("col"))

val expectedMeasureResultDTO = MeasureResultDTO(
TypedValue("1", ResultValueType.BigDecimal)
TypedValue("1", ResultValueType.BigDecimalValue)
)

assert(measurementDTO.measure == expectedMeasureDTO)
Expand All @@ -88,25 +89,25 @@ class MeasurementBuilderUnitTests extends AnyFlatSpec {

"buildAndValidateMeasurementsDTO" should "build Seq[MeasurementDTO] for multiple measures, all unique" in {
val measurements: Map[Measure, MeasureResult] = Map(
DistinctRecordCount(Seq("col")) -> MeasureResult("1", ResultValueType.Long),
DistinctRecordCount(Seq("col")) -> MeasureResult("1", ResultValueType.LongValue),
SumOfValuesOfColumn("col1") -> MeasureResult(BigDecimal(1.2)),
SumOfValuesOfColumn("col2") -> MeasureResult(BigDecimal(1.3)),
UnknownMeasure("unknownMeasure", Seq("col"), ResultValueType.BigDecimal) -> MeasureResult(BigDecimal(1.1))
UnknownMeasure("unknownMeasure", Seq("col"), ResultValueType.BigDecimalValue) -> MeasureResult(BigDecimal(1.1))
)
val measurementDTOs = MeasurementBuilder.buildAndValidateMeasurementsDTO(measurements)

val expectedMeasurementDTO = Set(
MeasurementDTO(
MeasureDTO("distinctCount", Seq("col")), MeasureResultDTO(TypedValue("1", ResultValueType.Long))
MeasureDTO("distinctCount", Seq("col")), MeasureResultDTO(TypedValue("1", ResultValueType.LongValue))
),
MeasurementDTO(
MeasureDTO("aggregatedTotal", Seq("col1")), MeasureResultDTO(TypedValue("1.2", ResultValueType.BigDecimal))
MeasureDTO("aggregatedTotal", Seq("col1")), MeasureResultDTO(TypedValue("1.2", ResultValueType.BigDecimalValue))
),
MeasurementDTO(
MeasureDTO("aggregatedTotal", Seq("col2")), MeasureResultDTO(TypedValue("1.3", ResultValueType.BigDecimal))
MeasureDTO("aggregatedTotal", Seq("col2")), MeasureResultDTO(TypedValue("1.3", ResultValueType.BigDecimalValue))
),
MeasurementDTO(
MeasureDTO("unknownMeasure", Seq("col")), MeasureResultDTO(TypedValue("1.1", ResultValueType.BigDecimal))
MeasureDTO("unknownMeasure", Seq("col")), MeasureResultDTO(TypedValue("1.1", ResultValueType.BigDecimalValue))
)
)

Expand Down Expand Up @@ -149,7 +150,7 @@ class MeasurementBuilderUnitTests extends AnyFlatSpec {
val measure = SumOfValuesOfColumn("col")

assertThrows[MeasurementException](
MeasurementBuilder.buildAndValidateMeasurementsDTO(Map(measure -> MeasureResult("stringValue", ResultValueType.String)))
MeasurementBuilder.buildAndValidateMeasurementsDTO(Map(measure -> MeasureResult("string", ResultValueType.StringValue)))
)
}
}
Loading

0 comments on commit da3f9f6

Please sign in to comment.