-
Notifications
You must be signed in to change notification settings - Fork 12
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add ZIO module providing
connect
and transact
mechanism adapted t…
…o ZIO
- Loading branch information
Showing
2 changed files
with
112 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
100 changes: 100 additions & 0 deletions
100
magnum-zio/src/main/scala/com/augustnagro/magnum/zio/zio.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,100 @@ | ||
package com.augustnagro.magnum.zio | ||
|
||
import _root_.zio.{Trace, ZIO} | ||
import com.augustnagro.magnum.* | ||
|
||
import javax.sql.DataSource | ||
import scala.util.control.NonFatal | ||
|
||
/** Executes a given query on a given DataSource | ||
* | ||
* Re-implementation of | ||
* [[com.augustnagro.magnum.connect(dataSource: DataSource)]] for ZIO | ||
* | ||
* Usage: | ||
* {{{ | ||
* connect(datasource) { cn ?=> repo.findById(id) } | ||
* }}} | ||
*/ | ||
def connect[A](dataSource: DataSource)(q: DbCon ?=> A)(implicit | ||
trace: Trace | ||
): ZIO[Any, Throwable, A] = | ||
connect(Transactor(dataSource))(q) | ||
|
||
/** Executes a given query on a given Transactor | ||
* | ||
* Re-implementation of | ||
* [[com.augustnagro.magnum.connect(dataSource: DataSource)]] for ZIO | ||
* | ||
* Usage: | ||
* {{{ | ||
* connect(transactor) { cn ?=> repo.findById(id) } | ||
* }}} | ||
*/ | ||
def connect[A]( | ||
transactor: Transactor | ||
)(q: DbCon ?=> A)(implicit trace: Trace): ZIO[Any, Throwable, A] = | ||
ZIO.blocking { | ||
ZIO.acquireReleaseWith( | ||
acquire = ZIO.attempt(transactor.dataSource.getConnection()) | ||
)(release = | ||
conn => ZIO.attempt(if (conn ne null) conn.close() else ()).orDie | ||
) { cn => | ||
ZIO.attempt { | ||
transactor.connectionConfig(cn) | ||
q(using DbCon(cn, transactor.sqlLogger)) | ||
} | ||
} | ||
} | ||
|
||
/** Executes a given transaction on a given DataSource | ||
* | ||
* Re-implementation of | ||
* [[com.augustnagro.magnum.transact(transactor: Transactor)]] for ZIO | ||
* | ||
* Usage: | ||
* {{{ | ||
* transact(dataSource) { tx ?=> repo.insertReturning(creator) } | ||
* }}} | ||
*/ | ||
def transact[A](dataSource: DataSource)(q: DbTx ?=> A)(implicit | ||
trace: Trace | ||
): ZIO[Any, Throwable, A] = | ||
transact(Transactor(dataSource))(q) | ||
|
||
/** Executes a given transaction on a given Transactor | ||
* | ||
* Re-implementation of | ||
* [[com.augustnagro.magnum.transact(transactor: Transactor)]] for ZIO | ||
* | ||
* Usage: | ||
* {{{ | ||
* transact(transactor) { tx ?=> repo.insertReturning(creator) } | ||
* }}} | ||
*/ | ||
def transact[A]( | ||
transactor: Transactor | ||
)(q: DbTx ?=> A)(implicit trace: Trace): ZIO[Any, Throwable, A] = | ||
ZIO.blocking { | ||
ZIO.acquireReleaseWith( | ||
acquire = ZIO.attempt(transactor.dataSource.getConnection()) | ||
)(release = | ||
conn => ZIO.attempt(if (conn ne null) conn.close() else ()).orDie | ||
) { cn => | ||
ZIO.uninterruptible { | ||
ZIO.attempt { | ||
transactor.connectionConfig(cn) | ||
cn.setAutoCommit(false) | ||
try { | ||
val res = q(using DbTx(cn, transactor.sqlLogger)) | ||
cn.commit() | ||
res | ||
} catch { | ||
case NonFatal(t) => | ||
cn.rollback() | ||
throw t | ||
} | ||
} | ||
} | ||
} | ||
} |