Skip to content

Commit

Permalink
Fixes #120 - Applying GitHub comments
Browse files Browse the repository at this point in the history
  • Loading branch information
TebaleloS committed Apr 15, 2024
1 parent ae68970 commit d75b5dd
Show file tree
Hide file tree
Showing 16 changed files with 101 additions and 122 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ $$

-- Status codes:
-- 11 - OK
-- 16 - Record not found for the given partitioning
-- 41 - Partitioning not found
--
-------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ $$
--
-- Status codes:
-- 11 - OK
-- 16 - Record not found for the given partitioning
-- 41 - Partitioning not found
--
-------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,6 @@ import zio.macros.accessible

@accessible
trait PartitioningController {
def createPartitioningIfNotExists(partitioning: PartitioningSubmitDTO): IO[ErrorResponse, AtumContextDTO]
def createPartitioningIfNotExists(partitioningSubmitDTO: PartitioningSubmitDTO): IO[ErrorResponse, AtumContextDTO]
def createOrUpdateAdditionalData(additionalData: AdditionalDataSubmitDTO): IO[ErrorResponse, AdditionalDataSubmitDTO]
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package za.co.absa.atum.server.api.controller


import za.co.absa.atum.model.dto._
import za.co.absa.atum.model.dto.{AdditionalDataSubmitDTO, AtumContextDTO, PartitioningSubmitDTO}
import za.co.absa.atum.server.api.exception.ServiceError
import za.co.absa.atum.server.api.service.PartitioningService
import za.co.absa.atum.server.model.{ErrorResponse, InternalServerErrorResponse}
Expand All @@ -26,11 +26,17 @@ import zio._
class PartitioningControllerImpl(partitioningService: PartitioningService)
extends PartitioningController with BaseController {

override def createPartitioningIfNotExists(partitioning: PartitioningSubmitDTO): IO[ErrorResponse, AtumContextDTO] = {
partitioningService.returnAtumContext(partitioning)
.mapError { serviceError: ServiceError =>
override def createPartitioningIfNotExists(partitioningSubmitDTO: PartitioningSubmitDTO): IO[ErrorResponse, AtumContextDTO] = {
for {
_ <- partitioningService.createPartitioningIfNotExists(partitioningSubmitDTO)
.mapError(error => InternalServerErrorResponse(error.message))
measures <- partitioningService.getPartitioningMeasures(partitioningSubmitDTO.partitioning).mapError { serviceError: ServiceError =>
InternalServerErrorResponse(serviceError.message)
}
additionalData <- partitioningService.getPartitioningAdditionalData(partitioningSubmitDTO.partitioning).mapError { serviceError: ServiceError =>
InternalServerErrorResponse(serviceError.message)
}
} yield AtumContextDTO(partitioningSubmitDTO.partitioning, measures.toSet, additionalData)
}

override def createOrUpdateAdditionalData(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import doobie.Fragment
import doobie.implicits.toSqlInterpolator
import doobie.util.Read
import play.api.libs.json.Json
import za.co.absa.atum.model.dto.PartitioningSubmitDTO
import za.co.absa.atum.model.dto.PartitioningDTO
import za.co.absa.atum.server.api.database.PostgresDatabaseProvider
import za.co.absa.atum.server.api.database.runs.Runs
import za.co.absa.atum.server.model.PartitioningForDB
Expand All @@ -29,15 +29,14 @@ import za.co.absa.fadb.doobie.DoobieFunction.DoobieMultipleResultFunction
import za.co.absa.fadb.doobie.DoobieEngine
import zio.interop.catz.asyncInstance
import zio.{Task, URLayer, ZIO, ZLayer}

import za.co.absa.atum.server.api.database.DoobieImplicits.getMapWithOptionStringValues

class GetPartitioningAdditionalData (implicit schema: DBSchema, dbEngine: DoobieEngine[Task])
extends DoobieMultipleResultFunction[PartitioningSubmitDTO, (String, Option[String]), Task]
extends DoobieMultipleResultFunction[PartitioningDTO, (String, Option[String]), Task]
{

override def sql(values: PartitioningSubmitDTO)(implicit read: Read[(String, Option[String])]): Fragment = {
val partitioning = PartitioningForDB.fromSeqPartitionDTO(values.partitioning)
override def sql(values: PartitioningDTO)(implicit read: Read[(String, Option[String])]): Fragment = {
val partitioning: PartitioningForDB = PartitioningForDB.fromSeqPartitionDTO(values)
val partitioningJsonString = Json.toJson(partitioning).toString

sql"""SELECT ${Fragment.const(selectEntry)} FROM ${Fragment.const(functionName)}(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import doobie.Fragment
import doobie.implicits.toSqlInterpolator
import doobie.util.Read
import play.api.libs.json.Json
import za.co.absa.atum.model.dto.{MeasureDTO, PartitioningSubmitDTO}
import za.co.absa.atum.model.dto.{MeasureDTO, PartitioningDTO}
import za.co.absa.atum.server.model.PartitioningForDB
import za.co.absa.fadb.DBSchema
import za.co.absa.fadb.doobie.DoobieEngine
Expand All @@ -29,15 +29,14 @@ import za.co.absa.atum.server.api.database.PostgresDatabaseProvider
import za.co.absa.atum.server.api.database.runs.Runs
import zio._
import zio.interop.catz._

import za.co.absa.atum.server.api.database.DoobieImplicits.Sequence.get

class GetPartitioningMeasures (implicit schema: DBSchema, dbEngine: DoobieEngine[Task])
extends DoobieMultipleResultFunction[PartitioningSubmitDTO, MeasureDTO, Task]
extends DoobieMultipleResultFunction[PartitioningDTO, MeasureDTO, Task]
{

override def sql(values: PartitioningSubmitDTO)(implicit read: Read[MeasureDTO]): Fragment = {
val partitioning = PartitioningForDB.fromSeqPartitionDTO(values.partitioning)
override def sql(values: PartitioningDTO)(implicit read: Read[MeasureDTO]): Fragment = {
val partitioning = PartitioningForDB.fromSeqPartitionDTO(values)
val partitioningJsonString = Json.toJson(partitioning).toString

sql"""SELECT ${Fragment.const(selectEntry)} FROM ${Fragment.const(functionName)}(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package za.co.absa.atum.server.api.repository

import za.co.absa.atum.model.dto.{AdditionalDataDTO, MeasureDTO, PartitioningSubmitDTO, AdditionalDataSubmitDTO}
import za.co.absa.atum.model.dto.{AdditionalDataDTO, AdditionalDataSubmitDTO, MeasureDTO, PartitioningDTO, PartitioningSubmitDTO}
import za.co.absa.atum.server.api.exception.DatabaseError
import za.co.absa.fadb.exceptions.StatusException
import zio.IO
Expand All @@ -25,15 +25,15 @@ import zio.macros.accessible
@accessible
trait PartitioningRepository {
def createPartitioningIfNotExists(
partitioning: PartitioningSubmitDTO
partitioningSubmitDTO: PartitioningSubmitDTO
): IO[DatabaseError, Either[StatusException, Unit]]

def getPartitioningMeasures(
partitioning: PartitioningSubmitDTO
partitioning: PartitioningDTO
): IO[DatabaseError, Seq[MeasureDTO]]

def getPartitioningAdditionalData(
partitioning: PartitioningSubmitDTO
partitioning: PartitioningDTO
): IO[DatabaseError, AdditionalDataDTO]

def createOrUpdateAdditionalData(additionalData: AdditionalDataSubmitDTO):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,12 @@

package za.co.absa.atum.server.api.repository

import za.co.absa.atum.model.dto.{AdditionalDataDTO, AdditionalDataSubmitDTO, MeasureDTO, PartitioningSubmitDTO}
import za.co.absa.atum.server.api.database.runs.functions.{
CreateOrUpdateAdditionalData,
CreatePartitioningIfNotExists,
GetPartitioningAdditionalData,
GetPartitioningMeasures}
import za.co.absa.atum.model.dto.{AdditionalDataDTO, AdditionalDataSubmitDTO, MeasureDTO, PartitioningDTO, PartitioningSubmitDTO}
import za.co.absa.atum.server.api.database.runs.functions.{CreateOrUpdateAdditionalData, CreatePartitioningIfNotExists, GetPartitioningAdditionalData, GetPartitioningMeasures}
import za.co.absa.atum.server.api.exception.DatabaseError
import za.co.absa.fadb.exceptions.StatusException
import zio._
import zio.prelude.ZivariantOps

class PartitioningRepositoryImpl(
createPartitioningIfNotExistsFn: CreatePartitioningIfNotExists,
Expand All @@ -34,9 +31,9 @@ class PartitioningRepositoryImpl(
) extends PartitioningRepository with BaseRepository {

override def createPartitioningIfNotExists(
partitioning: PartitioningSubmitDTO
partitioningSubmitDTO: PartitioningSubmitDTO
): IO[DatabaseError, Either[StatusException, Unit]] = {
dbCallWithStatus(createPartitioningIfNotExistsFn(partitioning), "createPartitioningIfNotExists")
dbCallWithStatus(createPartitioningIfNotExistsFn(partitioningSubmitDTO), "createPartitioningIfNotExists")
}

override def createOrUpdateAdditionalData(
Expand All @@ -45,21 +42,25 @@ class PartitioningRepositoryImpl(
dbCallWithStatus(createOrUpdateAdditionalDataFn(additionalData), "createOrUpdateAdditionalData")
}

override def getPartitioningMeasures(partitioning: PartitioningSubmitDTO):
override def getPartitioningMeasures(partitioning: PartitioningDTO):
IO[DatabaseError, Seq[MeasureDTO]] = {
getPartitioningMeasuresFn(partitioning)
.mapError(err => DatabaseError(err.getMessage))
getPartitioningMeasuresFn(partitioning).mapLeft(err => DatabaseError(err.getMessage))
}

override def getPartitioningAdditionalData(partitioning: PartitioningSubmitDTO):
override def getPartitioningAdditionalData(partitioning: PartitioningDTO):
IO[DatabaseError, AdditionalDataDTO] = {
getPartitioningAdditionalDataFn(partitioning).mapBoth(err => DatabaseError(err.getMessage), _.toMap)
}
}

object PartitioningRepositoryImpl {
val layer: URLayer[CreatePartitioningIfNotExists with GetPartitioningMeasures with
GetPartitioningAdditionalData with CreateOrUpdateAdditionalData, PartitioningRepository] = ZLayer {
val layer: URLayer[
CreatePartitioningIfNotExists
with GetPartitioningMeasures
with GetPartitioningAdditionalData
with CreateOrUpdateAdditionalData,
PartitioningRepository
] = ZLayer {
for {
createPartitioningIfNotExists <- ZIO.service[CreatePartitioningIfNotExists]
getPartitioningMeasures <- ZIO.service[GetPartitioningMeasures]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,26 +16,24 @@

package za.co.absa.atum.server.api.service

import za.co.absa.atum.model.dto.{AdditionalDataDTO, AdditionalDataSubmitDTO, AtumContextDTO, MeasureDTO, PartitioningSubmitDTO}
import za.co.absa.atum.model.dto.{AdditionalDataDTO, AdditionalDataSubmitDTO, MeasureDTO, PartitioningDTO, PartitioningSubmitDTO}
import za.co.absa.atum.server.api.exception.ServiceError
import za.co.absa.fadb.exceptions.StatusException
import zio.IO
import zio.macros.accessible

@accessible
trait PartitioningService {
def createPartitioningIfNotExists(partitioning: PartitioningSubmitDTO):
def createPartitioningIfNotExists(partitioningSubmitDTO: PartitioningSubmitDTO):
IO[ServiceError, Either[StatusException, Unit]]

def getPartitioningMeasures(partitioning: PartitioningSubmitDTO):
def getPartitioningMeasures(partitioning: PartitioningDTO):
IO[ServiceError, Seq[MeasureDTO]]

def getPartitioningAdditionalData(partitioning: PartitioningSubmitDTO):
def getPartitioningAdditionalData(partitioning: PartitioningDTO):
IO[ServiceError, AdditionalDataDTO]

def createOrUpdateAdditionalData(additionalData: AdditionalDataSubmitDTO):
IO[ServiceError, Either[StatusException, Unit]]

def returnAtumContext(partitioning: PartitioningSubmitDTO): IO[ServiceError, AtumContextDTO]

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package za.co.absa.atum.server.api.service

import za.co.absa.atum.model.dto.{AdditionalDataDTO, AdditionalDataSubmitDTO, AtumContextDTO, MeasureDTO, PartitioningSubmitDTO}
import za.co.absa.atum.model.dto.{AdditionalDataDTO, AdditionalDataSubmitDTO, MeasureDTO, PartitioningDTO, PartitioningSubmitDTO}
import za.co.absa.atum.server.api.exception.ServiceError
import za.co.absa.atum.server.api.repository.PartitioningRepository
import za.co.absa.fadb.exceptions.StatusException
Expand All @@ -26,12 +26,11 @@ import zio._
class PartitioningServiceImpl(partitioningRepository: PartitioningRepository)
extends PartitioningService with BaseService {

override def createPartitioningIfNotExists(
partitioning: PartitioningSubmitDTO
): IO[ServiceError, Either[StatusException, Unit]] = {
override def createPartitioningIfNotExists(partitioningSubmitDTO: PartitioningSubmitDTO):
IO[ServiceError, Either[StatusException, Unit]] = {
repositoryCallWithStatus(
partitioningRepository.createPartitioningIfNotExists(partitioning), "createPartitioningIfNotExists"
)
partitioningRepository.createPartitioningIfNotExists(partitioningSubmitDTO), "createPartitioningIfNotExists"
).mapError(error => ServiceError(error.message))
}

override def createOrUpdateAdditionalData(
Expand All @@ -42,35 +41,20 @@ class PartitioningServiceImpl(partitioningRepository: PartitioningRepository)
).mapError(error => ServiceError(error.message))
}

override def getPartitioningMeasures(partitioning: PartitioningSubmitDTO): IO[ServiceError, Seq[MeasureDTO]] = {
override def getPartitioningMeasures(partitioning: PartitioningDTO): IO[ServiceError, Seq[MeasureDTO]] = {
partitioningRepository.getPartitioningMeasures(partitioning)
.mapError { case DatabaseError(message) =>
ServiceError(s"Failed to retrieve partitioning measures': $message")
}
}

override def getPartitioningAdditionalData(partitioning: PartitioningSubmitDTO): IO[ServiceError, AdditionalDataDTO] = {
override def getPartitioningAdditionalData(partitioning: PartitioningDTO): IO[ServiceError, AdditionalDataDTO] = {
partitioningRepository.getPartitioningAdditionalData(partitioning)
.mapError { case DatabaseError(message) =>
ServiceError(s"Failed to retrieve partitioning additional data': $message")
}
}

override def returnAtumContext(partitioningSubmitDTO: PartitioningSubmitDTO): IO[ServiceError, AtumContextDTO] = {
for {
partitioning <- createPartitioningIfNotExists(partitioningSubmitDTO)
.flatMap {
case Left(_) => ZIO.fail(ServiceError("Failed to create or retrieve partitioning"))
case Right(_) => ZIO.succeed(partitioningSubmitDTO)
}

additionalData <- getPartitioningAdditionalData(partitioning)

measures <- getPartitioningMeasures(partitioning)

} yield AtumContextDTO(partitioning.partitioning, measures.toSet, additionalData)
}

}

object PartitioningServiceImpl {
Expand Down
12 changes: 11 additions & 1 deletion server/src/test/scala/za/co/absa/atum/server/api/TestData.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package za.co.absa.atum.server.api

import za.co.absa.atum.model.dto.{AdditionalDataSubmitDTO, AdditionalDataDTO, AtumContextDTO, CheckpointDTO, MeasureDTO, PartitioningSubmitDTO}
import za.co.absa.atum.model.dto.{AdditionalDataDTO, AdditionalDataSubmitDTO, AtumContextDTO, CheckpointDTO, MeasureDTO, PartitioningDTO, PartitioningSubmitDTO}

import java.time.ZonedDateTime
import java.util.UUID
Expand All @@ -35,6 +35,16 @@ trait TestData {
protected val partitioningSubmitDTO3: PartitioningSubmitDTO =
partitioningSubmitDTO1.copy(authorIfNew = "yetAnotherAuthor")

val additionalDataDTO: AdditionalDataDTO = Map(
"key1" -> Some("value1"),
"key2" -> None,
"key3" -> Some("value3")
)
// Partitioning DTO
protected val partitioningDTO1: PartitioningDTO = Seq.empty
protected val partitioningDTO2: PartitioningDTO = Seq.empty
protected val partitioningDTO3: PartitioningDTO = Seq.empty

// Measure
protected val measureDTO1: MeasureDTO = MeasureDTO("count", Seq("1"))
protected val measureDTO2: MeasureDTO = MeasureDTO("count", Seq("*"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,14 @@ class PartitioningControllerSpec extends ZIOSpecDefault with TestData {

private val partitioningServiceMock = mock(classOf[PartitioningService])

when(partitioningServiceMock.returnAtumContext(partitioningSubmitDTO1))
.thenReturn(ZIO.succeed(atumContextDTO1))
when(partitioningServiceMock.returnAtumContext(partitioningSubmitDTO2))
when(partitioningServiceMock.createPartitioningIfNotExists(partitioningSubmitDTO1))
.thenReturn(ZIO.right(()))
when(partitioningServiceMock.getPartitioningMeasures(partitioningDTO1))
.thenReturn(ZIO.succeed(Seq(measureDTO2)))
when(partitioningServiceMock.getPartitioningAdditionalData(partitioningDTO2))
.thenReturn(ZIO.succeed(Map.empty))

when(partitioningServiceMock.createPartitioningIfNotExists(partitioningSubmitDTO2))
.thenReturn(ZIO.fail(ServiceError("boom!")))

private val partitioningServiceMockLayer = ZLayer.succeed(partitioningServiceMock)
Expand All @@ -52,7 +57,10 @@ class PartitioningControllerSpec extends ZIOSpecDefault with TestData {
)
for {
result <- PartitioningController.createPartitioningIfNotExists(partitioningSubmitDTO1)
} yield assertTrue(result == expectedAtumContextDTO)
} yield assertTrue {
println(result)
result == expectedAtumContextDTO
}
},
test("Returns expected InternalServerErrorResponse") {
assertZIO(PartitioningController.createPartitioningIfNotExists(partitioningSubmitDTO2).exit)(
Expand All @@ -67,4 +75,5 @@ class PartitioningControllerSpec extends ZIOSpecDefault with TestData {

}


}
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,10 @@ class GetPartitioningAdditionalDataSpec extends ConfigProviderSpec {

suite("GetPartitioningAdditionalDataSuite")(
test("Returns expected sequence of (String, Option[String])") {
val partitioningSubmitDTO = PartitioningSubmitDTO(
partitioning = Seq(PartitionDTO("key1", "val1"), PartitionDTO("key2", "val2")),
parentPartitioning = Some(Seq(PartitionDTO("pKey1", "pVal1"), PartitionDTO("pKey2", "pVal2"))),
authorIfNew = "newAuthor"
)
val partitioningDTO = Seq(PartitionDTO("key1", "val1"), PartitionDTO("key2", "val2"))
for {
getPartitioningAdditionalData <- ZIO.service[GetPartitioningAdditionalData]
result <- getPartitioningAdditionalData(partitioningSubmitDTO)
result <- getPartitioningAdditionalData(partitioningDTO)
} yield assertTrue(result.nonEmpty)
}
).provide(
Expand Down
Loading

0 comments on commit d75b5dd

Please sign in to comment.