Skip to content

Commit

Permalink
Merge remote-tracking branch 'remotes/origin/master' into feature/126…
Browse files Browse the repository at this point in the history
…-add-end-to-end-tests
  • Loading branch information
benedeki committed Nov 17, 2023
2 parents 2c8f566 + a446fb2 commit 994ffba
Show file tree
Hide file tree
Showing 26 changed files with 301 additions and 197 deletions.
3 changes: 2 additions & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ name: Build

on:
pull_request:
branches: [ master ]
branches:
- '**'
types: [ opened, synchronize, reopened ]

jobs:
Expand Down
4 changes: 2 additions & 2 deletions agent/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ Create multiple `AtumContext` with different control measures to be applied
### Option 1
```scala
val atumContextInstanceWithRecordCount = AtumContext(processor = processor)
.withMeasureAdded(RecordCount(MockMeasureNames.recordCount1, controlCol = "id"))
.withMeasureAdded(RecordCount(MockMeasureNames.recordCount1, measuredColumn = "id"))

val atumContextWithSalaryAbsMeasure = atumContextInstanceWithRecordCount
.withMeasureAdded(AbsSumOfValuesOfColumn(controlCol = "salary"))
.withMeasureAdded(AbsSumOfValuesOfColumn(measuredColumn = "salary"))
```

### Option 2
Expand Down
43 changes: 32 additions & 11 deletions agent/src/main/scala/za/co/absa/atum/agent/AtumAgent.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@ package za.co.absa.atum.agent
import com.typesafe.config.{Config, ConfigFactory}
import za.co.absa.atum.agent.AtumContext.AtumPartitions
import za.co.absa.atum.agent.dispatcher.{ConsoleDispatcher, HttpDispatcher}
import za.co.absa.atum.model.dto.{CheckpointDTO, PartitioningDTO}
import za.co.absa.atum.model.dto.{CheckpointDTO, PartitioningSubmitDTO}

/**
* Entity that communicate with the API, primarily focused on spawning Atum Context(s).
*/
class AtumAgent private[agent] () {

private[this] var contexts: Map[AtumPartitions, AtumContext] = Map.empty

val config: Config = ConfigFactory.load()

private val dispatcher = config.getString("atum.dispatcher.type") match {
Expand All @@ -33,6 +35,16 @@ class AtumAgent private[agent] () {
case dt => throw new UnsupportedOperationException(s"Unsupported dispatcher type: '$dt''")
}

/**
* Returns a user under who's security context the JVM is running.
* It's purpose is for auditing in author/createdBy fields.
*
* Important: It's not supposed to be used for authorization as it can be spoofed!
*
* @return Current user.
*/
private[agent] def currentUser: String = System.getProperty("user.name") // platform independent

/**
* Sends `CheckpointDTO` to the AtumService API
*
Expand All @@ -45,25 +57,36 @@ class AtumAgent private[agent] () {
/**
* Provides an AtumContext given a `AtumPartitions` instance. Retrieves the data from AtumService API.
*
* @param atumPartitions
* @return
* Note: if partitioning doesn't exist in the store yet, a new one will be created with the author stored in
* `AtumAgent.currentUser`. If partitioning already exists, this attribute will be ignored because there
* already is an author who previously created the partitioning in the data store. Each Atum Context thus
* can have different author potentially.
*
* @param atumPartitions: Partitioning based on which an Atum Context will be created or obtained.
* @return Atum context object that's either newly created in the data store, or obtained because the input
* partitioning already existed.
*/
def getOrCreateAtumContext(atumPartitions: AtumPartitions): AtumContext = {
val partitioningDTO = PartitioningDTO(AtumPartitions.toSeqPartitionDTO(atumPartitions), None)
val atumContextDTO = dispatcher.getOrCreateAtumContext(partitioningDTO)
lazy val atumContext = AtumContext.fromDTO(atumContextDTO, this)
val authorIfNew = AtumAgent.currentUser
val partitioningDTO = PartitioningSubmitDTO(AtumPartitions.toSeqPartitionDTO(atumPartitions), None, authorIfNew)

val atumContextDTO = dispatcher.createPartitioning(partitioningDTO)
val atumContext = AtumContext.fromDTO(atumContextDTO, this)

getExistingOrNewContext(atumPartitions, atumContext)
}

def getOrCreateAtumSubContext(subPartitions: AtumPartitions)(implicit parentAtumContext: AtumContext): AtumContext = {
val authorIfNew = AtumAgent.currentUser
val newPartitions: AtumPartitions = parentAtumContext.atumPartitions ++ subPartitions

val newPartitionsDTO = AtumPartitions.toSeqPartitionDTO(newPartitions)
val parentPartitionsDTO = Some(AtumPartitions.toSeqPartitionDTO(parentAtumContext.atumPartitions))
val partitioningDTO = PartitioningDTO(newPartitionsDTO, parentPartitionsDTO)
val partitioningDTO = PartitioningSubmitDTO(newPartitionsDTO, parentPartitionsDTO, authorIfNew)

val atumContextDTO = dispatcher.createPartitioning(partitioningDTO)
val atumContext = AtumContext.fromDTO(atumContextDTO, this)

val atumContextDTO = dispatcher.getOrCreateAtumContext(partitioningDTO)
lazy val atumContext = AtumContext.fromDTO(atumContextDTO, this)
getExistingOrNewContext(newPartitions, atumContext)
}

Expand All @@ -78,8 +101,6 @@ class AtumAgent private[agent] () {
}
}

private[this] var contexts: Map[AtumPartitions, AtumContext] = Map.empty

}

object AtumAgent extends AtumAgent
28 changes: 14 additions & 14 deletions agent/src/main/scala/za/co/absa/atum/agent/AtumContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,10 @@ import scala.collection.immutable.ListMap

/**
* This class provides the methods to measure Spark `Dataframe`. Also allows to add and remove measures.
* @param atumPartitions
* @param agent
* @param measures
* @param atumPartitions: Atum partitions associated with a given Atum Context.
* @param agent: Reference to an Atum Agent object that will be used within the current Atum Contedt.
* @param measures: Variable set of measures associated with a given partitions / context.
* @param additionalData: Additional metadata or tags, associated with a given context.
*/
class AtumContext private[agent] (
val atumPartitions: AtumPartitions,
Expand All @@ -52,15 +53,15 @@ class AtumContext private[agent] (
}
}

def createCheckpoint(checkpointName: String, author: String, dataToMeasure: DataFrame): AtumContext = {
def createCheckpoint(checkpointName: String, dataToMeasure: DataFrame): AtumContext = {
val startTime = OffsetDateTime.now()
val measurements = takeMeasurements(dataToMeasure)
val endTime = OffsetDateTime.now()

val checkpointDTO = CheckpointDTO(
id = UUID.randomUUID(),
name = checkpointName,
author = author,
author = this.agent.currentUser,
measuredByAtumAgent = true,
partitioning = AtumPartitions.toSeqPartitionDTO(this.atumPartitions),
processStartTime = startTime,
Expand All @@ -72,13 +73,13 @@ class AtumContext private[agent] (
this
}

def createCheckpointOnProvidedData(checkpointName: String, author: String, measurements: Seq[Measurement]): AtumContext = {
def createCheckpointOnProvidedData(checkpointName: String, measurements: Seq[Measurement]): AtumContext = {
val offsetDateTimeNow = OffsetDateTime.now()

val checkpointDTO = CheckpointDTO(
id = UUID.randomUUID(),
name = checkpointName,
author = author,
author = this.agent.currentUser,
partitioning = AtumPartitions.toSeqPartitionDTO(this.atumPartitions),
processStartTime = offsetDateTimeNow,
processEndTime = Some(offsetDateTimeNow),
Expand Down Expand Up @@ -130,16 +131,16 @@ object AtumContext {
ListMap(elems)
}

def apply(elems: Seq[(String, String)]): AtumPartitions = {
def apply(elems: List[(String, String)]): AtumPartitions = {
ListMap(elems:_*)
}

private[agent] def toSeqPartitionDTO(atumPartitions: AtumPartitions): Seq[PartitionDTO] = {
private[agent] def toSeqPartitionDTO(atumPartitions: AtumPartitions): PartitioningDTO = {
atumPartitions.map { case (key, value) => PartitionDTO(key, value) }.toSeq
}

private[agent] def fromPartitioning(partitioning: Seq[PartitionDTO]): AtumPartitions = {
AtumPartitions(partitioning.map(partition => partition.key -> partition.value))
private[agent] def fromPartitioning(partitioning: PartitioningDTO): AtumPartitions = {
AtumPartitions(partitioning.map(partition => Tuple2(partition.key, partition.value)).toList)
}
}

Expand All @@ -157,12 +158,11 @@ object AtumContext {
/**
* Set a point in the pipeline to execute calculation and store it.
* @param checkpointName The key assigned to this checkpoint
* @param author Author of the checkpoint
* @param atumContext Contains the calculations to be done and publish the result
* @return
*/
def createCheckpoint(checkpointName: String, author: String)(implicit atumContext: AtumContext): DataFrame = {
atumContext.createCheckpoint(checkpointName, author, df)
def createCheckpoint(checkpointName: String)(implicit atumContext: AtumContext): DataFrame = {
atumContext.createCheckpoint(checkpointName, df)
df
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package za.co.absa.atum.agent.dispatcher

import org.apache.spark.internal.Logging
import za.co.absa.atum.model.dto.{AtumContextDTO, CheckpointDTO, PartitioningDTO}
import za.co.absa.atum.model.dto.{AtumContextDTO, CheckpointDTO, PartitioningSubmitDTO}

/**
* dispatcher useful for development, testing and debugging
Expand All @@ -26,10 +26,11 @@ class ConsoleDispatcher extends Dispatcher with Logging {

logInfo("using console dispatcher")

override def getOrCreateAtumContext(partitioning: PartitioningDTO): AtumContextDTO = {
override def createPartitioning(partitioning: PartitioningSubmitDTO): AtumContextDTO = {
println(s"Fetching AtumContext using ConsoleDispatcher with partitioning $partitioning")
AtumContextDTO(partitioning = partitioning.partitioning)
}

override def saveCheckpoint(checkpoint: CheckpointDTO): Unit = {
println(s"Saving checkpoint to server. $checkpoint")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@

package za.co.absa.atum.agent.dispatcher

import za.co.absa.atum.model.dto.{AtumContextDTO, CheckpointDTO, PartitioningDTO}
import za.co.absa.atum.model.dto.{AtumContextDTO, CheckpointDTO, PartitioningSubmitDTO}

trait Dispatcher {
def getOrCreateAtumContext(partitioning: PartitioningDTO): AtumContextDTO
def createPartitioning(partitioning: PartitioningSubmitDTO): AtumContextDTO

def saveCheckpoint(checkpoint: CheckpointDTO): Unit
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,29 +20,39 @@ import com.typesafe.config.Config
import org.apache.spark.internal.Logging
import sttp.client3._
import sttp.model.Uri
import za.co.absa.atum.model.dto.{AtumContextDTO, CheckpointDTO, PartitioningDTO}
import za.co.absa.atum.model.dto.{AtumContextDTO, CheckpointDTO, PartitioningSubmitDTO}
import za.co.absa.atum.model.utils.SerializationUtils

class HttpDispatcher(config: Config) extends Dispatcher with Logging {

private val serverUri = Uri.unsafeParse(config.getString("url"))
private val serverUrl = config.getString("url")
private val createPartitioningEndpoint = Uri.unsafeParse(s"$serverUrl/api/v1/createPartitioning")
private val createCheckpointEndpoint = Uri.unsafeParse(s"$serverUrl/api/v1/createCheckpoint")

private val backend = HttpURLConnectionBackend()

logInfo("using http dispatcher")
logInfo(s"serverUri $serverUri")
logInfo(s"serverUrl $serverUrl")

override def getOrCreateAtumContext(partitioning: PartitioningDTO): AtumContextDTO = {
basicRequest
.body(s"$partitioning")
.post(serverUri)
.send(backend)
override def createPartitioning(partitioning: PartitioningSubmitDTO): AtumContextDTO = {
val request = basicRequest
.post(createPartitioningEndpoint)
.body(SerializationUtils.asJson(partitioning))
.header("Content-Type", "application/json")
.response(asString)

val response = backend.send(request)

AtumContextDTO(partitioning = partitioning.partitioning) // todo: implement request
response.body match {
case Left(_) => throw new RuntimeException(s"${response.code} ${response.statusText}")
case Right(r) => SerializationUtils.fromJson[AtumContextDTO](r)
}
}

override def saveCheckpoint(checkpoint: CheckpointDTO): Unit = {
basicRequest
.body(s"$checkpoint")
.post(serverUri)
.post(createCheckpointEndpoint)
.send(backend)
}

Expand Down
Loading

0 comments on commit 994ffba

Please sign in to comment.