diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 113e0e2c..3d91d546 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -51,8 +51,10 @@ jobs: with: path: | ${{ github.workspace }}/core/target/scala-${{ matrix.scalaShort }}/jacoco/report/jacoco.xml + ${{ github.workspace }}/streaming/target/scala-${{ matrix.scalaShort }}/jacoco/report/jacoco.xml ${{ github.workspace }}/examples/target/scala-${{ matrix.scalaShort }}/jacoco/report/jacoco.xml ${{ github.workspace }}/slick/target/scala-${{ matrix.scalaShort }}/jacoco/report/jacoco.xml + ${{ github.workspace }}/slick-streaming/target/scala-${{ matrix.scalaShort }}/jacoco/report/jacoco.xml ${{ github.workspace }}/doobie/target/scala-${{ matrix.scalaShort }}/jacoco/report/jacoco.xml key: ${{ runner.os }}-${{ matrix.scalaShort }}-${{ hashFiles('**/jacoco.xml') }} @@ -79,8 +81,10 @@ jobs: with: path: | ${{ github.workspace }}/core/target/scala-${{ matrix.scalaShort }}/jacoco/report/jacoco.xml + ${{ github.workspace }}/streaming/target/scala-${{ matrix.scalaShort }}/jacoco/report/jacoco.xml ${{ github.workspace }}/examples/target/scala-${{ matrix.scalaShort }}/jacoco/report/jacoco.xml ${{ github.workspace }}/slick/target/scala-${{ matrix.scalaShort }}/jacoco/report/jacoco.xml + ${{ github.workspace }}/slick-streaming/target/scala-${{ matrix.scalaShort }}/jacoco/report/jacoco.xml ${{ github.workspace }}/doobie/target/scala-${{ matrix.scalaShort }}/jacoco/report/jacoco.xml key: ${{ runner.os }}-${{ matrix.scalaShort }}-${{ hashFiles('**/jacoco.xml') }} - name: Setup Scala @@ -93,7 +97,9 @@ jobs: with: paths: > ${{ github.workspace }}/core/target/scala-${{ matrix.scalaShort }}/jacoco/report/jacoco.xml, + ${{ github.workspace }}/streaming/target/scala-${{ matrix.scalaShort }}/jacoco/report/jacoco.xml, ${{ github.workspace }}/slick/target/scala-${{ matrix.scalaShort }}/jacoco/report/jacoco.xml + ${{ github.workspace }}/slick-streaming/target/scala-${{ matrix.scalaShort }}/jacoco/report/jacoco.xml ${{ github.workspace }}/doobie/target/scala-${{ matrix.scalaShort }}/jacoco/report/jacoco.xml # examples don't need code coverage - at least not now token: ${{ secrets.GITHUB_TOKEN }} diff --git a/README.md b/README.md index f3622d12..92542a6f 100644 --- a/README.md +++ b/README.md @@ -14,6 +14,8 @@ ___ - [Usage](#usage) - [Concepts](#concepts) - [Slick module](#slick-module) +- [Slick streaming module](#slick-streaming-module) +- [Doobie module](#doobie-module) - [Testing](#testing) - [How to Release](#how-to-release) @@ -54,10 +56,12 @@ Currently, the library is developed with Postgres as the target DB. But the appr #### Sbt -Import one of the two available module at the moment. Slick module works with Scala Futures. Doobie module works with any effect type (typically IO or ZIO) provided cats effect's Async instance is available. +Slick module works with Scala Futures. Slick-streaming provides also streaming support for Slick with fs2 streams and any effect type provided cats effect's Async instance is available. +Doobie module provides both non-streaming and streaming functions and works with any effect type (typically IO or ZIO) provided cats effect's Async instance is available. ```scala libraryDependencies *= "za.co.absa.fa-db" %% "slick" % "X.Y.Z" +libraryDependencies *= "za.co.absa.fa-db" %% "slick-streaming" % "X.Y.Z" libraryDependencies *= "za.co.absa.fa-db" %% "doobie" % "X.Y.Z" ``` @@ -67,7 +71,9 @@ libraryDependencies *= "za.co.absa.fa-db" %% "doobie" % "X.Y.Z" Modules: * Core [![Maven Central](https://maven-badges.herokuapp.com/maven-central/za.co.absa.fa-db/core_2.12/badge.svg)](https://maven-badges.herokuapp.com/maven-central/za.co.absa.fa-db/core_2.12) +* Streaming [![Maven Central](https://maven-badges.herokuapp.com/maven-central/za.co.absa.fa-db/streaming_2.12/badge.svg)](https://maven-badges.herokuapp.com/maven-central/za.co.absa.fa-db/streaming_2.12) * Slick [![Maven Central](https://maven-badges.herokuapp.com/maven-central/za.co.absa.fa-db/slick_2.12/badge.svg)](https://maven-badges.herokuapp.com/maven-central/za.co.absa.fa-db/slick_2.12) +* Slick-streaming [![Maven Central](https://maven-badges.herokuapp.com/maven-central/za.co.absa.fa-db/slick-streaming_2.12/badge.svg)](https://maven-badges.herokuapp.com/maven-central/za.co.absa.fa-db/slick-streaming_2.12) * Doobie [![Maven Central](https://maven-badges.herokuapp.com/maven-central/za.co.absa.fa-db/doobie_2.12/badge.svg)](https://maven-badges.herokuapp.com/maven-central/za.co.absa.fa-db/doobie_2.12) ```xml @@ -76,6 +82,11 @@ Modules: slick_2.12 ${latest_version} + + za.co.absa.fa-db + slick-streaming_2.12 + ${latest_version} + za.co.absa.fa-db doobie_2.12 @@ -86,7 +97,9 @@ Modules: ### Scala 2.13 Modules: * Core [![Maven Central](https://maven-badges.herokuapp.com/maven-central/za.co.absa.fa-db/core_2.13/badge.svg)](https://maven-badges.herokuapp.com/maven-central/za.co.absa.fa-db/core_2.13) +* Streaming [![Maven Central](https://maven-badges.herokuapp.com/maven-central/za.co.absa.fa-db/streaming_2.13/badge.svg)](https://maven-badges.herokuapp.com/maven-central/za.co.absa.fa-db/streaming_2.13) * Slick [![Maven Central](https://maven-badges.herokuapp.com/maven-central/za.co.absa.fa-db/slick_2.13/badge.svg)](https://maven-badges.herokuapp.com/maven-central/za.co.absa.fa-db/slick_2.13) +* Slick-streaming [![Maven Central](https://maven-badges.herokuapp.com/maven-central/za.co.absa.fa-db/slick-streaming_2.13/badge.svg)](https://maven-badges.herokuapp.com/maven-central/za.co.absa.fa-db/slick-streaming_2.13) * Doobie [![Maven Central](https://maven-badges.herokuapp.com/maven-central/za.co.absa.fa-db/doobie_2.13/badge.svg)](https://maven-badges.herokuapp.com/maven-central/za.co.absa.fa-db/doobie_2.13) ```xml @@ -95,6 +108,11 @@ Modules: slick_2.13 ${latest_version} + + za.co.absa.fa-db + slick-streaming_2.13 + ${latest_version} + za.co.absa.fa-db doobie_2.13 @@ -141,6 +159,13 @@ val hStore: Option[Map[String, String]] = pr.nextHStoreOption val macAddr: Option[MacAddrString] = pr.nextMacAddrOption ``` +## Slick streaming module + +It additionally brings: + +* `class SlickPgStreamingEngine` - implementation of _Streaming_'s `DBStreamingEngine` executing the queries via Slick in a streaming fashion +* `class SlickStreamingResultFunction` - abstract class for DB functions returning sequence of results (fs2.Stream) in a streaming fashion + ## Doobie module As the name suggests it runs on [Doobie library](https://tpolecat.github.io/doobie/). The main benefit of the module is that it allows to use any effect type (typically IO or ZIO) therefore is more suitable for functional programming. It also brings in the [Doobie-Postgres library](https://tpolecat.github.io/doobie/docs/14-PostgreSQL.html) for extended Postgres type support. @@ -148,11 +173,13 @@ As the name suggests it runs on [Doobie library](https://tpolecat.github.io/doob It brings: * `class DoobieEngine` - implementation of _Core_'s `DBEngine` executing the queries via Doobie. The class is type parameterized with the effect type. +* `class DoobieStreamingEngine` - implementation of _Streamings_'s `DBStreamingEngine` executing the queries via Doobie in a streaming fashion. The class is type parameterized with the effect type. * `class DoobieSingleResultFunction` - abstract class for DB functions returning single result * `class DoobieMultipleResultFunction` - abstract class for DB functions returning sequence of results * `class DoobieOptionalResultFunction` - abstract class for DB functions returning optional result * `class DoobieSingleResultFunctionWithStatus` - abstract class for DB functions with status handling; it requires an implementation of `StatusHandling` to be mixed-in (`StandardStatusHandling` available out-of-the-box) - +* `class DoobieStreamingResultFunction` - abstract class for DB functions returning sequence of results (fs2.Stream) in a streaming fashion +* Since Doobie also interoperates with ZIO, there is an example of how a database connection can be properly established within a ZIO application. Please see [this file](doobie/zio-setup.md) for more details. ## Testing diff --git a/build.sbt b/build.sbt index 494eb38e..88cb3763 100644 --- a/build.sbt +++ b/build.sbt @@ -49,7 +49,7 @@ lazy val commonJacocoExcludes: Seq[String] = Seq( ) lazy val parent = (project in file(".")) - .aggregate(faDbCore, faDBSlick, faDBDoobie, faDBExamples) + .aggregate(faDbCore, faDBStreaming, faDBSlick, faDBSlickStreaming, faDBDoobie, faDBExamples) .settings( name := "root", libraryDependencies ++= rootDependencies(scalaVersion.value), @@ -73,6 +73,21 @@ lazy val faDbCore = (project in file("core")) jacocoExcludes := commonJacocoExcludes ) +lazy val faDBStreaming = (project in file("streaming")) + .configs(IntegrationTest) + .settings( + name := "streaming", + libraryDependencies ++= streamingDependencies(scalaVersion.value), + javacOptions ++= commonJavacOptions, + scalacOptions ++= commonScalacOptions, + (Compile / compile) := ((Compile / compile) dependsOn printScalaVersion).value, + Defaults.itSettings, + ).dependsOn(faDbCore) + .settings( + jacocoReportSettings := commonJacocoReportSettings.withTitle(s"fa-db:streaming Jacoco Report - scala:${scalaVersion.value}"), + jacocoExcludes := commonJacocoExcludes + ) + lazy val faDBSlick = (project in file("slick")) .configs(IntegrationTest) .settings( @@ -80,7 +95,7 @@ lazy val faDBSlick = (project in file("slick")) libraryDependencies ++= slickDependencies(scalaVersion.value), javacOptions ++= commonJavacOptions, scalacOptions ++= commonScalacOptions, - (Compile / compile) := ((Compile / compile) dependsOn printScalaVersion).value, // printScalaVersion is run with compile + (Compile / compile) := ((Compile / compile) dependsOn printScalaVersion).value, Defaults.itSettings, ).dependsOn(faDbCore) .settings( @@ -88,6 +103,21 @@ lazy val faDBSlick = (project in file("slick")) jacocoExcludes := commonJacocoExcludes ) +lazy val faDBSlickStreaming = (project in file("slick-streaming")) + .configs(IntegrationTest) + .settings( + name := "slick-streaming", + libraryDependencies ++= slickStreamingDependencies(scalaVersion.value), + javacOptions ++= commonJavacOptions, + scalacOptions ++= commonScalacOptions, + (Compile / compile) := ((Compile / compile) dependsOn printScalaVersion).value, + Defaults.itSettings, + ).dependsOn(faDbCore, faDBStreaming, faDBSlick) + .settings( + jacocoReportSettings := commonJacocoReportSettings.withTitle(s"fa-db:slick-streaming Jacoco Report - scala:${scalaVersion.value}"), + jacocoExcludes := commonJacocoExcludes + ) + lazy val faDBDoobie = (project in file("doobie")) .configs(IntegrationTest) .settings( @@ -96,7 +126,7 @@ lazy val faDBDoobie = (project in file("doobie")) javacOptions ++= commonJavacOptions, scalacOptions ++= commonScalacOptions, Defaults.itSettings, - ).dependsOn(faDbCore) + ).dependsOn(faDbCore, faDBStreaming) .settings( jacocoReportSettings := commonJacocoReportSettings.withTitle(s"fa-db:doobie Jacoco Report - scala:${scalaVersion.value}"), jacocoExcludes := commonJacocoExcludes @@ -108,7 +138,7 @@ lazy val faDBExamples = (project in file("examples")) name := "examples", libraryDependencies ++= examplesDependencies(scalaVersion.value), Test / parallelExecution := false, - (Compile / compile) := ((Compile / compile) dependsOn printScalaVersion).value, // printScalaVersion is run with compile + (Compile / compile) := ((Compile / compile) dependsOn printScalaVersion).value, publish / skip := true ).dependsOn(faDbCore, faDBSlick) .settings( diff --git a/core/src/main/scala/za/co/absa/fadb/DBEngine.scala b/core/src/main/scala/za/co/absa/fadb/DBEngine.scala index d282749d..a81eb9a7 100644 --- a/core/src/main/scala/za/co/absa/fadb/DBEngine.scala +++ b/core/src/main/scala/za/co/absa/fadb/DBEngine.scala @@ -23,7 +23,7 @@ import za.co.absa.fadb.exceptions.StatusException import scala.language.higherKinds /** - * `DBEngine` is an abstract class that represents a database engine. + * [[DBEngine]] is an abstract class that represents a database engine. * It provides methods to execute queries and fetch results from a database. * @tparam F - The type of the context in which the database queries are executed. */ diff --git a/core/src/main/scala/za/co/absa/fadb/DBFunction.scala b/core/src/main/scala/za/co/absa/fadb/DBFunction.scala index ad8da2df..7dafb018 100644 --- a/core/src/main/scala/za/co/absa/fadb/DBFunction.scala +++ b/core/src/main/scala/za/co/absa/fadb/DBFunction.scala @@ -22,7 +22,7 @@ import za.co.absa.fadb.status.handling.StatusHandling import scala.language.higherKinds /** - * `DBFunction` is an abstract class that represents a database function. + * [[DBFunction]] is an abstract class that represents a database function. * @param functionNameOverride - Optional parameter to override the class name if it does not match the database function name. * @param schema - The schema the function belongs to. * @param dBEngine - The database engine that is supposed to execute the function (contains connection to the database). @@ -73,7 +73,7 @@ abstract class DBFunction[I, R, E <: DBEngine[F], F[_]](functionNameOverride: Op } /** - * `DBFunctionWithStatus` is an abstract class that represents a database function with a status. + * [[DBFunctionWithStatus]] is an abstract class that represents a database function with a status. * It extends the [[DBFunction]] class and adds handling for the status of the function invocation. * @param functionNameOverride - Optional parameter to override the class name if it does not match the database function name. * @param schema - The schema the function belongs to. @@ -100,7 +100,7 @@ abstract class DBFunctionWithStatus[I, R, E <: DBEngine[F], F[_]](functionNameOv /** * Executes the database function and returns multiple results. - * @param values + * @param values the values to pass over to the database function. * @return A sequence of results from the database function. */ def apply(values: I): F[Either[StatusException, R]] = dBEngine.runWithStatus(query(values)) @@ -131,7 +131,7 @@ abstract class DBFunctionWithStatus[I, R, E <: DBEngine[F], F[_]](functionNameOv object DBFunction { /** - * `DBMultipleResultFunction` is an abstract class that represents a database function returning multiple results. + * [[DBMultipleResultFunction]] is an abstract class that represents a database function returning multiple results. * It extends the [[DBFunction]] class and overrides the apply method to return a sequence of results. */ abstract class DBMultipleResultFunction[I, R, E <: DBEngine[F], F[_]]( @@ -155,7 +155,7 @@ object DBFunction { } /** - * `DBSingleResultFunction` is an abstract class that represents a database function returning a single result. + * [[DBSingleResultFunction]] is an abstract class that represents a database function returning a single result. * It extends the [[DBFunction]] class and overrides the apply method to return a single result. */ abstract class DBSingleResultFunction[I, R, E <: DBEngine[F], F[_]]( @@ -178,7 +178,7 @@ object DBFunction { } /** - * `DBOptionalResultFunction` is an abstract class that represents a database function returning an optional result. + * [[DBOptionalResultFunction]] is an abstract class that represents a database function returning an optional result. * It extends the [[DBFunction]] class and overrides the apply method to return an optional result. */ abstract class DBOptionalResultFunction[I, R, E <: DBEngine[F], F[_]]( diff --git a/core/src/main/scala/za/co/absa/fadb/DBSchema.scala b/core/src/main/scala/za/co/absa/fadb/DBSchema.scala index 8fadd6d2..b23bb613 100644 --- a/core/src/main/scala/za/co/absa/fadb/DBSchema.scala +++ b/core/src/main/scala/za/co/absa/fadb/DBSchema.scala @@ -22,7 +22,7 @@ import za.co.absa.fadb.naming.NamingConvention * An abstract class, an ancestor to represent a database schema * The database name of the schema is derived from the class name based on the provided naming convention * @param schemaNameOverride - in case the class name would not match the database schema name, this gives the - * @param namingConvention - the [[za.co.absa.fadb.naming.NamingConvention NamingConvention]] + * @param namingConvention - the [[za.co.absa.fadb.naming.NamingConvention]] * prescribing how to convert a class name into a db object name */ abstract class DBSchema(schemaNameOverride: Option[String] = None)(implicit val namingConvention: NamingConvention) { @@ -46,7 +46,7 @@ abstract class DBSchema(schemaNameOverride: Option[String] = None)(implicit val /** * Function to convert a class to the associated DB object name, based on the class' name. For transformation from the - * class name to usual db name the schema's [[za.co.absa.fadb.naming.NamingConvention NamingConvention]] is used. + * class name to usual db name the schema's [[za.co.absa.fadb.naming.NamingConvention]] is used. * @param c - class which name to use to get the DB object name * @return - the db object name */ diff --git a/core/src/main/scala/za/co/absa/fadb/naming/ExplicitNamingRequired.scala b/core/src/main/scala/za/co/absa/fadb/naming/ExplicitNamingRequired.scala index b0fbde12..b136e321 100644 --- a/core/src/main/scala/za/co/absa/fadb/naming/ExplicitNamingRequired.scala +++ b/core/src/main/scala/za/co/absa/fadb/naming/ExplicitNamingRequired.scala @@ -19,15 +19,15 @@ package za.co.absa.fadb.naming import za.co.absa.fadb.exceptions.NamingException /** - * `ExplicitNamingRequired` is a `NamingConvention` that throws a `NamingConvention` for any string. + * [[ExplicitNamingRequired]] is a [[NamingConvention]] that throws a [[za.co.absa.fadb.exceptions.NamingException]] for any string. * This is used when explicit naming is required and no other naming convention should be applied. */ class ExplicitNamingRequired extends NamingConvention { /** - * Throws a `NamingConvention` with a message indicating that explicit naming is required. + * Throws a [[za.co.absa.fadb.exceptions.NamingException]] with a message indicating that explicit naming is required. * @param original - The original string. - * @return Nothing, as a `NamingException` is always thrown. + * @return Nothing, as a [[za.co.absa.fadb.exceptions.NamingException]] is always thrown. */ override def stringPerConvention(original: String): String = { val message = s"No convention for '$original', explicit naming required." @@ -36,14 +36,14 @@ class ExplicitNamingRequired extends NamingConvention { } /** - * `ExplicitNamingRequired.Implicits` provides an implicit `NamingConvention` instance that - * throws a `NamingException` for any string. + * [[ExplicitNamingRequired.Implicits]] provides an implicit [[NamingConvention]] instance that + * throws a [[za.co.absa.fadb.exceptions.NamingException]] for any string. */ object ExplicitNamingRequired { object Implicits { /** - * An implicit `NamingConvention` instance that throws a `NamingException` for any string. + * An implicit [[NamingConvention]] instance that throws a [[za.co.absa.fadb.exceptions.NamingException]] for any string. */ implicit val namingConvention: NamingConvention = new ExplicitNamingRequired() } diff --git a/core/src/main/scala/za/co/absa/fadb/naming/LettersCase.scala b/core/src/main/scala/za/co/absa/fadb/naming/LettersCase.scala index 471dc1c1..6d194077 100644 --- a/core/src/main/scala/za/co/absa/fadb/naming/LettersCase.scala +++ b/core/src/main/scala/za/co/absa/fadb/naming/LettersCase.scala @@ -17,7 +17,7 @@ package za.co.absa.fadb.naming /** - * `LettersCase` is a sealed trait that represents different cases of letters. + * [[LettersCase]] is a sealed trait that represents different cases of letters. * It provides a method to convert a string to the specific case. */ sealed trait LettersCase { @@ -33,21 +33,21 @@ sealed trait LettersCase { object LettersCase { /** - * `AsIs` is a [[LettersCase]] that leaves strings as they are. + * [[AsIs]] is a [[LettersCase]] that leaves strings as they are. */ case object AsIs extends LettersCase { override def convert(s: String): String = s } /** - * `LowerCase` is a [[LettersCase]] that converts strings to lower case. + * [[LowerCase]] is a [[LettersCase]] that converts strings to lower case. */ case object LowerCase extends LettersCase { override def convert(s: String): String = s.toLowerCase } /** - * `UpperCase` is a [[LettersCase]] that converts strings to upper case. + * [[UpperCase]] is a [[LettersCase]] that converts strings to upper case. */ case object UpperCase extends LettersCase { override def convert(s: String): String = s.toUpperCase diff --git a/core/src/main/scala/za/co/absa/fadb/naming/NamingConvention.scala b/core/src/main/scala/za/co/absa/fadb/naming/NamingConvention.scala index b5adadfa..e082a0c9 100644 --- a/core/src/main/scala/za/co/absa/fadb/naming/NamingConvention.scala +++ b/core/src/main/scala/za/co/absa/fadb/naming/NamingConvention.scala @@ -17,7 +17,7 @@ package za.co.absa.fadb.naming /** - * `NamingConvention` is a base trait that defines the interface for different naming conventions. + * [[NamingConvention]] is a base trait that defines the interface for different naming conventions. * It provides methods to convert a class name according to given naming convention. */ trait NamingConvention { diff --git a/core/src/main/scala/za/co/absa/fadb/naming/implementations/AsIsNaming.scala b/core/src/main/scala/za/co/absa/fadb/naming/implementations/AsIsNaming.scala index 926cd00c..3890cac7 100644 --- a/core/src/main/scala/za/co/absa/fadb/naming/implementations/AsIsNaming.scala +++ b/core/src/main/scala/za/co/absa/fadb/naming/implementations/AsIsNaming.scala @@ -20,8 +20,9 @@ import za.co.absa.fadb.naming.{LettersCase, NamingConvention} import LettersCase.AsIs /** - * `AsIsNaming` provides a naming convention that leaves strings as they are. + * [[AsIsNaming]] provides a naming convention that leaves strings as they are. * It implements the [[NamingConvention]] trait. + * * @param lettersCase - The case of the letters in the string. */ class AsIsNaming(lettersCase: LettersCase) extends NamingConvention { @@ -37,7 +38,7 @@ class AsIsNaming(lettersCase: LettersCase) extends NamingConvention { } /** - * `AsIsNaming.Implicits` provides an implicit [[NamingConvention]] instance that leaves strings as they are. + * [[AsIsNaming.Implicits]] provides an implicit [[NamingConvention]] instance that leaves strings as they are. */ object AsIsNaming { object Implicits { diff --git a/core/src/main/scala/za/co/absa/fadb/naming/implementations/SnakeCaseNaming.scala b/core/src/main/scala/za/co/absa/fadb/naming/implementations/SnakeCaseNaming.scala index 7362ea2c..caa96fda 100644 --- a/core/src/main/scala/za/co/absa/fadb/naming/implementations/SnakeCaseNaming.scala +++ b/core/src/main/scala/za/co/absa/fadb/naming/implementations/SnakeCaseNaming.scala @@ -20,7 +20,7 @@ import za.co.absa.fadb.naming.{LettersCase, NamingConvention} import LettersCase.LowerCase /** - * `SnakeCaseNaming` provides a naming convention that converts camel case strings to snake case. + * [[SnakeCaseNaming]] provides a naming convention that converts camel case strings to snake case. * It implements the [[NamingConvention]] trait. * @param lettersCase - The case of the letters in the string. */ @@ -51,7 +51,7 @@ class SnakeCaseNaming(lettersCase: LettersCase) extends NamingConvention { } /** - * `SnakeCaseNaming.Implicits` provides an implicit [[NamingConvention]] instance that converts camel case strings to snake case. + * [[SnakeCaseNaming.Implicits]] provides an implicit [[NamingConvention]] instance that converts camel case strings to snake case. */ object SnakeCaseNaming { object Implicits { diff --git a/core/src/main/scala/za/co/absa/fadb/status/handling/StatusHandling.scala b/core/src/main/scala/za/co/absa/fadb/status/handling/StatusHandling.scala index 82768ac5..75d784bd 100644 --- a/core/src/main/scala/za/co/absa/fadb/status/handling/StatusHandling.scala +++ b/core/src/main/scala/za/co/absa/fadb/status/handling/StatusHandling.scala @@ -20,7 +20,7 @@ import za.co.absa.fadb.FunctionStatusWithData import za.co.absa.fadb.exceptions.StatusException /** - * `StatusHandling` is a base trait that defines the interface for handling the status of a function invocation. + * [[StatusHandling]] is a base trait that defines the interface for handling the status of a function invocation. * It provides a method to check the status of a function invocation with data. */ trait StatusHandling { @@ -28,7 +28,7 @@ trait StatusHandling { /** * Checks the status of a function invocation. * @param statusWithData - The status of the function invocation with data. - * @return Either a `StatusException` if the status code indicates an error, or the data if the status code is successful. + * @return Either a [[za.co.absa.fadb.exceptions.StatusException]] if the status code indicates an error, or the data if the status code is successful. */ def checkStatus[A](statusWithData: FunctionStatusWithData[A]): Either[StatusException, A] } diff --git a/core/src/main/scala/za/co/absa/fadb/status/handling/implementations/StandardStatusHandling.scala b/core/src/main/scala/za/co/absa/fadb/status/handling/implementations/StandardStatusHandling.scala index 2e491082..2fa0176e 100644 --- a/core/src/main/scala/za/co/absa/fadb/status/handling/implementations/StandardStatusHandling.scala +++ b/core/src/main/scala/za/co/absa/fadb/status/handling/implementations/StandardStatusHandling.scala @@ -21,7 +21,7 @@ import za.co.absa.fadb.exceptions._ import za.co.absa.fadb.status.handling.StatusHandling /** - * `StandardStatusHandling` is a trait that extends the `StatusHandling` interface. + * [[StandardStatusHandling]] is a trait that implements the [[StatusHandling]] interface. * It provides a standard implementation for checking the status of a function invocation. */ trait StandardStatusHandling extends StatusHandling { diff --git a/doobie/src/it/scala/za/co/absa/fadb/doobie/DoobieStreamingResultFunctionTest.scala b/doobie/src/it/scala/za/co/absa/fadb/doobie/DoobieStreamingResultFunctionTest.scala new file mode 100644 index 00000000..a3e7f80e --- /dev/null +++ b/doobie/src/it/scala/za/co/absa/fadb/doobie/DoobieStreamingResultFunctionTest.scala @@ -0,0 +1,46 @@ +/* + * Copyright 2022 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.fadb.doobie + +import cats.effect.IO +import cats.effect.unsafe.implicits.global +import doobie.implicits.toSqlInterpolator +import doobie.util.Read +import doobie.util.fragment.Fragment +import org.scalatest.funsuite.AnyFunSuite +import za.co.absa.fadb.DBSchema +import za.co.absa.fadb.doobie.DoobieFunction.DoobieStreamingResultFunction + +class DoobieStreamingResultFunctionTest extends AnyFunSuite with DoobieTest { + + class GetActors(implicit schema: DBSchema, dbEngine: DoobieStreamingEngine[IO]) + extends DoobieStreamingResultFunction[GetActorsQueryParameters, Actor, IO] { + + override def sql(values: GetActorsQueryParameters)(implicit read: Read[Actor]): Fragment = + sql"SELECT actor_id, first_name, last_name FROM ${Fragment.const(functionName)}(${values.firstName}, ${values.lastName})" + } + + private val getActors = new GetActors()(Runs, new DoobieStreamingEngine(transactor)) + + test("Retrieving actor from database") { + val expectedResultElem = Actor(49, "Pavel", "Marek") + val results = + getActors(GetActorsQueryParameters(Some("Pavel"), Some("Marek"))).take(10).compile.toList.unsafeRunSync() + assert(results.contains(expectedResultElem)) + } + +} diff --git a/doobie/src/main/scala/za/co/absa/fadb/doobie/DoobieEngine.scala b/doobie/src/main/scala/za/co/absa/fadb/doobie/DoobieEngine.scala index d33585b1..dfb8e3d2 100644 --- a/doobie/src/main/scala/za/co/absa/fadb/doobie/DoobieEngine.scala +++ b/doobie/src/main/scala/za/co/absa/fadb/doobie/DoobieEngine.scala @@ -27,13 +27,13 @@ import za.co.absa.fadb.exceptions.StatusException import scala.language.higherKinds /** - * `DoobieEngine` is a class that extends `DBEngine` with `F` as the effect type. - * It uses Doobie's `Transactor[F]` to execute SQL queries. + * [[DoobieEngine]] is a class that extends [[za.co.absa.fadb.DBEngine]] with `F` as the effect type. + * It uses [[doobie.Transactor]] to execute SQL queries. * - * `Async` is needed because Doobie requires it for non-blocking database operations. + * [[cats.effect.Async]] is needed because Doobie requires it for non-blocking database operations. * - * @param transactor the Doobie transactor for executing SQL queries - * @tparam F the effect type, which must have an `Async` instance + * @param transactor the [[doobie.Transactor]] for executing SQL queries + * @tparam F the effect type, which must have an [[cats.effect.Async]] instance */ class DoobieEngine[F[_]: Async](val transactor: Transactor[F]) extends DBEngine[F] { @@ -45,7 +45,7 @@ class DoobieEngine[F[_]: Async](val transactor: Transactor[F]) extends DBEngine[ * Executes a Doobie query and returns the result as an `F[Seq[R]]`. * * @param query the Doobie query to execute - * @param readR the `Read[R]` instance used to read the query result into `R` + * @param readR the [[doobie.Read]] instance used to read the query result into `R` * @return the query result as an `F[Seq[R]]` */ private def executeQuery[R](query: QueryType[R])(implicit readR: Read[R]): F[Seq[R]] = { @@ -56,7 +56,7 @@ class DoobieEngine[F[_]: Async](val transactor: Transactor[F]) extends DBEngine[ * Executes a Doobie query and returns the result as an `F[Either[StatusException, R]]`. * * @param query the Doobie query to execute - * @param readStatusWithDataR the `Read[StatusWithData[R]]` instance used to read the query result into `StatusWithData[R]` + * @param readStatusWithDataR the [[doobie.Read]] instance used to read the query result into `StatusWithData[R]` * @return the query result as an `F[Either[StatusException, R]]` */ private def executeQueryWithStatus[R]( diff --git a/doobie/src/main/scala/za/co/absa/fadb/doobie/DoobieFunction.scala b/doobie/src/main/scala/za/co/absa/fadb/doobie/DoobieFunction.scala index 14be3b82..eb338d4f 100644 --- a/doobie/src/main/scala/za/co/absa/fadb/doobie/DoobieFunction.scala +++ b/doobie/src/main/scala/za/co/absa/fadb/doobie/DoobieFunction.scala @@ -16,68 +16,68 @@ package za.co.absa.fadb.doobie -import cats.Monad import cats.effect.kernel.Async import doobie.util.Read import doobie.util.fragment.Fragment import za.co.absa.fadb.DBFunction._ import za.co.absa.fadb.exceptions.StatusException +import za.co.absa.fadb.streaming.DBStreamingFunction import za.co.absa.fadb.{DBFunctionWithStatus, DBSchema, FunctionStatusWithData} import scala.language.higherKinds trait DoobieFunctionBase[R] { /** - * The `Read[R]` instance used to read the query result into `R`. + * The [[doobie.Read]] instance used to read the query result into `R`. */ implicit val readR: Read[R] } /** - * `DoobieFunction` provides support for executing database functions using Doobie. + * [[DoobieFunction]] provides support for executing database functions using Doobie. * * @tparam I the input type of the function * @tparam R the result type of the function */ private[doobie] trait DoobieFunction[I, R] extends DoobieFunctionBase[R] { /** - * Generates a Doobie `Fragment` representing the SQL query for the function. + * Generates a [[doobie.Fragment]] representing the SQL query for the function. * * @param values the input values for the function - * @return the Doobie `Fragment` representing the SQL query + * @return the [[doobie.Fragment]] representing the SQL query */ def sql(values: I)(implicit read: Read[R]): Fragment /** - * Generates a `DoobieQuery[R]` representing the SQL query for the function. + * Generates a [[DoobieQuery]] for type `R` representing the SQL query for the function. * * @param values the input values for the function - * @return the `DoobieQuery[R]` representing the SQL query + * @return the [[DoobieQuery]] for type `R` representing the SQL query */ protected def query(values: I): DoobieQuery[R] = new DoobieQuery[R](sql(values)) } private[doobie] trait DoobieFunctionWithStatus[I, R] extends DoobieFunctionBase[R] { /** - * The `Read[StatusWithData[R]]` instance used to read the query result with status into `StatusWithData[R]`. + * The [[doobie.Read]] instance used to read the query result with status into `StatusWithData[R]`. */ implicit def readStatusWithDataR(implicit readR: Read[R]): Read[StatusWithData[R]] = Read[(Int, String, R)].map { case (status, status_text, data) => StatusWithData(status, status_text, data) } /** - * Generates a Doobie `Fragment` representing the SQL query for the function. + * Generates a [[doobie.Fragment]] representing the SQL query for the function. * * @param values the input values for the function - * @return the Doobie `Fragment` representing the SQL query + * @return the [[doobie.Fragment]] representing the SQL query */ def sql(values: I)(implicit read: Read[StatusWithData[R]]): Fragment /** - * Generates a `DoobieQueryWithStatus[R]` representing the SQL query for the function. + * Generates a [[DoobieQueryWithStatus]] for type `R` representing the SQL query for the function. * * @param values the input values for the function - * @return the `DoobieQueryWithStatus[R]` representing the SQL query + * @return the [[DoobieQueryWithStatus]] for type `R` representing the SQL query */ protected def query(values: I): DoobieQueryWithStatus[R] = new DoobieQueryWithStatus[R](sql(values), checkStatus) @@ -86,20 +86,21 @@ private[doobie] trait DoobieFunctionWithStatus[I, R] extends DoobieFunctionBase[ } /** - * `DoobieFunction` is an object that contains several abstract classes extending different types of database functions. - * These classes use Doobie's `Fragment` to represent SQL queries and `DoobieEngine` to execute them. + * An object that contains several abstract classes extending different types of database functions. + * These classes use [[doobie.Fragment]] to represent SQL queries and [[DoobieEngine]] to execute them. */ object DoobieFunction { /** - * `DoobieSingleResultFunctionWithStatus` is an abstract class that extends `DBSingleResultFunctionWithStatus` with `DoobiePgEngine` as the engine type. + * [[DoobieSingleResultFunctionWithStatus]] is an abstract class that extends + * [[za.co.absa.fadb.DBFunctionWithStatus]] with [[DoobieEngine]] as the engine type. * It represents a database function that returns a single result with status. * * @param functionNameOverride the optional override for the function name * @param schema the database schema - * @param dbEngine the `DoobieEngine` instance used to execute SQL queries - * @param readR the `Read[R]` instance used to read the query result into `R` - * @param readSelectWithStatus the `Read[StatusWithData[R]]` instance used to read the query result with status into `StatusWithData[R]` - * @tparam F the effect type, which must have an `Async` and a `Monad` instance + * @param dbEngine the [[DoobieEngine]] instance used to execute SQL queries + * @param readR the [[doobie.Read]] instance used to read the query result into `R` + * @param readSelectWithStatus the [[doobie.Read]] instance used to read the query result into `StatusWithData[R]` + * @tparam F the effect type, which must have an [[cats.effect.Async]] instance */ abstract class DoobieSingleResultFunctionWithStatus[I, R, F[_]: Async]( functionNameOverride: Option[String] = None @@ -111,14 +112,15 @@ object DoobieFunction { with DoobieFunctionWithStatus[I, R] /** - * `DoobieSingleResultFunction` is an abstract class that extends `DBSingleResultFunction` with `DoobiePgEngine` as the engine type. + * [[DoobieSingleResultFunction]] is an abstract class that extends + * [[za.co.absa.fadb.DBFunction.DBSingleResultFunction]] with [[DoobieEngine]] as the engine type. * It represents a database function that returns a single result. * * @param functionNameOverride the optional override for the function name * @param schema the database schema - * @param dbEngine the `DoobieEngine` instance used to execute SQL queries - * @param readR the `Read[R]` instance used to read the query result into `R` - * @tparam F the effect type, which must have an `Async` and a `Monad` instance + * @param dbEngine the [[DoobieEngine]] instance used to execute SQL queries + * @param readR the [[doobie.Read]] instance used to read the query result into `R` + * @tparam F the effect type, which must have an [[cats.effect.Async]] instance */ abstract class DoobieSingleResultFunction[I, R, F[_]: Async](functionNameOverride: Option[String] = None)( implicit override val schema: DBSchema, @@ -128,7 +130,8 @@ object DoobieFunction { with DoobieFunction[I, R] /** - * `DoobieMultipleResultFunction` is an abstract class that extends `DBMultipleResultFunction` with `DoobiePgEngine` as the engine type. + * [[DoobieMultipleResultFunction]] is an abstract class that extends + * [[za.co.absa.fadb.DBFunction.DBMultipleResultFunction]] with [[DoobieEngine]] as the engine type. * It represents a database function that returns multiple results. */ abstract class DoobieMultipleResultFunction[I, R, F[_]: Async](functionNameOverride: Option[String] = None)( @@ -139,7 +142,20 @@ object DoobieFunction { with DoobieFunction[I, R] /** - * `DoobieOptionalResultFunction` is an abstract class that extends `DBOptionalResultFunction` with `DoobiePgEngine` as the engine type. + * [[DoobieStreamingResultFunction]] is an abstract class that extends + * [[za.co.absa.fadb.streaming.DBStreamingFunction]] with [[DoobieStreamingEngine]] as the engine type. + * It represents a database function that returns a stream of results. + */ + abstract class DoobieStreamingResultFunction[I, R, F[_]: Async](functionNameOverride: Option[String] = None)( + implicit override val schema: DBSchema, + val dbEngine: DoobieStreamingEngine[F], + val readR: Read[R] + ) extends DBStreamingFunction[I, R, DoobieStreamingEngine[F], F](functionNameOverride) + with DoobieFunction[I, R] + + /** + * [[DoobieOptionalResultFunction]] is an abstract class that extends + * [[za.co.absa.fadb.DBFunction.DBOptionalResultFunction]] with [[DoobieEngine]] as the engine type. * It represents a database function that returns an optional result. */ abstract class DoobieOptionalResultFunction[I, R, F[_]: Async](functionNameOverride: Option[String] = None)( diff --git a/doobie/src/main/scala/za/co/absa/fadb/doobie/DoobieQuery.scala b/doobie/src/main/scala/za/co/absa/fadb/doobie/DoobieQuery.scala index 0909d363..f670ad6b 100644 --- a/doobie/src/main/scala/za/co/absa/fadb/doobie/DoobieQuery.scala +++ b/doobie/src/main/scala/za/co/absa/fadb/doobie/DoobieQuery.scala @@ -23,21 +23,21 @@ import za.co.absa.fadb.status.FunctionStatus import za.co.absa.fadb.{FunctionStatusWithData, Query, QueryWithStatus} /** - * `DoobieQuery` is a class that extends `Query` with `R` as the result type. - * It uses Doobie's `Fragment` to represent SQL queries. + * [[DoobieQuery]] is a class that extends [[za.co.absa.fadb.Query]] with `R` as the result type. + * It uses [[doobie.Fragment]] to represent SQL queries. * - * @param fragment the Doobie fragment representing the SQL query - * @param readR the `Read[R]` instance used to read the query result into `R` + * @param fragment the [[doobie.Fragment]] representing the SQL query + * @param readR the [[doobie.Read]] instance used to read the query result into `R` */ class DoobieQuery[R](val fragment: Fragment)(implicit val readR: Read[R]) extends Query[R] /** - * `DoobieQueryWithStatus` is a class that extends `QueryWithStatus` with `R` as the result type. - * It uses Doobie's `Fragment` to represent SQL queries. + * [[DoobieQueryWithStatus]] is a class that extends [[za.co.absa.fadb.QueryWithStatus]] with `R` as the result type. + * It uses [[doobie.Fragment]] to represent SQL queries. * - * @param fragment the Doobie fragment representing the SQL query + * @param fragment the [[doobie.Fragment]] representing the SQL query * @param checkStatus the function to check the status of the query - * @param readStatusWithDataR the `Read[StatusWithData[R]]` instance used to read the query result into `StatusWithData[R]` + * @param readStatusWithDataR the [[doobie.Read]] instance used to read the query result into `StatusWithData[R]` */ class DoobieQueryWithStatus[R]( val fragment: Fragment, @@ -45,7 +45,7 @@ class DoobieQueryWithStatus[R]( )(implicit val readStatusWithDataR: Read[StatusWithData[R]]) extends QueryWithStatus[StatusWithData[R], R, R] { - /* + /** * Processes the status of the query and returns the status with data * @param initialResult - the initial result of the query * @return the status with data @@ -53,7 +53,7 @@ class DoobieQueryWithStatus[R]( override def processStatus(initialResult: StatusWithData[R]): FunctionStatusWithData[R] = FunctionStatusWithData(FunctionStatus(initialResult.status, initialResult.statusText), initialResult.data) - /* + /** * Converts the status with data to either a status exception or the data * @param statusWithData - the status with data * @return either a status exception or the data diff --git a/doobie/src/main/scala/za/co/absa/fadb/doobie/DoobieStreamingEngine.scala b/doobie/src/main/scala/za/co/absa/fadb/doobie/DoobieStreamingEngine.scala new file mode 100644 index 00000000..18addffc --- /dev/null +++ b/doobie/src/main/scala/za/co/absa/fadb/doobie/DoobieStreamingEngine.scala @@ -0,0 +1,69 @@ +/* + * Copyright 2022 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.fadb.doobie + +import cats.effect.Async +import doobie.Transactor +import doobie.implicits.toDoobieStreamOps +import doobie.util.Read +import za.co.absa.fadb.streaming.DBStreamingEngine + +import scala.language.higherKinds + +/** + * [[DoobieStreamingEngine]] is a class that represents a database engine. + * It provides methods to execute streaming queries from a database. + * @tparam F - The type of the context in which the database queries are executed. + */ +class DoobieStreamingEngine[F[_]: Async](val transactor: Transactor[F], defaultChunkSize: Int = 512) + extends DBStreamingEngine[F] { + + /** The type of Doobie queries that produce `R` */ + type QueryType[R] = DoobieQuery[R] + + /** + * Executes a Doobie query and returns the result as an [[fs2.Stream]] for effect type `F` and value type `R`. + * + * @param query the [[DoobieQuery]] to execute + * @return the query result as an [[fs2.Stream]] for effect type `F` and value type `R` + */ + override def runStreaming[R](query: QueryType[R]): fs2.Stream[F, R] = + executeStreamingQuery(query, defaultChunkSize)(query.readR) + + /** + * Executes a Doobie query and returns the result as an [[fs2.Stream]] for effect type `F` and value type `R`. + * + * @param query the Doobie query to execute + * @param chunkSize the chunk size to use when streaming the query result + * @return the query result as an [[fs2.Stream]] for effect type `F` and value type `R` + */ + override def runStreamingWithChunkSize[R](query: QueryType[R], chunkSize: Int): fs2.Stream[F, R] = + executeStreamingQuery(query, chunkSize)(query.readR) + + /** + * Executes a Doobie query and returns the result as an [[fs2.Stream]] for effect type `F` and value type `R`. + * + * @param query the [[DoobieQuery]] to execute + * @param chunkSize the chunk size to use when streaming the query result + * @param readR the [[doobie.Read]] instance used to read the query result into `R` + * @return the query result as an [[fs2.Stream]] for effect type `F` and value type `R` + */ + private def executeStreamingQuery[R](query: QueryType[R], chunkSize: Int)(implicit readR: Read[R]): fs2.Stream[F, R] = { + query.fragment.query[R].streamWithChunkSize(chunkSize).transact(transactor) + } + +} diff --git a/project/Dependencies.scala b/project/Dependencies.scala index c1e9ca12..a2008c2e 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -19,8 +19,7 @@ import sbt._ object Dependencies { private def commonDependencies(scalaVersion: String): Seq[ModuleID] = Seq( - "org.typelevel" %% "cats-core" % "2.9.0", - "org.typelevel" %% "cats-effect" % "3.5.0", + "org.typelevel" %% "cats-core" % "2.9.0", "org.scalatest" %% "scalatest" % "3.1.0" % "test,it", "org.scalatest" %% "scalatest-flatspec" % "3.2.0" % "test,it", "org.scalatestplus" %% "mockito-1-10" % "3.1.0.0" % "test,it" @@ -33,20 +32,32 @@ object Dependencies { ) } + def streamingDependencies(scalaVersion: String): Seq[ModuleID] = { + commonDependencies(scalaVersion) ++ Seq( + "co.fs2" %% "fs2-core" % "3.7.0", + ) + } + def slickDependencies(scalaVersion: String): Seq[ModuleID] = { commonDependencies(scalaVersion) ++ Seq( "com.typesafe.slick" %% "slick" % "3.3.3", "org.slf4j" % "slf4j-nop" % "1.7.26", "com.typesafe.slick" %% "slick-hikaricp" % "3.3.3", "org.postgresql" % "postgresql" % "42.6.0", - "com.github.tminglei" %% "slick-pg" % "0.20.4" % Optional + "com.github.tminglei" %% "slick-pg" % "0.20.4" % Optional, + ) + } + + def slickStreamingDependencies(scalaVersion: String): Seq[ModuleID] = { + commonDependencies(scalaVersion) ++ slickDependencies(scalaVersion) ++ streamingDependencies(scalaVersion) ++ Seq( + "co.fs2" %% "fs2-reactive-streams" % "3.9.3" ) } def doobieDependencies(scalaVersion: String): Seq[ModuleID] = { commonDependencies(scalaVersion) ++ Seq( - "org.tpolecat" %% "doobie-core" % "1.0.0-RC2", - "org.tpolecat" %% "doobie-hikari" % "1.0.0-RC2", + "org.tpolecat" %% "doobie-core" % "1.0.0-RC2", + "org.tpolecat" %% "doobie-hikari" % "1.0.0-RC2", "org.tpolecat" %% "doobie-postgres" % "1.0.0-RC2" ) } diff --git a/slick-streaming/src/it/scala/za/co/absa/fadb/slick/streaming/SlickStreamingResultFunctionTest.scala b/slick-streaming/src/it/scala/za/co/absa/fadb/slick/streaming/SlickStreamingResultFunctionTest.scala new file mode 100644 index 00000000..676a13a8 --- /dev/null +++ b/slick-streaming/src/it/scala/za/co/absa/fadb/slick/streaming/SlickStreamingResultFunctionTest.scala @@ -0,0 +1,49 @@ +/* + * Copyright 2022 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.fadb.slick.streaming + +import cats.effect.IO +import cats.effect.unsafe.implicits.global +import org.scalatest.funsuite.AnyFunSuite +import slick.jdbc.SQLActionBuilder +import za.co.absa.fadb.DBSchema +import za.co.absa.fadb.slick.FaDbPostgresProfile.api._ +import za.co.absa.fadb.slick.{Actor, ActorSlickConverter, SlickTest} + +class SlickStreamingResultFunctionTest extends AnyFunSuite with SlickTest { + + class GetActors(implicit override val schema: DBSchema, val dbEngine: SlickPgStreamingEngine[IO]) + extends SlickStreamingResultFunction[GetActorsQueryParameters, Actor, IO] + with ActorSlickConverter { + + override def fieldsToSelect: Seq[String] = super.fieldsToSelect ++ Seq("actor_id", "first_name", "last_name") + + override protected def sql(values: GetActorsQueryParameters): SQLActionBuilder = { + sql"""SELECT #$selectEntry FROM #$functionName(${values.firstName},${values.lastName}) #$alias;""" + } + } + + private val getActors = new GetActors()(Runs, new SlickPgStreamingEngine[IO](db)) + + test("Retrieving actors from database") { + val expectedResultElem = Actor(49, "Pavel", "Marek") + val results = + getActors(GetActorsQueryParameters(Some("Pavel"), Some("Marek")), 1024).take(10).compile.toList.unsafeRunSync() + assert(results.contains(expectedResultElem)) + } + +} diff --git a/slick-streaming/src/main/scala/za/co/absa/fadb/slick/streaming/SlickPgStreamingEngine.scala b/slick-streaming/src/main/scala/za/co/absa/fadb/slick/streaming/SlickPgStreamingEngine.scala new file mode 100644 index 00000000..472d3f8a --- /dev/null +++ b/slick-streaming/src/main/scala/za/co/absa/fadb/slick/streaming/SlickPgStreamingEngine.scala @@ -0,0 +1,60 @@ +/* + * Copyright 2022 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.fadb.slick.streaming + +import cats.effect.Async +import fs2.interop.reactivestreams.PublisherOps +import slick.jdbc.JdbcBackend.Database +import za.co.absa.fadb.slick.SlickQuery +import za.co.absa.fadb.streaming.DBStreamingEngine + +import scala.language.higherKinds + +/** + * [[SlickPgStreamingEngine]] is a class that represents a database engine. + * It provides methods to execute streaming queries from a database. + * @tparam F - The type of the context in which the database queries are executed. + */ +class SlickPgStreamingEngine[F[_]: Async](val db: Database, defaultChunkSize: Int = 512) extends DBStreamingEngine[F] { + + /** The type of Slick queries that produce `R` */ + type QueryType[R] = SlickQuery[R] + + /** + * Executes a Slick query and returns the result as an [[fs2.Stream]] for effect type `F` and value type `R`. + * + * @param query the Slick query to execute + * @return the query result as an [[fs2.Stream]] for effect type `F` and value type `R` + */ + def runStreaming[R](query: QueryType[R]): fs2.Stream[F, R] = { + val slickPublisher = db.stream(query.sql.as[R](query.getResult)) + slickPublisher.toStreamBuffered[F](defaultChunkSize) + } + + /** + * Executes a Slick query and returns the result as an [[fs2.Stream]] for effect type `F` and value type `R`. + * + * @param query the Slick query to execute + * @param chunkSize the chunk size to use when streaming the query result + * @return the query result as an [[fs2.Stream]] for effect type `F` and value type `R` + */ + def runStreamingWithChunkSize[R](query: QueryType[R], chunkSize: Int): fs2.Stream[F, R] = { + val slickPublisher = db.stream(query.sql.as[R](query.getResult)) + slickPublisher.toStreamBuffered[F](chunkSize) + } + +} diff --git a/slick-streaming/src/main/scala/za/co/absa/fadb/slick/streaming/SlickStreamingResultFunction.scala b/slick-streaming/src/main/scala/za/co/absa/fadb/slick/streaming/SlickStreamingResultFunction.scala new file mode 100644 index 00000000..d081d66e --- /dev/null +++ b/slick-streaming/src/main/scala/za/co/absa/fadb/slick/streaming/SlickStreamingResultFunction.scala @@ -0,0 +1,33 @@ +/* + * Copyright 2022 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.fadb.slick.streaming + +import cats.effect.Async +import za.co.absa.fadb.DBSchema +import za.co.absa.fadb.slick.SlickFunction +import za.co.absa.fadb.streaming.DBStreamingFunction + +import scala.language.higherKinds + +/** + * Class for Slick DB functions with streaming results. + */ +abstract class SlickStreamingResultFunction[I, R, F[_]: Async](functionNameOverride: Option[String] = None)(implicit + override val schema: DBSchema, + dBEngine: SlickPgStreamingEngine[F] +) extends DBStreamingFunction[I, R, SlickPgStreamingEngine[F], F](functionNameOverride) + with SlickFunction[I, R] diff --git a/slick/src/main/scala/za/co/absa/fadb/slick/FaDbPostgresProfile.scala b/slick/src/main/scala/za/co/absa/fadb/slick/FaDbPostgresProfile.scala index b0044d15..c90058f2 100644 --- a/slick/src/main/scala/za/co/absa/fadb/slick/FaDbPostgresProfile.scala +++ b/slick/src/main/scala/za/co/absa/fadb/slick/FaDbPostgresProfile.scala @@ -22,7 +22,7 @@ import za.co.absa.fadb.slick.support.PgUUIDSupport /** * DB profile recommended to use with SlickPgEngine to offer support for all extended Postgres types. * JSON is not included, as they are multiple JSON implementations. Choose the one of your liking and extend - * `FaDbPostgresProfile` with it. More on [SlickPG](https://github.com/tminglei/slick-pg/tree/master) page. + * [[FaDbPostgresProfile]] with it. More on [SlickPG](https://github.com/tminglei/slick-pg/tree/master) page. */ trait FaDbPostgresProfile extends ExPostgresProfile diff --git a/slick/src/main/scala/za/co/absa/fadb/slick/SlickFunction.scala b/slick/src/main/scala/za/co/absa/fadb/slick/SlickFunction.scala index 07000ad4..eabad68a 100644 --- a/slick/src/main/scala/za/co/absa/fadb/slick/SlickFunction.scala +++ b/slick/src/main/scala/za/co/absa/fadb/slick/SlickFunction.scala @@ -32,28 +32,28 @@ import scala.concurrent.Future private[slick] trait SlickFunctionBase[I, R] { /** - * The `GetResult[R]` instance used to read the query result into `R`. + * The [[slick.jdbc.GetResult]] instance used to read the query result into `R`. */ protected def slickConverter: GetResult[R] /** - * Generates a Slick `SQLActionBuilder` representing the SQL query for the function. + * Generates a Slick [[slick.jdbc.SQLActionBuilder]] representing the SQL query for the function. * * @param values the input values for the function - * @return the Slick `SQLActionBuilder` representing the SQL query + * @return the Slick [[slick.jdbc.SQLActionBuilder]] representing the SQL query */ protected def sql(values: I): SQLActionBuilder def fieldsToSelect: Seq[String] } -private[slick] trait SlickFunction[I, R] extends SlickFunctionBase[I, R] { +trait SlickFunction[I, R] extends SlickFunctionBase[I, R] { /** - * Generates a `SlickQuery[R]` representing the SQL query for the function. + * Generates a [[SlickQuery]] for type `R` representing the SQL query for the function. * * @param values the input values for the function - * @return the `SlickQuery[R]` representing the SQL query + * @return the [[SlickQuery]] for type `R` representing the SQL query */ protected def query(values: I): SlickQuery[R] = new SlickQuery(sql(values), slickConverter) } @@ -61,10 +61,10 @@ private[slick] trait SlickFunction[I, R] extends SlickFunctionBase[I, R] { private[slick] trait SlickFunctionWithStatus[I, R] extends SlickFunctionBase[I, R] { /** - * Generates a `SlickQueryWithStatus[R]` representing the SQL query for the function with status support. + * Generates a [[SlickQueryWithStatus]] for type `R` representing the SQL query for the function with status support. * * @param values the input values for the function - * @return the `SlickQueryWithStatus[R]` representing the SQL query + * @return the [[SlickQueryWithStatus]] for type `R` representing the SQL query */ protected def query(values: I): SlickQueryWithStatus[R] = new SlickQueryWithStatus[R](sql(values), slickConverter, checkStatus) diff --git a/slick/src/main/scala/za/co/absa/fadb/slick/SlickPgEngine.scala b/slick/src/main/scala/za/co/absa/fadb/slick/SlickPgEngine.scala index 61eba4c3..2b32a4f1 100644 --- a/slick/src/main/scala/za/co/absa/fadb/slick/SlickPgEngine.scala +++ b/slick/src/main/scala/za/co/absa/fadb/slick/SlickPgEngine.scala @@ -25,7 +25,7 @@ import scala.concurrent.{ExecutionContext, Future} import scala.language.higherKinds /** - * [[DBEngine]] based on the Slick library in the Postgres flavor + * [[za.co.absa.fadb.DBEngine]] based on the Slick library in the Postgres flavor * * @param db - the Slick database */ diff --git a/slick/src/main/scala/za/co/absa/fadb/slick/SlickQuery.scala b/slick/src/main/scala/za/co/absa/fadb/slick/SlickQuery.scala index 13b9f5bb..7adf1da4 100644 --- a/slick/src/main/scala/za/co/absa/fadb/slick/SlickQuery.scala +++ b/slick/src/main/scala/za/co/absa/fadb/slick/SlickQuery.scala @@ -24,7 +24,7 @@ import za.co.absa.fadb.{FunctionStatusWithData, Query, QueryWithStatus} /** * SQL query representation for Slick * @param sql - the SQL query in Slick format - * @param getResult - function that converts the [[slick.jdbc.PositionedResult slick.PositionedResult]] + * @param getResult - function that converts the [[slick.jdbc.PositionedResult]] * (the result of Slick execution) into the desired `R` type * @tparam R - the return type of the query */ @@ -33,7 +33,7 @@ class SlickQuery[R](val sql: SQLActionBuilder, val getResult: GetResult[R]) exte /** * SQL query representation for Slick with status * @param sql - the SQL query in Slick format - * @param getResult - function that converts the [[slick.jdbc.PositionedResult slick.PositionedResult]] + * @param getResult - function that converts the [[slick.jdbc.PositionedResult]] * (the result of Slick execution) into the desired `R` type * @tparam R - the return type of the query */ diff --git a/streaming/src/main/scala/za/co/absa/fadb/streaming/DBStreamingEngine.scala b/streaming/src/main/scala/za/co/absa/fadb/streaming/DBStreamingEngine.scala new file mode 100644 index 00000000..c1bf9e4f --- /dev/null +++ b/streaming/src/main/scala/za/co/absa/fadb/streaming/DBStreamingEngine.scala @@ -0,0 +1,53 @@ +/* + * Copyright 2022 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.fadb.streaming + +import za.co.absa.fadb.Query + +import scala.language.higherKinds + +/** + * [[DBStreamingEngine]] is an abstract class that represents a database engine. + * It provides methods to execute streaming queries from a database. + * @tparam F - The type of the context in which the database queries are executed. + */ +abstract class DBStreamingEngine[F[_]] { + + /** + * A type representing the (SQL) query within the engine + * @tparam R - the return type of the query + */ + type QueryType[R] <: Query[R] + + /** + * Executes a query and returns the result as an [[fs2.Stream]] for effect type `F` and value type `R`. + * + * @param query the query to execute + * @return the query result as an [[fs2.Stream]] for effect type `F` and value type `R` + */ + def runStreaming[R](query: QueryType[R]): fs2.Stream[F, R] + + /** + * Executes a query and returns the result as an [[fs2.Stream]] for effect type `F` and value type `R`. + * + * @param query the query to execute + * @param chunkSize the chunk size to use when streaming the query result + * @return the query result as an [[fs2.Stream]] for effect type `F` and value type `R` + */ + def runStreamingWithChunkSize[R](query: QueryType[R], chunkSize: Int): fs2.Stream[F, R] + +} diff --git a/streaming/src/main/scala/za/co/absa/fadb/streaming/DBStreamingFunction.scala b/streaming/src/main/scala/za/co/absa/fadb/streaming/DBStreamingFunction.scala new file mode 100644 index 00000000..e0445215 --- /dev/null +++ b/streaming/src/main/scala/za/co/absa/fadb/streaming/DBStreamingFunction.scala @@ -0,0 +1,69 @@ +/* + * Copyright 2022 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.fadb.streaming + +import za.co.absa.fadb.{DBEngine, DBFunctionFabric, DBSchema} + +import scala.language.higherKinds + +/** + * [[DBStreamingFunction]] is an abstract class that represents a database function returning a stream of results. + * + * @param functionNameOverride - Optional parameter to override the class name if it does not match the database function name. + * @param schema - The schema the function belongs to. + * @param dbStreamingEngine - The database engine that is supposed to execute the function (contains connection to the database). + * @tparam I - The type covering the input fields of the database function. + * @tparam R - The type covering the returned fields from the database function. + * @tparam E - The type of the [[DBStreamingEngine]] engine. + * @tparam F - The type of the context in which the database function is executed. + */ +abstract class DBStreamingFunction[I, R, E <: DBStreamingEngine[F], F[_]](functionNameOverride: Option[String] = None)( + implicit override val schema: DBSchema, + val dbStreamingEngine: E +) extends DBFunctionFabric(functionNameOverride) { + + // A constructor that takes only the mandatory parameters and uses default values for the optional ones + def this()(implicit schema: DBSchema, dBEngine: E) = this(None) + + // A constructor that allows specifying the function name as a string, but not as an option + def this(functionName: String)(implicit schema: DBSchema, dBEngine: E) = this(Some(functionName)) + + /** + * Function to create the DB function call specific to the provided [[DBEngine]]. + * Expected to be implemented by the DBEngine specific mix-in. + * @param values - The values to pass over to the database function. + * @return - The SQL query in the format specific to the provided [[DBEngine]]. + */ + protected def query(values: I): dbStreamingEngine.QueryType[R] + + /** + * Executes the database function and returns stream of results + * @param values The values to pass over to the database function + * @return A stream of results from the database function + */ + def apply(values: I): fs2.Stream[F, R] = dbStreamingEngine.runStreaming(query(values)) + + /** + * Executes the database function and returns stream of results. Allows to specify chunk size. + * @param values The values to pass over to the database function + * @param chunkSize The chunk size to use for the stream + * @return A stream of results from the database function + */ + def apply(values: I, chunkSize: Int): fs2.Stream[F, R] = { + dbStreamingEngine.runStreamingWithChunkSize(query(values), chunkSize) + } +}