Skip to content

Commit

Permalink
* the first working commit
Browse files Browse the repository at this point in the history
  • Loading branch information
benedeki committed Nov 1, 2024
1 parent 38fde1c commit 1ac2233
Show file tree
Hide file tree
Showing 18 changed files with 459 additions and 109 deletions.
49 changes: 49 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
- [Measurement](#measurement)
- [Checkpoint](#checkpoint)
- [Data Flow](#data-flow)
- [Usage](#usage)
- [Reader](#reader-usage)
- [How to generate Code coverage report](#how-to-generate-code-coverage-report)
- [How to Run in IntelliJ](#how-to-run-in-intellij)
- [How to Run Tests](#how-to-run-tests)
Expand Down Expand Up @@ -156,6 +158,52 @@ We can even say, that `Checkpoint` is a result of particular `Measurements` (ver
The journey of a dataset throughout various data transformations and pipelines. It captures the whole journey,
even if it involves multiple applications or ETL pipelines.

## Usage

### Reader usage
Reader module support several asynchronous http clients. The dependencies used for these clients are set as _optional_,
so the user of the module can decide which client to use and include only the necessary dependencies.

The clients are:
#### Future based `HttpClientServerConnection`
Uses `java.net.http.HttpClient` to send requests to the server, therefore requires no additional dependencies. But works
only with Java 11 or higher.

#### Future based `ArmeririaServerConnection`
Add
```scala
"com.softwaremill.sttp.client3" %% "armeria-backend" % "[version]"
```
to your dependencies.

#### Cats IO based `ArmeririaServerConnection`
Add
```scala
"org.typelevel." %% "cats-effect" % "[version]"
"com.softwaremill.sttp.client3" %% "armeria-backend-cats" % "[version]" // for cats-effect 3.x
// or
"com.softwaremill.sttp.client3" %% "armeria-backend-cats-ce2" % "[version]" // for cats-effect 2.x
```
"
to your dependencies.

#### ZIO based `HttpClientServerConnection`
Add
```scala
"com.softwaremill.sttp.client3" %% "zio" % "[version]" // for ZIO 2.x
"com.softwaremill.sttp.client3" %% "zio1" % "[version]" // for ZIO 1.x
```
to your dependencies.

#### ZIO based `ArmeririaServerConnection`
Add
```scala
"com.softwaremill.sttp.client3" %% "armeria-backend-zio" % "[version]" // for ZIO 2.x
"com.softwaremill.sttp.client3" %% "armeria-backend-zio1" % "[version]" // for ZIO 1.x
```
to your dependencies.



## How to generate Code coverage report
```sbt
Expand All @@ -172,6 +220,7 @@ Code coverage wil be generated on path:
To make this project runnable via IntelliJ, do the following:
- Make sure that your configuration in `server/src/main/resources/reference.conf`
is configured according to your needs
- When building within the UI be sure to have the option `-language:higherKinds` on in the compiler options

## How to Run Tests

Expand Down
61 changes: 52 additions & 9 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ object Dependencies {

val sparkCommons = "0.6.1"

val sttpClient = "3.5.2"
val sttpCirceJson = "3.9.7"
val sttpClient = "3.10.1"
val sttpCirceJson = "3.10.1"

val postgresql = "42.6.0"

Expand All @@ -64,6 +64,8 @@ object Dependencies {

val absaCommons = "2.0.0"

val catsEffect = "3.3.14"

def truncateVersion(version: String, parts: Int): String = {
version.split("\\.").take(parts).mkString(".")
}
Expand Down Expand Up @@ -236,17 +238,58 @@ object Dependencies {
}

def readerDependencies(scalaVersion: Version): Seq[ModuleID] = {
val zioOrg = "dev.zio"
val sbtOrg = "com.github.sbt"
val sttpClient3Org = "com.softwaremill.sttp.client3"
val typeLevelOrg = "org.typelevel"

// STTP core and Circe integration
lazy val sttpCore = sttpClient3Org %% "core" % Versions.sttpClient
lazy val sttpCirce = sttpClient3Org %% "circe" % Versions.sttpClient

// Armeria Future backend
lazy val sttpArmeririaFutureBackend = sttpClient3Org %% "armeria-backend" % Versions.sttpClient % Optional
// Armeria Cats backend
lazy val sttpArmeririaCatsBackend = sttpClient3Org %% "armeria-backend-cats" % Versions.sttpClient % Optional
lazy val catsEffect = typeLevelOrg %% "cats-effect" % Versions.catsEffect % Optional
// Armeria Zio backend
lazy val sttpArmeririaZioBackend = sttpClient3Org %% "armeria-backend-zio" % Versions.sttpClient % Optional
// HttpClient Zio backend
lazy val sttpHttpClientZioBackend = sttpClient3Org %% "zio" % Versions.sttpClient % Optional

// testing
lazy val zioTest = zioOrg %% "zio-test" % Versions.zio % Test
lazy val zioTestSbt = zioOrg %% "zio-test-sbt" % Versions.zio % Test
lazy val zioTestJunit = zioOrg %% "zio-test-junit" % Versions.zio % Test
lazy val sbtJunitInterface = sbtOrg % "junit-interface" % Versions.sbtJunitInterface % Test

Seq(
"com.softwaremill.sttp.client3" %% "core" % "3.9.7",
"com.softwaremill.sttp.client3" %% "async-http-client-backend-future" % "3.9.6",
"com.softwaremill.sttp.client3" %% "armeria-backend-cats" % "3.9.8",
"com.softwaremill.sttp.client3" %% "zio" % "3.9.8",
"com.softwaremill.sttp.client3" %% "armeria-backend-zio" % "3.9.8",
"org.typelevel" %% "cats-effect" % "3.3.14",
"dev.zio" %% "zio" % "2.1.4"
sttpCore,
sttpCirce,
sttpArmeririaFutureBackend,
sttpArmeririaCatsBackend,
catsEffect,
sttpArmeririaZioBackend,
sttpHttpClientZioBackend,
zioTest,
zioTestSbt,
zioTestJunit,
sbtJunitInterface
) ++
testDependencies ++
jsonSerdeDependencies
// "com.softwaremill.sttp.client3" %% "core" % "3.9.7",
// "com.softwaremill.sttp.client3" %% "async-http-client-backend-future" % "3.9.6",
// "com.softwaremill.sttp.client3" %% "armeria-backend-cats" % "3.9.8",
// "com.softwaremill.sttp.client3" %% "armeria-backend" % "3.9.8",
// "com.softwaremill.sttp.client3" %% "zio" % "3.9.8",
// "com.softwaremill.sttp.client3" %% "armeria-backend-zio" % "3.9.8",
// "org.typelevel" %% "cats-effect" % "3.3.14",
// "dev.zio" %% "zio" % "2.1.4",
// "dev.zio" %% "zio-interop-cats" % "23.1.0.1",
// "dev.zio" %% "zio-macros" % "2.1.4",
// "com.softwaremill.sttp.client3" %% "circe" % Versions.sttpCirceJson

}

def databaseDependencies: Seq[ModuleID] = {
Expand Down
30 changes: 25 additions & 5 deletions project/Setup.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,17 +40,37 @@ object Setup {
val serverAndDbScalaVersion: Version = scala213 //covers REST server and database modules
val clientSupportedScalaVersions: Seq[Version] = Seq(scala212, scala213)

val commonScalacOptions: Seq[String] = Seq("-unchecked", "-deprecation", "-feature", "-Xfatal-warnings")
val commonScalacOptions: Seq[String] = Seq(
"-unchecked",
"-deprecation",
"-feature",
"-Xfatal-warnings"
)

val serverAndDbJavacOptions: Seq[String] = Seq("-source", "11", "-target", "11", "-Xlint")
val serverAndDbScalacOptions: Seq[String] = Seq("-Ymacro-annotations")
val serverAndDbJavacOptions: Seq[String] = Seq(
"-source", "11",
"-target", "11",
"-Xlint"
)
val serverAndDbScalacOptions: Seq[String] = Seq(
"-language:higherKinds",
"-Ymacro-annotations"
)

val clientJavacOptions: Seq[String] = Seq("-source", "1.8", "-target", "1.8", "-Xlint")
def clientScalacOptions(scalaVersion: Version): Seq[String] = {
if (scalaVersion >= scala213) {
Seq("-release", "8", "-Ymacro-annotations")
Seq(
"-release", "8",
"-language:higherKinds",
"-Ymacro-annotations"
)
} else {
Seq("-release", "8", "-target:8")
Seq(
"-release", "8",
"-language:higherKinds",
"-target:8"
)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package za.co.absa.atum.reader
import za.co.absa.atum.reader.basic.Reader
import za.co.absa.atum.reader.server.GenericServerConnection

class FlowReader[F[_]]()(override implicit val serverConnection: GenericServerConnection[F[_]]) extends Reader[F]{
class FlowReader[F[_]]()(override implicit val serverConnection: GenericServerConnection[F]) extends Reader[F]{
def foo(): String = {
// just to have some testable content
"bar"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package za.co.absa.atum.reader
import za.co.absa.atum.reader.basic.Reader
import za.co.absa.atum.reader.server.GenericServerConnection

class PartitioningReader[F[_]]()(override implicit val serverConnection: GenericServerConnection[F[_]]) extends Reader[F] {
class PartitioningReader[F[_]]()(override implicit val serverConnection: GenericServerConnection[F]) extends Reader[F] {
def foo(): String = {
// just to have some testable content
"bar"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,4 @@ package za.co.absa.atum.reader.basic

import za.co.absa.atum.reader.server.GenericServerConnection

abstract class Reader[F[_]](implicit val serverConnection: GenericServerConnection[F[_]])
abstract class Reader[F[_]](implicit val serverConnection: GenericServerConnection[F])
Original file line number Diff line number Diff line change
Expand Up @@ -16,45 +16,37 @@

package za.co.absa.atum.reader.server

import _root_.io.circe.parser.decode
import _root_.io.circe.Decoder
import cats.Monad
import cats.implicits.toFunctorOps
import _root_.io.circe.{Error => circeError}
import com.typesafe.config.Config
import sttp.client3.{Identity, RequestT, Response, UriContext, basicRequest}
import za.co.absa.atum.reader.exceptions.RequestException
import za.co.absa.atum.reader.server.GenericServerConnection.ReaderResponse
import sttp.client3.{Identity, RequestT, ResponseException, basicRequest}
import sttp.model.Uri
import sttp.client3.circe._

import scala.util.{Failure, Try}
import za.co.absa.atum.model.envelopes.ErrorResponse
import za.co.absa.atum.reader.server.GenericServerConnection.RequestResult

/**
* A HttpProvider is a component that is responsible for providing teh data to readers using REST API
* @tparam F
*/
abstract class GenericServerConnection[F[_]: Monad](val serverUrl: String) {
abstract class GenericServerConnection[F[_]](val serverUrl: String) {

protected def executeRequest(request: RequestT[Identity, Either[String, String], Any]): F[ReaderResponse]
protected def executeRequest[R](request: RequestT[Identity, RequestResult[R], Any]): F[RequestResult[R]]

def query[R: Decoder](endpointUri: String): F[Try[R]] = {
def getQuery[R: Decoder](endpointUri: String, params: Map[String, String] = Map.empty): F[RequestResult[R]] = {
val endpointToQuery = serverUrl + endpointUri
val request = basicRequest
.get(uri"$endpointToQuery")
val response = executeRequest(request)
// using map instead of Circe's `asJson` to have own exception from a failed response
response.map { responseData =>
responseData.body match {
case Left(error) => Failure(RequestException(responseData.statusText, error, responseData.code, responseData.request))
case Right(body) => decode[R](body).toTry
}
}
val uri = Uri.unsafeParse(endpointToQuery).addParams(params)
val request: RequestT[Identity, RequestResult[R], Any] = basicRequest
.get(uri)
.response(asJsonEither[ErrorResponse, R])
executeRequest(request)
}

def close(): F[Unit]

}

object GenericServerConnection {
final val UrlKey = "atum.server.url"

type ReaderResponse = Response[Either[String, String]]
type RequestResult[R] = Either[ResponseException[ErrorResponse, circeError], R]

def atumServerUrl(config: Config): String = {
config.getString(UrlKey)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright 2024 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.atum.reader.server.future

import com.typesafe.config.{Config, ConfigFactory}
import scala.concurrent.{ExecutionContext, Future}
import sttp.client3.armeria.future.ArmeriaFutureBackend
import sttp.client3.SttpBackend

import za.co.absa.atum.reader.server.GenericServerConnection

class ArmeriaServerConnection private(serverUrl: String, closeable: Boolean)(implicit executor: ExecutionContext)
extends FutureServerConnection(serverUrl, closeable) {

def this(serverUrl: String)(implicit executor: ExecutionContext) = {
this(serverUrl, true)(executor)
}

def this(config: Config = ConfigFactory.load())(implicit executor: ExecutionContext) = {
this(GenericServerConnection.atumServerUrl(config))(executor)
}

override protected val backend: SttpBackend[Future, Any] = ArmeriaFutureBackend()

}

object ArmeriaServerConnection {
lazy implicit val serverConnection: ArmeriaServerConnection = new ArmeriaServerConnection()(ExecutionContext.Implicits.global)

def use[R](serverUrl: String)(fnc: ArmeriaServerConnection => Future[R])
(implicit executor: ExecutionContext): Future[R] = {
val serverConnection = new ArmeriaServerConnection(serverUrl, false)
try {
fnc(serverConnection)
} finally {
serverConnection.backend.close()
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright 2024 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.atum.reader.server.future

import scala.concurrent.{ExecutionContext, Future}
import sttp.client3.{Identity, RequestT, SttpBackend}

import za.co.absa.atum.reader.server.GenericServerConnection
import za.co.absa.atum.reader.server.GenericServerConnection.RequestResult


abstract class FutureServerConnection(serverUrl: String, closeable: Boolean)(implicit executor: ExecutionContext)
extends GenericServerConnection[Future](serverUrl) {

protected val backend: SttpBackend[Future, Any]

override protected def executeRequest[R](request: RequestT[Identity, RequestResult[R], Any]): Future[RequestResult[R]] = {
request.send(backend).map(_.body)
}

override def close(): Future[Unit] = {
if (closeable) {
backend.close()
} else {
Future.successful(())
}
}

}

Loading

0 comments on commit 1ac2233

Please sign in to comment.