Skip to content

Commit

Permalink
Merge branch 'master' into feature/#120-retrieve-Measures-and-Additio…
Browse files Browse the repository at this point in the history
…nalData-for-a-given-partitioning

# Conflicts:
#	server/src/main/scala/za/co/absa/atum/server/Main.scala
#	server/src/main/scala/za/co/absa/atum/server/api/controller/PartitioningController.scala
#	server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepository.scala
#	server/src/main/scala/za/co/absa/atum/server/api/service/PartitioningService.scala
  • Loading branch information
TebaleloS committed Mar 20, 2024
2 parents 57a1fd7 + af2a129 commit 6d7145b
Show file tree
Hide file tree
Showing 64 changed files with 1,862 additions and 528 deletions.
36 changes: 36 additions & 0 deletions .github/ISSUE_TEMPLATE/spike_task.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
---
name: Spike
about: Issue template for spikes, research and investigation tasks
labels: 'spike'

---

## Background
A clear and concise description of the problem or a topic we need to understand.

Feel free to add information about why it's needed and what assumptions you have at the moment.

## Questions To Answer

1.
2.
3.

## Desired Outcome

The list of desired outcomes of this spike ticket.

```[tasklist]
### Tasks
- [ ] Questions have been answered or we have a clearer idea of how to get to our goal
- [ ] Discussion with the team
- [ ] Documentation
- [ ] Create recommendations and new implementation tickets
- [ ] item here..
```

## Additional Info/Resources [Optional]

1.
2.
3.
62 changes: 37 additions & 25 deletions agent/src/main/scala/za/co/absa/atum/agent/AtumContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package za.co.absa.atum.agent

import org.apache.spark.sql.DataFrame
import za.co.absa.atum.agent.AtumContext.AtumPartitions
import za.co.absa.atum.agent.AtumContext.{AdditionalData, AtumPartitions}
import za.co.absa.atum.agent.model._
import za.co.absa.atum.model.dto._

Expand All @@ -36,7 +36,7 @@ class AtumContext private[agent] (
val atumPartitions: AtumPartitions,
val agent: AtumAgent,
private var measures: Set[AtumMeasure] = Set.empty,
private var additionalData: Map[String, Option[String]] = Map.empty
private var additionalData: AdditionalData = Map.empty
) {

/**
Expand Down Expand Up @@ -84,9 +84,9 @@ class AtumContext private[agent] (
val checkpointDTO = CheckpointDTO(
id = UUID.randomUUID(),
name = checkpointName,
author = this.agent.currentUser,
author = agent.currentUser,
measuredByAtumAgent = true,
partitioning = AtumPartitions.toSeqPartitionDTO(this.atumPartitions),
partitioning = AtumPartitions.toSeqPartitionDTO(atumPartitions),
processStartTime = startTime,
processEndTime = Some(endTime),
measurements = measurementDTOs
Expand All @@ -103,14 +103,14 @@ class AtumContext private[agent] (
* @param measurements the measurements to be included in the checkpoint
* @return the AtumContext after the checkpoint has been created
*/
def createCheckpointOnProvidedData(checkpointName: String, measurements: Map[AtumMeasure, MeasureResult]): AtumContext = {
def createCheckpointOnProvidedData(checkpointName: String, measurements: Map[Measure, MeasureResult]): AtumContext = {
val dateTimeNow = ZonedDateTime.now()

val checkpointDTO = CheckpointDTO(
id = UUID.randomUUID(),
name = checkpointName,
author = this.agent.currentUser,
partitioning = AtumPartitions.toSeqPartitionDTO(this.atumPartitions),
author = agent.currentUser,
partitioning = AtumPartitions.toSeqPartitionDTO(atumPartitions),
processStartTime = dateTimeNow,
processEndTime = Some(dateTimeNow),
measurements = MeasurementBuilder.buildAndValidateMeasurementsDTO(measurements)
Expand All @@ -121,24 +121,35 @@ class AtumContext private[agent] (
}

/**
* This method creates Additional Data in the agentService.
* Adds additional data to the AtumContext.
*
* @param key the key of the additional data
* @param value the value of the additional data
*
* @return AtumContext
* @return the AtumContext after the AD has been dispatched and added
*/
def saveAdditionalData(): AtumContext = {
val additionalData = AdditionalDataSubmitDTO(AtumPartitions.toSeqPartitionDTO(this.atumPartitions), this.additionalData)
agent.saveAdditionalData(additionalData)
this
def addAdditionalData(key: String, value: String): AtumContext = {
addAdditionalData(Map(key -> value))
}

/**
* Adds additional data to the AtumContext.
* This method creates Additional Data in the agentService and dispatches them into the data store.
*
* @param key the key of the additional data
* @param value the value of the additional data
* @param newAdditionalDataToAdd additional data that will be added into the data store
*
* @return the AtumContext after the AD has been dispatched and added
*/
def addAdditionalData(key: String, value: String): AtumContext = {
additionalData += (key -> Some(value))
def addAdditionalData(newAdditionalDataToAdd: Map[String, String]): AtumContext = {
val currAdditionalData = newAdditionalDataToAdd.map{case (k,v) => (k, Some(v))}

val currAdditionalDataSubmit = AdditionalDataSubmitDTO(
AtumPartitions.toSeqPartitionDTO(atumPartitions),
currAdditionalData,
agent.currentUser
)
agent.saveAdditionalData(currAdditionalDataSubmit)

this.additionalData ++= currAdditionalData
this
}

Expand All @@ -147,8 +158,8 @@ class AtumContext private[agent] (
*
* @return the current additional data
*/
def currentAdditionalData: Map[String, Option[String]] = {
this.additionalData
def currentAdditionalData: AdditionalDataDTO = {
additionalData
}

/**
Expand Down Expand Up @@ -182,10 +193,10 @@ class AtumContext private[agent] (
}

private[agent] def copy(
atumPartitions: AtumPartitions = this.atumPartitions,
agent: AtumAgent = this.agent,
measures: Set[AtumMeasure] = this.measures,
additionalData: Map[String, Option[String]] = this.additionalData
atumPartitions: AtumPartitions = atumPartitions,
agent: AtumAgent = agent,
measures: Set[AtumMeasure] = measures,
additionalData: AdditionalDataDTO = additionalData
): AtumContext = {
new AtumContext(atumPartitions, agent, measures, additionalData)
}
Expand All @@ -196,6 +207,7 @@ object AtumContext {
* Type alias for Atum partitions.
*/
type AtumPartitions = ListMap[String, String]
type AdditionalData = AdditionalDataDTO

/**
* Object contains helper methods to work with Atum partitions.
Expand Down Expand Up @@ -223,7 +235,7 @@ object AtumContext {
AtumPartitions.fromPartitioning(atumContextDTO.partitioning),
agent,
MeasuresBuilder.mapToMeasures(atumContextDTO.measures),
atumContextDTO.additionalData.additionalData
atumContextDTO.additionalData
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,6 @@ object AtumAgentException {
*/
case class MeasurementException(message: String) extends AtumAgentException(message)

/**
* This type represents an exception thrown when a measure is not supported by the Atum Agent.
*
* @param message A message describing the exception.
*/
case class MeasureException(message: String) extends AtumAgentException(message)

/**
* This type represents an exception related to HTTP communication.
* @param statusCode A status code of the HTTP response.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,14 @@ import za.co.absa.atum.model.dto.MeasureResultDTO.ResultValueType
sealed trait Measure {
val measureName: String
def measuredColumns: Seq[String]
}

trait AtumMeasure extends Measure with MeasurementProcessor {
val resultValueType: ResultValueType
}

trait AtumMeasure extends Measure with MeasurementProcessor

final case class UnknownMeasure(measureName: String, measuredColumns: Seq[String], resultValueType: ResultValueType)
extends Measure

object AtumMeasure {

val supportedMeasureNames: Seq[String] = Seq(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import za.co.absa.atum.model.dto.{MeasureDTO, MeasureResultDTO, MeasurementDTO}
*/
private [agent] object MeasurementBuilder {

private def validateMeasureAndResultTypeCompatibility(measure: AtumMeasure, result: MeasureResult): Unit = {
private def validateMeasureAndResultTypeCompatibility(measure: Measure, result: MeasureResult): Unit = {
val requiredType = measure.resultValueType
val actualType = result.resultValueType

Expand All @@ -35,7 +35,7 @@ private [agent] object MeasurementBuilder {
)
}

def buildMeasurementDTO(measure: AtumMeasure, measureResult: MeasureResult): MeasurementDTO = {
def buildMeasurementDTO(measure: Measure, measureResult: MeasureResult): MeasurementDTO = {
validateMeasureAndResultTypeCompatibility(measure, measureResult)

val measureDTO = MeasureDTO(measure.measureName, measure.measuredColumns)
Expand All @@ -46,8 +46,8 @@ private [agent] object MeasurementBuilder {
MeasurementDTO(measureDTO, measureResultDTO)
}

def buildAndValidateMeasurementsDTO(measurements: Map[AtumMeasure, MeasureResult]): Set[MeasurementDTO] = {
measurements.toSet[(AtumMeasure, MeasureResult)].map { case (measure: AtumMeasure, measureResult: MeasureResult) =>
def buildAndValidateMeasurementsDTO(measurements: Map[Measure, MeasureResult]): Set[MeasurementDTO] = {
measurements.toSet[(Measure, MeasureResult)].map { case (measure: Measure, measureResult: MeasureResult) =>
validateMeasureAndResultTypeCompatibility(measure, measureResult)
buildMeasurementDTO(measure, measureResult)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,34 +16,42 @@

package za.co.absa.atum.agent.model

import za.co.absa.atum.agent.exception.AtumAgentException.MeasureException
import za.co.absa.atum.agent.model.AtumMeasure._
import za.co.absa.atum.model.dto.MeasureDTO
import org.apache.spark.internal.Logging

import scala.util.Try

/**
* This object provides a functionality to convert a DTO representation of measures to the Agent's internal
* representation of those objects.
*/
private [agent] object MeasuresBuilder {
private [agent] object MeasuresBuilder extends Logging {

private [agent] def mapToMeasures(measures: Set[MeasureDTO]): Set[za.co.absa.atum.agent.model.AtumMeasure] = {
measures.map(createMeasure)
measures.flatMap { measure =>
createMeasure(measure) match {
case Some(value) =>
Some(value)
case None =>
logWarning(s"Measure not supported or unknown: $measure.")
None
}
}
}

private def createMeasure(measure: MeasureDTO): za.co.absa.atum.agent.model.AtumMeasure = {
private def createMeasure(measure: MeasureDTO): Option[za.co.absa.atum.agent.model.AtumMeasure] = {
val measuredColumns = measure.measuredColumns

measure.measureName match {
case RecordCount.measureName => RecordCount()
case DistinctRecordCount.measureName => DistinctRecordCount(measuredColumns)
case SumOfValuesOfColumn.measureName => SumOfValuesOfColumn(measuredColumns.head)
case AbsSumOfValuesOfColumn.measureName => AbsSumOfValuesOfColumn(measuredColumns.head)
case SumOfHashesOfColumn.measureName => SumOfHashesOfColumn(measuredColumns.head)
case unsupportedMeasure =>
throw MeasureException(
s"Measure not supported: $unsupportedMeasure. Supported measures are: ${AtumMeasure.supportedMeasureNames}"
)
}
Try {
measure.measureName match {
case RecordCount.measureName => RecordCount()
case DistinctRecordCount.measureName => DistinctRecordCount(measuredColumns)
case SumOfValuesOfColumn.measureName => SumOfValuesOfColumn(measuredColumns.head)
case AbsSumOfValuesOfColumn.measureName => AbsSumOfValuesOfColumn(measuredColumns.head)
case SumOfHashesOfColumn.measureName => SumOfHashesOfColumn(measuredColumns.head)
}
}.toOption
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ import org.mockito.Mockito.{mock, times, verify, when}
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import za.co.absa.atum.agent.AtumContext.AtumPartitions
import za.co.absa.atum.agent.model.{AtumMeasure, MeasureResult, MeasurementBuilder}
import za.co.absa.atum.agent.model.AtumMeasure._
import za.co.absa.atum.agent.model.AtumMeasure.{RecordCount, SumOfValuesOfColumn}
import za.co.absa.atum.agent.model.{Measure, MeasureResult, MeasurementBuilder, UnknownMeasure}
import za.co.absa.atum.model.dto.CheckpointDTO
import za.co.absa.atum.model.dto.MeasureResultDTO.ResultValueType

Expand Down Expand Up @@ -112,9 +112,10 @@ class AtumContextTest extends AnyFlatSpec with Matchers {
val atumPartitions = AtumPartitions("key" -> "value")
val atumContext: AtumContext = new AtumContext(atumPartitions, mockAgent)

val measurements: Map[AtumMeasure, MeasureResult] = Map(
val measurements: Map[Measure, MeasureResult] = Map(
RecordCount("col") -> MeasureResult(1L),
SumOfValuesOfColumn("col") -> MeasureResult(BigDecimal(1))
SumOfValuesOfColumn("col") -> MeasureResult(BigDecimal(1)),
UnknownMeasure("customMeasureName", Seq("col"), ResultValueType.BigDecimal) -> MeasureResult(BigDecimal(1))
)

atumContext.createCheckpointOnProvidedData(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,11 @@ class MeasurementBuilderTest extends AnyFlatSpec {
}

"buildAndValidateMeasurementsDTO" should "build Seq[MeasurementDTO] for multiple measures, all unique" in {
val measurements: Map[AtumMeasure, MeasureResult] = Map(
val measurements: Map[Measure, MeasureResult] = Map(
DistinctRecordCount(Seq("col")) -> MeasureResult("1", ResultValueType.Long),
SumOfValuesOfColumn("col1") -> MeasureResult(BigDecimal(1.2)),
SumOfValuesOfColumn("col2") -> MeasureResult(BigDecimal(1.3))
SumOfValuesOfColumn("col2") -> MeasureResult(BigDecimal(1.3)),
UnknownMeasure("unknownMeasure", Seq("col"), ResultValueType.BigDecimal) -> MeasureResult(BigDecimal(1.1))
)
val measurementDTOs = MeasurementBuilder.buildAndValidateMeasurementsDTO(measurements)

Expand All @@ -103,6 +104,9 @@ class MeasurementBuilderTest extends AnyFlatSpec {
),
MeasurementDTO(
MeasureDTO("aggregatedTotal", Seq("col2")), MeasureResultDTO(TypedValue("1.3", ResultValueType.BigDecimal))
),
MeasurementDTO(
MeasureDTO("unknownMeasure", Seq("col")), MeasureResultDTO(TypedValue("1.1", ResultValueType.BigDecimal))
)
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package za.co.absa.atum.agent.model

import org.scalatest.flatspec.AnyFlatSpecLike
import za.co.absa.atum.agent.exception.AtumAgentException.MeasureException
import za.co.absa.atum.agent.model.AtumMeasure._
import za.co.absa.atum.model.dto.MeasureDTO

Expand Down Expand Up @@ -45,12 +44,12 @@ class MeasuresBuilderTest extends AnyFlatSpecLike {
assert(expectedMeasures == actualMeasures)
}

"mapToMeasures" should "throw exception for unsupported measure" in {
"mapToMeasures" should "ignore unsupported or unknown measure" in {
val unsupportedMeasure = Set(
MeasureDTO("unsupportedMeasure", Seq("col"))
)

assertThrows[MeasureException](MeasuresBuilder.mapToMeasures(unsupportedMeasure))
assert(MeasuresBuilder.mapToMeasures(unsupportedMeasure).isEmpty)
}

}
Loading

0 comments on commit 6d7145b

Please sign in to comment.