Skip to content

Commit

Permalink
Fixes #120 - Fixing bugs
Browse files Browse the repository at this point in the history
  • Loading branch information
TebaleloS committed Apr 17, 2024
1 parent 4475aaa commit c4f2aaa
Show file tree
Hide file tree
Showing 9 changed files with 166 additions and 12 deletions.
3 changes: 2 additions & 1 deletion database/src/main/postgres/flows/V1.7.4__flows.alter.ddl
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
* limitations under the License.
*/

ALTER COLUMN fk_primary_partitioning SET NOT NULL;
ALTER TABLE flows.flows
ALTER COLUMN fk_primary_partitioning SET NOT NULL;

CREATE UNIQUE INDEX IF NOT EXISTS unq_flows ON flows.flows (fk_primary_partitioning);
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ $$

-- Status codes:
-- 11 - OK
-- 16 - Record not found for the given partitioning
-- 41 - Partitioning not found
--
-------------------------------------------------------------------------------
Expand All @@ -62,6 +63,12 @@ BEGIN
FROM runs.measure_definitions AS md
WHERE md.fk_partitioning = _fk_partitioning;

IF NOT FOUND THEN
status := 16;
status_text := 'No measures found for the given partitioning.';
RETURN NEXT;
RETURN;
END IF;
END;
$$
LANGUAGE plpgsql VOLATILE SECURITY DEFINER;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,10 @@ CREATE OR REPLACE FUNCTION runs.get_partitioning_additional_data(
OUT ad_name TEXT,
OUT ad_value TEXT
) RETURNS SETOF record AS

$$
-------------------------------------------------------------------------------
--
-- Function: runs.get_partitioning_additional_data(1)
-- Function: runs.get_partitioning_additional_data(2)
-- Returns additional data for the given partitioning
--
-- Parameters:
Expand All @@ -39,6 +38,7 @@ $$
--
-- Status codes:
-- 11 - OK
-- 16 - Record not found for the given partitioning
-- 41 - Partitioning not found
--
-------------------------------------------------------------------------------
Expand All @@ -63,6 +63,12 @@ BEGIN
FROM runs.additional_data AS ad
WHERE ad.fk_partitioning = _fk_partitioning;

IF NOT FOUND THEN
status := 16;
status_text := 'No additional data found for the given partitioning.';
RETURN NEXT;
RETURN;
END IF;
END;
$$
LANGUAGE plpgsql VOLATILE SECURITY DEFINER;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Copyright 2021 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/


CREATE OR REPLACE FUNCTION runs.get_partitioning_measures(
IN i_partitioning JSONB,
OUT status INTEGER,
OUT status_text TEXT,
OUT measure_name TEXT,
OUT measured_columns TEXT[]
) RETURNS SETOF record AS
$$
-------------------------------------------------------------------------------
--
-- Function: runs.get_partitioning_measures(1)
-- Returns measures for the given partitioning
--
-- Parameters:
-- i_partitioning - partitioning we are asking the measures for
--
-- Returns:
-- status - Status code
-- status_text - Status message
-- measure_name - Name of the measure
-- measured_columns - Array of columns associated with the measure

-- Status codes:
-- 11 - OK
-- 41 - Partitioning not found
--
-------------------------------------------------------------------------------

DECLARE
_fk_partitioning BIGINT;
BEGIN
_fk_partitioning = runs._get_id_partitioning(i_partitioning);

IF _fk_partitioning IS NULL THEN
status := 41;
status_text := 'The partitioning does not exist.';
RETURN NEXT;
RETURN;
END IF;

status := 11;
status_text := 'OK';

RETURN QUERY
SELECT status, status_text, md.measure_name, md.measured_columns
FROM runs.measure_definitions AS md
WHERE md.fk_partitioning = _fk_partitioning;

END;
$$
LANGUAGE plpgsql VOLATILE SECURITY DEFINER;

ALTER FUNCTION runs.get_partitioning_measures(JSONB) OWNER TO atum_owner;
GRANT EXECUTE ON FUNCTION runs.get_partitioning_measures(JSONB) TO atum_user;
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Copyright 2021 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

CREATE OR REPLACE FUNCTION runs.get_partitioning_additional_data(
IN i_partitioning JSONB,
OUT status INTEGER,
OUT status_text TEXT,
OUT ad_name TEXT,
OUT ad_value TEXT
) RETURNS SETOF record AS

$$
-------------------------------------------------------------------------------
--
-- Function: runs.get_partitioning_additional_data(1)
-- Returns additional data for the given partitioning
--
-- Parameters:
-- i_partitioning - partitioning for requested additional data
--
-- Returns:
-- status - Status code
-- status_text - Status message
-- ad_name - Name of the additional data
-- ad_value - Value of the additional data
--
-- Status codes:
-- 11 - OK
-- 41 - Partitioning not found
--
-------------------------------------------------------------------------------

DECLARE
_fk_partitioning BIGINT;
BEGIN
_fk_partitioning = runs._get_id_partitioning(i_partitioning);

IF _fk_partitioning IS NULL THEN
status := 41;
status_text := 'The partitioning does not exist.';
RETURN NEXT;
RETURN;
END IF;

status = 11;
status_text = 'OK';

RETURN QUERY
SELECT status, status_text, ad.ad_name, ad.ad_value
FROM runs.additional_data AS ad
WHERE ad.fk_partitioning = _fk_partitioning;

END;
$$
LANGUAGE plpgsql VOLATILE SECURITY DEFINER;

ALTER FUNCTION runs.get_partitioning_additional_data(JSONB) OWNER TO atum_owner;
GRANT EXECUTE ON FUNCTION runs.get_partitioning_additional_data(JSONB) TO atum_user;
Original file line number Diff line number Diff line change
Expand Up @@ -72,23 +72,20 @@ class GetPartitioningAdditionalDataTest extends DBTestSuite{
.add("created_by", "Joseph")
.add("ad_name", "ad_1")
.add("ad_value", "This is the additional data for Joseph")
.add("updated_by", "Joseph")
)

table("runs.additional_data").insert(
add("fk_partitioning", fkPartitioning1)
.add("created_by", "Joseph")
.add("ad_name", "ad_2")
.add("ad_value", "This is the additional data for Joseph")
.add("updated_by", "Joseph")
)

table("runs.additional_data").insert(
add("fk_partitioning", fkPartitioning2)
.add("created_by", "Daniel")
.add("ad_name", "ad_3")
.add("ad_value", "This is the additional data for Daniel")
.add("updated_by", "Daniel")
)

function(fncGetPartitioningAdditionalData)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@ import zio.interop.catz._
import za.co.absa.atum.server.api.database.DoobieImplicits.Sequence.get

class GetPartitioningMeasures (implicit schema: DBSchema, dbEngine: DoobieEngine[Task])
extends DoobieMultipleResultFunction[PartitioningDTO, Option[MeasureDTO], Task]
extends DoobieMultipleResultFunction[PartitioningDTO, MeasureDTO, Task]
{
import za.co.absa.atum.server.api.database.DoobieImplicits.Jsonb.jsonbPutUsingString

override def sql(values: PartitioningDTO)(implicit read: Read[Option[MeasureDTO]]): Fragment = {
override def sql(values: PartitioningDTO)(implicit read: Read[MeasureDTO]): Fragment = {
val partitioning = PartitioningForDB.fromSeqPartitionDTO(values)
val partitioningJsonString = Json.toJson(partitioning).toString

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,11 @@ class PartitioningRepositoryImpl(
dbCallWithStatus(createOrUpdateAdditionalDataFn(additionalData), "createOrUpdateAdditionalData")
}

override def getPartitioningMeasures(partitioning: PartitioningDTO):
IO[DatabaseError, Either[Error, Seq[MeasureDTO]]] = {
getPartitioningMeasuresFn(partitioning).mapLeft(err => DatabaseError(err.getMessage))
override def getPartitioningMeasures(
partitioning: PartitioningDTO
): IO[DatabaseError, Seq[MeasureDTO]] = {
val m = getPartitioningMeasuresFn(partitioning)
m.mapLeft(err => DatabaseError(err.getMessage))
}

override def getPartitioningAdditionalData(partitioning: PartitioningDTO):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class PartitioningRepositorySpec extends ZIOSpecDefault with TestData {
// Get Partitioning Measures Mocks
private val getPartitioningMeasuresMock = mock(classOf[GetPartitioningMeasures])

when(getPartitioningMeasuresMock.apply(partitioningDTO1)).thenReturn(ZIO.succeed(Seq().empty))
when(getPartitioningMeasuresMock.apply(partitioningDTO1)).thenReturn{println("Testing measures");ZIO.succeed(Seq().empty)}
when(getPartitioningMeasuresMock.apply(partitioningDTO2)).thenReturn(ZIO.fail(new Exception("boom!")))
when(getPartitioningMeasuresMock.apply(partitioningDTO3)).thenReturn(ZIO.fail(new Exception("boom!")))

Expand Down

0 comments on commit c4f2aaa

Please sign in to comment.