Skip to content

Commit

Permalink
Fixes #120 - merging master
Browse files Browse the repository at this point in the history
  • Loading branch information
TebaleloS committed Apr 17, 2024
2 parents a6f09d3 + 7be4f59 commit 4475aaa
Show file tree
Hide file tree
Showing 16 changed files with 587 additions and 48 deletions.
5 changes: 3 additions & 2 deletions .github/workflows/jacoco_check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ on:
env:
scalaLong12: 2.13.11
scalaShort12: "2.13"
jaCocoReportVersion: v1.6.1
overall: 80.0
changed: 80.0

Expand All @@ -49,7 +50,7 @@ jobs:
- name: Add coverage to PR
if: steps.jacocorun.outcome == 'success'
id: jacoco-agent
uses: madrapps/jacoco-report@v1.6.1
uses: madrapps/jacoco-report@${{ env.jaCocoReportVersion }}
with:
name: agent-jacoco-report
paths: ${{ github.workspace }}/agent/target/jvm-${{ env.scalaShort12 }}/jacoco/report/jacoco.xml
Expand All @@ -62,7 +63,7 @@ jobs:
- name: Add coverage to PR
if: steps.jacocorun.outcome == 'success'
id: jacoco-model
uses: madrapps/jacoco-report@v1.6.1
uses: madrapps/jacoco-report@${{ env.jaCocoReportVersion }}
with:
name: model-jacoco-report
paths: ${{ github.workspace }}/model/target/jvm-${{ env.scalaShort12 }}/jacoco/report/jacoco.xml
Expand Down
6 changes: 5 additions & 1 deletion agent/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.

# dispatcher to be used (http or console)
# dispatcher to be used (http, console, capture)
atum.dispatcher.type="http"

# The REST API URI of the atum server
#atum.dispatcher.http.url=

# Maximum number of dispatch captures to keep in memory
#atum.dispatcher.capture.capture-limit=1000 # 0 means no limit

63 changes: 35 additions & 28 deletions agent/src/main/scala/za/co/absa/atum/agent/AtumAgent.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,55 +18,49 @@ package za.co.absa.atum.agent

import com.typesafe.config.{Config, ConfigFactory}
import za.co.absa.atum.agent.AtumContext.AtumPartitions
import za.co.absa.atum.agent.dispatcher.{ConsoleDispatcher, HttpDispatcher}
import za.co.absa.atum.agent.dispatcher.{CapturingDispatcher, ConsoleDispatcher, Dispatcher, HttpDispatcher}
import za.co.absa.atum.model.dto.{AdditionalDataSubmitDTO, CheckpointDTO, PartitioningSubmitDTO}

/**
* Entity that communicate with the API, primarily focused on spawning Atum Context(s).
* Entity that communicate with the API, primarily focused on spawning Atum Context(s).
*/
class AtumAgent private[agent] () {
trait AtumAgent {

private[this] var contexts: Map[AtumPartitions, AtumContext] = Map.empty

val config: Config = ConfigFactory.load()

private val dispatcher = config.getString("atum.dispatcher.type") match {
case "http" => new HttpDispatcher(config.getConfig("atum.dispatcher.http"))
case "console" => new ConsoleDispatcher
case dt => throw new UnsupportedOperationException(s"Unsupported dispatcher type: '$dt''")
}
val dispatcher: Dispatcher

/**
* Returns a user under who's security context the JVM is running.
* It's purpose is for auditing in author/createdBy fields.
* Returns a user under who's security context the JVM is running.
* It's purpose is for auditing in author/createdBy fields.
*
* Important: It's not supposed to be used for authorization as it can be spoofed!
* Important: It's not supposed to be used for authorization as it can be spoofed!
*
* @return Current user.
* @return Current user.
*/
private[agent] def currentUser: String = System.getProperty("user.name") // platform independent

/**
* Sends `CheckpointDTO` to the AtumService API
* Sends `CheckpointDTO` to the AtumService API
*
* @param checkpoint Already initialized Checkpoint object to store
* @param checkpoint Already initialized Checkpoint object to store
*/
private [agent] def saveCheckpoint(checkpoint: CheckpointDTO): Unit = {
private[agent] def saveCheckpoint(checkpoint: CheckpointDTO): Unit = {
dispatcher.saveCheckpoint(checkpoint)
}

/**
* Sends the `Metadata` to the Atumservice API
* @param additionalData the metadata to be saved to the server.
* Sends the `Metadata` to the Atumservice API
* @param additionalData the metadata to be saved to the server.
*/
private [agent] def saveAdditionalData(additionalData: AdditionalDataSubmitDTO): Unit = {
private[agent] def saveAdditionalData(additionalData: AdditionalDataSubmitDTO): Unit = {
dispatcher.saveAdditionalData(additionalData)
}

/**
* Provides an AtumContext given a `AtumPartitions` instance. Retrieves the data from AtumService API.
* Provides an AtumContext given a `AtumPartitions` instance. Retrieves the data from AtumService API.
*
* Note: if partitioning doesn't exist in the store yet, a new one will be created with the author stored in
* Note: if partitioning doesn't exist in the store yet, a new one will be created with the author stored in
* `AtumAgent.currentUser`. If partitioning already exists, this attribute will be ignored because there
* already is an author who previously created the partitioning in the data store. Each Atum Context thus
* can have different author potentially.
Expand All @@ -86,11 +80,11 @@ class AtumAgent private[agent] () {
}

/**
* Provides an AtumContext given a `AtumPartitions` instance for sub partitions.
* Retrieves the data from AtumService API.
* @param subPartitions Sub partitions based on which an Atum Context will be created or obtained.
* @param parentAtumContext Parent AtumContext.
* @return Atum context object
* Provides an AtumContext given a `AtumPartitions` instance for sub partitions.
* Retrieves the data from AtumService API.
* @param subPartitions Sub partitions based on which an Atum Context will be created or obtained.
* @param parentAtumContext Parent AtumContext.
* @return Atum context object
*/
def getOrCreateAtumSubContext(subPartitions: AtumPartitions)(implicit parentAtumContext: AtumContext): AtumContext = {
val authorIfNew = AtumAgent.currentUser
Expand Down Expand Up @@ -119,4 +113,17 @@ class AtumAgent private[agent] () {

}

object AtumAgent extends AtumAgent
object AtumAgent extends AtumAgent {

override val dispatcher: Dispatcher = dispatcherFromConfig()

def dispatcherFromConfig(config: Config = ConfigFactory.load()): Dispatcher = {
config.getString("atum.dispatcher.type") match {
case "http" => new HttpDispatcher(config)
case "console" => new ConsoleDispatcher(config)
case "capture" => new CapturingDispatcher(config)
case dt => throw new UnsupportedOperationException(s"Unsupported dispatcher type: '$dt''")
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
/*
* Copyright 2021 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.atum.agent.dispatcher

import com.typesafe.config.Config
import za.co.absa.atum.model.dto._

import java.util.concurrent.atomic.AtomicReference
import java.util.function.UnaryOperator
import scala.collection.immutable.Queue

/**
* This dispatcher captures the data and stores them in memory instead of actually sending anything.
* @param config: Config to be used to create the dispatcher. Keys:
* capture-limit - maximal amount of dispatch captures to store.
*/
class CapturingDispatcher(config: Config) extends Dispatcher(config) {
import CapturingDispatcher._


val captureLimit: Int = config.getInt(CheckpointLimitKey)

/**
* This method is used to clear all captured data.
*/
def clear(): Unit = {
val updateFunction = new UnaryOperator[Queue[CapturedCall]] {
override def apply(queue: Queue[CapturedCall]): Queue[CapturedCall] = Queue.empty
}
capturesRef.updateAndGet(updateFunction)
}

/**
* This method is used to check if the given function call has been captured.
*
* @param functionName - the function name that was supposed to be dispatched
* @return - true if the function was captured, false otherwise
*/
def contains(functionName: String): Boolean = {
captures.exists(_.functionName == functionName)
}

/**
* This method is used to check if the given function call has been captured.
*
* @param functionName - the function name that was supposed to be dispatched
* @param input - the input parameter of the function
* @return - true if the function was captured, false otherwise
*/
def contains[I](functionName: String, input: I): Boolean = {
captures.exists(item => (item.functionName == functionName) && (item.input == input))
}

/**
* This method is used to check if the given function call has been captured.
*
* @param functionName - the function name that was supposed to be dispatched
* @param input - the input parameter of the function
* @param result - the result of the function
* @return - true if the function was captured, false otherwise
*/
def contains[I, R](functionName: String, input: I, result: R): Boolean = {
captures.contains(CapturedCall(functionName, input, result))
}

/**
* This method is used to get the captured data.
*
* @return the captured data
*/
def captures: Queue[CapturedCall] = capturesRef.get()


private val capturesRef = new AtomicReference(Queue.empty[CapturedCall])

private def captureFunctionCall[I, R](input: I, result: R): R = {

val functionName = Thread.currentThread().getStackTrace()(2).getMethodName
val capture = CapturedCall(functionName, input, result)

val captureFunctions = new UnaryOperator[Queue[CapturedCall]] {
override def apply(queue: Queue[CapturedCall]): Queue[CapturedCall] = {
if ((captureLimit > 0) && (queue.size >= captureLimit)) {
queue.dequeue._2.enqueue(capture)
} else {
queue.enqueue(capture)
}
}
}

capturesRef.updateAndGet(captureFunctions)

result
}

/**
* This method is used to save checkpoint to server.
*
* @param checkpoint : CheckpointDTO to be saved.
*/
override protected[agent] def saveCheckpoint(checkpoint: CheckpointDTO): Unit = {
captureFunctionCall(checkpoint, ())
}

/**
* This method is used to save the additional data to the server.
*
* @param additionalData the data to be saved.
*/
override protected[agent] def saveAdditionalData(additionalData: AdditionalDataSubmitDTO): Unit = {
captureFunctionCall(additionalData, ())
}

/**
* This method is used to ensure the server knows the given partitioning.
* As a response the `AtumContext` is fetched from the server.
*
* @param partitioning : PartitioningSubmitDTO to be used to ensure server knows the given partitioning.
* @return AtumContextDTO.
*/
override protected[agent] def createPartitioning(partitioning: PartitioningSubmitDTO): AtumContextDTO = {
val result = AtumContextDTO(partitioning.partitioning)
captureFunctionCall(partitioning, result)
}
}

object CapturingDispatcher {
private val CheckpointLimitKey = "atum.dispatcher.capture.capture-limit"

abstract class CapturedCall {
type I
type R
val functionName: String
val input: I
val result: R
}

object CapturedCall {

final case class CapturedCallImpl[IX, RX] private[dispatcher](functionName: String, input: IX, result: RX)
extends CapturedCall {
type I = IX
type R = RX
}

def apply[I, R](functionName: String, input: I, result: R): CapturedCall = {
CapturedCallImpl(functionName, input, result)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,26 +16,27 @@

package za.co.absa.atum.agent.dispatcher

import com.typesafe.config.Config
import org.apache.spark.internal.Logging
import za.co.absa.atum.model.dto.{AdditionalDataSubmitDTO, AtumContextDTO, CheckpointDTO, PartitioningSubmitDTO}

/**
* dispatcher useful for development, testing and debugging
*/
class ConsoleDispatcher extends Dispatcher with Logging {
class ConsoleDispatcher(config: Config) extends Dispatcher(config: Config) with Logging {

logInfo("using console dispatcher")

override def createPartitioning(partitioning: PartitioningSubmitDTO): AtumContextDTO = {
override protected[agent] def createPartitioning(partitioning: PartitioningSubmitDTO): AtumContextDTO = {
println(s"Fetching AtumContext using ConsoleDispatcher with partitioning $partitioning")
AtumContextDTO(partitioning = partitioning.partitioning)
}

override def saveCheckpoint(checkpoint: CheckpointDTO): Unit = {
override protected[agent] def saveCheckpoint(checkpoint: CheckpointDTO): Unit = {
println(s"Saving checkpoint to server. $checkpoint")
}

override def saveAdditionalData(additionalData: AdditionalDataSubmitDTO): Unit = {
override protected[agent] def saveAdditionalData(additionalData: AdditionalDataSubmitDTO): Unit = {
println(s"Saving the additional data to server. $additionalData")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,29 +16,32 @@

package za.co.absa.atum.agent.dispatcher

import com.typesafe.config.Config
import za.co.absa.atum.model.dto.{AdditionalDataSubmitDTO, AtumContextDTO, CheckpointDTO, PartitioningSubmitDTO}

/**
* This trait provides a contract for different dispatchers
* This class provides a contract for different dispatchers. It has a constructor foe eventual creation via reflection.
* @param config: Config to be used to create the dispatcher.
*/
trait Dispatcher {
abstract class Dispatcher(config: Config) {

/**
* This method is used to ensure the server knows the given partitioning.
* As a response the `AtumContext` is fetched from the server.
* @param partitioning: PartitioningSubmitDTO to be used to ensure server knows the given partitioning.
* @return AtumContextDTO.
*/
def createPartitioning(partitioning: PartitioningSubmitDTO): AtumContextDTO
protected[agent] def createPartitioning(partitioning: PartitioningSubmitDTO): AtumContextDTO

/**
* This method is used to save checkpoint to server.
* @param checkpoint: CheckpointDTO to be saved.
*/
def saveCheckpoint(checkpoint: CheckpointDTO): Unit
protected[agent] def saveCheckpoint(checkpoint: CheckpointDTO): Unit

/**
* This method is used to save the additional data to the server.
* @param additionalData the data to be saved.
*/
def saveAdditionalData(additionalData: AdditionalDataSubmitDTO): Unit
protected[agent] def saveAdditionalData(additionalData: AdditionalDataSubmitDTO): Unit
}
Loading

0 comments on commit 4475aaa

Please sign in to comment.