Skip to content

Commit

Permalink
Fixes #120 - Added class to call partitioning measures function
Browse files Browse the repository at this point in the history
  • Loading branch information
TebaleloS committed Mar 11, 2024
1 parent d89812e commit 74af153
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@ import zio.macros.accessible
@accessible
trait PartitioningController {
def createPartitioningIfNotExists(partitioning: PartitioningSubmitDTO): IO[ErrorResponse, AtumContextDTO]
}

class PartitioningControllerImpl(partitioningService: PartitioningService) extends PartitioningController {
override def createPartitioningIfNotExists(partitioning: PartitioningSubmitDTO): IO[ErrorResponse, AtumContextDTO] = {
partitioningService
Expand All @@ -40,7 +38,7 @@ class PartitioningControllerImpl(partitioningService: PartitioningService) exten
ZIO.fail(GeneralErrorResponse(s"(${statusException.status.statusCode}) ${statusException.status.statusText}"))
case Right(_) =>
ZIO.succeed {
val measures: Set[MeasureDTO] = Set(MeasureDTO("count", Seq("*")))
val measures: Set[MeasureDTO] = Set(partitioningService.GetPartitioningMeasures(partitioning))
val additionalData = AdditionalDataDTO(additionalData = Map.empty)
AtumContextDTO(partitioning.partitioning, measures, additionalData)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package za.co.absa.atum.server.api.database.runs.functions

import doobie.Fragment
import doobie.implicits.toSqlInterpolator
import doobie.util.Read
import play.api.libs.json.Json
import za.co.absa.atum.model.dto.PartitioningSubmitDTO
import za.co.absa.atum.server.api.database.PostgresDatabaseProvider
import za.co.absa.atum.server.model.PartitioningForDB
import za.co.absa.fadb.DBSchema
import za.co.absa.fadb.doobie.{DoobieEngine, StatusWithData}
import za.co.absa.fadb.doobie.DoobieFunction.DoobieSingleResultFunctionWithStatus
import za.co.absa.fadb.status.handling.implementations.StandardStatusHandling
import za.co.absa.atum.server.api.database.runs.Runs
import zio._
import zio.interop.catz._

class GetPartitioningMeasures (implicit schema: DBSchema, dbEngine: DoobieEngine[Task])
extends DoobieSingleResultFunctionWithStatus[PartitioningSubmitDTO, Unit, Task]
with StandardStatusHandling {

override def sql(values: PartitioningSubmitDTO)(implicit read: Read[StatusWithData[Unit]]): Fragment = {
val partitioning = PartitioningForDB.fromSeqPartitionDTO(values.partitioning)
val partitioningJsonString = Json.toJson(partitioning).toString

sql"""SELECT ${Fragment.const(selectEntry)} FROM ${Fragment.const(functionName)}(
${
import za.co.absa.atum.server.api.database.DoobieImplicits.Jsonb.jsonbPutUsingString
partitioningJsonString
}
) ${Fragment.const(alias)};"""

}

}

object GetPartitioningMeasures {
val layer: URLayer[PostgresDatabaseProvider, GetPartitioningMeasures] = ZLayer {
for {
dbProvider <- ZIO.service[PostgresDatabaseProvider]
} yield new GetPartitioningMeasures()(Runs, dbProvider.dbEngine)
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package za.co.absa.atum.server.api.repository

import za.co.absa.atum.model.dto.PartitioningSubmitDTO
import za.co.absa.atum.server.api.database.runs.functions.CreatePartitioningIfNotExists
import za.co.absa.atum.server.api.database.runs.functions.{CreatePartitioningIfNotExists, GetPartitioningMeasures}
import za.co.absa.atum.server.api.exception.DatabaseError
import za.co.absa.fadb.exceptions.StatusException
import zio._
Expand All @@ -28,6 +28,10 @@ trait PartitioningRepository {
def createPartitioningIfNotExists(
partitioning: PartitioningSubmitDTO
): IO[DatabaseError, Either[StatusException, Unit]]

def getPartitioningMeasures(
partitioning: PartitioningSubmitDTO
): IO[DatabaseError, Either[StatusException, Unit]]
}

class PartitioningRepositoryImpl(createPartitioningIfNotExistsFn: CreatePartitioningIfNotExists)
Expand All @@ -47,6 +51,20 @@ class PartitioningRepositoryImpl(createPartitioningIfNotExistsFn: CreatePartitio
.mapError(error => DatabaseError(error.getMessage))
.tapError(error => ZIO.logError(s"Failed to create or retrieve partitioning in/from database: ${error.message}"))
}

override def getPartitioningMeasures(partitioning: PartitioningSubmitDTO): IO[DatabaseError, Either[StatusException, Unit]] = {
getPartitioningMeasures(partitioning)
.tap {
case Left(statusException) =>
ZIO.logError(
s"Partitioning measures retrieve operation exception: (${statusException.status.statusCode}) ${statusException.status.statusText}"
)
case Right(_) =>
ZIO.logDebug("Partitioning measures successfully retrieved from database.")
}
.mapError(error => DatabaseError(error.getMessage))
.tapError(error => ZIO.logError(s"Failed to retrieve partitioning measures from database: ${error.message}"))
}
}

object PartitioningRepositoryImpl {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ trait PartitioningService {
def createPartitioningIfNotExists(
partitioning: PartitioningSubmitDTO
): IO[ServiceError, Either[StatusException, Unit]]

def GetPartitioningMeasures(
partitioning: PartitioningSubmitDTO
): IO[ServiceError, Either[StatusException, Unit]]
}

class PartitioningServiceImpl(partitioningRepository: PartitioningRepository) extends PartitioningService {
Expand All @@ -40,6 +44,14 @@ class PartitioningServiceImpl(partitioningRepository: PartitioningRepository) ex
ServiceError(s"Failed to create or retrieve partitioning: $message")
}
}

override def GetPartitioningMeasures(partitioning: PartitioningSubmitDTO): IO[ServiceError, Either[StatusException, Unit]] = {
partitioningRepository
.getPartitioningMeasures(partitioning)
.mapError { case DatabaseError(message) =>
ServiceError(s"Failed to retrieve partitioning measures: $message")
}
}
}

object PartitioningServiceImpl {
Expand Down

0 comments on commit 74af153

Please sign in to comment.