Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Get Atum Context, by getting existing partitioning from DB - retrieve Measures and AdditionalData for a given partitioning #120 #167

Merged
Show file tree
Hide file tree
Changes from 42 commits
Commits
Show all changes
56 commits
Select commit Hold shift + click to select a range
74af153
Fixes #120 - Added class to call partitioning measures function
TebaleloS Mar 11, 2024
74993c0
Merge branch 'master' into feature/#120-retrieve-Measures-and-Additio…
TebaleloS Mar 11, 2024
9b8dc52
Fixes #120 - Removed status 16 option from the DB function associated…
TebaleloS Mar 12, 2024
b0949ba
Merge remote-tracking branch 'origin/feature/#120-retrieve-Measures-a…
TebaleloS Mar 12, 2024
daa1a16
Fixes #120 - Changed measures function definitions
TebaleloS Mar 19, 2024
9680a1b
Fixes #120 - defined the class to communicate with get_partitioning_a…
TebaleloS Mar 19, 2024
ff6edb5
Fixes #120 - implementing the repository of get_partitioning_addition…
TebaleloS Mar 19, 2024
086525e
Fixes #120 - Calling getPartioningAdditionalData of the repo in the s…
TebaleloS Mar 19, 2024
52449d3
Fixes #120 - Calling getPartioningAdditionalData of the repo in the s…
TebaleloS Mar 19, 2024
57a1fd7
Fixes #120 - added value parameters
TebaleloS Mar 19, 2024
6d7145b
Merge branch 'master' into feature/#120-retrieve-Measures-and-Additio…
TebaleloS Mar 20, 2024
1896eba
Fixes #120 - Merged master
TebaleloS Mar 20, 2024
ac6aa3a
Fixes #120 - refactored the controller added computation method in th…
TebaleloS Mar 25, 2024
94703b8
Fixes #120 - refactored the controller added computation method in th…
TebaleloS Mar 26, 2024
3b05824
Fixes #120 - changed type from error response to service error
TebaleloS Mar 26, 2024
c063480
Fixes #120 - Changed inheritance from service trat to implementation
TebaleloS Mar 26, 2024
c5c5a28
Fixes #120 - Fixing bugs
TebaleloS Mar 27, 2024
e6e48c9
Fixes #120 - Fixing type mismatch
TebaleloS Apr 2, 2024
f43f67f
Fixes #120 - removed unused imports
TebaleloS Apr 2, 2024
baf89a2
Fixes #120 - Adding implicit conversion
TebaleloS Apr 4, 2024
c86914b
Fixes #120 - Added package object
TebaleloS Apr 4, 2024
5b3ceb5
implicits needed for Read instance derivation
salamonpavel Apr 4, 2024
9100092
Fixes #120 - Fixing bugs in PartitioningRepositorySpec
TebaleloS Apr 4, 2024
1212cd6
Fixes #120 - removed dead code in PartitioningRepositorySpec
TebaleloS Apr 4, 2024
a1c6cca
Fixes #120 - fixing type mismatch in PartitioningRepositorySpec
TebaleloS Apr 8, 2024
1301c24
Fixes #120 - Adding test cases in PartitioningRepositorySpec
TebaleloS Apr 8, 2024
1fabc96
Fixes #120 - Adding test cases
TebaleloS Apr 9, 2024
9fd6069
Fixes #120 - Adding test cases for db function
TebaleloS Apr 10, 2024
5de083a
Fixes #120 - Adding and fixing test cases for db function
TebaleloS Apr 11, 2024
e440533
Fixes #120 - removing dead code
TebaleloS Apr 11, 2024
ae68970
Fixes #120 - refactoring function name
TebaleloS Apr 11, 2024
d75b5dd
Fixes #120 - Applying GitHub comments
TebaleloS Apr 15, 2024
a6f09d3
Fixes #120 - Applying GitHub comments
TebaleloS Apr 17, 2024
4475aaa
Fixes #120 - merging master
TebaleloS Apr 17, 2024
c4f2aaa
Fixes #120 - Fixing bugs
TebaleloS Apr 17, 2024
5bfe34b
Fixes #120 - Fixing repository test cases bugs
TebaleloS Apr 17, 2024
975648b
Fixes #120 - removing println
TebaleloS Apr 17, 2024
ed67783
Fixes #120 - Fixing bugs from ControllerSpec
TebaleloS Apr 18, 2024
a8348a6
Fixes #120 - Adding more items to measure sequence
TebaleloS Apr 18, 2024
604d518
Fixes #120 - Removing dead code
TebaleloS Apr 18, 2024
1d31880
Closes #120
TebaleloS Apr 18, 2024
9c5bb99
Close #120 - Applying GitHub comments
TebaleloS Apr 19, 2024
44c3664
Close #120 - Addressing GitHub comments
TebaleloS Apr 19, 2024
7e30a62
Testing #120 db integration
TebaleloS Apr 24, 2024
d1b6f7c
Closes #120 - Cleaning unwanted code lines
TebaleloS Apr 26, 2024
42e9604
Closes #120 - Fixing the build
TebaleloS Apr 26, 2024
66af419
Closes #120 - Fixing the build
TebaleloS Apr 26, 2024
31dea31
Closes #120 - Fixing the tests
TebaleloS Apr 26, 2024
ef537a5
Closes #120 - Fixing the tests
TebaleloS Apr 26, 2024
30c5f3f
Closes #120 - Fixing the tests cases
TebaleloS Apr 26, 2024
c41a50f
Merge remote-tracking branch 'refs/remotes/origin/master' into featur…
lsulak Apr 26, 2024
14d8244
fixing the balta tests
lsulak Apr 26, 2024
d62f574
* Fixing jacoco_check.yml setup (hopefully)
benedeki Apr 29, 2024
57ffa95
* Decreasing overall coverage req
benedeki Apr 29, 2024
7ab2215
jacoco excludes
salamonpavel Apr 30, 2024
f6ef5df
added tests, decreased coverage level, added jacoco exclusions
salamonpavel Apr 30, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion database/src/main/postgres/flows/V1.7.4__flows.alter.ddl
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
* limitations under the License.
*/

ALTER COLUMN fk_primary_partitioning SET NOT NULL;
ALTER TABLE flows.flows
ALTER COLUMN fk_primary_partitioning SET NOT NULL;

CREATE UNIQUE INDEX IF NOT EXISTS unq_flows ON flows.flows (fk_primary_partitioning);
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Copyright 2021 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/


CREATE OR REPLACE FUNCTION runs.get_partitioning_measures(
IN i_partitioning JSONB,
OUT status INTEGER,
OUT status_text TEXT,
OUT measure_name TEXT,
OUT measured_columns TEXT[]
) RETURNS SETOF record AS
$$
-------------------------------------------------------------------------------
--
-- Function: runs.get_partitioning_measures(1)
-- Returns measures for the given partitioning
--
-- Parameters:
-- i_partitioning - partitioning we are asking the measures for
--
-- Returns:
-- status - Status code
-- status_text - Status message
-- measure_name - Name of the measure
-- measured_columns - Array of columns associated with the measure

-- Status codes:
-- 11 - OK
-- 41 - Partitioning not found
--
-------------------------------------------------------------------------------

DECLARE
_fk_partitioning BIGINT;
BEGIN
_fk_partitioning = runs._get_id_partitioning(i_partitioning);

IF _fk_partitioning IS NULL THEN
status := 41;
status_text := 'The partitioning does not exist.';
RETURN NEXT;
RETURN;
END IF;

status := 11;
status_text := 'OK';

RETURN QUERY
SELECT status, status_text, md.measure_name, md.measured_columns
FROM runs.measure_definitions AS md
WHERE md.fk_partitioning = _fk_partitioning;

END;
$$
LANGUAGE plpgsql VOLATILE SECURITY DEFINER;

ALTER FUNCTION runs.get_partitioning_measures(JSONB) OWNER TO atum_owner;
GRANT EXECUTE ON FUNCTION runs.get_partitioning_measures(JSONB) TO atum_user;
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Copyright 2021 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

CREATE OR REPLACE FUNCTION runs.get_partitioning_additional_data(
IN i_partitioning JSONB,
OUT status INTEGER,
OUT status_text TEXT,
OUT ad_name TEXT,
OUT ad_value TEXT
) RETURNS SETOF record AS

$$
-------------------------------------------------------------------------------
--
-- Function: runs.get_partitioning_additional_data(1)
-- Returns additional data for the given partitioning
--
-- Parameters:
-- i_partitioning - partitioning for requested additional data
--
-- Returns:
-- status - Status code
-- status_text - Status message
-- ad_name - Name of the additional data
-- ad_value - Value of the additional data
--
-- Status codes:
-- 11 - OK
-- 41 - Partitioning not found
--
-------------------------------------------------------------------------------

DECLARE
_fk_partitioning BIGINT;
BEGIN
_fk_partitioning = runs._get_id_partitioning(i_partitioning);

IF _fk_partitioning IS NULL THEN
status := 41;
status_text := 'The partitioning does not exist.';
RETURN NEXT;
RETURN;
END IF;

status = 11;
status_text = 'OK';

RETURN QUERY
SELECT status, status_text, ad.ad_name, ad.ad_value
FROM runs.additional_data AS ad
WHERE ad.fk_partitioning = _fk_partitioning;

END;
$$
LANGUAGE plpgsql VOLATILE SECURITY DEFINER;

ALTER FUNCTION runs.get_partitioning_additional_data(JSONB) OWNER TO atum_owner;
GRANT EXECUTE ON FUNCTION runs.get_partitioning_additional_data(JSONB) TO atum_user;
Original file line number Diff line number Diff line change
Expand Up @@ -72,23 +72,20 @@ class GetPartitioningAdditionalDataTest extends DBTestSuite{
.add("created_by", "Joseph")
.add("ad_name", "ad_1")
.add("ad_value", "This is the additional data for Joseph")
.add("updated_by", "Joseph")
)

table("runs.additional_data").insert(
add("fk_partitioning", fkPartitioning1)
.add("created_by", "Joseph")
.add("ad_name", "ad_2")
.add("ad_value", "This is the additional data for Joseph")
.add("updated_by", "Joseph")
)

table("runs.additional_data").insert(
add("fk_partitioning", fkPartitioning2)
.add("created_by", "Daniel")
.add("ad_name", "ad_3")
.add("ad_value", "This is the additional data for Daniel")
.add("updated_by", "Daniel")
)

function(fncGetPartitioningAdditionalData)
Expand Down Expand Up @@ -130,10 +127,6 @@ class GetPartitioningAdditionalDataTest extends DBTestSuite{
function(fncGetPartitioningAdditionalData)
.setParam("i_partitioning", partitioning2)
.execute { queryResult =>
val results = queryResult.next()
assert(results.getInt("status").contains(16))
assert(results.getString("status_text").contains("No additional data found for the given partitioning."))
assert(results.getString("ad_name").isEmpty)
assert(!queryResult.hasNext)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,9 +135,6 @@ class GetPartitioningMeasuresTest extends DBTestSuite {
function(fncGetPartitioningMeasures)
.setParam("i_partitioning", partitioning)
.execute { queryResult =>
val results = queryResult.next()
assert(results.getInt("status").contains(16))
assert(results.getString("status_text").contains("No measures found for the given partitioning."))
assert(!queryResult.hasNext)
}
}
Expand Down
2 changes: 2 additions & 0 deletions server/src/main/scala/za/co/absa/atum/server/Main.scala
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ object Main extends ZIOAppDefault with Server {
PartitioningRepositoryImpl.layer,
CheckpointRepositoryImpl.layer,
CreatePartitioningIfNotExists.layer,
GetPartitioningMeasures.layer,
GetPartitioningAdditionalData.layer,
CreateOrUpdateAdditionalData.layer,
WriteCheckpoint.layer,
PostgresDatabaseProvider.layer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,6 @@ import zio.macros.accessible

@accessible
trait PartitioningController {
def createPartitioningIfNotExists(partitioning: PartitioningSubmitDTO): IO[ErrorResponse, AtumContextDTO]
def createPartitioningIfNotExists(partitioningSubmitDTO: PartitioningSubmitDTO): IO[ErrorResponse, AtumContextDTO]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tiny:
Why the parameter rename?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because I wanted to distinguish between the partitioning parameter for this method and for partitioningMeasure's and partitioningAdditionalData methods

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, good reason. It's just it's not the best idea to put the type name into the parameter/variable name (only with rare exceptions). Next time I would for example use submittedPartitioning: PartitioningSubmitDTO.

But don't change it anymore! It's OK, good enough 😉

def createOrUpdateAdditionalData(additionalData: AdditionalDataSubmitDTO): IO[ErrorResponse, AdditionalDataSubmitDTO]
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,29 @@

package za.co.absa.atum.server.api.controller

import za.co.absa.atum.model.dto._

import za.co.absa.atum.model.dto.{AdditionalDataSubmitDTO, AtumContextDTO, PartitioningSubmitDTO}
import za.co.absa.atum.server.api.exception.ServiceError
import za.co.absa.atum.server.api.service.PartitioningService
import za.co.absa.atum.server.model.ErrorResponse
import za.co.absa.atum.server.model.{ErrorResponse, InternalServerErrorResponse}
import zio._

class PartitioningControllerImpl(partitioningService: PartitioningService)
extends PartitioningController with BaseController {

override def createPartitioningIfNotExists(partitioning: PartitioningSubmitDTO): IO[ErrorResponse, AtumContextDTO] = {
serviceCallWithStatus[Unit, AtumContextDTO](
partitioningService.createPartitioningIfNotExists(partitioning),
_ => {
val measures: Set[MeasureDTO] = Set(MeasureDTO("count", Seq("*")))
val additionalData: AdditionalDataDTO = Map.empty
AtumContextDTO(partitioning.partitioning, measures, additionalData)
override def createPartitioningIfNotExists(partitioningSubmitDTO: PartitioningSubmitDTO): IO[ErrorResponse, AtumContextDTO] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same tiny as above:

for {
_ <- partitioningService.createPartitioningIfNotExists(partitioningSubmitDTO)
.mapError(serviceError => InternalServerErrorResponse(serviceError.message))
measures <- partitioningService.getPartitioningMeasures(partitioningSubmitDTO.partitioning)
.mapError {
serviceError: ServiceError => InternalServerErrorResponse(serviceError.message)
}
)
additionalData <- partitioningService.getPartitioningAdditionalData(partitioningSubmitDTO.partitioning)
.mapError {
serviceError: ServiceError => InternalServerErrorResponse(serviceError.message)
}
} yield AtumContextDTO(partitioningSubmitDTO.partitioning, measures.toSet, additionalData)
}

override def createOrUpdateAdditionalData(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ object DoobieImplicits {
private implicit val showPGobject: Show[PGobject] = Show.show(_.getValue.take(250))
private implicit val showPgArray: Show[PgArray] = Show.fromToString

implicit val getMapWithOptionStringValues: Get[Map[String, Option[String]]] = Get[Map[String, String]]
.tmap(map => map.map { case (k, v) => k -> Option(v) })

object Sequence {

implicit val get: Get[Seq[String]] = Get[List[String]].map(_.toSeq)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright 2021 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.atum.server.api.database.runs.functions

import doobie.Fragment
import doobie.implicits.toSqlInterpolator
import doobie.util.Read
import play.api.libs.json.Json
import za.co.absa.atum.model.dto.PartitioningDTO
import za.co.absa.atum.server.api.database.PostgresDatabaseProvider
import za.co.absa.atum.server.api.database.runs.Runs
import za.co.absa.atum.server.model.PartitioningForDB
import za.co.absa.fadb.DBSchema
import za.co.absa.fadb.doobie.DoobieFunction.DoobieMultipleResultFunction
import za.co.absa.fadb.doobie.DoobieEngine
import zio.interop.catz.asyncInstance
import zio.{Task, URLayer, ZIO, ZLayer}
import za.co.absa.atum.server.api.database.DoobieImplicits.getMapWithOptionStringValues

class GetPartitioningAdditionalData (implicit schema: DBSchema, dbEngine: DoobieEngine[Task])
extends DoobieMultipleResultFunction[PartitioningDTO, (String, Option[String]), Task]
{
import za.co.absa.atum.server.api.database.DoobieImplicits.Jsonb.jsonbPutUsingString
override def sql(values: PartitioningDTO)(implicit read: Read[(String, Option[String])]): Fragment = {
val partitioning: PartitioningForDB = PartitioningForDB.fromSeqPartitionDTO(values)
val partitioningJsonString = Json.toJson(partitioning).toString

sql"""SELECT ${Fragment.const(selectEntry)} FROM ${Fragment.const(functionName)}(
${
partitioningJsonString
}
) ${Fragment.const(alias)};"""
}

}

object GetPartitioningAdditionalData {
val layer: URLayer[PostgresDatabaseProvider, GetPartitioningAdditionalData] = ZLayer {
for {
dbProvider <- ZIO.service[PostgresDatabaseProvider]
} yield new GetPartitioningAdditionalData()(Runs, dbProvider.dbEngine)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright 2021 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.atum.server.api.database.runs.functions

import doobie.Fragment
import doobie.implicits.toSqlInterpolator
import doobie.util.Read
import play.api.libs.json.Json
import za.co.absa.atum.model.dto.{MeasureDTO, PartitioningDTO}
import za.co.absa.atum.server.model.PartitioningForDB
import za.co.absa.fadb.DBSchema
import za.co.absa.fadb.doobie.DoobieEngine
import za.co.absa.fadb.doobie.DoobieFunction.DoobieMultipleResultFunction
import za.co.absa.atum.server.api.database.PostgresDatabaseProvider
import za.co.absa.atum.server.api.database.runs.Runs
import zio._
import zio.interop.catz._
import za.co.absa.atum.server.api.database.DoobieImplicits.Sequence.get

class GetPartitioningMeasures (implicit schema: DBSchema, dbEngine: DoobieEngine[Task])
extends DoobieMultipleResultFunction[PartitioningDTO, MeasureDTO, Task]
{
import za.co.absa.atum.server.api.database.DoobieImplicits.Jsonb.jsonbPutUsingString

override def sql(values: PartitioningDTO)(implicit read: Read[MeasureDTO]): Fragment = {
val partitioning = PartitioningForDB.fromSeqPartitionDTO(values)
val partitioningJsonString = Json.toJson(partitioning).toString

sql"""SELECT ${Fragment.const(selectEntry)} FROM ${Fragment.const(functionName)}(
${
partitioningJsonString
}
) ${Fragment.const(alias)};"""
}

}

object GetPartitioningMeasures {
val layer: URLayer[PostgresDatabaseProvider, GetPartitioningMeasures] = ZLayer {
for {
dbProvider <- ZIO.service[PostgresDatabaseProvider]
} yield new GetPartitioningMeasures()(Runs, dbProvider.dbEngine)
}
}

Loading
Loading