Skip to content

Commit

Permalink
get partitioning endpoint v2 implementation #226: (#240)
Browse files Browse the repository at this point in the history
---------

Co-authored-by: Pavel Salamon <[email protected]>
  • Loading branch information
TebaleloS and salamonpavel authored Aug 30, 2024
1 parent b080a34 commit 48a7a32
Show file tree
Hide file tree
Showing 22 changed files with 557 additions and 13 deletions.
69 changes: 69 additions & 0 deletions database/src/main/postgres/runs/V1.9.4__get_partitioning_by_id.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Copyright 2021 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

-- Function: runs.get_partitioning_by_id(Long)
CREATE OR REPLACE FUNCTION runs.get_partitioning_by_id(
IN i_id BIGINT,
OUT status INTEGER,
OUT status_text TEXT,
OUT id BIGINT,
OUT partitioning JSONB,
OUT author TEXT
) RETURNS RECORD AS
$$
-------------------------------------------------------------------------------
--
-- Function: runs.get_partitioning_by_id(1)
-- Returns partitioning for the given id
--
-- Parameters:
-- i_id - id that we asking the partitioning for
--
-- Returns:
-- status - Status code
-- status_text - Status message
-- partitioning - Partitioning value to be returned
-- author - Author of the partitioning

-- Status codes:
-- 11 - OK
-- 41 - Partitioning not found
--
-------------------------------------------------------------------------------
BEGIN

SELECT P.partitioning,
P.id_partitioning,
P.created_by
FROM runs.partitionings AS P
WHERE P.id_partitioning = i_id
INTO get_partitioning_by_id.partitioning, id, author;

IF FOUND THEN
status := 11;
status_text := 'OK';
ELSE
status := 41;
status_text := 'Partitioning not found';
END IF;

RETURN;
END;
$$
LANGUAGE plpgsql VOLATILE SECURITY DEFINER;

ALTER FUNCTION runs.get_partitioning_by_id(BIGINT) OWNER TO atum_owner;
GRANT EXECUTE ON FUNCTION runs.get_partitioning_by_id(BIGINT) TO atum_user;

Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Copyright 2021 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.atum.database.runs

import za.co.absa.balta.DBTestSuite
import za.co.absa.balta.classes.JsonBString
import io.circe.Json
import io.circe.parser._

class GetPartitioningByIdIntegrationTests extends DBTestSuite {

private val fncGetPartitioningById = "runs.get_partitioning_by_id"

private val partitioning = JsonBString(
"""
|{
| "version": 1,
| "keys": ["key1", "key3", "key2", "key4"],
| "keysToValues": {
| "key1": "valueX",
| "key2": "valueY",
| "key3": "valueZ",
| "key4": "valueA"
| }
|}
|""".stripMargin
)

private val expectedPartitioning = parse(partitioning.value).getOrElse(throw new Exception("Failed to parse JSON"))

test("Partitioning retrieved successfully") {
table("runs.partitionings").insert(
add("partitioning", partitioning)
.add("created_by", "Joseph")
)

val fkPartitioning1: Long = table("runs.partitionings").fieldValue("partitioning", partitioning, "id_partitioning").get.get

function(fncGetPartitioningById)
.setParam("i_id", fkPartitioning1)
.execute { queryResult =>
assert(queryResult.hasNext)
val row = queryResult.next()
val returnedPartitioning = row.getJsonB("partitioning").get
val returnedPartitioningParsed = parse(returnedPartitioning.value)
.getOrElse(fail("Failed to parse returned partitioning"))
assert(row.getInt("status").contains(11))
assert(row.getString("status_text").contains("OK"))
assert(row.getLong("id").contains(fkPartitioning1))
assert(returnedPartitioningParsed == expectedPartitioning)
assert(row.getString("author").contains("Joseph"))
}

table("runs.partitionings").where(add("id_partitioning", fkPartitioning1)) { partitioningResult =>
val row = partitioningResult.next()
val returnedPartitioning = row.getJsonB("partitioning").get
val returnedPartitioningParsed = parse(returnedPartitioning.value)
.getOrElse(fail("Failed to parse returned partitioning"))
assert(returnedPartitioningParsed == expectedPartitioning)
assert(row.getString("created_by").contains("Joseph"))
}
}

test("Partitioning not found") {
val nonExistentID = 9999L

function(fncGetPartitioningById)
.setParam("i_id", nonExistentID)
.execute { queryResult =>
assert(queryResult.hasNext)
val row = queryResult.next()
assert(row.getInt("status").contains(41))
assert(row.getString("status_text").contains("Partitioning not found"))
assert(row.getJsonB("partitioning").isEmpty)
assert(row.getString("author").isEmpty)
}
}
}


Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import io.circe.{Decoder, Encoder}
case class PartitioningWithIdDTO(
id: Long,
partitioning: PartitioningDTO,
parentPartitioning: Option[PartitioningDTO],
author: String
)

Expand Down
8 changes: 6 additions & 2 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -236,12 +236,16 @@ object Dependencies {
}

def databaseDependencies: Seq[ModuleID] = {
lazy val scalaTest = "org.scalatest" %% "scalatest" % Versions.scalatest % Test
lazy val balta = "za.co.absa" %% "balta" % Versions.balta % Test
lazy val scalaTest = "org.scalatest" %% "scalatest" % Versions.scalatest % Test
lazy val balta = "za.co.absa" %% "balta" % Versions.balta % Test
lazy val circe = "io.circe" %% "circe-core" % Versions.circeJson % Test
lazy val parser = "io.circe" %% "circe-parser" % Versions.circeJson % Test

Seq(
scalaTest,
balta,
circe,
parser,
)
}

Expand Down
1 change: 1 addition & 0 deletions server/src/main/scala/za/co/absa/atum/server/Main.scala
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ object Main extends ZIOAppDefault with Server {
WriteCheckpointV2.layer,
GetPartitioningCheckpointV2.layer,
GetFlowCheckpoints.layer,
GetPartitioningById.layer,
PostgresDatabaseProvider.layer,
TransactorProvider.layer,
AwsSecretsProviderImpl.layer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package za.co.absa.atum.server.api.controller
import za.co.absa.atum.server.api.exception.ServiceError
import za.co.absa.atum.server.api.exception.ServiceError._
import za.co.absa.atum.server.api.http.ApiPaths
import za.co.absa.atum.server.model.{ConflictErrorResponse, ErrorResponse, InternalServerErrorResponse}
import za.co.absa.atum.server.model.{ConflictErrorResponse, ErrorResponse, InternalServerErrorResponse, NotFoundErrorResponse}
import za.co.absa.atum.server.model.SuccessResponse.{MultiSuccessResponse, SingleSuccessResponse}
import za.co.absa.atum.server.model._
import zio._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,7 @@ trait PartitioningController {
def getPartitioningCheckpointsV2(
checkpointQueryDTO: CheckpointQueryDTO
): IO[ErrorResponse, MultiSuccessResponse[CheckpointDTO]]

def getPartitioningV2(partitioningId: Long): IO[ErrorResponse, SingleSuccessResponse[PartitioningWithIdDTO]]

}
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,15 @@ class PartitioningControllerImpl(partitioningService: PartitioningService)
)
)
}
override def getPartitioningV2(
partitioningId: Long
): IO[ErrorResponse, SingleSuccessResponse[PartitioningWithIdDTO]] = {
mapToSingleSuccessResponse(
serviceCall[PartitioningWithIdDTO, PartitioningWithIdDTO](
partitioningService.getPartitioning(partitioningId)
)
)
}
}

object PartitioningControllerImpl {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright 2021 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.atum.server.api.database.runs.functions

import doobie.implicits.toSqlInterpolator
import za.co.absa.atum.server.api.database.PostgresDatabaseProvider
import za.co.absa.atum.server.api.database.runs.Runs
import za.co.absa.db.fadb.DBSchema
import za.co.absa.db.fadb.doobie.DoobieEngine
import za.co.absa.db.fadb.doobie.DoobieFunction.DoobieSingleResultFunctionWithStatus
import za.co.absa.db.fadb.status.handling.implementations.StandardStatusHandling
import za.co.absa.atum.server.model.PartitioningFromDB
import zio._
import za.co.absa.atum.server.api.database.DoobieImplicits.Sequence.get
import doobie.postgres.implicits._
import za.co.absa.db.fadb.doobie.postgres.circe.implicits.jsonbGet

class GetPartitioningById(implicit schema: DBSchema, dbEngine: DoobieEngine[Task])
extends DoobieSingleResultFunctionWithStatus[Long, Option[PartitioningFromDB], Task](values =>
Seq(
fr"$values"
)
)
with StandardStatusHandling {

override def fieldsToSelect: Seq[String] = super.fieldsToSelect ++ Seq("id", "partitioning", "author")
}

object GetPartitioningById {
val layer: URLayer[PostgresDatabaseProvider, GetPartitioningById] = ZLayer {
for {
dbProvider <- ZIO.service[PostgresDatabaseProvider]
} yield new GetPartitioningById()(Runs, dbProvider.dbEngine)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,15 @@ trait Endpoints extends BaseEndpoints {
.out(jsonBody[MultiSuccessResponse[CheckpointDTO]])
}

protected val getPartitioningEndpointV2
: PublicEndpoint[Long, ErrorResponse, SingleSuccessResponse[PartitioningWithIdDTO], Any] = {
apiV2.get
.in(V2Paths.Partitionings / path[Long]("partitioningId"))
.out(statusCode(StatusCode.Ok))
.out(jsonBody[SingleSuccessResponse[PartitioningWithIdDTO]])
.errorOutVariantPrepend(notFoundErrorOneOfVariant)
}

protected val zioMetricsEndpoint: PublicEndpoint[Unit, Unit, String, Any] = {
endpoint.get.in(ZioMetrics).out(stringBody)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ trait Routes extends Endpoints with ServerOptions {
),
createServerEndpoint(getPartitioningCheckpointsEndpointV2, PartitioningController.getPartitioningCheckpointsV2),
createServerEndpoint(getFlowCheckpointsEndpointV2, FlowController.getFlowCheckpointsV2),
createServerEndpoint(getPartitioningEndpointV2, PartitioningController.getPartitioningV2),
createServerEndpoint(healthEndpoint, (_: Unit) => ZIO.unit)
)
ZHttp4sServerInterpreter[HttpEnv.Env](http4sServerOptions(metricsInterceptorOption)).from(endpoints).toRoutes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,7 @@ trait PartitioningRepository {
def createOrUpdateAdditionalData(additionalData: AdditionalDataSubmitDTO): IO[DatabaseError, Unit]

def getPartitioningCheckpoints(checkpointQueryDTO: CheckpointQueryDTO): IO[DatabaseError, Seq[CheckpointFromDB]]

def getPartitioning(partitioningId: Long): IO[DatabaseError, PartitioningWithIdDTO]

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,20 @@
package za.co.absa.atum.server.api.repository

import za.co.absa.atum.model.dto._
import za.co.absa.atum.server.model.{AdditionalDataFromDB, AdditionalDataItemFromDB, CheckpointFromDB, MeasureFromDB}
import za.co.absa.atum.server.model.{AdditionalDataFromDB, AdditionalDataItemFromDB, CheckpointFromDB, MeasureFromDB, PartitioningFromDB}
import za.co.absa.atum.server.api.database.runs.functions._
import za.co.absa.atum.server.api.exception.DatabaseError
import zio._
import zio.interop.catz.asyncInstance
import za.co.absa.atum.server.api.exception.DatabaseError.GeneralDatabaseError

class PartitioningRepositoryImpl(
createPartitioningIfNotExistsFn: CreatePartitioningIfNotExists,
getPartitioningMeasuresFn: GetPartitioningMeasures,
getPartitioningAdditionalDataFn: GetPartitioningAdditionalData,
createOrUpdateAdditionalDataFn: CreateOrUpdateAdditionalData,
getPartitioningCheckpointsFn: GetPartitioningCheckpoints,
getPartitioningByIdFn: GetPartitioningById,
getPartitioningAdditionalDataV2Fn: GetPartitioningAdditionalDataV2
) extends PartitioningRepository
with BaseRepository {
Expand Down Expand Up @@ -78,6 +80,20 @@ class PartitioningRepositoryImpl(
}.toMap)
.map(AdditionalDataDTO(_))
}

override def getPartitioning(partitioningId: Long): IO[DatabaseError, PartitioningWithIdDTO] = {
dbSingleResultCallWithStatus(getPartitioningByIdFn(partitioningId), "getPartitioningById")
.flatMap {
case Some(PartitioningFromDB(id, partitioning, author)) =>
val decodingResult = partitioning.as[PartitioningDTO]
decodingResult.fold(
error => ZIO.fail(GeneralDatabaseError(s"Failed to decode JSON: $error")),
partitioningDTO => ZIO.succeed(PartitioningWithIdDTO(id, partitioningDTO, author))
)
case None => ZIO.fail(GeneralDatabaseError("Unexpected error."))
}
}

}

object PartitioningRepositoryImpl {
Expand All @@ -87,7 +103,9 @@ object PartitioningRepositoryImpl {
with GetPartitioningAdditionalData
with CreateOrUpdateAdditionalData
with GetPartitioningCheckpoints
with GetPartitioningAdditionalDataV2,
with GetPartitioningAdditionalDataV2
with GetPartitioningCheckpoints
with GetPartitioningById,
PartitioningRepository
] = ZLayer {
for {
Expand All @@ -96,13 +114,15 @@ object PartitioningRepositoryImpl {
getPartitioningAdditionalData <- ZIO.service[GetPartitioningAdditionalData]
createOrUpdateAdditionalData <- ZIO.service[CreateOrUpdateAdditionalData]
getPartitioningCheckpoints <- ZIO.service[GetPartitioningCheckpoints]
getPartitioningById <- ZIO.service[GetPartitioningById]
getPartitioningAdditionalDataV2 <- ZIO.service[GetPartitioningAdditionalDataV2]
} yield new PartitioningRepositoryImpl(
createPartitioningIfNotExists,
getPartitioningMeasures,
getPartitioningAdditionalData,
createOrUpdateAdditionalData,
getPartitioningCheckpoints,
getPartitioningById,
getPartitioningAdditionalDataV2
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,7 @@ trait PartitioningService {
def createOrUpdateAdditionalData(additionalData: AdditionalDataSubmitDTO): IO[ServiceError, Unit]

def getPartitioningCheckpoints(checkpointQueryDTO: CheckpointQueryDTO): IO[ServiceError, Seq[CheckpointDTO]]

def getPartitioning(partitioningId: Long): IO[ServiceError, PartitioningWithIdDTO]

}
Loading

0 comments on commit 48a7a32

Please sign in to comment.