Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

219: Suggestion - Doobie multiple results with aggregated status #236

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
c582b79
Update fa-db and associated imports
TebaleloS Jul 10, 2024
da3f9f6
Merge branch 'master' into feature/#219-DoobieMultipleResultFunctionW…
TebaleloS Jul 11, 2024
a85e672
Implicit encoders and decoders for PartitioningDTO
TebaleloS Jul 11, 2024
2de71dd
adding value parameter toFragmentsSeq
TebaleloS Jul 12, 2024
0176678
modifying the dbFunction call classes and PartitioningRepository
TebaleloS Jul 15, 2024
6a3474f
Modifying getFlowCheckpoints
TebaleloS Jul 15, 2024
1f6b20a
Modifying partitioning service function signatures
TebaleloS Jul 16, 2024
b1bbd32
partitioning service
TebaleloS Jul 17, 2024
166f506
Refactoring
TebaleloS Jul 18, 2024
953a99a
Refactoring repository services
TebaleloS Jul 18, 2024
075c621
refactoring serviceCallWithStatus in controller
TebaleloS Jul 18, 2024
933688b
incorporating statusCode dbCallWithStatus
TebaleloS Jul 19, 2024
d70a8c1
refactoring partitioning and test cases
TebaleloS Jul 23, 2024
c5dc61f
Fixing test cases
TebaleloS Jul 24, 2024
b31d9a5
Fixing PartitioningRepositoryUnitTests
TebaleloS Jul 26, 2024
ee594e4
Fixing PartitioningRepositoryUnitTests
TebaleloS Jul 26, 2024
d684420
Fixed PartitioningRepositoryUnitTests
TebaleloS Jul 26, 2024
1908297
Fixed WriteCheckpointRepositoryUnitTests
TebaleloS Jul 26, 2024
fa6e076
Implement defaultErrorHandler method
TebaleloS Jul 26, 2024
d51b81d
optimising partitioningRepository base repository
TebaleloS Jul 26, 2024
bff113b
Fixing CheckpointServiceUnitTests
TebaleloS Jul 26, 2024
dce3de7
Fixing PartitioningServiceUnitTests
TebaleloS Jul 26, 2024
d53a58a
remove Unit and unused imports from PartitioningServiceUnitTests
TebaleloS Jul 26, 2024
1a4bfb1
remove Unit and unused imports from CheckpointServiceUnitTests
TebaleloS Jul 26, 2024
6b90db1
Adding AdditionalDataFromDB, upgrading fa-db to 5, and fixing bugs
TebaleloS Jul 30, 2024
9292faa
Adding AdditionalDataFromDB, upgrading fa-db to 5, and fixing bugs
TebaleloS Jul 30, 2024
e26ce12
Fixing test cases
TebaleloS Jul 30, 2024
bb2b202
Addressing GitHub comments
TebaleloS Aug 1, 2024
f02bf72
Re-addressing GitHub comments
TebaleloS Aug 1, 2024
c96e90a
Addind Todo comment
TebaleloS Aug 1, 2024
3005404
Fixing format comments
TebaleloS Aug 2, 2024
64cbf14
219: Suggestion - Doobie multiple results with aggregated status
benedeki Aug 3, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,18 @@
package za.co.absa.atum.model


import io.circe.generic.semiauto._
import io.circe._

package object dto {
type PartitioningDTO = Seq[PartitionDTO]
type AdditionalDataDTO = Map[String, Option[String]]

// Todo. This implicit definition should not be defined here, so it is to be addressed in Ticket #221
// Implicit encoders and decoders for AdditionalDataDTO
implicit val decodeAdditionalDataDTO: Decoder[AdditionalDataDTO] = Decoder.decodeMap[String, Option[String]]
implicit val encodeAdditionalDataDTO: Encoder[AdditionalDataDTO] = Encoder.encodeMap[String, Option[String]]

// Implicit encoders and decoders for PartitioningDTO
implicit val decodePartitioningDTO: Decoder[PartitioningDTO] = Decoder.decodeSeq[PartitionDTO]
implicit val encodePartitioningDTO: Encoder[PartitioningDTO] = Encoder.encodeSeq[PartitionDTO]
}
6 changes: 2 additions & 4 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ object Dependencies {

val postgresql = "42.6.0"

val fadb = "0.3.0"
val fadb = "0.5.0"

val logback = "1.2.3"

Expand Down Expand Up @@ -105,7 +105,7 @@ object Dependencies {
val zioOrg = "dev.zio"
val tapirOrg = "com.softwaremill.sttp.tapir"
val http4sOrg = "org.http4s"
val faDbOrg = "za.co.absa.fa-db"
val faDbOrg = "za.co.absa.db.fa-db"
val sbtOrg = "com.github.sbt"
val logbackOrg = "ch.qos.logback"
val awsSdkOrg = "software.amazon.awssdk"
Expand Down Expand Up @@ -147,7 +147,6 @@ object Dependencies {

// Fa-db
lazy val faDbDoobie = faDbOrg %% "doobie" % Versions.fadb
lazy val pgCirceDoobie = "org.tpolecat" %% "doobie-postgres-circe" % "1.0.0-RC2"

// aws
lazy val awsSecretsManagerSdk = awsSdkOrg % "secretsmanager" % Versions.awssdk
Expand All @@ -160,7 +159,6 @@ object Dependencies {

Seq(
faDbDoobie,
pgCirceDoobie,
zioCore,
zioMacros,
zioLogging,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,8 @@
package za.co.absa.atum.server.api.controller

import za.co.absa.atum.server.api.exception.ServiceError
import za.co.absa.atum.server.model.{ErrorResponse, GeneralErrorResponse, InternalServerErrorResponse}
import za.co.absa.atum.server.model.{ErrorResponse, InternalServerErrorResponse}
import za.co.absa.atum.server.model.SuccessResponse.{MultiSuccessResponse, SingleSuccessResponse}
import za.co.absa.fadb.exceptions.StatusException
import zio._

trait BaseController {
Expand All @@ -39,24 +38,6 @@ trait BaseController {

}

def serviceCallWithStatus[A, B](
serviceCall: IO[ServiceError, Either[StatusException, A]],
onSuccessFnc: A => B
): IO[ErrorResponse, B] = {

serviceCall
.mapError { serviceError: ServiceError =>
InternalServerErrorResponse(serviceError.message)
}
.flatMap {
case Left(statusException) =>
ZIO.fail(GeneralErrorResponse(s"(${statusException.status.statusCode}) ${statusException.status.statusText}"))
case Right(result) =>
ZIO.succeed(onSuccessFnc(result))
}

}

protected def mapToSingleSuccessResponse[A](
effect: IO[ErrorResponse, A]
): IO[ErrorResponse, SingleSuccessResponse[A]] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class CheckpointControllerImpl(checkpointService: CheckpointService) extends Che
override def createCheckpointV1(
checkpointDTO: CheckpointDTO
): IO[ErrorResponse, CheckpointDTO] = {
serviceCallWithStatus[Unit, CheckpointDTO](
serviceCall[Unit, CheckpointDTO](
checkpointService.saveCheckpoint(checkpointDTO),
_ => checkpointDTO
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class PartitioningControllerImpl(partitioningService: PartitioningService)
additionalData: AdditionalDataSubmitDTO
): IO[ErrorResponse, SingleSuccessResponse[AdditionalDataSubmitDTO]] = {
mapToSingleSuccessResponse(
serviceCallWithStatus[Unit, AdditionalDataSubmitDTO](
serviceCall[Unit, AdditionalDataSubmitDTO](
partitioningService.createOrUpdateAdditionalData(additionalData),
_ => additionalData
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,97 +16,18 @@

package za.co.absa.atum.server.api.database

import cats.Show
import cats.data.NonEmptyList

import doobie.postgres.implicits._
import doobie.{Get, Put}
import io.circe.{Json => CirceJson}
import org.postgresql.jdbc.PgArray
import org.postgresql.util.PGobject
import io.circe.parser._

import scala.util.Try

package object DoobieImplicits {

private implicit val showPgArray: Show[PgArray] = Show.fromToString
object DoobieImplicits {

implicit val getMapWithOptionStringValues: Get[Map[String, Option[String]]] = Get[Map[String, String]]
.tmap(map => map.map { case (k, v) => k -> Option(v) })

private def circeJsonListToPGJsonArrayString(jsonList: List[CirceJson]): String = {
val arrayElements = jsonList.map { x =>
// Convert to compact JSON string and escape inner quotes
val escapedJsonString = x.noSpaces.replace("\"", "\\\"")
// Wrap in double quotes for the array element
s""""$escapedJsonString""""
}

arrayElements.mkString("{", ",", "}")
}

private def pgArrayToListOfCirceJson(pgArray: PgArray): Either[String, List[CirceJson]] = {
Try {
Option(pgArray.getArray) match {
case Some(array: Array[_]) => array.collect {
case str: String => parse(str).toTry.get
case other => parse(other.toString).toTry.get
}.toList
case None => List.empty[CirceJson]
case _ => throw new IllegalArgumentException("Unexpected type encountered.")
}
}
.toEither
.left.map(_.getMessage)
}

object Sequence {

implicit val get: Get[Seq[String]] = Get[List[String]].map(_.toSeq)
implicit val put: Put[Seq[String]] = Put[List[String]].contramap(_.toList)

}

object Json {

implicit val jsonArrayPut: Put[List[CirceJson]] = {
Put.Advanced
.other[PGobject](
NonEmptyList.of("json[]")
)
.tcontramap { a =>
val o = new PGobject
o.setType("json[]")
o.setValue(circeJsonListToPGJsonArrayString(a))
o
}
}

implicit val jsonArrayGet: Get[List[CirceJson]] = {
Get.Advanced
.other[PgArray](
NonEmptyList.of("json[]")
)
.temap(pgArray => pgArrayToListOfCirceJson(pgArray))
}

}

object Jsonb {

implicit val jsonbArrayPut: Put[List[CirceJson]] = {
Put.Advanced
.other[PGobject](
NonEmptyList.of("jsonb[]")
)
.tcontramap { a =>
val o = new PGobject
o.setType("jsonb[]")
o.setValue(circeJsonListToPGJsonArrayString(a))
o
}
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package za.co.absa.atum.server.api.database

import doobie.Transactor
import za.co.absa.fadb.doobie.DoobieEngine
import za.co.absa.db.fadb.doobie.DoobieEngine
import zio._
import zio.interop.catz._

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package za.co.absa.atum.server.api.database.flows

import za.co.absa.fadb.DBSchema
import za.co.absa.fadb.naming.implementations.SnakeCaseNaming.Implicits._
import za.co.absa.db.fadb.DBSchema
import za.co.absa.db.fadb.naming.implementations.SnakeCaseNaming.Implicits._

object Flows extends DBSchema
Original file line number Diff line number Diff line change
Expand Up @@ -16,50 +16,43 @@

package za.co.absa.atum.server.api.database.flows.functions

import doobie.Fragment
import doobie.implicits.toSqlInterpolator
import doobie.util.Read
import za.co.absa.atum.model.dto.CheckpointQueryDTO
import za.co.absa.atum.server.api.database.PostgresDatabaseProvider
import za.co.absa.atum.server.api.database.flows.Flows
import za.co.absa.atum.server.model.{CheckpointFromDB, PartitioningForDB}
import za.co.absa.fadb.DBSchema
import za.co.absa.fadb.doobie.DoobieEngine
import za.co.absa.fadb.doobie.DoobieFunction.DoobieMultipleResultFunction
import za.co.absa.db.fadb.DBSchema
import za.co.absa.db.fadb.doobie.DoobieEngine
import za.co.absa.db.fadb.doobie.DoobieFunction.DoobieMultipleResultFunctionWithAggStatus
import zio._
import zio.interop.catz._

import za.co.absa.atum.server.api.database.DoobieImplicits.Sequence.get

import doobie.postgres.implicits._
import doobie.postgres.circe.jsonb.implicits.jsonbPut
import doobie.postgres.circe.json.implicits.jsonGet
import za.co.absa.db.fadb.doobie.postgres.circe.implicits.{jsonbGet, jsonbPut}
import io.circe.syntax.EncoderOps
import za.co.absa.db.fadb.status.aggregation.implementations.ByFirstErrorStatusAggregator
import za.co.absa.db.fadb.status.handling.implementations.StandardStatusHandling

class GetFlowCheckpoints(implicit schema: DBSchema, dbEngine: DoobieEngine[Task])
extends DoobieMultipleResultFunction[CheckpointQueryDTO, CheckpointFromDB, Task] {

extends DoobieMultipleResultFunctionWithAggStatus[CheckpointQueryDTO, CheckpointFromDB, Task](
values => Seq(
fr"${PartitioningForDB.fromSeqPartitionDTO(values.partitioning).asJson}",
fr"${values.limit}",
fr"${values.checkpointName}"
)
) with StandardStatusHandling with ByFirstErrorStatusAggregator {
override val fieldsToSelect: Seq[String] = Seq(
"status",
"status_text",
"id_checkpoint",
"checkpoint_name",
"author",
"measured_by_atum_agent",
"measure_name", "measured_columns", "measurement_value",
"checkpoint_start_time", "checkpoint_end_time",
"measure_name",
"measured_columns",
"measurement_value",
"checkpoint_start_time",
"checkpoint_end_time"
)

override def sql(values: CheckpointQueryDTO)(implicit read: Read[CheckpointFromDB]): Fragment = {
val partitioning = PartitioningForDB.fromSeqPartitionDTO(values.partitioning)
val partitioningNormalized = partitioning.asJson

sql"""SELECT ${Fragment.const(selectEntry)}
FROM ${Fragment.const(functionName)}(
$partitioningNormalized,
${values.limit},
${values.checkpointName}
) AS ${Fragment.const(alias)};"""
}

}

object GetFlowCheckpoints {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package za.co.absa.atum.server.api.database.runs

import za.co.absa.fadb.DBSchema
import za.co.absa.fadb.naming.implementations.SnakeCaseNaming.Implicits._
import za.co.absa.db.fadb.DBSchema
import za.co.absa.db.fadb.naming.implementations.SnakeCaseNaming.Implicits._

object Runs extends DBSchema
Original file line number Diff line number Diff line change
Expand Up @@ -16,42 +16,29 @@

package za.co.absa.atum.server.api.database.runs.functions

import doobie.Fragment
import doobie.implicits.toSqlInterpolator
import doobie.util.Read
import za.co.absa.atum.model.dto.AdditionalDataSubmitDTO
import za.co.absa.atum.server.api.database.PostgresDatabaseProvider
import za.co.absa.atum.server.api.database.runs.Runs
import za.co.absa.atum.server.model.PartitioningForDB
import za.co.absa.fadb.DBSchema
import za.co.absa.fadb.doobie.DoobieFunction.DoobieSingleResultFunctionWithStatus
import za.co.absa.fadb.doobie.{DoobieEngine, StatusWithData}
import za.co.absa.fadb.status.handling.implementations.StandardStatusHandling
import za.co.absa.db.fadb.DBSchema
import za.co.absa.db.fadb.doobie.DoobieFunction.DoobieSingleResultFunctionWithStatus
import za.co.absa.db.fadb.doobie.DoobieEngine
import za.co.absa.db.fadb.status.handling.implementations.StandardStatusHandling
import zio._
import zio.interop.catz._
import io.circe.syntax._

import doobie.postgres.implicits._
import doobie.postgres.circe.jsonb.implicits.jsonbPut
import za.co.absa.db.fadb.doobie.postgres.circe.implicits.jsonbPut

class CreateOrUpdateAdditionalData(implicit schema: DBSchema, dbEngine: DoobieEngine[Task])
extends DoobieSingleResultFunctionWithStatus[AdditionalDataSubmitDTO, Unit, Task]
with StandardStatusHandling {

override def sql(values: AdditionalDataSubmitDTO)(implicit read: Read[StatusWithData[Unit]]): Fragment = {
val partitioning = PartitioningForDB.fromSeqPartitionDTO(values.partitioning)
val partitioningJson = partitioning.asJson

// implicits from Doobie can't handle Map[String, Option[String]] -> HStore, so we converted None to null basically
val additionalDataNormalized = values.additionalData.map{ case (k, v) => (k, v.orNull)}

sql"""SELECT ${Fragment.const(selectEntry)} FROM ${Fragment.const(functionName)}(
$partitioningJson,
$additionalDataNormalized,
${values.author}
) ${Fragment.const(alias)};"""
}
}
extends DoobieSingleResultFunctionWithStatus[AdditionalDataSubmitDTO, Unit, Task](
values => Seq(
fr"${PartitioningForDB.fromSeqPartitionDTO(values.partitioning).asJson}",
fr"${values.additionalData.map{ case (k, v) => (k, v.orNull)}}",
fr"${values.author}"
)
) with StandardStatusHandling

object CreateOrUpdateAdditionalData {
val layer: URLayer[PostgresDatabaseProvider, CreateOrUpdateAdditionalData] = ZLayer {
Expand Down
Loading
Loading