We read every piece of feedback, and take your input very seriously.
To see all available qualifiers, see our documentation.
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
abstract class BaseF[I, R, F[_]: Async](functionNameOverride: Option[String])(implicit val schema: DBSchema) { def toFragmentsSeq: I => Seq[Fragment] val functionName: String = { val fn = functionNameOverride.getOrElse(schema.objectNameFromClassName(getClass)) if (schema.schemaName.isEmpty) { fn } else { s"${schema.schemaName}.$fn" } } protected val alias = "FNC" def fieldsToSelect: Seq[String] = Seq.empty protected def selectEntry: String = { val fieldsSeq = fieldsToSelect if (fieldsSeq.isEmpty) { "*" } else { val aliasToUse = if (alias.isEmpty) { "" } else { s"$alias." } fieldsToSelect.map(aliasToUse + _).mkString(",") } } protected final def composeFragments(fragments: Seq[Fragment]): Fragment = { val args = fragments.toList match { case head :: tail => tail.foldLeft(head)((acc, frag) => acc ++ fr"," ++ frag) case Nil => fr"" } sql"SELECT ${Fragment.const(selectEntry)} FROM ${Fragment.const(functionName)}($args) ${Fragment.const(alias)};" } protected def executeQuery[T](input: I, query: Fragment => F[T]): F[T] = { MonadError[F, Throwable] .catchNonFatal(composeFragments(toFragmentsSeq(input))) .flatMap(query) } } trait StatusSupport { def checkStatus[A](statusWithData: FunctionStatusWithData[A]): Either[StatusException, FunctionStatusWithData[A]] } trait StandardStatusSupport extends StatusSupport { override def checkStatus[A](statusWithData: FunctionStatusWithData[A]): Either[StatusException, FunctionStatusWithData[A]] = { val functionStatus = statusWithData.functionStatus functionStatus.statusCode / 10 match { case 1 => Right(statusWithData) case 2 => Left(ServerMisconfigurationException(functionStatus)) case 3 => Left(DataConflictException(functionStatus)) case 4 => Left(DataNotFoundException(functionStatus)) case 5 | 6 | 7 | 8 => Left(ErrorInDataException(functionStatus)) case 9 => Left(OtherStatusException(functionStatus)) case _ => Left(StatusOutOfRangeException(functionStatus)) } } } class SingleF[I, R, F[_]: Async]( val toFragmentsSeq: I => Seq[Fragment], val functionNameOverride: Option[String] = None )(implicit dbSchema: DBSchema, read: Read[R]) extends BaseF[I, R, F](functionNameOverride) { def apply(input: I)(implicit transactor: Transactor[F]): F[R] = { executeQuery(input, fragment => fragment.query[R](read).unique.transact(transactor)) } } class SingleFConnectionIO[I, R, F[_]: Async]( val toFragmentsSeq: I => Seq[Fragment], val functionNameOverride: Option[String] = None )(implicit dbSchema: DBSchema, read: Read[R]) extends BaseF[I, R, F](functionNameOverride) { def apply(input: I): F[ConnectionIO[R]] = { executeQuery(input, fragment => Async[F].delay(fragment.query[R](read).unique)) } } abstract class SingleFWithStatusSupport[I, R, F[_]: Async]( val toFragmentsSeq: I => Seq[Fragment], val functionNameOverride: Option[String] = None )(implicit dbSchema: DBSchema, read: Read[StatusWithData[R]]) extends BaseF[I, R, F](functionNameOverride) with StatusSupport { def apply(input: I)(implicit transactor: Transactor[F]): F[Either[StatusException, FunctionStatusWithData[R]]] = { executeQuery(input, fragment => fragment.query[StatusWithData[R]](read).unique.transact(transactor)) .map(statusWithData => FunctionStatusWithData(FunctionStatus(statusWithData.status, statusWithData.statusText), statusWithData.data) ) .map(functionStatusWithData => checkStatus(functionStatusWithData)) } } class MultiF[I, R, F[_]: Async]( val toFragmentsSeq: I => Seq[Fragment], val functionNameOverride: Option[String] = None )(implicit dbSchema: DBSchema, read: Read[R]) extends BaseF[I, R, F](functionNameOverride) { def apply(input: I)(implicit transactor: Transactor[F]): F[Seq[R]] = { executeQuery(input, fragment => fragment.query[R](read).to[Seq].transact(transactor)) } } class MultiFConnectionIO[I, R, F[_]: Async]( val toFragmentsSeq: I => Seq[Fragment], val functionNameOverride: Option[String] = None )(implicit dbSchema: DBSchema, read: Read[R]) extends BaseF[I, R, F](functionNameOverride) { def apply(input: I): F[ConnectionIO[Seq[R]]] = { executeQuery(input, fragment => Async[F].delay(fragment.query[R](read).to[Seq])) } } class MultiFStreaming[I, R, F[_]: Async]( val toFragmentsSeq: I => Seq[Fragment], val functionNameOverride: Option[String] = None )(implicit dbSchema: DBSchema, read: Read[R]) extends BaseF[I, R, F](functionNameOverride) { def apply(input: I, chunkSize: Int = 512)(implicit transactor: Transactor[F]): fs2.Stream[F, R] = { fs2.Stream .eval(MonadError[F, Throwable].catchNonFatal(composeFragments(toFragmentsSeq(input)))) .flatMap(fragment => fragment.query[R](read).streamWithChunkSize(chunkSize).transact(transactor)) } } class OptionF[I, R, F[_]: Async]( val toFragmentsSeq: I => Seq[Fragment], val functionNameOverride: Option[String] = None )(implicit dbSchema: DBSchema, read: Read[R]) extends BaseF[I, R, F](functionNameOverride) { def apply(input: I)(implicit transactor: Transactor[F]): F[Option[R]] = { executeQuery(input, fragment => fragment.query[R](read).to[Seq].map(_.headOption).transact(transactor)) } } class OptionFConnectionIO[I, R, F[_]: Async]( val toFragmentsSeq: I => Seq[Fragment], val functionNameOverride: Option[String] = None )(implicit dbSchema: DBSchema, read: Read[R]) extends BaseF[I, R, F](functionNameOverride) { def apply(input: I): F[ConnectionIO[Option[R]]] = { executeQuery(input, fragment => Async[F].delay(fragment.query[R](read).to[Seq].map(_.headOption))) } } // Usage import za.co.absa.fadb.naming.implementations.SnakeCaseNaming.Implicits._ implicit object Runs extends DBSchema case class Actor(name: String) class GetActor extends SingleF[Int, Actor, IO]((i: Int) => Seq(fr"$i")) class GetActorWithStatus extends SingleFWithStatusSupport[Int, Actor, IO]((i: Int) => Seq(fr"$i")) with StandardStatusSupport class GetActorConnectionIO extends SingleFConnectionIO[Int, Actor, IO]((i: Int) => Seq(fr"$i")) class Repository(getActorF: GetActor, getActorFWithStatus: GetActorWithStatus, getActorC: GetActorConnectionIO) (implicit transactor: Transactor[IO]) { def getActor(input: Int): IO[Actor] = getActorF(input) def getActorWithStatus(input: Int): IO[Either[StatusException, FunctionStatusWithData[Actor]]] = getActorFWithStatus(input) def groupedExecution(input: Int): IO[Actor] = { val connectionIO = for { actorConnectionIO <- getActorC(input) // here could be another call } yield actorConnectionIO connectionIO.flatMap(connectionIO => connectionIO.transact(transactor)) } } // If you need to return Future you can always call unsafeToFuture class Repository(getActorF: GetActor)(implicit transactor: Transactor[IO]) { def getActor(input: Int): Future[Actor] = getActorF(input).unsafeToFuture }
The text was updated successfully, but these errors were encountered:
No branches or pull requests
Background
The text was updated successfully, but these errors were encountered: