diff --git a/build.sbt b/build.sbt index 8da0e4d51..c2770bf8a 100644 --- a/build.sbt +++ b/build.sbt @@ -139,4 +139,4 @@ lazy val e2eTests = (projectMatrix in file("e2e")) ) .enablePlugins(AssemblyPlugin) .jvmPlatform(scalaVersions = Seq(Versions.serviceScalaVersion)) - .dependsOn(agent,database) \ No newline at end of file + .dependsOn(agent, database, server) \ No newline at end of file diff --git a/e2e/src/main/scala/za/co/absa/atum/e2e/AtumE2eTests.scala b/e2e/src/main/scala/za/co/absa/atum/e2e/AtumE2eTests.scala index 9c475196c..2e5d78668 100644 --- a/e2e/src/main/scala/za/co/absa/atum/e2e/AtumE2eTests.scala +++ b/e2e/src/main/scala/za/co/absa/atum/e2e/AtumE2eTests.scala @@ -16,21 +16,41 @@ package za.co.absa.atum.e2e -import org.apache.spark.sql.{DataFrame, SparkSession} -import za.co.absa.atum.agent.{AtumAgent, AtumContext} -import za.co.absa.atum.agent.AtumContext.DatasetWrapper +import org.apache.commons.logging.LogFactory +import org.apache.spark.internal.Logging +import org.apache.spark.sql.SparkSession +import za.co.absa.atum.database.balta.classes.DBConnection + +import java.sql.DriverManager +import java.time.Instant +import java.time.format.DateTimeFormatter +import java.util.Properties + +object AtumE2eTests extends Logging { + + private val now = DateTimeFormatter.ofPattern("yyyy-MM-dd_HH:mm:ss").format(Instant.now()) -object AtumE2eTests { def main(args: Array[String]): Unit = { val jobName = "Atum E2E tests" - implicit val spark: SparkSession = obtainSparkSession(jobName) - implicit val context: AtumContext = AtumAgent.getOrCreateAtumContext(???) + implicit val spark: SparkSession = obtainSparkSession(jobName) + implicit val dbConnection: DBConnection = obtainDBConnection + logInfo("DB connection established") - val df: DataFrame = ??? - df.createCheckpoint("xxx") + val partitionsValues = List( + ("data", jobName), + ("when", now) + ) + val subpartitionsValues = List( + ("foo", "bar") + ) - //implicit val subContext: AtumContext = AtumAgent.getOrCreateAtumSubContext(???) + val testSuite = new TestSuite(jobName) + testSuite.test1(partitionsValues) + logInfo("Test 1 passed") + testSuite.test2(partitionsValues, partitionsValues) + logInfo("Test 2 passed") + logInfo("All tests passed") } private def obtainSparkSession(jobName: String): SparkSession = { @@ -40,4 +60,17 @@ object AtumE2eTests { spark } + + private def obtainDBConnection: DBConnection = { + val properties = new Properties() + properties.load(getClass.getResourceAsStream("/application.conf")) + + val dbUrl = properties.getProperty("test.jdbc.url") + val username = properties.getProperty("test.jdbc.username") + val password = properties.getProperty("test.jdbc.password") + + val conn = DriverManager.getConnection(dbUrl, username, password) + conn.setAutoCommit(false) + new DBConnection(conn) + } } diff --git a/e2e/src/main/scala/za/co/absa/atum/e2e/TestSuite.scala b/e2e/src/main/scala/za/co/absa/atum/e2e/TestSuite.scala new file mode 100644 index 000000000..aaa4a8232 --- /dev/null +++ b/e2e/src/main/scala/za/co/absa/atum/e2e/TestSuite.scala @@ -0,0 +1,55 @@ +/* + * Copyright 2023 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.atum.e2e + +import org.apache.spark.sql.{DataFrame, SparkSession} +import za.co.absa.atum.agent.{AtumAgent, AtumContext} +import za.co.absa.atum.agent.AtumContext.{AtumPartitions, DatasetWrapper} +import za.co.absa.atum.database.balta.classes.{DBConnection, DBTable} + +import java.time.Instant +import java.time.format.DateTimeFormatter + +class TestSuite (jobName: String)(implicit spark: SparkSession, dbConnection: DBConnection) { + + def test1(partitionsValues: List[(String, String)]): Unit = { + val mainPartition = AtumPartitions(partitionsValues) + + implicit val context: AtumContext = AtumAgent.getOrCreateAtumContext(mainPartition) + //check DB + DBTable("").where("") { + ??? + } + + val df1: DataFrame = spark.read.csv(getClass.getResource("/data/table1.csv").getPath) + df1.createCheckpoint("Read table 1") + //check DB + + } + + def test2(partitionsValues: List[(String, String)], subpartitionsValues: List[(String, String)]): Unit = { + val mainPartition = AtumPartitions(partitionsValues) + val subPartition = AtumPartitions(subpartitionsValues) + val subContext: AtumContext = AtumAgent.getOrCreateAtumSubContext(subPartition)(AtumAgent.getOrCreateAtumContext(mainPartition)) + () + //check DB + + val df2: DataFrame = spark.read.csv(getClass.getResource("/data/table1.csv").getPath) + df2.createCheckpoint("Read table 2")(subContext) + //check DB + } +} diff --git a/e2e/src/resources/data/table1.csv b/e2e/src/resources/data/table1.csv new file mode 100644 index 000000000..d710fc733 --- /dev/null +++ b/e2e/src/resources/data/table1.csv @@ -0,0 +1,4 @@ +1,John +2,Paul +3,George +4,Ringo \ No newline at end of file diff --git a/e2e/src/resources/data/table2.csv b/e2e/src/resources/data/table2.csv new file mode 100644 index 000000000..b778e65ec --- /dev/null +++ b/e2e/src/resources/data/table2.csv @@ -0,0 +1,7 @@ +1,Monday,Mon +2,Tuesday,Tue +3,Wednesday,Wed +4,Thursday,Thu +5,Friday,Fri +6,Saturday,Sat +7,Sunday,Sun \ No newline at end of file diff --git a/server/src/main/scala/za/co/absa/atum/server/model/PartitioningForDB.scala b/server/src/main/scala/za/co/absa/atum/server/model/PartitioningForDB.scala index 016ae29fc..8792d597a 100644 --- a/server/src/main/scala/za/co/absa/atum/server/model/PartitioningForDB.scala +++ b/server/src/main/scala/za/co/absa/atum/server/model/PartitioningForDB.scala @@ -18,7 +18,7 @@ package za.co.absa.atum.server.model import za.co.absa.atum.model.dto.PartitioningDTO -private[server] case class PartitioningForDB private( +case class PartitioningForDB private( version: Int = 1, keys: Seq[String], keysToValuesMap: Map[String, String]