Skip to content

Commit

Permalink
Server endpoints v2 returning the checkpoins data of a partitioning: …
Browse files Browse the repository at this point in the history
…190 (#194)

* Fixes #189 - Define and implemented a db function to return the checkpoint

* Fixes #189 - Modifying a db function

* Fixes #189 - Modifying a db function

* Fixes #189 - Adding test cases

* Fixes #189 - Added two input fields to the db function

* Fixes #189 - Fixing type mismatch

* Testing #120 db integration

* Fixes #189 - Adding test cases

* Fixes #189 - Changing back the details of flows.flows

* Fixes #189 - Implementing GitHub comments

* Fixes #190 - defined the dto to submit

* Update V1.7.1__flows.alter.ddl

* Delete database/src/main/postgres/runs/V1.8.3__get_partitioning_checkpoint.sql

* Update GetPartitioningMeasuresTest.scala

* Delete database/src/test/scala/za/co/absa/atum/database/runs/GetPartitioningCheckpoints.scala

* Fixees #190 - Added checkpoint query resultDTO, GetPartitioningCheckpoints class and its object

* Fixes #190 - implementing get checkpoints functionality in the repository

* Fixes #190 - renamed CheckpointDTO to CheckpointSubmitDTO

* Fixes #190 - Implementing partitioningCheckpoint service

* Fixes #190 - Implementing partitioningCheckpoint in the partitioning related classes.

* Fixes #190 - Implementing partitioningCheckpoint in the partitioning related classes.

* Fixes #190 - fixing controller

* Fixes #190 - Adding layers

* Fixes #190 - Adding doobie implicit

* Fixes #190 - Fixing the build

* Fixes #190 - removing unwanted layer from checkpoint repository

* Fixes #190 - adding mock layer from PartitioningRepositorySpec

* Fixes #190 - adding the implicits for the added dto's and defining the endpoint.

* Fixes #190 - defining current version

* Fixes #190 - Fixing type mismatch

* Fixes #190 - Implementing DTO instances

* Fixes #190 - Implementing DTO instances

* Fixes #190 - adding CheckpointMeasurements model

* Fixes #190 - adding CheckpointMeasurements test

* Read[CheckpointMeasurements]

* Saving changes

* Fixes #190 - Read[CheckpointMeasurements] test

* Fixes #190 - transforming CheckpointMeasurements to CheckpointDTO

* removing default values on output

* partitioning checkpoint test

* partitioning checkpoint test repository

* partitioning checkpoint test for service

* partitioning checkpoint test controller

* partitioning checkpoint test controller

* removed the mapError from getPartitioningCheckpoints

* Fixing getPartitioningCheckpoints test and implements GitHub suggestions

* Fixing merging conflicts

* Removing unused import

* implementing GitHub comments

* implementing GitHub comments in service

* implementing GitHub comments

* Fixing the build

* Fixing V1.8.3__get_partitioning_checkpoints.sql fields

* Fixing service test cases

* Fixing measure_columns to measured_columns

* Changing from object CheckpointFromDBObject to CheckpointFromDB

* Changing from object CheckpointFromDBObject to CheckpointFromDB

* Defining partitioningJson in a common way

* Implementing GitHub comments

---------

Co-authored-by: Pavel Salamon <[email protected]>
  • Loading branch information
TebaleloS and salamonpavel authored Jun 13, 2024
1 parent e4e0420 commit 5ab72ea
Show file tree
Hide file tree
Showing 34 changed files with 744 additions and 271 deletions.
1 change: 1 addition & 0 deletions database/src/main/postgres/flows/V1.7.1__flows.alter.ddl
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,4 @@

ALTER TABLE flows.flows
ADD COLUMN IF NOT EXISTS fk_primary_partitioning BIGINT;

Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ CREATE OR REPLACE FUNCTION flows.get_flow_checkpoints(
OUT id_checkpoint UUID,
OUT checkpoint_name TEXT,
OUT measure_name TEXT,
OUT measure_columns TEXT[],
OUT measured_columns TEXT[],
OUT measurement_value JSONB,
OUT checkpoint_start_time TIMESTAMP WITH TIME ZONE,
OUT checkpoint_end_time TIMESTAMP WITH TIME ZONE
Expand Down Expand Up @@ -58,7 +58,7 @@ $$
-- id_checkpoint - id of retrieved checkpoint
-- checkpoint_name - name of retrieved checkpoint
-- measure_name - measure name associated with a given checkpoint
-- measure_columns - measure columns associated with a given checkpoint
-- measured_columns - measure columns associated with a given checkpoint
-- measurement_value - measurement details associated with a given checkpoint
-- checkpoint_time - time
--
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@ CREATE OR REPLACE FUNCTION runs.get_partitioning_checkpoints(
OUT status_text TEXT,
OUT id_checkpoint UUID,
OUT checkpoint_name TEXT,
OUT author TEXT,
OUT measured_by_atum_agent BOOLEAN,
OUT measure_name TEXT,
OUT measure_columns TEXT[],
OUT measured_columns TEXT[],
OUT measurement_value JSONB,
OUT checkpoint_start_time TIMESTAMP WITH TIME ZONE,
OUT checkpoint_end_time TIMESTAMP WITH TIME ZONE
Expand All @@ -37,16 +39,18 @@ $$
-- given partitioning (and checkpoint name, if specified).
--
-- Parameters:
-- i_partitioning - partitioning of requested checkpoints
-- i_partitioning - partitioning of requested checkpoints
-- i_limit - (optional) maximum number of checkpoint's measurements to return
-- if 0 specified, all data will be returned, i.e. no limit will be applied
-- i_checkpoint_name - (optional) if specified, returns data related to particular checkpoint's name
--
-- Returns:
-- i_checkpoint_name - (optional) if specified, returns data related to particular checkpoint's name
-- status - Status code
-- status_text - Status message
-- id_checkpoint - ID of the checkpoint
-- checkpoint_name - Name of the checkpoint
-- author - Author of the checkpoint
-- measuredByAtumAgent - Flag indicating whether the checkpoint was measured by ATUM agent
-- measure_name - Name of the measure
-- measure_columns - Columns of the measure
-- measurement_value - Value of the measurement
Expand Down Expand Up @@ -74,26 +78,28 @@ BEGIN
SELECT
11 AS status,
'Ok' AS status_text,
c.id_checkpoint,
c.checkpoint_name,
C.id_checkpoint,
C.checkpoint_name,
C.created_by AS author,
C.measured_by_atum_agent,
md.measure_name,
md.measured_columns,
m.measurement_value,
c.process_start_time AS checkpoint_start_time,
c.process_end_time AS checkpoint_end_time
M.measurement_value,
C.process_start_time AS checkpoint_start_time,
C.process_end_time AS checkpoint_end_time
FROM
runs.checkpoints c
runs.checkpoints C
JOIN
runs.measurements m ON c.id_checkpoint = m.fk_checkpoint
runs.measurements M ON C.id_checkpoint = M.fk_checkpoint
JOIN
runs.measure_definitions md ON m.fk_measure_definition = md.id_measure_definition
runs.measure_definitions MD ON M.fk_measure_definition = MD.id_measure_definition
WHERE
c.fk_partitioning = _fk_partitioning
C.fk_partitioning = _fk_partitioning
AND
(i_checkpoint_name IS NULL OR c.checkpoint_name = i_checkpoint_name)
(i_checkpoint_name IS NULL OR C.checkpoint_name = i_checkpoint_name)
ORDER BY
c.process_start_time,
c.id_checkpoint
C.process_start_time,
C.id_checkpoint
LIMIT nullif(i_limit, 0);

END;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ class GetFlowCheckpointsIntegrationTests extends DBTestSuite {

val measure1 = MeasuredDetails(
row1.getString("measure_name").get,
row1.getArray[String]("measure_columns").map(_.toList).get,
row1.getArray[String]("measured_columns").map(_.toList).get,
row1.getJsonB("measurement_value").get
)

Expand All @@ -248,7 +248,7 @@ class GetFlowCheckpointsIntegrationTests extends DBTestSuite {

val measure2 = MeasuredDetails(
row2.getString("measure_name").get,
row2.getArray[String]("measure_columns").map(_.toList).get,
row2.getArray[String]("measured_columns").map(_.toList).get,
row2.getJsonB("measurement_value").get
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,21 +151,21 @@ class CreateOrUpdateAdditionalDataIntegrationTests extends DBTestSuite{
assert(table("runs.additional_data").count(add("fk_partitioning", fkPartitioning)) == 5)
assert(table("runs.additional_data_history").count(add("fk_partitioning", fkPartitioning)) == 0)

val expectedDataInAdTable = Seq(
("PrimaryOwner", "TechnicalManagerX", "Bot"),
("SecondaryOwner", "AnalystY", "Bot"),
("SomeNewKey", "SomeNewValue", "MikeRusty"),
("IsDatasetInHDFS", "true", "MikeRusty"),
("DatasetContentSensitivityLevel", "1", "MikeRusty"),
)
expectedDataInAdTable.foreach { case (adNameExp, adValExp, adCreatedByExp) =>
table("runs.additional_data").where(add("ad_name", adNameExp)) {
resultSet =>
val row = resultSet.next()
assert(row.getString("ad_value").contains(adValExp))
assert(row.getString("created_by").contains(adCreatedByExp))
}
}
val expectedDataInAdTable = Seq(
("PrimaryOwner", "TechnicalManagerX", "Bot"),
("SecondaryOwner", "AnalystY", "Bot"),
("SomeNewKey", "SomeNewValue", "MikeRusty"),
("IsDatasetInHDFS", "true", "MikeRusty"),
("DatasetContentSensitivityLevel", "1", "MikeRusty"),
)
expectedDataInAdTable.foreach { case (adNameExp, adValExp, adCreatedByExp) =>
table("runs.additional_data").where(add("ad_name", adNameExp)) {
resultSet =>
val row = resultSet.next()
assert(row.getString("ad_value").contains(adValExp))
assert(row.getString("created_by").contains(adCreatedByExp))
}
}
}

test("Partitioning and AD present, but no new AD records were backed-up or inserted, no changes detected") {
Expand Down
Loading

0 comments on commit 5ab72ea

Please sign in to comment.