-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
#245 Add the ability to query REST endpoints from Reader module #297
base: master
Are you sure you want to change the base?
#245 Add the ability to query REST endpoints from Reader module #297
Conversation
* created new module Info * the new modul added to JaCoco and CI routines
* JaCoCo exclusion for model
* created Provider to query the data from server * support for Future, IO, and ZIO based providers * work in progress
* fixed license headers
JaCoCo model module code coverage report - scala 2.13.11
|
JaCoCo agent module code coverage report - scala 2.13.11
|
JaCoCo reader module code coverage report - scala 2.13.11
|
JaCoCo server module code coverage report - scala 2.13.11
|
|
||
protected def executeRequest[R](request: RequestT[Identity, RequestResult[R], Any]): F[Response[RequestResult[R]]] | ||
|
||
def getQuery[R: Decoder](endpointUri: String, params: Map[String, String] = Map.empty): F[RequestResult[R]] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure about the return type of this key function. Particularly the error case. But it can be relatively easily adjusted in future.
This item depends on: |
…endpoints-from-info-module
|
||
### Option 1 | ||
```scala | ||
val atumContextInstanceWithRecordCount = AtumContext(processor = processor) | ||
.withMeasureAdded(RecordCount(MockMeasureNames.recordCount1, measuredColumn = "id")) | ||
.withMeasureAdded(RecordCount(MockMeasureNames.recordCount1)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am curious why you changed it to MockMeasureNames
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't change the code. But I believe the example in the README was outdated, so I fixed it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Possibly IntelliJ refactoring in the past, should be changed though
import za.co.absa.atum.model.dto.{PartitionDTO, PartitioningDTO} | ||
|
||
|
||
object basic { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea to move this into its dedicated object instead of the AtumContext. The Ops are also useful.
} | ||
} | ||
|
||
def asSafe[T: Decoder]: Either[io.circe.Error, T] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You could make this method private.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I originally used it in Reader, but seems dropped it since. Will make it private 👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it has only 1 usecase right now? And it's a 1 liner, so why to have it as a separate method?
* limitations under the License. | ||
*/ | ||
|
||
package za.co.absa.atum.model.testing.implicits |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The package is incorrect. It's not inside model
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you. Imperfect refactoring 😉
import sttp.client3.impl.zio.RIOMonadAsyncError | ||
|
||
object zio { | ||
implicit val ZIOMonad: RIOMonadAsyncError[Any] = new RIOMonadAsyncError[Any] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please rename this. It's confusing. I would personally stick to its name rioMonadAsyncError
.
import sttp.client3.impl.cats.CatsMonadAsyncError | ||
|
||
object io { | ||
implicit val CatsIOMonad: CatsMonadAsyncError[IO] = new CatsMonadAsyncError[IO] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as with the ZIO counterpart, please rename this.
// This test is disabled as is breaks on JaCoCo execution | ||
// Once the problem is figured out or how to cirmvent it, this can be re-enabled | ||
// | ||
//object Reader_ZIOUnitTests extends ZIOSpecDefault { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test doesn't work. Please address this already as part of this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The error message indicates that there is a NoSuchMethodError related to the zio.package$.RIO() method. This typically happens when there is a version mismatch between the ZIO library and the code that depends on it.
assert(result == Right(partitionDTO)) | ||
|
||
|
||
// .map { result => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
clean up needed here :)
} | ||
} | ||
|
||
object Reader { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
empty object could be removed
* bounds, as it make the imports easier to follow | ||
* @tparam F - the effect type (e.g. Future, IO, Task, etc.) | ||
*/ | ||
class FlowReader[F[_]](val mainFlowPartitioning: AtumPartitions) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what's the point of the public val?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's the identifier of the FlowReader
. So having a collection of FlowReaders would allow a distinction among them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, that I understand, but I wonder why it needs to be public val? It would be reasonable if you were going to use the trait PartitioningIdProvider
and by using public val partitioning
in the constructor you provided the requirement for def partitioning: AtumPartitions
.
import za.co.absa.atum.reader.basic.RequestResult.RequestResult | ||
import za.co.absa.atum.reader.server.ServerConfig | ||
|
||
abstract class ReaderWithPartitioningId[F[_]: MonadError](implicit serverConfig: ServerConfig, backend: SttpBackend[F, Any]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
trait PartitioningIdProvider[F[_]] { self: Reader[F] =>
def partitioning: AtumPartitions
def partitioningId()(implicit monad: MonadError[F]): F[RequestResult[Long]] = {
val encodedPartitioning = partitioning.toPartitioningDTO.asBase64EncodedJsonString
val queryResult = getQuery[SingleSuccessResponse[PartitioningWithIdDTO]]("/api/v2/partitionings", Map("partitioning" -> encodedPartitioning))
queryResult.map { result =>
result.map(_.data.id)
}
}
}
By defining PartitioningIdProvider
as a trait, you can mix this functionality into any class that needs it without being constrained by inheritance. This makes the code more flexible and reusable. The self-type constraint (self: Reader[F] =>) ensures that PartitioningIdProvider can only be mixed into classes that also mix in Reader[F]. This guarantees that the getQuery method from Reader[F] is available in PartitioningIdProvider, eliminating the need for abstract methods and making the code more cohesive.
case class PartitioningReader[F[_]](partitioning: AtumPartitions)
(implicit serverConfig: ServerConfig, backend: SttpBackend[F, Any], ev: MonadError[F])
extends Reader[F] with PartitioningIdProvider[F] {
def foo(): String = {
// just to have some testable content
"bar"
}
}
class FlowReader[F[_]](val mainFlowPartitioning: AtumPartitions)
(implicit serverConfig: ServerConfig, backend: SttpBackend[F, Any], ev: MonadError[F])
extends Reader[F] with PartitioningIdProvider[F] {
override def partitioning: AtumPartitions = mainFlowPartitioning
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While I don't see the inheritance that much limiting here (not expecting a wide variety of successors, it's a cool code. Will switch to it. 👍
echo "Total 'model' module coverage ${{ steps.jacoco-model.outputs.coverage-overall }}" | ||
echo "Changed files of 'model' module coverage ${{ steps.jacoco-model.outputs.coverage-changed-files }}" | ||
echo "Total 'server' module coverage ${{ steps.jacoco-server.outputs.coverage-overall }}" | ||
echo "Changed files of'server' module coverage ${{ steps.jacoco-server.outputs.coverage-changed-files }}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
echo "Changed files of'server' module coverage ${{ steps.jacoco-server.outputs.coverage-changed-files }}" | |
echo "Changed files of 'server' module coverage ${{ steps.jacoco-server.outputs.coverage-changed-files }}" |
@@ -247,6 +246,7 @@ Code coverage wil be generated on path: | |||
To make this project runnable via IntelliJ, do the following: | |||
- Make sure that your configuration in `server/src/main/resources/reference.conf` | |||
is configured according to your needs | |||
- When building within the UI be sure to have the option `-language:higherKinds` on in the compiler options |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand - what UI do you mean?
case Right(value) => value | ||
case Left(error) => throw new RuntimeException(s"Failed to decode JSON: $error") | ||
case Left(error) => throw error |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why removing the prefix of the exception msg? I think it was quite useful, we would exactly know what this is coming from
Reader
andReaderWithPartitioningId
base classes for all readers - they are to bringAtum server connectivity/querying abilityRequestResult[R]
represents an Atum server query response.MonadError
type class needed forReader
andReaderWithPartitioningId
- there areFuture
, CatsIO
and ZioTask
ones (the last one available only in Scala 2.13)AtumPartitions
andAdditionalData
moved from Agent to ModuleErrorResponse
received a method to decode from Json based on http status codeCloses #245
Depends on #300
Release Notes: