Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

get partitioning additional data v2 #252

Merged
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Copyright 2021 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

CREATE OR REPLACE FUNCTION runs.get_partitioning_additional_data(
IN i_partitioning_id BIGINT,
OUT status INTEGER,
OUT status_text TEXT,
OUT ad_name TEXT,
OUT ad_value TEXT,
OUT ad_author TEXT
) RETURNS SETOF record AS
$$
-------------------------------------------------------------------------------
--
-- Function: runs.get_partitioning_additional_data(1)
-- Returns additional data for the given partitioning
--
-- Parameters:
-- i_partitioning - partitioning for requested additional data
--
-- Returns:
-- status - Status code
-- status_text - Status message
-- ad_name - Name of the additional data
-- ad_value - Value of the additional data
-- ad_author - Author of the additional data
--
-- Status codes:
-- 11 - OK
-- 16 - No additional data found
-- 41 - Partitioning not found
--
-------------------------------------------------------------------------------

BEGIN
PERFORM 1 FROM runs.partitionings WHERE id_partitioning = i_partitioning_id;
IF NOT FOUND THEN
status := 41;
status_text := 'Partitioning not found';
RETURN NEXT;
RETURN;
END IF;

status = 11;
status_text = 'OK';

RETURN QUERY
SELECT status, status_text, ad.ad_name, ad.ad_value, ad.created_by
FROM runs.additional_data AS ad
WHERE ad.fk_partitioning = i_partitioning_id;

IF NOT FOUND THEN
status := 16;
status_text := 'No additional data found';
RETURN NEXT;
RETURN;
END IF;
END;
$$
LANGUAGE plpgsql VOLATILE SECURITY DEFINER;

ALTER FUNCTION runs.get_partitioning_additional_data(BIGINT) OWNER TO atum_owner;
GRANT EXECUTE ON FUNCTION runs.get_partitioning_additional_data(BIGINT) TO atum_user;
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
/*
* Copyright 2021 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.atum.database.runs

import za.co.absa.balta.DBTestSuite
import za.co.absa.balta.classes.JsonBString

class GetPartitioningAdditionalDataV2IntegrationTests extends DBTestSuite {

private val fncGetPartitioningAdditionalData = "runs.get_partitioning_additional_data"

private val partitioning1 = JsonBString(
"""
|{
| "version": 1,
| "keys": ["keyX", "keyY", "keyZ"],
| "keysToValues": {
| "keyX": "value1",
| "keyZ": "value3",
| "keyY": "value2"
| }
|}
|""".stripMargin
)

private val partitioning2 = JsonBString(
"""
|{
| "version": 1,
| "keys": ["key1", "key3", "key2", "key4"],
| "keysToValues": {
| "key1": "valueX",
| "key2": "valueY",
| "key3": "valueZ",
| "key4": "valueA"
| }
|}
|""".stripMargin
)

test("Get partitioning additional data returns additional data for partitioning with additional data") {
table("runs.partitionings").insert(
add("partitioning", partitioning1)
.add("created_by", "Joseph")
)

table("runs.partitionings").insert(
add("partitioning", partitioning2)
.add("created_by", "Daniel")
)

val fkPartitioning1: Long = table("runs.partitionings").fieldValue("partitioning", partitioning1, "id_partitioning").get.get
val fkPartitioning2: Long = table("runs.partitionings").fieldValue("partitioning", partitioning2, "id_partitioning").get.get

table("runs.additional_data").insert(
add("fk_partitioning", fkPartitioning1)
.add("created_by", "Joseph")
.add("ad_name", "ad_1")
.add("ad_value", "This is the additional data for Joseph")
)

table("runs.additional_data").insert(
add("fk_partitioning", fkPartitioning1)
.add("created_by", "Joseph")
.add("ad_name", "ad_2")
.add("ad_value", "This is the additional data for Joseph")
)

table("runs.additional_data").insert(
add("fk_partitioning", fkPartitioning2)
.add("created_by", "Daniel")
.add("ad_name", "ad_3")
.add("ad_value", "This is the additional data for Daniel")
)

function(fncGetPartitioningAdditionalData)
.setParam("i_partitioning_id", fkPartitioning1)
.execute { queryResult =>
val results = queryResult.next()
assert(results.getInt("status").contains(11))
assert(results.getString("status_text").contains("OK"))
assert(results.getString("ad_name").contains("ad_1"))
assert(results.getString("ad_value").contains("This is the additional data for Joseph"))
assert(results.getString("ad_author").contains("Joseph"))

val results2 = queryResult.next()
assert(results2.getInt("status").contains(11))
assert(results2.getString("status_text").contains("OK"))
assert(results2.getString("ad_name").contains("ad_2"))
assert(results2.getString("ad_value").contains("This is the additional data for Joseph"))
assert(results2.getString("ad_author").contains("Joseph"))

assert(!queryResult.hasNext)
}

table("runs.additional_data").where(add("fk_partitioning", fkPartitioning1)) { additionalDataResult =>
assert(additionalDataResult.hasNext)
val row = additionalDataResult.next()
assert(row.getString("ad_name").contains("ad_1"))
assert(row.getString("ad_value").contains("This is the additional data for Joseph"))
assert(row.getString("created_by").contains("Joseph"))
}

}

test("Get partitioning additional data should return no records for partitioning without additional data") {
table("runs.partitionings").insert(
add("partitioning", partitioning2)
.add("created_by", "Joseph")
)

val fkPartitioning: Long = table("runs.partitionings").fieldValue("partitioning", partitioning2, "id_partitioning").get.get

function(fncGetPartitioningAdditionalData)
.setParam("i_partitioning_id", fkPartitioning)
.execute { queryResult =>
val result = queryResult.next()
assert(result.getInt("status").contains(16))
assert(result.getString("status_text").contains("No additional data found"))
assert(!queryResult.hasNext)
}

table("runs.additional_data").where(add("fk_partitioning", fkPartitioning)) { additionalDataResult =>
assert(!additionalDataResult.hasNext)
}
}

test("Get partitioning additional data should return error status code on non existing partitioning") {
function(fncGetPartitioningAdditionalData)
.setParam("i_partitioning_id", 0L)
.execute { queryResult =>
val results = queryResult.next()
assert(results.getInt("status").contains(41))
assert(results.getString("status_text").contains("Partitioning not found"))
assert(!queryResult.hasNext)
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import io.circe.generic.semiauto.{deriveDecoder, deriveEncoder}
import io.circe.{Decoder, Encoder}

case class AdditionalDataItemDTO(
value: String,
value: Option[String],
author: String
)

Expand Down
1 change: 1 addition & 0 deletions server/src/main/scala/za/co/absa/atum/server/Main.scala
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ object Main extends ZIOAppDefault with Server {
CreatePartitioningIfNotExists.layer,
GetPartitioningMeasures.layer,
GetPartitioningAdditionalData.layer,
GetPartitioningAdditionalDataV2.layer,
CreateOrUpdateAdditionalData.layer,
GetPartitioningCheckpoints.layer,
WriteCheckpoint.layer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,7 @@

package za.co.absa.atum.server.api.controller

import za.co.absa.atum.model.dto.{
AdditionalDataSubmitDTO,
AtumContextDTO,
CheckpointDTO,
CheckpointQueryDTO,
PartitioningSubmitDTO
}
import za.co.absa.atum.model.dto._
import za.co.absa.atum.server.model.ErrorResponse
import za.co.absa.atum.server.model.SuccessResponse.{MultiSuccessResponse, SingleSuccessResponse}
import zio.IO
Expand All @@ -38,6 +32,10 @@ trait PartitioningController {
partitioningSubmitDTO: PartitioningSubmitDTO
): IO[ErrorResponse, SingleSuccessResponse[AtumContextDTO]]

def getPartitioningAdditionalDataV2(
partitioningId: Long
): IO[ErrorResponse, SingleSuccessResponse[AdditionalDataDTO]]

def createOrUpdateAdditionalDataV2(
additionalData: AdditionalDataSubmitDTO
): IO[ErrorResponse, SingleSuccessResponse[AdditionalDataSubmitDTO]]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,16 @@ class PartitioningControllerImpl(partitioningService: PartitioningService)
)
}

override def getPartitioningAdditionalDataV2(
partitioningId: Long
): IO[ErrorResponse, SingleSuccessResponse[AdditionalDataDTO]] = {
mapToSingleSuccessResponse(
serviceCall[AdditionalDataDTO, AdditionalDataDTO](
partitioningService.getPartitioningAdditionalDataV2(partitioningId),
identity
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just an idea, what about having identity as the default parameter value? 🤔

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, good idea. Applied.

)
)
}
}

object PartitioningControllerImpl {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright 2021 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.atum.server.api.database.runs.functions

import doobie.implicits.toSqlInterpolator
import za.co.absa.atum.server.api.database.PostgresDatabaseProvider
import za.co.absa.atum.server.api.database.runs.Runs
import za.co.absa.atum.server.model.AdditionalDataItemFromDB
import za.co.absa.db.fadb.DBSchema
import za.co.absa.db.fadb.doobie.DoobieFunction.DoobieMultipleResultFunctionWithAggStatus
import za.co.absa.db.fadb.doobie.DoobieEngine
import zio.{Task, URLayer, ZIO, ZLayer}

import za.co.absa.atum.server.api.database.DoobieImplicits.getMapWithOptionStringValues
import za.co.absa.db.fadb.doobie.postgres.circe.implicits.jsonbPut
import za.co.absa.db.fadb.status.aggregation.implementations.ByFirstErrorStatusAggregator
import za.co.absa.db.fadb.status.handling.implementations.StandardStatusHandling

class GetPartitioningAdditionalDataV2(implicit schema: DBSchema, dbEngine: DoobieEngine[Task])
extends DoobieMultipleResultFunctionWithAggStatus[Long, Option[AdditionalDataItemFromDB], Task](input =>
Seq(fr"$input"), Some("get_partitioning_additional_data")
)
with StandardStatusHandling
with ByFirstErrorStatusAggregator {

override def fieldsToSelect: Seq[String] = super.fieldsToSelect ++ Seq("ad_name", "ad_value", "ad_author")
}

object GetPartitioningAdditionalDataV2 {
val layer: URLayer[PostgresDatabaseProvider, GetPartitioningAdditionalDataV2] = ZLayer {
for {
dbProvider <- ZIO.service[PostgresDatabaseProvider]
} yield new GetPartitioningAdditionalDataV2()(Runs, dbProvider.dbEngine)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,15 @@ trait Endpoints extends BaseEndpoints {
.out(jsonBody[SingleSuccessResponse[AtumContextDTO]])
}

protected val getPartitioningAdditionalDataEndpointV2
: PublicEndpoint[Long, ErrorResponse, SingleSuccessResponse[AdditionalDataDTO], Any] = {
apiV2.get
.in(V2Paths.Partitionings / path[Long]("partitioningId") / V2Paths.AdditionalData)
.out(statusCode(StatusCode.Ok))
.out(jsonBody[SingleSuccessResponse[AdditionalDataDTO]])
.errorOutVariantPrepend(notFoundErrorOneOfVariant)
}

protected val createOrUpdateAdditionalDataEndpointV2
: PublicEndpoint[AdditionalDataSubmitDTO, ErrorResponse, SingleSuccessResponse[AdditionalDataSubmitDTO], Any] = {
apiV2.post
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ trait Routes extends Endpoints with ServerOptions {
),
createServerEndpoint(createPartitioningEndpointV1, PartitioningController.createPartitioningIfNotExistsV1),
createServerEndpoint(createPartitioningEndpointV2, PartitioningController.createPartitioningIfNotExistsV2),
createServerEndpoint(
getPartitioningAdditionalDataEndpointV2,
PartitioningController.getPartitioningAdditionalDataV2
),
createServerEndpoint(
createOrUpdateAdditionalDataEndpointV2,
PartitioningController.createOrUpdateAdditionalDataV2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,7 @@

package za.co.absa.atum.server.api.repository

import za.co.absa.atum.model.dto.{
InitialAdditionalDataDTO,
AdditionalDataSubmitDTO,
CheckpointQueryDTO,
MeasureDTO,
PartitioningDTO,
PartitioningSubmitDTO
}
import za.co.absa.atum.model.dto._
import za.co.absa.atum.server.api.exception.DatabaseError
import za.co.absa.atum.server.model.CheckpointFromDB
import zio.IO
Expand All @@ -37,6 +30,10 @@ trait PartitioningRepository {

def getPartitioningAdditionalData(partitioning: PartitioningDTO): IO[DatabaseError, InitialAdditionalDataDTO]

def getPartitioningAdditionalDataV2(
partitioningId: Long
): IO[DatabaseError, AdditionalDataDTO]

def createOrUpdateAdditionalData(additionalData: AdditionalDataSubmitDTO): IO[DatabaseError, Unit]

def getPartitioningCheckpoints(checkpointQueryDTO: CheckpointQueryDTO): IO[DatabaseError, Seq[CheckpointFromDB]]
Expand Down
Loading
Loading