Skip to content

Commit

Permalink
Testing #120 db integration
Browse files Browse the repository at this point in the history
  • Loading branch information
TebaleloS committed Apr 25, 2024
1 parent e2c61d1 commit 388492a
Show file tree
Hide file tree
Showing 2 changed files with 170 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,29 +25,31 @@ CREATE OR REPLACE FUNCTION runs.get_partitioning_checkpoints(
OUT measure_name TEXT,
OUT measure_columns TEXT[],
OUT measurement_value JSONB,
OUT i_checkpoint_start_time TIMESTAMP WITH TIME ZONE
OUT checkpoint_start_time TIMESTAMP WITH TIME ZONE,
OUT checkpoint_end_time TIMESTAMP WITH TIME ZONE
)
RETURNS SETOF record AS
$$
-------------------------------------------------------------------------------
--
-- Function: runs.get_partitioning_checkpoints(JSONB, INT, TEXT)
-- Returns all the checkpoint for the given partitioning
-- Returns all the checkpoint for the given partitioning and checkpoint name
--
-- Parameters:
-- i_partitioning - partitioning for requested checkpoints
-- i_limit - limit of the number of checkpoints to return
-- i_checkpoint_name - name of the checkpoint to filter by
--
-- Returns:
-- status - Status code
-- status_text - Status message
-- id_checkpoint - ID of the checkpoint
-- checkpoint_name - Name of the checkpoint
-- measure_name - Name of the measure
-- measure_columns - Columns of the measure
-- measurement_value - Value of the measurement
-- checkpoint_time - Time of the checkpoint
-- status - Status code
-- status_text - Status message
-- id_checkpoint - ID of the checkpoint
-- checkpoint_name - Name of the checkpoint
-- measure_name - Name of the measure
-- measure_columns - Columns of the measure
-- measurement_value - Value of the measurement
-- checkpoint_start_time - Time of the checkpoint
-- checkpoint_end_time - End time of the checkpoint computation
--
-- Status codes:
-- 11 - OK
Expand Down Expand Up @@ -76,7 +78,8 @@ BEGIN
md.measure_name,
md.measured_columns,
m.measurement_value,
c.process_start_time
c.process_start_time AS checkpoint_start_time,
c.process_end_time AS checkpoint_end_time
FROM
runs.checkpoints c
JOIN
Expand All @@ -86,11 +89,11 @@ BEGIN
WHERE
c.fk_partitioning = _fk_partitioning
AND
c.checkpoint_name = i_checkpoint_name
(i_checkpoint_name IS NULL OR c.checkpoint_name = i_checkpoint_name)
ORDER BY
c.process_start_time,
c.id_checkpoint
LIMIT i_limit;
LIMIT nullif(i_limit, 0);

IF NOT FOUND THEN
status := 16;
Expand All @@ -102,5 +105,8 @@ END;
$$

LANGUAGE plpgsql VOLATILE SECURITY DEFINER;

ALTER FUNCTION runs.get_partitioning_checkpoints(JSONB, INT, TEXT) OWNER TO atum_owner;

GRANT EXECUTE ON FUNCTION runs.get_partitioning_checkpoints(JSONB, INT, TEXT) TO atum_owner;

Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package za.co.absa.atum.database.runs

import za.co.absa.balta.DBTestSuite
import za.co.absa.balta.classes.JsonBString
import za.co.absa.balta.classes.setter.CustomDBType

import java.time.OffsetDateTime
import java.util.UUID
Expand Down Expand Up @@ -39,32 +40,21 @@ class GetPartitioningCheckpoints extends DBTestSuite{
|""".stripMargin
)

private val parentPartitioning = JsonBString(
"""
|{
| "version": 1,
| "keys": ["key1", "key3"],
| "keysToValues": {
| "key1": "valueX",
| "key3": "valueZ"
| }
|}
|""".stripMargin
)

private val i_limit = 10
private val i_checkpoint_name = "checkpoint_1"

private val measurement1 = JsonBString("""1""".stripMargin)

private val measured_columns = "[col_1, col_2, col_3]"
private val measured_columns = CustomDBType("""{"col2"}""", "TEXT[]")

test("Get partitioning checkpoints returns checkpoints for partitioning with checkpoints") {

val uuid = UUID.randomUUID
val startTime = OffsetDateTime.parse("1992-08-03T10:00:00Z")
val endTime = OffsetDateTime.parse("2022-11-05T08:00:00Z")

val id_measure_definition: Long = 1

table("runs.partitionings").insert(
add("partitioning", partitioning1)
.add("created_by", "Daniel")
Expand All @@ -83,35 +73,171 @@ class GetPartitioningCheckpoints extends DBTestSuite{
.add("created_by", "Daniel")
)

table("runs.measurements").insert(
add("fk_checkpoint", fkPartitioning1)
.add("fk_measure_definition", 1)
.add("measurement_value", measurement1)
)

table("runs.measure_definitions").insert(
add("fk_partitioning", fkPartitioning1)
add("id_measure_definition", id_measure_definition)
.add("fk_partitioning", fkPartitioning1)
.add("created_by", "Daniel")
.add("measure_name", "measure_1")
.add("measured_columns", measured_columns)
)

table("runs.measurements").insert(
add("fk_checkpoint", uuid)
.add("fk_measure_definition", id_measure_definition)
.add("measurement_value", measurement1)
)


function(fncGetPartitioningCheckpoints)
.setParam("i_partitioning", partitioning1)
.setParam("i_limit", i_limit)
.setParam("i_checkpoint_name", i_checkpoint_name)
.execute { queryResult =>
val results = queryResult.next()
assert(results.getInt("status").contains(11))
assert(results.getString("status_text").contains("OK"))
assert(results.getString("status_text").contains("Ok"))
assert(results.getString("checkpoint_name").contains("checkpoint_1"))
assert(results.getUUID("id_checkpoint").contains(uuid))
assert(results.getOffsetDateTime("checkpoint_start_time").contains(startTime))
assert(results.getOffsetDateTime("checkpoint_end_time").contains(endTime))
assert(results.getBoolean("measured_by_atum_agent").contains(true))
assert(results.getString("created_by").contains("Daniel"))
assert(results.getInt("measurement_value").contains(1))
assert(results.getString("measure_name").contains("measurement_1"))
assert(results.getString("measure_name").contains("measure_1"))
assert(!queryResult.hasNext)
}

function(fncGetPartitioningCheckpoints)
.setParam("i_partitioning", partitioning1)
.setParam("i_limit", i_limit)
.execute { queryResult =>
val results = queryResult.next()
assert(results.getInt("status").contains(11))
assert(results.getString("status_text").contains("Ok"))
assert(results.getString("checkpoint_name").contains("checkpoint_1"))
assert(results.getUUID("id_checkpoint").contains(uuid))
assert(results.getOffsetDateTime("checkpoint_start_time").contains(startTime))
assert(results.getOffsetDateTime("checkpoint_end_time").contains(endTime))
assert(results.getInt("measurement_value").contains(1))
assert(!queryResult.hasNext)
}

}

test("Get partitioning checkpoints return multiple checkpoints on a partitioning with checkpoints") {

val uuid1 = UUID.randomUUID
val uuid2 = UUID.randomUUID
val startTime1 = OffsetDateTime.parse("1992-08-03T10:00:00Z")
val endTime1 = OffsetDateTime.parse("2022-11-05T08:00:00Z")
val startTime2 = OffsetDateTime.parse("1992-08-03T10:00:00Z")
val endTime2 = OffsetDateTime.parse("2022-11-05T08:00:00Z")

val id_measure_definition1: Long = 1
val id_measure_definition2: Long = 2

val measured_columns2 = CustomDBType("""{"col3"}""", "TEXT[]")

table("runs.partitionings").insert(
add("partitioning", partitioning1)
.add("created_by", "Daniel")
)

val fkPartitioning1: Long = table("runs.partitionings")
.fieldValue("partitioning", partitioning1, "id_partitioning").get.get

table("runs.checkpoints").insert(
add("id_checkpoint", uuid1)
.add("fk_partitioning", fkPartitioning1)
.add("checkpoint_name", "checkpoint_1")
.add("process_start_time", startTime1)
.add("process_end_time", endTime1)
.add("measured_by_atum_agent", true)
.add("created_by", "Daniel")
)

table("runs.checkpoints").insert(
add("id_checkpoint", uuid2)
.add("fk_partitioning", fkPartitioning1)
.add("checkpoint_name", "checkpoint_2")
.add("process_start_time", startTime2)
.add("process_end_time", endTime2)
.add("measured_by_atum_agent", true)
.add("created_by", "Daniel")
)

table("runs.measure_definitions").insert(
add("id_measure_definition", id_measure_definition1)
.add("fk_partitioning", fkPartitioning1)
.add("created_by", "Daniel")
.add("measure_name", "measure_1")
.add("measured_columns", measured_columns)
)

table("runs.measure_definitions").insert(
add("id_measure_definition", id_measure_definition2)
.add("fk_partitioning", fkPartitioning1)
.add("created_by", "Daniel")
.add("measure_name", "measure_2")
.add("measured_columns", measured_columns2)
)

table("runs.measurements").insert(
add("fk_checkpoint", uuid1)
.add("fk_measure_definition", id_measure_definition1)
.add("measurement_value", measurement1)
)

table("runs.measurements").insert(
add("fk_checkpoint", uuid2)
.add("fk_measure_definition", id_measure_definition2)
.add("measurement_value", measurement1)
)

function(fncGetPartitioningCheckpoints)
.setParam("i_partitioning", partitioning1)
.setParam("i_limit", i_limit)
.setParam("i_checkpoint_name", i_checkpoint_name)
.execute { queryResult =>
val results1 = queryResult.next()
assert(results1.getInt("status").contains(11))
assert(results1.getString("status_text").contains("Ok"))
assert(results1.getString("checkpoint_name").contains("checkpoint_1"))
assert(results1.getUUID("id_checkpoint").contains(uuid1))
assert(results1.getOffsetDateTime("checkpoint_start_time").contains(startTime1))
assert(results1.getOffsetDateTime("checkpoint_end_time").contains(endTime1))
assert(results1.getInt("measurement_value").contains(1))
assert(results1.getString("measure_name").contains("measure_1"))

if (queryResult.hasNext) {
// Fetching the next checkpoint
val results2 = queryResult.next()
assert(results2.getString("checkpoint_name").contains("checkpoint_2"))
assert(results2.getUUID("id_checkpoint").contains(uuid2))
assert(results2.getOffsetDateTime("checkpoint_start_time").contains(startTime2))
assert(results2.getOffsetDateTime("checkpoint_end_time").contains(endTime2))
assert(results2.getInt("measurement_value").contains(1))
assert(results2.getString("measure_name").contains("measure_2"))
} else {
fail("Expected two checkpoints, but got only one.")
}
}

}

test("Get partitioning checkpoints returns no checkpoints for partitioning without checkpoints") {

table("runs.partitionings").insert(
add("partitioning", partitioning2)
.add("created_by", "Daniel")
)

function(fncGetPartitioningCheckpoints)
.setParam("i_partitioning", partitioning2)
.setParam("i_limit", i_limit)
.setParam("i_checkpoint_name", i_checkpoint_name)
.execute { queryResult =>
val results = queryResult.next()
assert(results.getInt("status").contains(16))
assert(results.getString("status_text").contains("No checkpoints were found for the given partitioning."))
assert(!queryResult.hasNext)
}

Expand Down

0 comments on commit 388492a

Please sign in to comment.