Skip to content

Commit

Permalink
#126: Add end-to-end tests
Browse files Browse the repository at this point in the history
* new module with e2e tests application
  • Loading branch information
benedeki committed Nov 17, 2023
1 parent 994ffba commit a6ce779
Show file tree
Hide file tree
Showing 6 changed files with 110 additions and 11 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -139,4 +139,4 @@ lazy val e2eTests = (projectMatrix in file("e2e"))
)
.enablePlugins(AssemblyPlugin)
.jvmPlatform(scalaVersions = Seq(Versions.serviceScalaVersion))
.dependsOn(agent,database)
.dependsOn(agent, database, server)
51 changes: 42 additions & 9 deletions e2e/src/main/scala/za/co/absa/atum/e2e/AtumE2eTests.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,41 @@

package za.co.absa.atum.e2e

import org.apache.spark.sql.{DataFrame, SparkSession}
import za.co.absa.atum.agent.{AtumAgent, AtumContext}
import za.co.absa.atum.agent.AtumContext.DatasetWrapper
import org.apache.commons.logging.LogFactory
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import za.co.absa.atum.database.balta.classes.DBConnection

import java.sql.DriverManager
import java.time.Instant
import java.time.format.DateTimeFormatter
import java.util.Properties

object AtumE2eTests extends Logging {

private val now = DateTimeFormatter.ofPattern("yyyy-MM-dd_HH:mm:ss").format(Instant.now())

object AtumE2eTests {
def main(args: Array[String]): Unit = {
val jobName = "Atum E2E tests"
implicit val spark: SparkSession = obtainSparkSession(jobName)

implicit val context: AtumContext = AtumAgent.getOrCreateAtumContext(???)
implicit val spark: SparkSession = obtainSparkSession(jobName)
implicit val dbConnection: DBConnection = obtainDBConnection
logInfo("DB connection established")

val df: DataFrame = ???
df.createCheckpoint("xxx")
val partitionsValues = List(
("data", jobName),
("when", now)
)
val subpartitionsValues = List(
("foo", "bar")
)

//implicit val subContext: AtumContext = AtumAgent.getOrCreateAtumSubContext(???)
val testSuite = new TestSuite(jobName)
testSuite.test1(partitionsValues)
logInfo("Test 1 passed")
testSuite.test2(partitionsValues, partitionsValues)
logInfo("Test 2 passed")
logInfo("All tests passed")
}

private def obtainSparkSession(jobName: String): SparkSession = {
Expand All @@ -40,4 +60,17 @@ object AtumE2eTests {

spark
}

private def obtainDBConnection: DBConnection = {
val properties = new Properties()
properties.load(getClass.getResourceAsStream("/application.conf"))

val dbUrl = properties.getProperty("test.jdbc.url")
val username = properties.getProperty("test.jdbc.username")
val password = properties.getProperty("test.jdbc.password")

val conn = DriverManager.getConnection(dbUrl, username, password)
conn.setAutoCommit(false)
new DBConnection(conn)
}
}
55 changes: 55 additions & 0 deletions e2e/src/main/scala/za/co/absa/atum/e2e/TestSuite.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright 2023 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.atum.e2e

import org.apache.spark.sql.{DataFrame, SparkSession}
import za.co.absa.atum.agent.{AtumAgent, AtumContext}
import za.co.absa.atum.agent.AtumContext.{AtumPartitions, DatasetWrapper}
import za.co.absa.atum.database.balta.classes.{DBConnection, DBTable}

import java.time.Instant
import java.time.format.DateTimeFormatter

class TestSuite (jobName: String)(implicit spark: SparkSession, dbConnection: DBConnection) {

def test1(partitionsValues: List[(String, String)]): Unit = {
val mainPartition = AtumPartitions(partitionsValues)

implicit val context: AtumContext = AtumAgent.getOrCreateAtumContext(mainPartition)
//check DB
DBTable("").where("") {
???
}

val df1: DataFrame = spark.read.csv(getClass.getResource("/data/table1.csv").getPath)
df1.createCheckpoint("Read table 1")
//check DB

}

def test2(partitionsValues: List[(String, String)], subpartitionsValues: List[(String, String)]): Unit = {
val mainPartition = AtumPartitions(partitionsValues)
val subPartition = AtumPartitions(subpartitionsValues)
val subContext: AtumContext = AtumAgent.getOrCreateAtumSubContext(subPartition)(AtumAgent.getOrCreateAtumContext(mainPartition))
()
//check DB

val df2: DataFrame = spark.read.csv(getClass.getResource("/data/table1.csv").getPath)
df2.createCheckpoint("Read table 2")(subContext)
//check DB
}
}
4 changes: 4 additions & 0 deletions e2e/src/resources/data/table1.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
1,John
2,Paul
3,George
4,Ringo
7 changes: 7 additions & 0 deletions e2e/src/resources/data/table2.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
1,Monday,Mon
2,Tuesday,Tue
3,Wednesday,Wed
4,Thursday,Thu
5,Friday,Fri
6,Saturday,Sat
7,Sunday,Sun
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package za.co.absa.atum.server.model

import za.co.absa.atum.model.dto.PartitioningDTO

private[server] case class PartitioningForDB private(
case class PartitioningForDB private(
version: Int = 1,
keys: Seq[String],
keysToValuesMap: Map[String, String]
Expand Down

0 comments on commit a6ce779

Please sign in to comment.