From 1fca547f7383a35977957e34568d7db1042c9ed8 Mon Sep 17 00:00:00 2001 From: augustnagro Date: Thu, 5 Dec 2024 05:06:07 -0800 Subject: [PATCH] wip zio module changes --- build.sbt | 17 +- .../magnum/magzio/streams/zio.scala | 9 + .../magnum/magzio/ErrorContext.scala | 10 + .../magnum/magzio/Transactor.scala | 200 ++++++++++++++++++ .../augustnagro/magnum/magzio/exports.scala | 21 ++ .../com/augustnagro/magnum/magzio/zio.scala | 128 ----------- .../magnum/magzio/ImmutableRepoZioTests.scala | 97 +++++---- .../magnum/magzio/PgZioTests.scala | 7 +- 8 files changed, 310 insertions(+), 179 deletions(-) create mode 100644 magnum-zio-streams/src/main/scala/com/augustnagro/magnum/magzio/streams/zio.scala create mode 100644 magnum-zio/src/main/scala/com/augustnagro/magnum/magzio/ErrorContext.scala create mode 100644 magnum-zio/src/main/scala/com/augustnagro/magnum/magzio/Transactor.scala create mode 100644 magnum-zio/src/main/scala/com/augustnagro/magnum/magzio/exports.scala delete mode 100644 magnum-zio/src/main/scala/com/augustnagro/magnum/magzio/zio.scala diff --git a/build.sbt b/build.sbt index fd786be..f7c3de1 100644 --- a/build.sbt +++ b/build.sbt @@ -41,9 +41,9 @@ addCommandAlias("fmt", "scalafmtAll") val testcontainersVersion = "0.41.4" val circeVersion = "0.14.10" -lazy val root = project +lazy val magnumRoot = project .in(file(".")) - .aggregate(magnum, magnumPg, magnumZio) + .aggregate(magnum, magnumPg, magnumZio, magnumZioStreams) lazy val magnum = project .in(file("magnum")) @@ -90,6 +90,17 @@ lazy val magnumZio = project Test / fork := true, publish / skip := false, libraryDependencies ++= Seq( - "dev.zio" %% "zio" % "2.1.12" % Provided + "dev.zio" %% "zio" % "2.1.13" % Provided + ) + ) + +lazy val magnumZioStreams = project + .in(file("magnum-zio-streams")) + .dependsOn(magnum) + .settings( + Test / fork := true, + publish / skip := false, + libraryDependencies ++= Seq( + "dev.zio" %% "zio-streams" % "2.1.13" % Provided ) ) diff --git a/magnum-zio-streams/src/main/scala/com/augustnagro/magnum/magzio/streams/zio.scala b/magnum-zio-streams/src/main/scala/com/augustnagro/magnum/magzio/streams/zio.scala new file mode 100644 index 0000000..6194a18 --- /dev/null +++ b/magnum-zio-streams/src/main/scala/com/augustnagro/magnum/magzio/streams/zio.scala @@ -0,0 +1,9 @@ +package com.augustnagro.magnum.magzio.streams + +import com.augustnagro.magnum.Query +import zio.stream.ZStream + +extension [E](query: Query[e]) + def stream(fetchSize: Int = 0): ZStream + +def stream(): Unit = ZStream.acquireReleaseWith(acquire = ) diff --git a/magnum-zio/src/main/scala/com/augustnagro/magnum/magzio/ErrorContext.scala b/magnum-zio/src/main/scala/com/augustnagro/magnum/magzio/ErrorContext.scala new file mode 100644 index 0000000..e6ac715 --- /dev/null +++ b/magnum-zio/src/main/scala/com/augustnagro/magnum/magzio/ErrorContext.scala @@ -0,0 +1,10 @@ +package com.augustnagro.magnum.magzio + +import com.augustnagro.magnum.{Transactor as _, *} + +import scala.util.control.NoStackTrace + +private class Break[E](val value: E) extends NoStackTrace + +/** Abort this SQL Transaction or Connection and become a failed ZIO */ +def fail[E](error: E)(using DbCon): Nothing = throw Break(error) diff --git a/magnum-zio/src/main/scala/com/augustnagro/magnum/magzio/Transactor.scala b/magnum-zio/src/main/scala/com/augustnagro/magnum/magzio/Transactor.scala new file mode 100644 index 0000000..4bdeb70 --- /dev/null +++ b/magnum-zio/src/main/scala/com/augustnagro/magnum/magzio/Transactor.scala @@ -0,0 +1,200 @@ +package com.augustnagro.magnum.magzio + +import com.augustnagro.magnum.{DbCon, DbTx, SqlLogger} +import zio.{IO, Semaphore, Task, Trace, UIO, ZIO} + +import java.sql.Connection +import javax.sql.DataSource +import scala.reflect.ClassTag +import scala.util.control.{ControlThrowable, NonFatal} + +/** Transactor lets you customize a transaction or connection's behavior. It is + * a parameter to the [[transact]] and [[connect]] methods. + */ +class Transactor private ( + /** Datasource to be used */ + private val dataSource: DataSource, + /** Logging configuration */ + private val sqlLogger: SqlLogger, + /** Customize the underlying JDBC Connections */ + private val connectionConfig: Connection => Unit, + /** Number of threads in your connection pool. This helps magzio be more + * memory efficient by limiting the number of blocking pool threads used. + * Not needed if using the ZIO virtual-thread based blocking executor + */ + private val semaphore: Option[Semaphore] +): + + def withSqlLogger(sqlLogger: SqlLogger): Transactor = + new Transactor( + dataSource, + sqlLogger, + connectionConfig, + semaphore + ) + + def withConnectionConfig(connectionConfig: Connection => Unit): Transactor = + new Transactor( + dataSource, + sqlLogger, + connectionConfig, + semaphore + ) + + /** Executes a given query on a given Transactor, returning UIO. + * + * Re-implementation for ZIO of + * [[com.augustnagro.magnum.connect(dataSource: DataSource)]] + * + * Usage: + * {{{ + * import com.augustnagro.magnum.magzio.* + * + * connectUIO(transactor): + * repo.findById(id) + * }}} + */ + def connectUIO[A](q: DbCon ?=> A)(using Trace): UIO[A] = connectTask(q).orDie + + /** Executes a given query on a given Transactor, returning IO. + * + * Re-implementation for ZIO of + * [[com.augustnagro.magnum.connect(dataSource: DataSource)]] + * + * Usage: + * {{{ + * import com.augustnagro.magnum.magzio.* + * + * connectIO(transactor): + * repo.findById(id) + * }}} + */ + def connectIO[E: ClassTag, A](q: DbCon ?=> A)(using Trace): IO[E, A] = + connectTask(q).catchAll: + case break: Break[?] => ZIO.fail(break.value.asInstanceOf[E]) + case error: E => ZIO.fail(error) + case NonFatal(defect) => ZIO.die(defect) + + private def connectTask[A](q: DbCon ?=> A)(using Trace): Task[A] = + val task = ZIO.blocking( + ZIO.acquireReleaseWith( + acquire = ZIO.attempt(dataSource.getConnection()) + )(release = conn => ZIO.attempt(conn.close()).orDie): cn => + ZIO.attempt: + connectionConfig(cn) + q(using DbCon(cn, sqlLogger)) + ) + semaphore match + case Some(sem) => sem.withPermit(task) + case None => task + + /** Executes a given transaction on a given Transactor + * + * Re-implementation for ZIO of + * [[com.augustnagro.magnum.transact(transactor: Transactor)]] + * + * Usage: + * {{{ + * import com.augustnagro.magnum.magzio.* + * + * transact(transactor) { tx ?=> repo.insertReturning(creator) } + * }}} + */ + def transactUIO[A](q: DbTx ?=> A)(using Trace): UIO[A] = + transactTask(q).orDie + + def transactIO[E: ClassTag, A](q: DbTx ?=> A)(using Trace): IO[E, A] = + transactTask(q).catchAll: + case break: Break[?] => ZIO.fail(break.value.asInstanceOf[E]) + case error: E => ZIO.fail(error) + case NonFatal(defect) => ZIO.die(defect) + + private def transactTask[A](q: DbTx ?=> A)(using Trace): Task[A] = + val task = ZIO.blocking { + ZIO.acquireReleaseWith( + acquire = ZIO.attempt(dataSource.getConnection()) + )(release = + conn => if (conn ne null) ZIO.attempt(conn.close()).orDie else ZIO.unit + ) { cn => + ZIO.uninterruptible { + ZIO.attempt { + connectionConfig(cn) + cn.setAutoCommit(false) + try { + val res = q(using DbTx(cn, sqlLogger)) + cn.commit() + res + } catch { + case NonFatal(t) => + cn.rollback() + throw t + } + } + } + } + } + semaphore match + case Some(sem) => sem.withPermit(task) + case None => task + end transactTask +end Transactor + +object Transactor: + def apply( + dataSource: DataSource, + sqlLogger: SqlLogger, + connectionConfig: Connection => Unit, + maxBlockingThreads: Option[Int] + ): UIO[Transactor] = + ZIO + .fromOption(maxBlockingThreads) + .flatMap(threads => Semaphore.make(threads)) + .unsome + .map(semaphoreOpt => + new Transactor( + dataSource, + sqlLogger, + connectionConfig, + semaphoreOpt + ) + ) + + def apply( + dataSource: DataSource, + sqlLogger: SqlLogger, + connectionConfig: Connection => Unit + ): UIO[Transactor] = + apply( + dataSource, + sqlLogger, + connectionConfig, + None + ) + + def apply(dataSource: DataSource, sqlLogger: SqlLogger): UIO[Transactor] = + apply( + dataSource, + sqlLogger, + _ => (), + None + ) + + def apply(dataSource: DataSource): UIO[Transactor] = + apply( + dataSource, + SqlLogger.Default, + _ => (), + None + ) + + def apply( + dataSource: DataSource, + connectionConfig: Connection => Unit + ): UIO[Transactor] = + apply( + dataSource, + SqlLogger.Default, + connectionConfig, + None + ) +end Transactor diff --git a/magnum-zio/src/main/scala/com/augustnagro/magnum/magzio/exports.scala b/magnum-zio/src/main/scala/com/augustnagro/magnum/magzio/exports.scala new file mode 100644 index 0000000..f852c09 --- /dev/null +++ b/magnum-zio/src/main/scala/com/augustnagro/magnum/magzio/exports.scala @@ -0,0 +1,21 @@ +package com.augustnagro.magnum.magzio + +export com.augustnagro.magnum.{ + sql, + batchUpdate, + DbCon, + DbTx, + DbType, + Id, + ImmutableRepo, + NullOrder, + Repo, + SeekDir, + SortOrder, + Spec, + SqlName, + SqlNameMapper, + Table, + TableInfo, + DbCodec +} diff --git a/magnum-zio/src/main/scala/com/augustnagro/magnum/magzio/zio.scala b/magnum-zio/src/main/scala/com/augustnagro/magnum/magzio/zio.scala deleted file mode 100644 index 3b1a701..0000000 --- a/magnum-zio/src/main/scala/com/augustnagro/magnum/magzio/zio.scala +++ /dev/null @@ -1,128 +0,0 @@ -package com.augustnagro.magnum.magzio - -import zio.{Trace, ZIO} -import com.augustnagro.magnum.* - -import java.sql.Connection -import javax.sql.DataSource -import scala.util.control.NonFatal - -/** Executes a given query on a given DataSource - * - * Re-implementation for ZIO of - * [[com.augustnagro.magnum.connect(dataSource: DataSource)]] - * - * Usage: - * {{{ - * import com.augustnagro.magnum.magzio.* - * - * connect(datasource) { cn ?=> repo.findById(id) } - * }}} - */ -def connect[A](dataSource: DataSource)(q: DbCon ?=> A)(implicit - trace: Trace -): ZIO[Any, Throwable, A] = - connect(Transactor(dataSource))(q) - -/** Executes a given query on a given Transactor - * - * Re-implementation for ZIO of - * [[com.augustnagro.magnum.connect(dataSource: DataSource)]] - * - * Usage: - * {{{ - * import com.augustnagro.magnum.magzio.* - * - * connect(transactor) { cn ?=> repo.findById(id) } - * }}} - */ -def connect[A]( - transactor: Transactor -)(q: DbCon ?=> A)(implicit trace: Trace): ZIO[Any, Throwable, A] = - ZIO.blocking { - ZIO.acquireReleaseWith( - acquire = ZIO.attempt(transactor.dataSource.getConnection()) - )(release = - conn => if (conn ne null) ZIO.attempt(conn.close()).orDie else ZIO.unit - ) { cn => - ZIO.attempt { - transactor.connectionConfig(cn) - q(using DbCon(cn, transactor.sqlLogger)) - } - } - } - -/** Executes a given transaction on a given DataSource - * - * Re-implementation for ZIO of - * [[com.augustnagro.magnum.transact(transactor: Transactor)]] - * - * Usage: - * {{{ - * import com.augustnagro.magnum.magzio.* - * - * transact(dataSource) { tx ?=> repo.insertReturning(creator) } - * }}} - */ -def transact[A](dataSource: DataSource)(q: DbTx ?=> A)(implicit - trace: Trace -): ZIO[Any, Throwable, A] = - transact(Transactor(dataSource))(q) - -/** Executes a given transaction on a given DataSource - * - * Re-implementation for ZIO of - * [[com.augustnagro.magnum.transact(transactor: Transactor, connectionConfig: Connection => Unit)]] - * - * Usage: - * {{{ - * import com.augustnagro.magnum.magzio.* - * - * transact(dataSource, ...) { tx ?=> repo.insertReturning(creator) } - * }}} - */ -def transact[A](dataSource: DataSource, connectionConfig: Connection => Unit)( - q: DbTx ?=> A -)(implicit trace: Trace): ZIO[Any, Throwable, A] = - val transactor = - Transactor(dataSource = dataSource, connectionConfig = connectionConfig) - transact(transactor)(q) - -/** Executes a given transaction on a given Transactor - * - * Re-implementation for ZIO of - * [[com.augustnagro.magnum.transact(transactor: Transactor)]] - * - * Usage: - * {{{ - * import com.augustnagro.magnum.magzio.* - * - * transact(transactor) { tx ?=> repo.insertReturning(creator) } - * }}} - */ -def transact[A]( - transactor: Transactor -)(q: DbTx ?=> A)(implicit trace: Trace): ZIO[Any, Throwable, A] = - ZIO.blocking { - ZIO.acquireReleaseWith( - acquire = ZIO.attempt(transactor.dataSource.getConnection()) - )(release = - conn => if (conn ne null) ZIO.attempt(conn.close()).orDie else ZIO.unit - ) { cn => - ZIO.uninterruptible { - ZIO.attempt { - transactor.connectionConfig(cn) - cn.setAutoCommit(false) - try { - val res = q(using DbTx(cn, transactor.sqlLogger)) - cn.commit() - res - } catch { - case NonFatal(t) => - cn.rollback() - throw t - } - } - } - } - } diff --git a/magnum-zio/src/test/scala/com/augustnagro/magnum/magzio/ImmutableRepoZioTests.scala b/magnum-zio/src/test/scala/com/augustnagro/magnum/magzio/ImmutableRepoZioTests.scala index 12aa7e9..6b6e699 100644 --- a/magnum-zio/src/test/scala/com/augustnagro/magnum/magzio/ImmutableRepoZioTests.scala +++ b/magnum-zio/src/test/scala/com/augustnagro/magnum/magzio/ImmutableRepoZioTests.scala @@ -1,6 +1,5 @@ package com.augustnagro.magnum.magzio -import com.augustnagro.magnum.* import munit.FunSuite import shared.Color import zio.* @@ -12,7 +11,7 @@ import scala.util.{Success, Using} def immutableRepoZioTests( suite: FunSuite, dbType: DbType, - xa: () => Transactor + xa: UIO[Transactor] )(using munit.Location, DbCodec[OffsetDateTime] @@ -66,33 +65,46 @@ def immutableRepoZioTests( ) ) +// test("error handling"): +// enum Errors: +// case RateLimit +// case ExistingColor +// +// runIO: +// magzio.transact(xa()): +// val now = OffsetDateTime.parse("2024-11-24T22:17:33.000000000Z") +// val latestCar = sql"SELECT ${car.all} FROM $car ORDER BY created DESC" +// .query[Car] +// .run() +// .headOption +// if latestCar.exists(_.created.isAfter(now.minusSeconds(4))) then + test("count"): - val count = - runIO: - magzio.connect(xa()): + val count = runIO( + xa.flatMap: + _.connectUIO: carRepo.count - assert(count == 3L) + ) + assert(count == 3) + /* test("existsById"): val (exists3, exists4) = - runIO: - magzio.connect(xa()): - carRepo.existsById(3L) -> carRepo.existsById(4L) + xa().connectUIO: + carRepo.existsById(3L) -> carRepo.existsById(4L) assert(exists3) assert(!exists4) test("findAll"): val cars = - runIO: - magzio.connect(xa()): - carRepo.findAll + xa().connectUIO: + carRepo.findAll assert(cars == allCars) test("findById"): val (exists3, exists4) = - runIO: - magzio.connect(xa()): - carRepo.findById(3L) -> carRepo.findById(4L) + xa().connectUIO: + carRepo.findById(3L) -> carRepo.findById(4L) assert(exists3.get == allCars.last) assert(exists4 == None) @@ -102,16 +114,14 @@ def immutableRepoZioTests( assume(dbType != OracleDbType) assume(dbType != SqliteDbType) val ids = - runIO: - magzio.connect(xa()): - carRepo.findAllById(Vector(1L, 3L)).map(_.id) + xa().connectUIO: + carRepo.findAllById(Vector(1L, 3L)).map(_.id) assert(ids == Vector(1L, 3L)) test("serializable transaction"): val count = - runIO: - magzio.transact(xa().copy(connectionConfig = withSerializable)): - carRepo.count + magzio.transact(xa().copy(connectionConfig = withSerializable)): + carRepo.count assert(count == 3L) def withSerializable(con: Connection): Unit = @@ -124,7 +134,7 @@ def immutableRepoZioTests( .query[Car] val result = runIO: - magzio.connect(xa()): + xa().connectUIO: query.run() assertNoDiff( query.frag.sqlString, @@ -141,7 +151,7 @@ def immutableRepoZioTests( .query[Car] val result = runIO: - magzio.connect(xa()): + xa().connectUIO: query.run() assertNoDiff( query.frag.sqlString, @@ -153,44 +163,41 @@ def immutableRepoZioTests( test("select via option"): val vin = Some(124) val cars = - runIO: - magzio.connect(xa()): - sql"select * from car where vin = $vin" - .query[Car] - .run() + xa().connectUIO: + sql"select * from car where vin = $vin" + .query[Car] + .run() assert(cars == allCars.filter(_.vinNumber == vin)) test("tuple select"): val tuples = - runIO: - magzio.connect(xa()): - sql"select model, color from car where id = 2" - .query[(String, Color)] - .run() + xa().connectUIO: + sql"select model, color from car where id = 2" + .query[(String, Color)] + .run() assert(tuples == Vector(allCars(1).model -> allCars(1).color)) test("reads null int as None and not Some(0)"): val maybeCar = - runIO: - magzio.connect(xa()): - carRepo.findById(3L) + xa().connectUIO: + carRepo.findById(3L) assert(maybeCar.get.vinNumber == None) test("created timestamps should match"): val allCars = - runIO: - magzio.connect(xa()): - carRepo.findAll + xa().connectUIO: + carRepo.findAll assert(allCars.map(_.created) == allCars.map(_.created)) test(".query iterator"): val carsCount = - runIO: - magzio.connect(xa()): - Using.Manager(implicit use => - val it = sql"SELECT * FROM car".query[Car].iterator() - it.map(_.id).size - ) + xa().connectUIO: + Using.Manager(implicit use => + val it = sql"SELECT * FROM car".query[Car].iterator() + it.map(_.id).size + ) assert(carsCount == Success(3)) + */ + end immutableRepoZioTests diff --git a/magnum-zio/src/test/scala/com/augustnagro/magnum/magzio/PgZioTests.scala b/magnum-zio/src/test/scala/com/augustnagro/magnum/magzio/PgZioTests.scala index 61406f3..b116775 100644 --- a/magnum-zio/src/test/scala/com/augustnagro/magnum/magzio/PgZioTests.scala +++ b/magnum-zio/src/test/scala/com/augustnagro/magnum/magzio/PgZioTests.scala @@ -1,11 +1,12 @@ package com.augustnagro.magnum.magzio -import com.augustnagro.magnum.* +import com.augustnagro.magnum.PostgresDbType import com.dimafeng.testcontainers.PostgreSQLContainer import com.dimafeng.testcontainers.munit.fixtures.TestContainersFixtures import munit.{AnyFixture, FunSuite, Location} import org.postgresql.ds.PGSimpleDataSource import org.testcontainers.utility.DockerImageName +import zio.* import java.nio.file.{Files, Path} import scala.util.Using @@ -13,7 +14,7 @@ import scala.util.Using.Manager class PgZioTests extends FunSuite, TestContainersFixtures: - immutableRepoZioTests(this, PostgresDbType, xa) + immutableRepoZioTests(this, PostgresDbType, xa()) val pgContainer = ForAllContainerFixture( PostgreSQLContainer @@ -24,7 +25,7 @@ class PgZioTests extends FunSuite, TestContainersFixtures: override def munitFixtures: Seq[AnyFixture[_]] = super.munitFixtures :+ pgContainer - def xa(): Transactor = + def xa(): UIO[Transactor] = val ds = PGSimpleDataSource() val pg = pgContainer() ds.setUrl(pg.jdbcUrl)