Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#25: create class for partitions #45

Merged
merged 19 commits into from
Sep 18, 2023
7 changes: 4 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
# Atum


## Modules

### Agent `agent/`
This module is intended to replace the current [Atum](https://github.com/AbsaOSS/atum) repository. Provides libraries for establishing and pushing them to the API located in `server/`.
This module is intended to replace the current [Atum](https://github.com/AbsaOSS/atum) repository.
It provides libraries for establishing and pushing them to the API located in `server/`.
See `agent/README.md`.

### Server `server/`
An API under construction that communicates with AtumAgent and with the persisting storage. It also provides measure configuration to the `AtumAgent`.
An API under construction that communicates with the Agent and with the persistent storage.
It also provides measure configuration to the `AtumAgent`.
See `server/README.md`.
32 changes: 18 additions & 14 deletions agent/README.md
Original file line number Diff line number Diff line change
@@ -1,32 +1,36 @@
# Atum Agent


`Atum Agent` module has two main features
`AtumAgent`: Retrieves the configurations and reports the measures.
`AtumContext`: Provides a library for calculating control measures over a `Spark` `Dataframe`.

`Atum Agent` module has two main parts:
* `AtumAgent`: Retrieves the configurations and reports the measures.
* `AtumContext`: Provides a library for calculating control measures over a `Spark` `Dataframe`.


## Usage

Include `AtumAgent` as an implicit in the scope for use by the `AtumContext`.
Create multiple `AtumContext` with different control measures to be applied

### Option 1
```scala
import za.co.absa.atum.agent.AtumContext.DatasetWrapper
import za.co.absa.atum.agent.model._
val atumContextInstanceWithRecordCount = AtumContext(processor = processor)
.withMeasureAdded(RecordCount(MockMeasureNames.recordCount1, controlCol = "id"))

val atumContextWithSalaryAbsMeasure = atumContextInstanceWithRecordCount
.withMeasureAdded(AbsSumOfValuesOfColumn(controlCol = "salary"))
```

### Option 2
Use `AtumPartitions` to get an `AtumContext` from the service using the `AtumAgent`.
```scala
implicit val agent: AtumAgent = new AgentImpl
val atumContext1 = AtumAgent.createAtumContext(atumPartition)
```

Create multiple `AtumContext` with different control measures
#### AtumPartitions
A list of key values that maintains the order of arrival of the items, the `AtumService`
is able to deliver the correct `AtumContext` according to the `AtumPartitions` we give it.
```scala
val atumContextInstanceWithRecordCount = AtumContext(processor = processor)
.withMeasureAdded(RecordCount(MockMeasureNames.recordCount1, controlCol = "id"))
val atumPartitions = AtumPartitions().withPartitions(ListMap("name" -> "partition-name", "country" -> "SA", "gender" -> "female" ))

val atumContextWithSalaryAbsMeasure = atumContextInstanceWithRecordCount
.withMeasureAdded(AbsSumOfValuesOfColumn(controlCol = "salary"))
val subPartition = atumPartitions.addPartition("otherKey", "otherValue")
```

Control measures can also be overwritten, added or removed.
Expand Down
71 changes: 65 additions & 6 deletions agent/src/main/scala/za/co/absa/atum/agent/AtumAgent.scala
Original file line number Diff line number Diff line change
@@ -1,17 +1,76 @@
/*
* Copyright 2021 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.atum.agent

import za.co.absa.atum.agent.AtumContext.AtumPartitions
import za.co.absa.atum.agent.model.MeasureResult

/**
* Place holder for the agent that communicate with the API.
*/
object AtumAgent {
class AtumAgent private() {

/**
* Sends a single `MeasureResult` to the AtumService API along with an extra data from a given `AtumContext`.
* @param checkpointKey
* @param atumContext
* @param measureResult
*/
def publish(checkpointKey: String, atumContext: AtumContext, measureResult: MeasureResult): Unit =
println(
s"Enqueued measurement: ${Seq(checkpointKey, atumContext, measureResult).mkString(" || ")}"
)

/**
* Sends a single `MeasureResult` to the AtumService API. It doesn't involve AtumContext.
*
* @param checkpointKey
* @param measureResult
*/
def measurePublish(checkpointKey: String, measureResult: MeasureResult): Unit =
println(s"Enqueued measurement: $checkpointKey || $measureResult")

def measurePublish(checkpointKey: String, measure: MeasureResult): Unit =
println(s"Enqueued measurement: $checkpointKey, " + (measure))
/**
* Provides an AtumContext given a `AtumPartitions` instance. Retrieves the data from AtumService API.
* @param atumPartitions
* @return
*/
def getOrCreateAtumContext(atumPartitions: AtumPartitions): AtumContext = {
contexts.getOrElse(atumPartitions, new AtumContext(atumPartitions, this))
}

def publish(checkpointKey: String, context: AtumContext, measureResult: MeasureResult): Unit = println(
Seq(checkpointKey, context, measureResult).mkString(" || ")
)
def getOrCreateAtumSubContext(subPartitions: AtumPartitions)(implicit atumContext: AtumContext): AtumContext = {
val newPartitions: AtumPartitions = atumContext.atumPartitions ++ subPartitions
getContextOrElse(newPartitions, atumContext.copy(atumPartitions = newPartitions, parentAgent = this))
}

private def getContextOrElse(atumPartitions: AtumPartitions, creationMethod: =>AtumContext): AtumContext = {
synchronized{
contexts.getOrElse(atumPartitions, {
val result = creationMethod
contexts = contexts + (atumPartitions -> result)
result
})
}
}


private[this] var contexts: Map[AtumPartitions, AtumContext] = Map.empty

}

object AtumAgent extends AtumAgent
98 changes: 72 additions & 26 deletions agent/src/main/scala/za/co/absa/atum/agent/AtumContext.scala
Original file line number Diff line number Diff line change
@@ -1,56 +1,102 @@
/*
* Copyright 2021 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.atum.agent

import org.apache.spark.sql.DataFrame
import za.co.absa.atum.agent.model.{MeasureResult, Measurement}
import za.co.absa.atum.agent.model.{MeasureNew, MeasureResult, Measure}
lsulak marked this conversation as resolved.
Show resolved Hide resolved
import AtumContext.AtumPartitions

import scala.collection.immutable.ListMap

/**
* AtumContext: This class provides the methods to measure Spark `Dataframe`. Also allows to add/edit/remove measures.
* @param measurements: A sequences of measurements.
lsulak marked this conversation as resolved.
Show resolved Hide resolved
*/

case class AtumContext(measurements: Set[Measurement] = Set()) {
class AtumContext private[agent](
val atumPartitions: AtumPartitions,
val parentAgent: AtumAgent,
private var measures: Set[Measure] = Set.empty) {

def withMeasuresReplaced(
byMeasure: Measurement
): AtumContext =
this.copy(measurements = Set(byMeasure))
def currentMeasures: Set[Measure] = measures

def subPartitionContext(subPartitions: AtumPartitions): AtumContext = {
parentAgent.getOrCreateAtumSubContext(atumPartitions ++ subPartitions)(this)
}

def withMeasuresReplaced(
byMeasures: Iterable[Measurement]
): AtumContext =
this.copy(measurements = byMeasures.toSet)
def createCheckpoint(checkpointName: String, dataToMeasure: DataFrame) = {
??? //TODO #26
}

def withMeasuresAdded(
measure: Measurement
): AtumContext =
this.copy(measurements = measurements + measure)
def saveCheckpointMeasurements(checkpointName: String, measurements: Seq[MeasureNew]) = {
??? //TODO #55
}

def withMeasuresAdded(
measures: Iterable[Measurement]
): AtumContext =
this.copy(measurements = measurements ++ measures)
def addAdditionalData(key: String, value: String) = {
??? //TODO #60
}

def withMeasureRemoved(measurement: Measurement): AtumContext =
this.copy(measurements = measurements.filterNot(_ == measurement))
def addMeasure(newMeasure: Measure): AtumContext = {
measures = measures + newMeasure
this
}

def addMeasures(newMeasures: Set[Measure]): AtumContext = {
measures = measures ++ newMeasures
this
}

def removeMeasure(measureToRemove: Measure): AtumContext = {
measures = measures - measureToRemove
this
}

private[agent] def copy(
atumPartitions: AtumPartitions = this.atumPartitions,
parentAgent: AtumAgent = this.parentAgent,
measures: Set[Measure] = this.measures
): AtumContext = {
new AtumContext(atumPartitions, parentAgent, measures)
}
}

object AtumContext {
type AtumPartitions = ListMap[String, String]

object AtumPartitions {
def apply(elems: (String, String)): AtumPartitions = {
ListMap(elems)
}
}

implicit class DatasetWrapper(df: DataFrame) {

/**
* Executes the measure directly with not AtumContext.
* Executes the measure directly (without AtumContext).
* @param measure the measure to be calculated
* @return
*/
def executeMeasure(checkpointName: String, measure: Measurement): DataFrame = {

def executeMeasure(checkpointName: String, measure: Measure): DataFrame = {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that execute should do only executing and publishing should do publishing. And it can be all put together in some other operation.

I definitely wouldn't expect from 'executeMeasure' to actually also send data to the server

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think these could and should be eventually completely removed. Thee;s no scenario where just particular measure are recorded for a checkpoint.

val result = MeasureResult(measure, measure.function(df))
AtumAgent.measurePublish(checkpointName, result)
df
}

def executeMeasures(checkpointName: String, measures: Iterable[Measurement]): DataFrame = {
def executeMeasures(checkpointName: String, measures: Iterable[Measure]): DataFrame = {
measures.foreach(m => executeMeasure(checkpointName, m))
df
}
Expand All @@ -62,11 +108,11 @@ object AtumContext {
* @return
*/
def createCheckpoint(checkpointName: String)(implicit atumContext: AtumContext): DataFrame = {
atumContext.measurements.foreach { measure =>
atumContext.measures.foreach { measure =>
val result = MeasureResult(measure, measure.function(df))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here you can call executeMeasure function.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As it will (probably) go away, can this stay? 😉

AtumAgent.publish(checkpointName, atumContext, result)

executeMeasures(checkpointName, atumContext.measurements)
executeMeasures(checkpointName, atumContext.measures)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this line can be removed, it's doing the same as the whole foreach basically

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will removed, but as said, this needs reword. createCheckpoint and send it as an "atomic" steo.

}

df
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
/*
* Copyright 2021 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.atum.agent.core

import org.apache.spark.sql.DataFrame
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Copyright 2021 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.atum.agent.model

import za.co.absa.atum.agent.model.AtumPartitionsOld.Partitions

import scala.collection.immutable.ListMap

case class AtumPartitionsOld(partitions: Partitions = ListMap()) {
lsulak marked this conversation as resolved.
Show resolved Hide resolved
lsulak marked this conversation as resolved.
Show resolved Hide resolved

/**
* Creates a new `AtumPartitions` instance with the given metadata
* @param partitions metadata key value map that preserves the order of arrival of the elements.
* @return
*/
def withPartitions(partitions: Partitions): AtumPartitionsOld = this.copy(partitions = partitions)
lsulak marked this conversation as resolved.
Show resolved Hide resolved

/**
* Creates a new `AtumPartitions` instance with new metadata key values added known as a sub-Partition
* @param partitions metadata key value map to be added to the existing metadata
* @return
*/
def addPartitions(partitions: Partitions): AtumPartitionsOld = this.copy(partitions = this.partitions ++ partitions)

/**
* Creates a new Partition instance with new metadata key values added known as a sub-Partition
lsulak marked this conversation as resolved.
Show resolved Hide resolved
* @param partitions key value map to be added to the existing metadata
* @return
*/
def addPartitions(partitions: Map[String, String]): AtumPartitionsOld = {
val typedPartitions: Partitions = ListMap(partitions.toList: _*)
addPartitions(typedPartitions)
}

/**
* Creates a new Partition instance with new partition key values added known as a sub-Partition
lsulak marked this conversation as resolved.
Show resolved Hide resolved
* @param key new partition key
* @param value new partition value
* @return
*/
def addPartition(key: String, value: String): AtumPartitionsOld =
this.copy(partitions = this.partitions + (key -> value))

/**
* An alias for `addPartitions`
*/
def subPartitions(partitions: Partitions): AtumPartitionsOld = addPartitions(partitions)
lsulak marked this conversation as resolved.
Show resolved Hide resolved

}

object AtumPartitionsOld {

// Each element represent a data partition. The order is preserved as a list.
lsulak marked this conversation as resolved.
Show resolved Hide resolved
type Partitions = ListMap[String, String]
}
Loading