Skip to content

Commit

Permalink
Fixes #120 - Changed measures function definitions
Browse files Browse the repository at this point in the history
  • Loading branch information
TebaleloS committed Mar 19, 2024
1 parent b0949ba commit daa1a16
Show file tree
Hide file tree
Showing 5 changed files with 17 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ CREATE OR REPLACE FUNCTION runs.get_partitioning_additional_data(
$$
-------------------------------------------------------------------------------
--
-- Function: runs.get_partitioning_additional_data(2)
-- Function: runs.get_partitioning_additional_data(1)
-- Returns additional data for the given partitioning
--
-- Parameters:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import za.co.absa.atum.server.api.service.PartitioningService
import za.co.absa.atum.server.model.{ErrorResponse, GeneralErrorResponse, InternalServerErrorResponse}
import zio._
import zio.macros.accessible
import zio.prelude.data.Optional.AllValuesAreNullable

@accessible
trait PartitioningController {
Expand All @@ -39,8 +40,12 @@ class PartitioningControllerImpl(partitioningService: PartitioningService) exten
case Left(statusException) =>
ZIO.fail(GeneralErrorResponse(s"(${statusException.status.statusCode}) ${statusException.status.statusText}"))
case Right(_) =>
// partitioningService.getPartitioningMeasures(partitioning)
// .flatMap { measures => ZIO.succeed(AtumContextDTO) }

ZIO.succeed {
val measures: Set[MeasureDTO] = partitioningService.getPartitioningMeasures(partitioning)
.flatMap(measures => ZIO.succeed(measures)).getOrElse(Set.empty[MeasureDTO])
val additionalData = AdditionalDataDTO(additionalData = Map.empty)
AtumContextDTO(partitioning.partitioning, measures, additionalData)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,22 @@ import doobie.Fragment
import doobie.implicits.toSqlInterpolator
import doobie.util.Read
import play.api.libs.json.Json
import za.co.absa.atum.model.dto.PartitioningSubmitDTO
import za.co.absa.atum.server.api.database.PostgresDatabaseProvider
import za.co.absa.atum.model.dto.{MeasureDTO, PartitioningSubmitDTO}
import za.co.absa.atum.server.model.PartitioningForDB
import za.co.absa.fadb.DBSchema
import za.co.absa.fadb.doobie.{DoobieEngine, StatusWithData}
import za.co.absa.fadb.doobie.DoobieFunction.DoobieMultipleResultFunction
import za.co.absa.fadb.doobie.DoobieFunction.DoobieSingleResultFunctionWithStatus
import za.co.absa.fadb.status.handling.implementations.StandardStatusHandling
import za.co.absa.atum.server.api.database.PostgresDatabaseProvider
import za.co.absa.atum.server.api.database.runs.Runs
import zio._
import zio.interop.catz._

class GetPartitioningMeasures (implicit schema: DBSchema, dbEngine: DoobieEngine[Task])
extends DoobieMultipleResultFunction[PartitioningSubmitDTO, Unit, Task]
extends DoobieSingleResultFunctionWithStatus[PartitioningSubmitDTO, Seq[MeasureDTO], Task]
with StandardStatusHandling {

override def sql(values: PartitioningSubmitDTO)(implicit read: Read[StatusWithData[Unit]]): Fragment = {
override def sql(values: PartitioningSubmitDTO)(implicit read: Read[StatusWithData[Seq[MeasureDTO]]]): Fragment = {
val partitioning = PartitioningForDB.fromSeqPartitionDTO(values.partitioning)
val partitioningJsonString = Json.toJson(partitioning).toString

Expand All @@ -45,7 +45,6 @@ class GetPartitioningMeasures (implicit schema: DBSchema, dbEngine: DoobieEngine
partitioningJsonString
}
) ${Fragment.const(alias)};"""

}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package za.co.absa.atum.server.api.repository

import za.co.absa.atum.model.dto.PartitioningSubmitDTO
import za.co.absa.atum.model.dto.{MeasureDTO, PartitioningSubmitDTO}
import za.co.absa.atum.server.api.database.runs.functions.{CreatePartitioningIfNotExists, GetPartitioningMeasures}
import za.co.absa.atum.server.api.exception.DatabaseError
import za.co.absa.fadb.exceptions.StatusException
Expand All @@ -31,7 +31,7 @@ trait PartitioningRepository {

def getPartitioningMeasures(
partitioning: PartitioningSubmitDTO
): IO[DatabaseError, Either[StatusException, Unit]]
): IO[DatabaseError, Either[StatusException, Seq[MeasureDTO]]]
}

class PartitioningRepositoryImpl(createPartitioningIfNotExistsFn: CreatePartitioningIfNotExists,
Expand All @@ -53,7 +53,7 @@ class PartitioningRepositoryImpl(createPartitioningIfNotExistsFn: CreatePartitio
.tapError(error => ZIO.logError(s"Failed to create or retrieve partitioning in/from database: ${error.message}"))
}

override def getPartitioningMeasures(partitioning: PartitioningSubmitDTO): IO[DatabaseError, Either[StatusException, Unit]] = {
override def getPartitioningMeasures(partitioning: PartitioningSubmitDTO): IO[DatabaseError, Either[StatusException, Seq[MeasureDTO]]] = {
getPartitioningMeasuresFn(partitioning)
.tap {
case Left(statusException) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ trait PartitioningService {

def getPartitioningMeasures(
partitioning: PartitioningSubmitDTO
): IO[ServiceError, Either[StatusException, Unit]]
): IO[ServiceError, Either[StatusException, Seq[MeasureDTO]]]
}

class PartitioningServiceImpl(partitioningRepository: PartitioningRepository) extends PartitioningService {
Expand All @@ -45,13 +45,14 @@ class PartitioningServiceImpl(partitioningRepository: PartitioningRepository) ex
}
}

override def getPartitioningMeasures(partitioning: PartitioningSubmitDTO): Seq[MeasureDTO] = {
override def getPartitioningMeasures(partitioning: PartitioningSubmitDTO): IO[ServiceError, Either[StatusException, Seq[MeasureDTO]]] = {
partitioningRepository
.getPartitioningMeasures(partitioning)
.mapError { case DatabaseError(message) =>
ServiceError(s"Failed to retrieve partitioning measures: $message")
}
}

}

object PartitioningServiceImpl {
Expand Down

0 comments on commit daa1a16

Please sign in to comment.