From 7ef4dd2c3dfaafad85a5794921f7efe0dbd46f41 Mon Sep 17 00:00:00 2001 From: David Benedeki <14905969+benedeki@users.noreply.github.com> Date: Thu, 11 Apr 2024 12:45:04 +0200 Subject: [PATCH 1/2] #97: Add possibility to Inject Custom Logger and Dispatcher, reasonable defaults otherwise (#182) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * `AtumAgent` methods distilled into a `trait AtumAgent` for testing purposes * a method to create common dispatcher based on config values * Added `CapturingDispatcher` to capture all dispatch calls * `CapturingDipsatcher` allow testing against captured records * All dispatchers have unified constructor to allow creation by reflection in future --------- Co-authored-by: Filip Horňák Co-authored-by: Ladislav Sulak --- .github/workflows/jacoco_check.yml | 5 +- agent/src/main/resources/reference.conf | 6 +- .../za/co/absa/atum/agent/AtumAgent.scala | 63 ++++--- .../dispatcher/CapturingDispatcher.scala | 164 ++++++++++++++++++ .../agent/dispatcher/ConsoleDispatcher.scala | 9 +- .../atum/agent/dispatcher/Dispatcher.scala | 13 +- .../agent/dispatcher/HttpDispatcher.scala | 16 +- .../za/co/absa/atum/agent/AtumAgentTest.scala | 54 +++++- .../co/absa/atum/agent/AtumContextTest.scala | 2 +- .../dispatcher/CapturingDispatcherTest.scala | 110 ++++++++++++ project/Dependencies.scala | 7 +- 11 files changed, 401 insertions(+), 48 deletions(-) create mode 100644 agent/src/main/scala/za/co/absa/atum/agent/dispatcher/CapturingDispatcher.scala create mode 100644 agent/src/test/scala/za/co/absa/atum/agent/dispatcher/CapturingDispatcherTest.scala diff --git a/.github/workflows/jacoco_check.yml b/.github/workflows/jacoco_check.yml index ca56bb4e9..c8db65a33 100644 --- a/.github/workflows/jacoco_check.yml +++ b/.github/workflows/jacoco_check.yml @@ -24,6 +24,7 @@ on: env: scalaLong12: 2.13.11 scalaShort12: "2.13" + jaCocoReportVersion: v1.6.1 overall: 80.0 changed: 80.0 @@ -49,7 +50,7 @@ jobs: - name: Add coverage to PR if: steps.jacocorun.outcome == 'success' id: jacoco-agent - uses: madrapps/jacoco-report@v1.6.1 + uses: madrapps/jacoco-report@${{ env.jaCocoReportVersion }} with: name: agent-jacoco-report paths: ${{ github.workspace }}/agent/target/jvm-${{ env.scalaShort12 }}/jacoco/report/jacoco.xml @@ -62,7 +63,7 @@ jobs: - name: Add coverage to PR if: steps.jacocorun.outcome == 'success' id: jacoco-model - uses: madrapps/jacoco-report@v1.6.1 + uses: madrapps/jacoco-report@${{ env.jaCocoReportVersion }} with: name: model-jacoco-report paths: ${{ github.workspace }}/model/target/jvm-${{ env.scalaShort12 }}/jacoco/report/jacoco.xml diff --git a/agent/src/main/resources/reference.conf b/agent/src/main/resources/reference.conf index 1b6a657a8..306926bae 100644 --- a/agent/src/main/resources/reference.conf +++ b/agent/src/main/resources/reference.conf @@ -11,8 +11,12 @@ # See the License for the specific language governing permissions and # limitations under the License. -# dispatcher to be used (http or console) +# dispatcher to be used (http, console, capture) atum.dispatcher.type="http" # The REST API URI of the atum server #atum.dispatcher.http.url= + +# Maximum number of dispatch captures to keep in memory +#atum.dispatcher.capture.capture-limit=1000 # 0 means no limit + diff --git a/agent/src/main/scala/za/co/absa/atum/agent/AtumAgent.scala b/agent/src/main/scala/za/co/absa/atum/agent/AtumAgent.scala index 0ade73c5c..9e375b9e8 100644 --- a/agent/src/main/scala/za/co/absa/atum/agent/AtumAgent.scala +++ b/agent/src/main/scala/za/co/absa/atum/agent/AtumAgent.scala @@ -18,55 +18,49 @@ package za.co.absa.atum.agent import com.typesafe.config.{Config, ConfigFactory} import za.co.absa.atum.agent.AtumContext.AtumPartitions -import za.co.absa.atum.agent.dispatcher.{ConsoleDispatcher, HttpDispatcher} +import za.co.absa.atum.agent.dispatcher.{CapturingDispatcher, ConsoleDispatcher, Dispatcher, HttpDispatcher} import za.co.absa.atum.model.dto.{AdditionalDataSubmitDTO, CheckpointDTO, PartitioningSubmitDTO} /** - * Entity that communicate with the API, primarily focused on spawning Atum Context(s). + * Entity that communicate with the API, primarily focused on spawning Atum Context(s). */ -class AtumAgent private[agent] () { +trait AtumAgent { private[this] var contexts: Map[AtumPartitions, AtumContext] = Map.empty - val config: Config = ConfigFactory.load() - - private val dispatcher = config.getString("atum.dispatcher.type") match { - case "http" => new HttpDispatcher(config.getConfig("atum.dispatcher.http")) - case "console" => new ConsoleDispatcher - case dt => throw new UnsupportedOperationException(s"Unsupported dispatcher type: '$dt''") - } + val dispatcher: Dispatcher /** - * Returns a user under who's security context the JVM is running. - * It's purpose is for auditing in author/createdBy fields. + * Returns a user under who's security context the JVM is running. + * It's purpose is for auditing in author/createdBy fields. * - * Important: It's not supposed to be used for authorization as it can be spoofed! + * Important: It's not supposed to be used for authorization as it can be spoofed! * - * @return Current user. + * @return Current user. */ private[agent] def currentUser: String = System.getProperty("user.name") // platform independent /** - * Sends `CheckpointDTO` to the AtumService API + * Sends `CheckpointDTO` to the AtumService API * - * @param checkpoint Already initialized Checkpoint object to store + * @param checkpoint Already initialized Checkpoint object to store */ - private [agent] def saveCheckpoint(checkpoint: CheckpointDTO): Unit = { + private[agent] def saveCheckpoint(checkpoint: CheckpointDTO): Unit = { dispatcher.saveCheckpoint(checkpoint) } /** - * Sends the `Metadata` to the Atumservice API - * @param additionalData the metadata to be saved to the server. + * Sends the `Metadata` to the Atumservice API + * @param additionalData the metadata to be saved to the server. */ - private [agent] def saveAdditionalData(additionalData: AdditionalDataSubmitDTO): Unit = { + private[agent] def saveAdditionalData(additionalData: AdditionalDataSubmitDTO): Unit = { dispatcher.saveAdditionalData(additionalData) } /** - * Provides an AtumContext given a `AtumPartitions` instance. Retrieves the data from AtumService API. + * Provides an AtumContext given a `AtumPartitions` instance. Retrieves the data from AtumService API. * - * Note: if partitioning doesn't exist in the store yet, a new one will be created with the author stored in + * Note: if partitioning doesn't exist in the store yet, a new one will be created with the author stored in * `AtumAgent.currentUser`. If partitioning already exists, this attribute will be ignored because there * already is an author who previously created the partitioning in the data store. Each Atum Context thus * can have different author potentially. @@ -86,11 +80,11 @@ class AtumAgent private[agent] () { } /** - * Provides an AtumContext given a `AtumPartitions` instance for sub partitions. - * Retrieves the data from AtumService API. - * @param subPartitions Sub partitions based on which an Atum Context will be created or obtained. - * @param parentAtumContext Parent AtumContext. - * @return Atum context object + * Provides an AtumContext given a `AtumPartitions` instance for sub partitions. + * Retrieves the data from AtumService API. + * @param subPartitions Sub partitions based on which an Atum Context will be created or obtained. + * @param parentAtumContext Parent AtumContext. + * @return Atum context object */ def getOrCreateAtumSubContext(subPartitions: AtumPartitions)(implicit parentAtumContext: AtumContext): AtumContext = { val authorIfNew = AtumAgent.currentUser @@ -119,4 +113,17 @@ class AtumAgent private[agent] () { } -object AtumAgent extends AtumAgent +object AtumAgent extends AtumAgent { + + override val dispatcher: Dispatcher = dispatcherFromConfig() + + def dispatcherFromConfig(config: Config = ConfigFactory.load()): Dispatcher = { + config.getString("atum.dispatcher.type") match { + case "http" => new HttpDispatcher(config) + case "console" => new ConsoleDispatcher(config) + case "capture" => new CapturingDispatcher(config) + case dt => throw new UnsupportedOperationException(s"Unsupported dispatcher type: '$dt''") + } + } + +} diff --git a/agent/src/main/scala/za/co/absa/atum/agent/dispatcher/CapturingDispatcher.scala b/agent/src/main/scala/za/co/absa/atum/agent/dispatcher/CapturingDispatcher.scala new file mode 100644 index 000000000..6684b7253 --- /dev/null +++ b/agent/src/main/scala/za/co/absa/atum/agent/dispatcher/CapturingDispatcher.scala @@ -0,0 +1,164 @@ +/* + * Copyright 2021 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.atum.agent.dispatcher + +import com.typesafe.config.Config +import za.co.absa.atum.model.dto._ + +import java.util.concurrent.atomic.AtomicReference +import java.util.function.UnaryOperator +import scala.collection.immutable.Queue + +/** + * This dispatcher captures the data and stores them in memory instead of actually sending anything. + * @param config: Config to be used to create the dispatcher. Keys: + * capture-limit - maximal amount of dispatch captures to store. + */ +class CapturingDispatcher(config: Config) extends Dispatcher(config) { + import CapturingDispatcher._ + + + val captureLimit: Int = config.getInt(CheckpointLimitKey) + + /** + * This method is used to clear all captured data. + */ + def clear(): Unit = { + val updateFunction = new UnaryOperator[Queue[CapturedCall]] { + override def apply(queue: Queue[CapturedCall]): Queue[CapturedCall] = Queue.empty + } + capturesRef.updateAndGet(updateFunction) + } + + /** + * This method is used to check if the given function call has been captured. + * + * @param functionName - the function name that was supposed to be dispatched + * @return - true if the function was captured, false otherwise + */ + def contains(functionName: String): Boolean = { + captures.exists(_.functionName == functionName) + } + + /** + * This method is used to check if the given function call has been captured. + * + * @param functionName - the function name that was supposed to be dispatched + * @param input - the input parameter of the function + * @return - true if the function was captured, false otherwise + */ + def contains[I](functionName: String, input: I): Boolean = { + captures.exists(item => (item.functionName == functionName) && (item.input == input)) + } + + /** + * This method is used to check if the given function call has been captured. + * + * @param functionName - the function name that was supposed to be dispatched + * @param input - the input parameter of the function + * @param result - the result of the function + * @return - true if the function was captured, false otherwise + */ + def contains[I, R](functionName: String, input: I, result: R): Boolean = { + captures.contains(CapturedCall(functionName, input, result)) + } + + /** + * This method is used to get the captured data. + * + * @return the captured data + */ + def captures: Queue[CapturedCall] = capturesRef.get() + + + private val capturesRef = new AtomicReference(Queue.empty[CapturedCall]) + + private def captureFunctionCall[I, R](input: I, result: R): R = { + + val functionName = Thread.currentThread().getStackTrace()(2).getMethodName + val capture = CapturedCall(functionName, input, result) + + val captureFunctions = new UnaryOperator[Queue[CapturedCall]] { + override def apply(queue: Queue[CapturedCall]): Queue[CapturedCall] = { + if ((captureLimit > 0) && (queue.size >= captureLimit)) { + queue.dequeue._2.enqueue(capture) + } else { + queue.enqueue(capture) + } + } + } + + capturesRef.updateAndGet(captureFunctions) + + result + } + + /** + * This method is used to save checkpoint to server. + * + * @param checkpoint : CheckpointDTO to be saved. + */ + override protected[agent] def saveCheckpoint(checkpoint: CheckpointDTO): Unit = { + captureFunctionCall(checkpoint, ()) + } + + /** + * This method is used to save the additional data to the server. + * + * @param additionalData the data to be saved. + */ + override protected[agent] def saveAdditionalData(additionalData: AdditionalDataSubmitDTO): Unit = { + captureFunctionCall(additionalData, ()) + } + + /** + * This method is used to ensure the server knows the given partitioning. + * As a response the `AtumContext` is fetched from the server. + * + * @param partitioning : PartitioningSubmitDTO to be used to ensure server knows the given partitioning. + * @return AtumContextDTO. + */ + override protected[agent] def createPartitioning(partitioning: PartitioningSubmitDTO): AtumContextDTO = { + val result = AtumContextDTO(partitioning.partitioning) + captureFunctionCall(partitioning, result) + } +} + +object CapturingDispatcher { + private val CheckpointLimitKey = "atum.dispatcher.capture.capture-limit" + + abstract class CapturedCall { + type I + type R + val functionName: String + val input: I + val result: R + } + + object CapturedCall { + + final case class CapturedCallImpl[IX, RX] private[dispatcher](functionName: String, input: IX, result: RX) + extends CapturedCall { + type I = IX + type R = RX + } + + def apply[I, R](functionName: String, input: I, result: R): CapturedCall = { + CapturedCallImpl(functionName, input, result) + } + } +} diff --git a/agent/src/main/scala/za/co/absa/atum/agent/dispatcher/ConsoleDispatcher.scala b/agent/src/main/scala/za/co/absa/atum/agent/dispatcher/ConsoleDispatcher.scala index 834b4185e..cbffd2ff4 100644 --- a/agent/src/main/scala/za/co/absa/atum/agent/dispatcher/ConsoleDispatcher.scala +++ b/agent/src/main/scala/za/co/absa/atum/agent/dispatcher/ConsoleDispatcher.scala @@ -16,26 +16,27 @@ package za.co.absa.atum.agent.dispatcher +import com.typesafe.config.Config import org.apache.spark.internal.Logging import za.co.absa.atum.model.dto.{AdditionalDataSubmitDTO, AtumContextDTO, CheckpointDTO, PartitioningSubmitDTO} /** * dispatcher useful for development, testing and debugging */ -class ConsoleDispatcher extends Dispatcher with Logging { +class ConsoleDispatcher(config: Config) extends Dispatcher(config: Config) with Logging { logInfo("using console dispatcher") - override def createPartitioning(partitioning: PartitioningSubmitDTO): AtumContextDTO = { + override protected[agent] def createPartitioning(partitioning: PartitioningSubmitDTO): AtumContextDTO = { println(s"Fetching AtumContext using ConsoleDispatcher with partitioning $partitioning") AtumContextDTO(partitioning = partitioning.partitioning) } - override def saveCheckpoint(checkpoint: CheckpointDTO): Unit = { + override protected[agent] def saveCheckpoint(checkpoint: CheckpointDTO): Unit = { println(s"Saving checkpoint to server. $checkpoint") } - override def saveAdditionalData(additionalData: AdditionalDataSubmitDTO): Unit = { + override protected[agent] def saveAdditionalData(additionalData: AdditionalDataSubmitDTO): Unit = { println(s"Saving the additional data to server. $additionalData") } diff --git a/agent/src/main/scala/za/co/absa/atum/agent/dispatcher/Dispatcher.scala b/agent/src/main/scala/za/co/absa/atum/agent/dispatcher/Dispatcher.scala index d337e563f..f0ea03812 100644 --- a/agent/src/main/scala/za/co/absa/atum/agent/dispatcher/Dispatcher.scala +++ b/agent/src/main/scala/za/co/absa/atum/agent/dispatcher/Dispatcher.scala @@ -16,29 +16,32 @@ package za.co.absa.atum.agent.dispatcher +import com.typesafe.config.Config import za.co.absa.atum.model.dto.{AdditionalDataSubmitDTO, AtumContextDTO, CheckpointDTO, PartitioningSubmitDTO} /** - * This trait provides a contract for different dispatchers + * This class provides a contract for different dispatchers. It has a constructor foe eventual creation via reflection. + * @param config: Config to be used to create the dispatcher. */ -trait Dispatcher { +abstract class Dispatcher(config: Config) { + /** * This method is used to ensure the server knows the given partitioning. * As a response the `AtumContext` is fetched from the server. * @param partitioning: PartitioningSubmitDTO to be used to ensure server knows the given partitioning. * @return AtumContextDTO. */ - def createPartitioning(partitioning: PartitioningSubmitDTO): AtumContextDTO + protected[agent] def createPartitioning(partitioning: PartitioningSubmitDTO): AtumContextDTO /** * This method is used to save checkpoint to server. * @param checkpoint: CheckpointDTO to be saved. */ - def saveCheckpoint(checkpoint: CheckpointDTO): Unit + protected[agent] def saveCheckpoint(checkpoint: CheckpointDTO): Unit /** * This method is used to save the additional data to the server. * @param additionalData the data to be saved. */ - def saveAdditionalData(additionalData: AdditionalDataSubmitDTO): Unit + protected[agent] def saveAdditionalData(additionalData: AdditionalDataSubmitDTO): Unit } diff --git a/agent/src/main/scala/za/co/absa/atum/agent/dispatcher/HttpDispatcher.scala b/agent/src/main/scala/za/co/absa/atum/agent/dispatcher/HttpDispatcher.scala index 47a1f63d4..13548a18c 100644 --- a/agent/src/main/scala/za/co/absa/atum/agent/dispatcher/HttpDispatcher.scala +++ b/agent/src/main/scala/za/co/absa/atum/agent/dispatcher/HttpDispatcher.scala @@ -24,9 +24,11 @@ import za.co.absa.atum.agent.exception.AtumAgentException.HttpException import za.co.absa.atum.model.dto.{AdditionalDataSubmitDTO, AtumContextDTO, CheckpointDTO, PartitioningSubmitDTO} import za.co.absa.atum.model.utils.SerializationUtils -class HttpDispatcher(config: Config) extends Dispatcher with Logging { +class HttpDispatcher(config: Config) extends Dispatcher(config: Config) with Logging { + import HttpDispatcher._ + + val serverUrl: String = config.getString(UrlKey) - private val serverUrl = config.getString("url") private val currentApiVersion = "/api/v1" private val createPartitioningEndpoint = Uri.unsafeParse(s"$serverUrl$currentApiVersion/createPartitioning") private val createCheckpointEndpoint = Uri.unsafeParse(s"$serverUrl$currentApiVersion/createCheckpoint") @@ -41,7 +43,7 @@ class HttpDispatcher(config: Config) extends Dispatcher with Logging { logInfo("using http dispatcher") logInfo(s"serverUrl $serverUrl") - override def createPartitioning(partitioning: PartitioningSubmitDTO): AtumContextDTO = { + override protected[agent] def createPartitioning(partitioning: PartitioningSubmitDTO): AtumContextDTO = { val request = commonAtumRequest .post(createPartitioningEndpoint) .body(SerializationUtils.asJson(partitioning)) @@ -53,7 +55,7 @@ class HttpDispatcher(config: Config) extends Dispatcher with Logging { ) } - override def saveCheckpoint(checkpoint: CheckpointDTO): Unit = { + override protected[agent] def saveCheckpoint(checkpoint: CheckpointDTO): Unit = { val request = commonAtumRequest .post(createCheckpointEndpoint) .body(SerializationUtils.asJson(checkpoint)) @@ -63,7 +65,7 @@ class HttpDispatcher(config: Config) extends Dispatcher with Logging { handleResponseBody(response) } - override def saveAdditionalData(additionalDataSubmitDTO: AdditionalDataSubmitDTO): Unit = { + override protected[agent] def saveAdditionalData(additionalDataSubmitDTO: AdditionalDataSubmitDTO): Unit = { val request = commonAtumRequest .post(createAdditionalDataEndpoint) .body(SerializationUtils.asJson(additionalDataSubmitDTO)) @@ -81,3 +83,7 @@ class HttpDispatcher(config: Config) extends Dispatcher with Logging { } } + +object HttpDispatcher { + private val UrlKey = "atum.dispatcher.http.url" +} diff --git a/agent/src/test/scala/za/co/absa/atum/agent/AtumAgentTest.scala b/agent/src/test/scala/za/co/absa/atum/agent/AtumAgentTest.scala index d358f3de8..6a7636f1d 100644 --- a/agent/src/test/scala/za/co/absa/atum/agent/AtumAgentTest.scala +++ b/agent/src/test/scala/za/co/absa/atum/agent/AtumAgentTest.scala @@ -16,13 +16,15 @@ package za.co.absa.atum.agent +import com.typesafe.config.{Config, ConfigException, ConfigFactory, ConfigValueFactory} import org.scalatest.funsuite.AnyFunSuiteLike import za.co.absa.atum.agent.AtumContext.AtumPartitions +import za.co.absa.atum.agent.dispatcher.{CapturingDispatcher, ConsoleDispatcher, HttpDispatcher} class AtumAgentTest extends AnyFunSuiteLike { test("AtumAgent creates AtumContext(s) as expected") { - val atumAgent = new AtumAgent() + val atumAgent = AtumAgent val atumPartitions = AtumPartitions("abc" -> "def") val subPartitions = AtumPartitions("ghi", "jkl") @@ -41,4 +43,54 @@ class AtumAgentTest extends AnyFunSuiteLike { assert(atumSubContext.agent == atumAgent) } + test("AtumAgent creates dispatcher per configuration") { + def configForDispatcher(dispatcherType: String): Config = { + val emptyConfig = ConfigFactory.empty() + val value = ConfigValueFactory.fromAnyRef(dispatcherType) + emptyConfig.withValue("atum.dispatcher.type", value) + } + + def configOf(configValues: Map[String, Any]): Config = { + val emptyConfig = ConfigFactory.empty() + configValues.foldLeft(emptyConfig) { case (acc, (configKey, value)) => + val configValue = ConfigValueFactory.fromAnyRef(value) + acc.withValue(configKey, configValue) + } + } + + AtumAgent.dispatcherFromConfig(configOf(Map( + "atum.dispatcher.type" -> "http", + "atum.dispatcher.http.url" -> "http://localhost:8080" + ))) match { + case _: HttpDispatcher => info("HttpDispatcher created successfully") + case _ => fail("Expected HttpDispatcher") + } + + AtumAgent.dispatcherFromConfig(configOf(Map("atum.dispatcher.type" -> "console"))) match { + case _: ConsoleDispatcher => info("ConsoleDispatcher created successfully") + case _ => fail("Expected ConsoleDispatcher") + } + + AtumAgent.dispatcherFromConfig(configOf(Map( + "atum.dispatcher.type" -> "capture", + "atum.dispatcher.capture.capture-limit" -> 0 + ))) match { + case _: CapturingDispatcher => info("CapturingDispatcher created successfully") + case _ => fail("Expected CapturingDispatcher") + } + + val eUnknown = intercept[UnsupportedOperationException] { + AtumAgent.dispatcherFromConfig(configOf(Map("atum.dispatcher.type" -> "unknown"))) + } + assert(eUnknown.getMessage.contains("Unsupported dispatcher type: 'unknown'")) + info("unknown dispatcher type throws exception as expected") + + val eNoConfig = intercept[ConfigException.Missing] { + AtumAgent.dispatcherFromConfig(configOf(Map.empty)) + } + assert(eNoConfig.getMessage.contains("No configuration setting found for key")) + info("missing dispatcher configuration throws exception as expected") + + } + } diff --git a/agent/src/test/scala/za/co/absa/atum/agent/AtumContextTest.scala b/agent/src/test/scala/za/co/absa/atum/agent/AtumContextTest.scala index b6935f958..f3aba4731 100644 --- a/agent/src/test/scala/za/co/absa/atum/agent/AtumContextTest.scala +++ b/agent/src/test/scala/za/co/absa/atum/agent/AtumContextTest.scala @@ -189,7 +189,7 @@ class AtumContextTest extends AnyFlatSpec with Matchers { } "addAdditionalData" should "add key/value pair to map for additional data" in { - val atumAgent = new AtumAgent + val atumAgent = AtumAgent val atumPartitions = AtumPartitions("key" -> "val") val atumContext = atumAgent.getOrCreateAtumContext(atumPartitions) diff --git a/agent/src/test/scala/za/co/absa/atum/agent/dispatcher/CapturingDispatcherTest.scala b/agent/src/test/scala/za/co/absa/atum/agent/dispatcher/CapturingDispatcherTest.scala new file mode 100644 index 000000000..a4245dd3b --- /dev/null +++ b/agent/src/test/scala/za/co/absa/atum/agent/dispatcher/CapturingDispatcherTest.scala @@ -0,0 +1,110 @@ +/* + * Copyright 2021 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.atum.agent.dispatcher + +import com.typesafe.config.{Config, ConfigFactory, ConfigValueFactory} +import org.scalatest.matchers.should.Matchers +import org.scalatest.wordspec.AnyWordSpec +import za.co.absa.atum.model.dto.{AtumContextDTO, CheckpointDTO, PartitionDTO, PartitioningDTO, PartitioningSubmitDTO} + +import java.time.ZonedDateTime +import java.util.UUID +import com.github.dwickern.macros.NameOf._ +import za.co.absa.atum.agent.dispatcher.CapturingDispatcher.CapturedCall + +class CapturingDispatcherTest extends AnyWordSpec with Matchers { + private val DefaultProcessStartTime = ZonedDateTime.parse("2024-03-11T00:00:00Z") + + private def config(captureLimit: Int): Config = { + val emptyCfg = ConfigFactory.empty() + val value = ConfigValueFactory.fromAnyRef(captureLimit) + emptyCfg.withValue("atum.dispatcher.capture.capture-limit", value) + } + + private def createCheckpoint(partition: PartitioningDTO): CheckpointDTO = + CheckpointDTO( + id = UUID.randomUUID(), + name = "name", + author = "author", + partitioning = partition, + processStartTime = DefaultProcessStartTime, + processEndTime = None, + measurements = Set.empty + ) + + private def createPartition(kvs: (String, String)*): PartitioningDTO = { + kvs.map { case (k, v) => PartitionDTO(k, v) } + } + + private def createPartitionSubmit(partition: PartitioningDTO, parent: Option[PartitioningDTO] = None): PartitioningSubmitDTO = { + PartitioningSubmitDTO( + partitioning = partition, + parentPartitioning = parent, + authorIfNew = "another author" + ) + } + + + "CaptureDispatcher" should { + val dispatcher = new CapturingDispatcher(config(3)) + val partitioning1 = createPartition("k1" -> "v1") + val partitioning2 = createPartition("k1" -> "v1", "k2" -> "v2") + val partitioning1Submit = createPartitionSubmit(partitioning1) + val partitioning2Submit = createPartitionSubmit(partitioning2, Some(partitioning1)) + val checkpoint1 = createCheckpoint(partitioning1) + val checkpoint2 = createCheckpoint(partitioning2) + + "captures no more than limit" in { + dispatcher.createPartitioning(partitioning1Submit) + dispatcher.saveCheckpoint(checkpoint1) + dispatcher.createPartitioning(partitioning2Submit) + dispatcher.saveCheckpoint(checkpoint2) + + dispatcher.captures.size shouldBe 3 + } + + "verifies presence of captures" in { + val expectedResult = AtumContextDTO(partitioning2) + dispatcher.contains(nameOf(dispatcher.createPartitioning _), partitioning2Submit, expectedResult) shouldBe true + dispatcher.contains(nameOf(dispatcher.createPartitioning _), partitioning2Submit) shouldBe true + dispatcher.contains(nameOf(dispatcher.saveCheckpoint _)) shouldBe true + } + + "verifies absence of captures" in { + val unexpectedResult = AtumContextDTO(partitioning1) + dispatcher.contains(nameOf(dispatcher.createPartitioning _), partitioning2Submit, unexpectedResult) shouldBe false + dispatcher.contains(nameOf(dispatcher.createPartitioning _), partitioning1Submit) shouldBe false + dispatcher.contains(nameOf(dispatcher.saveAdditionalData _)) shouldBe false + } + + "lists all captures" in { + val expected = List( + CapturedCall(nameOf(dispatcher.saveCheckpoint _), checkpoint1, ()), + CapturedCall(nameOf(dispatcher.createPartitioning _), partitioning2Submit, AtumContextDTO(partitioning2)), + CapturedCall(nameOf(dispatcher.saveCheckpoint _), checkpoint2, ()) + ) + dispatcher.captures shouldBe expected + } + + "removes all captures with clear" in { + dispatcher.captures.size shouldBe 3 + dispatcher.clear() + dispatcher.captures.size shouldBe 0 + } + } + +} diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 45d8dd607..ab23dd30c 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -70,6 +70,8 @@ object Dependencies { val sttpPlayJson = "3.9.3" val awssdk = "2.23.15" + + val scalaNameof = "4.0.0" } @@ -225,6 +227,8 @@ object Dependencies { lazy val logback = "ch.qos.logback" % "logback-classic" % Versions.logback + lazy val nameOf = "com.github.dwickern" %% "scala-nameof" % Versions.scalaNameof % Provided // it's provided, as it's a macro needed only at runtime + Seq( sparkCore, sparkSql, @@ -232,7 +236,8 @@ object Dependencies { sparkCommons, sparkCommonsTest, sttp, - logback + logback, + nameOf ) } From 7be4f59c1cc5310932f7b19446249ef2d34d081c Mon Sep 17 00:00:00 2001 From: David Benedeki <14905969+benedeki@users.noreply.github.com> Date: Mon, 15 Apr 2024 13:45:01 +0200 Subject: [PATCH 2/2] #178: Figure out a more natural way to work and identify flows (#186) * Added field `fk_primary_partitioning` column to `flows.flows` * The field is filled when the flow is created * added unique index on flows.flows table field `fk_primary_partitioning` --- .../postgres/flows/V1.7.1__flows.alter.ddl | 17 +++++ .../postgres/flows/V1.7.2___create_flow.sql | 68 +++++++++++++++++++ .../postgres/flows/V1.7.3__flows.update.sql | 23 +++++++ .../postgres/flows/V1.7.4__flows.alter.ddl | 18 +++++ .../absa/atum/database/flows/CreateFlow.scala | 60 ++++++++++++++++ 5 files changed, 186 insertions(+) create mode 100644 database/src/main/postgres/flows/V1.7.1__flows.alter.ddl create mode 100644 database/src/main/postgres/flows/V1.7.2___create_flow.sql create mode 100644 database/src/main/postgres/flows/V1.7.3__flows.update.sql create mode 100644 database/src/main/postgres/flows/V1.7.4__flows.alter.ddl create mode 100644 database/src/test/scala/za/co/absa/atum/database/flows/CreateFlow.scala diff --git a/database/src/main/postgres/flows/V1.7.1__flows.alter.ddl b/database/src/main/postgres/flows/V1.7.1__flows.alter.ddl new file mode 100644 index 000000000..c239e1f4e --- /dev/null +++ b/database/src/main/postgres/flows/V1.7.1__flows.alter.ddl @@ -0,0 +1,17 @@ +/* + * Copyright 2021 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +ALTER TABLE flows.flows + ADD COLUMN IF NOT EXISTS fk_primary_partitioning BIGINT; diff --git a/database/src/main/postgres/flows/V1.7.2___create_flow.sql b/database/src/main/postgres/flows/V1.7.2___create_flow.sql new file mode 100644 index 000000000..204b27b15 --- /dev/null +++ b/database/src/main/postgres/flows/V1.7.2___create_flow.sql @@ -0,0 +1,68 @@ +/* + * Copyright 2021 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE OR REPLACE FUNCTION flows._create_flow( + IN i_fk_partitioning BIGINT, + IN i_by_user TEXT, + OUT status INTEGER, + OUT status_text TEXT, + OUT id_flow BIGINT +) RETURNS record AS +$$ +------------------------------------------------------------------------------- +-- +-- Function: flows._create_flow(2) +-- Creates a flow associated with provided partitioning +-- +-- Parameters: +-- i_fk_partitioning - id of the partitioning to associate with +-- i_by_user - user behind the change +-- +-- Returns: +-- status - Status code +-- status_text - Status text +-- id_flow - id of the created flow +-- +-- Status codes: +-- 11 - Flow created +-- +-- +------------------------------------------------------------------------------- +DECLARE + _id_flow BIGINT; + _flow_name TEXT; +BEGIN + --generating the id explicitly to use, if custom flow name if needed + _id_flow := global_id(); + _flow_name := 'Custom flow #' || _id_flow; + + + INSERT INTO flows.flows (id_flow, flow_name, flow_description, from_pattern, created_by, fk_primary_partitioning) + VALUES (_id_flow, _flow_name, '', false, i_by_user, i_fk_partitioning); + + INSERT INTO flows.partitioning_to_flow(fk_flow, fk_partitioning, created_by) + VALUES (_id_flow, i_fk_partitioning, i_by_user); + + status := 11; + status_text := 'Flow created'; + id_flow := _id_flow; + + RETURN; +END; +$$ +LANGUAGE plpgsql VOLATILE SECURITY DEFINER; + +GRANT EXECUTE ON FUNCTION flows._create_flow(BIGINT, TEXT) TO atum_owner; diff --git a/database/src/main/postgres/flows/V1.7.3__flows.update.sql b/database/src/main/postgres/flows/V1.7.3__flows.update.sql new file mode 100644 index 000000000..456153395 --- /dev/null +++ b/database/src/main/postgres/flows/V1.7.3__flows.update.sql @@ -0,0 +1,23 @@ +/* + * Copyright 2021 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +UPDATE flows.flows F +SET fk_primary_partitioning = + (SELECT fk_partitioning + FROM flows.partitioning_to_flow PTF + WHERE PTF.fk_flow = F.id_flow + ORDER BY fk_partitioning ASC + LIMIT 1) +WHERE fk_primary_partitioning IS NULL; diff --git a/database/src/main/postgres/flows/V1.7.4__flows.alter.ddl b/database/src/main/postgres/flows/V1.7.4__flows.alter.ddl new file mode 100644 index 000000000..a151c0cf0 --- /dev/null +++ b/database/src/main/postgres/flows/V1.7.4__flows.alter.ddl @@ -0,0 +1,18 @@ +/* + * Copyright 2021 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +ALTER COLUMN fk_primary_partitioning SET NOT NULL; + +CREATE UNIQUE INDEX IF NOT EXISTS unq_flows ON flows.flows (fk_primary_partitioning); diff --git a/database/src/test/scala/za/co/absa/atum/database/flows/CreateFlow.scala b/database/src/test/scala/za/co/absa/atum/database/flows/CreateFlow.scala new file mode 100644 index 000000000..5444f9cb3 --- /dev/null +++ b/database/src/test/scala/za/co/absa/atum/database/flows/CreateFlow.scala @@ -0,0 +1,60 @@ +/* + * Copyright 2024 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.atum.database.flows + +import za.co.absa.balta.DBTestSuite + +import scala.util.Random + + +class CreateFlow extends DBTestSuite { + private val fncGetPartitioningMeasures = "flows._create_flow" + + test("Create flow") { + val partitioningId: Long = Random.nextLong() + val user = "Geralt of Rivia" + val flowID = function(fncGetPartitioningMeasures) + .setParam("i_fk_partitioning", partitioningId) + .setParam("i_by_user", user) + .execute { queryResult => + assert(queryResult.hasNext) + val row = queryResult.next() + assert(row.getInt("status").contains(11)) + assert(row.getString("status_text").contains("Flow created")) + val result = row.getLong("id_flow").get + assert(!queryResult.hasNext) + result + } + + table("flows.flows").where(add("id_flow", flowID)){ queryResult => + assert(queryResult.hasNext) + val row = queryResult.next() + assert(row.getBoolean("from_pattern").contains(false)) + assert(row.getString("created_by").contains(user)) + assert(row.getLong("fk_primary_partitioning").contains(partitioningId)) + assert(!queryResult.hasNext) + } + + table("flows.partitioning_to_flow").where(add("fk_flow", flowID)){ queryResult => + assert(queryResult.hasNext) + val row = queryResult.next() + assert(row.getLong("fk_partitioning").contains(partitioningId)) + assert(row.getString("created_by").contains(user)) + assert(!queryResult.hasNext) + } + } +}