Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Proposal for refactoring (WIP) #117

Open
salamonpavel opened this issue Mar 6, 2024 · 0 comments
Open

Proposal for refactoring (WIP) #117

salamonpavel opened this issue Mar 6, 2024 · 0 comments
Labels
enhancement New feature or request work in progress Work on this item is not yet finished (mainly intended for PRs)

Comments

@salamonpavel
Copy link
Contributor

salamonpavel commented Mar 6, 2024

Background

abstract class BaseF[I, R, F[_]: Async](functionNameOverride: Option[String])(implicit val schema: DBSchema) {
  def toFragmentsSeq: I => Seq[Fragment]

  val functionName: String = {
    val fn = functionNameOverride.getOrElse(schema.objectNameFromClassName(getClass))
    if (schema.schemaName.isEmpty) {
      fn
    } else {
      s"${schema.schemaName}.$fn"
    }
  }

  protected val alias = "FNC"
  def fieldsToSelect: Seq[String] = Seq.empty

  protected def selectEntry: String = {
    val fieldsSeq = fieldsToSelect
    if (fieldsSeq.isEmpty) {
      "*"
    } else {
      val aliasToUse = if (alias.isEmpty) {
        ""
      } else {
        s"$alias."
      }
      fieldsToSelect.map(aliasToUse + _).mkString(",")
    }
  }

  protected final def composeFragments(fragments: Seq[Fragment]): Fragment = {
    val args = fragments.toList match {
      case head :: tail => tail.foldLeft(head)((acc, frag) => acc ++ fr"," ++ frag)
      case Nil          => fr""
    }
    sql"SELECT ${Fragment.const(selectEntry)} FROM ${Fragment.const(functionName)}($args) ${Fragment.const(alias)};"
  }

  protected def executeQuery[T](input: I, query: Fragment => F[T]): F[T] = {
    MonadError[F, Throwable]
      .catchNonFatal(composeFragments(toFragmentsSeq(input)))
      .flatMap(query)
  }
}

trait StatusSupport {
  def checkStatus[A](statusWithData: FunctionStatusWithData[A]): Either[StatusException, FunctionStatusWithData[A]]
}

trait StandardStatusSupport extends StatusSupport {
  override def checkStatus[A](statusWithData: FunctionStatusWithData[A]): Either[StatusException, FunctionStatusWithData[A]] = {
    val functionStatus = statusWithData.functionStatus
    functionStatus.statusCode / 10 match {
      case 1             => Right(statusWithData)
      case 2             => Left(ServerMisconfigurationException(functionStatus))
      case 3             => Left(DataConflictException(functionStatus))
      case 4             => Left(DataNotFoundException(functionStatus))
      case 5 | 6 | 7 | 8 => Left(ErrorInDataException(functionStatus))
      case 9             => Left(OtherStatusException(functionStatus))
      case _             => Left(StatusOutOfRangeException(functionStatus))
    }
  }
}

class SingleF[I, R, F[_]: Async](
  val toFragmentsSeq: I => Seq[Fragment],
  val functionNameOverride: Option[String] = None
)(implicit dbSchema: DBSchema, read: Read[R])
    extends BaseF[I, R, F](functionNameOverride) {
  def apply(input: I)(implicit transactor: Transactor[F]): F[R] = {
    executeQuery(input, fragment => fragment.query[R](read).unique.transact(transactor))
  }
}

class SingleFConnectionIO[I, R, F[_]: Async](
  val toFragmentsSeq: I => Seq[Fragment],
  val functionNameOverride: Option[String] = None
)(implicit dbSchema: DBSchema, read: Read[R])
    extends BaseF[I, R, F](functionNameOverride) {
  def apply(input: I): F[ConnectionIO[R]] = {
    executeQuery(input, fragment => Async[F].delay(fragment.query[R](read).unique))
  }
}

abstract class SingleFWithStatusSupport[I, R, F[_]: Async](
  val toFragmentsSeq: I => Seq[Fragment],
  val functionNameOverride: Option[String] = None
)(implicit dbSchema: DBSchema, read: Read[StatusWithData[R]])
    extends BaseF[I, R, F](functionNameOverride)
    with StatusSupport {
  def apply(input: I)(implicit transactor: Transactor[F]): F[Either[StatusException, FunctionStatusWithData[R]]] = {
    executeQuery(input, fragment => fragment.query[StatusWithData[R]](read).unique.transact(transactor))
      .map(statusWithData =>
        FunctionStatusWithData(FunctionStatus(statusWithData.status, statusWithData.statusText), statusWithData.data)
      )
      .map(functionStatusWithData => checkStatus(functionStatusWithData))
  }
}

class MultiF[I, R, F[_]: Async](
  val toFragmentsSeq: I => Seq[Fragment],
  val functionNameOverride: Option[String] = None
)(implicit dbSchema: DBSchema, read: Read[R])
    extends BaseF[I, R, F](functionNameOverride) {
  def apply(input: I)(implicit transactor: Transactor[F]): F[Seq[R]] = {
    executeQuery(input, fragment => fragment.query[R](read).to[Seq].transact(transactor))
  }
}

class MultiFConnectionIO[I, R, F[_]: Async](
  val toFragmentsSeq: I => Seq[Fragment],
  val functionNameOverride: Option[String] = None
)(implicit dbSchema: DBSchema, read: Read[R])
    extends BaseF[I, R, F](functionNameOverride) {
  def apply(input: I): F[ConnectionIO[Seq[R]]] = {
    executeQuery(input, fragment => Async[F].delay(fragment.query[R](read).to[Seq]))
  }
}

class MultiFStreaming[I, R, F[_]: Async](
  val toFragmentsSeq: I => Seq[Fragment],
  val functionNameOverride: Option[String] = None
)(implicit dbSchema: DBSchema, read: Read[R])
    extends BaseF[I, R, F](functionNameOverride) {
  def apply(input: I, chunkSize: Int = 512)(implicit transactor: Transactor[F]): fs2.Stream[F, R] = {
    fs2.Stream
      .eval(MonadError[F, Throwable].catchNonFatal(composeFragments(toFragmentsSeq(input))))
      .flatMap(fragment => fragment.query[R](read).streamWithChunkSize(chunkSize).transact(transactor))
  }
}

class OptionF[I, R, F[_]: Async](
  val toFragmentsSeq: I => Seq[Fragment],
  val functionNameOverride: Option[String] = None
)(implicit dbSchema: DBSchema, read: Read[R])
    extends BaseF[I, R, F](functionNameOverride) {
  def apply(input: I)(implicit transactor: Transactor[F]): F[Option[R]] = {
    executeQuery(input, fragment => fragment.query[R](read).to[Seq].map(_.headOption).transact(transactor))
  }
}

class OptionFConnectionIO[I, R, F[_]: Async](
  val toFragmentsSeq: I => Seq[Fragment],
  val functionNameOverride: Option[String] = None
)(implicit dbSchema: DBSchema, read: Read[R])
    extends BaseF[I, R, F](functionNameOverride) {
  def apply(input: I): F[ConnectionIO[Option[R]]] = {
    executeQuery(input, fragment => Async[F].delay(fragment.query[R](read).to[Seq].map(_.headOption)))
  }
}

// Usage
import za.co.absa.fadb.naming.implementations.SnakeCaseNaming.Implicits._
implicit object Runs extends DBSchema

case class Actor(name: String)
class GetActor extends SingleF[Int, Actor, IO]((i: Int) => Seq(fr"$i"))
class GetActorWithStatus extends SingleFWithStatusSupport[Int, Actor, IO]((i: Int) => Seq(fr"$i")) with StandardStatusSupport
class GetActorConnectionIO extends SingleFConnectionIO[Int, Actor, IO]((i: Int) => Seq(fr"$i"))

class Repository(getActorF: GetActor, getActorFWithStatus: GetActorWithStatus, getActorC: GetActorConnectionIO)
  (implicit transactor: Transactor[IO]) {
    def getActor(input: Int): IO[Actor] = getActorF(input)
    def getActorWithStatus(input: Int): IO[Either[StatusException, FunctionStatusWithData[Actor]]] = getActorFWithStatus(input)
    def groupedExecution(input: Int): IO[Actor] = {
      val connectionIO = for {
        actorConnectionIO <- getActorC(input)
        // here could be another call
      } yield actorConnectionIO

      connectionIO.flatMap(connectionIO => connectionIO.transact(transactor))
    }
  }

// If you need to return Future you can always call unsafeToFuture
class Repository(getActorF: GetActor)(implicit transactor: Transactor[IO]) {
  def getActor(input: Int): Future[Actor] = getActorF(input).unsafeToFuture
}
@salamonpavel salamonpavel added enhancement New feature or request work in progress Work on this item is not yet finished (mainly intended for PRs) labels Mar 6, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request work in progress Work on this item is not yet finished (mainly intended for PRs)
Projects
Status: 🆕 To groom
Development

No branches or pull requests

1 participant