Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#245 Add the ability to query REST endpoints from Reader module #297

Open
wants to merge 32 commits into
base: master
Choose a base branch
from

Conversation

benedeki
Copy link
Contributor

@benedeki benedeki commented Nov 1, 2024

  • Implemented Reader and ReaderWithPartitioningId base classes for all readers - they are to bringAtum server connectivity/querying ability
  • Created 'ServerConfig' case class representing values needed to connect to an Atum Server
  • RequestResult[R] represents an Atum server query response.
  • Offered implicits for MonadError type class needed for Reader and ReaderWithPartitioningId - there are Future, Cats IO and Zio Task ones (the last one available only in Scala 2.13)
  • AtumPartitions and AdditionalData moved from Agent to Module
  • ErrorResponse received a method to decode from Json based on http status code

Closes #245

Depends on #300

Release Notes:

  • Reader module now supports connection to Atum server.

@benedeki benedeki added the work in progress Work on this item is not yet finished (mainly intended for PRs) label Nov 1, 2024
@benedeki benedeki self-assigned this Nov 1, 2024
Copy link

github-actions bot commented Nov 1, 2024

JaCoCo model module code coverage report - scala 2.13.11

Overall Project 61.27% -35.88% 🍏
Files changed 66.75%

File Coverage
JsonSyntaxExtensions.scala 94.34% -5.66% 🍏
ErrorResponse.scala 89.66%
basic.scala 47.17%

Copy link

github-actions bot commented Nov 1, 2024

JaCoCo agent module code coverage report - scala 2.13.11

Overall Project 78.2% 🍏
Files changed 100% 🍏

File Coverage
AtumAgent.scala 97.13% 🍏
AtumContext.scala 91.79% 🍏

Copy link

github-actions bot commented Nov 1, 2024

JaCoCo reader module code coverage report - scala 2.13.11

Overall Project 88.71% 🍏
Files changed 40.19%

File Coverage
PartitioningReader.scala 100%
Reader.scala 100% -14.75% 🍏
ReaderWithPartitioningId.scala 100% 🍏
io.scala 100% 🍏
future.scala 100% 🍏
ServerConfig.scala 88.24%
FlowReader.scala 80% -20% 🍏
RequestResult.scala 12.5%
zio.scala 0%

Copy link

github-actions bot commented Nov 1, 2024

JaCoCo server module code coverage report - scala 2.13.11

Overall Project 68.89% 🍏

There is no coverage information present for the Files changed


protected def executeRequest[R](request: RequestT[Identity, RequestResult[R], Any]): F[Response[RequestResult[R]]]

def getQuery[R: Decoder](endpointUri: String, params: Map[String, String] = Map.empty): F[RequestResult[R]] = {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure about the return type of this key function. Particularly the error case. But it can be relatively easily adjusted in future.

@benedeki benedeki marked this pull request as ready for review November 6, 2024 12:12
@benedeki benedeki added the dependent The item depends on some other open item (Issue or PR) label Nov 21, 2024
@benedeki benedeki removed the dependent The item depends on some other open item (Issue or PR) label Nov 22, 2024
@benedeki
Copy link
Contributor Author

@benedeki benedeki removed the work in progress Work on this item is not yet finished (mainly intended for PRs) label Nov 24, 2024

### Option 1
```scala
val atumContextInstanceWithRecordCount = AtumContext(processor = processor)
.withMeasureAdded(RecordCount(MockMeasureNames.recordCount1, measuredColumn = "id"))
.withMeasureAdded(RecordCount(MockMeasureNames.recordCount1))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am curious why you changed it to MockMeasureNames.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't change the code. But I believe the example in the README was outdated, so I fixed it.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Possibly IntelliJ refactoring in the past, should be changed though

import za.co.absa.atum.model.dto.{PartitionDTO, PartitioningDTO}


object basic {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea to move this into its dedicated object instead of the AtumContext. The Ops are also useful.

}
}

def asSafe[T: Decoder]: Either[io.circe.Error, T] = {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could make this method private.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I originally used it in Reader, but seems dropped it since. Will make it private 👍

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it has only 1 usecase right now? And it's a 1 liner, so why to have it as a separate method?

* limitations under the License.
*/

package za.co.absa.atum.model.testing.implicits
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The package is incorrect. It's not inside model.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you. Imperfect refactoring 😉

import sttp.client3.impl.zio.RIOMonadAsyncError

object zio {
implicit val ZIOMonad: RIOMonadAsyncError[Any] = new RIOMonadAsyncError[Any]
Copy link
Collaborator

@salamonpavel salamonpavel Nov 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please rename this. It's confusing. I would personally stick to its name rioMonadAsyncError.

import sttp.client3.impl.cats.CatsMonadAsyncError

object io {
implicit val CatsIOMonad: CatsMonadAsyncError[IO] = new CatsMonadAsyncError[IO]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as with the ZIO counterpart, please rename this.

// This test is disabled as is breaks on JaCoCo execution
// Once the problem is figured out or how to cirmvent it, this can be re-enabled
//
//object Reader_ZIOUnitTests extends ZIOSpecDefault {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test doesn't work. Please address this already as part of this PR.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error message indicates that there is a NoSuchMethodError related to the zio.package$.RIO() method. This typically happens when there is a version mismatch between the ZIO library and the code that depends on it.

assert(result == Right(partitionDTO))


// .map { result =>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

clean up needed here :)

}
}

object Reader {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

empty object could be removed

* bounds, as it make the imports easier to follow
* @tparam F - the effect type (e.g. Future, IO, Task, etc.)
*/
class FlowReader[F[_]](val mainFlowPartitioning: AtumPartitions)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the point of the public val?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's the identifier of the FlowReader. So having a collection of FlowReaders would allow a distinction among them.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, that I understand, but I wonder why it needs to be public val? It would be reasonable if you were going to use the trait PartitioningIdProvider and by using public val partitioning in the constructor you provided the requirement for def partitioning: AtumPartitions.

import za.co.absa.atum.reader.basic.RequestResult.RequestResult
import za.co.absa.atum.reader.server.ServerConfig

abstract class ReaderWithPartitioningId[F[_]: MonadError](implicit serverConfig: ServerConfig, backend: SttpBackend[F, Any])
Copy link
Collaborator

@salamonpavel salamonpavel Nov 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

trait PartitioningIdProvider[F[_]] { self: Reader[F] =>
  def partitioning: AtumPartitions

  def partitioningId()(implicit monad: MonadError[F]): F[RequestResult[Long]] = {
    val encodedPartitioning = partitioning.toPartitioningDTO.asBase64EncodedJsonString
    val queryResult = getQuery[SingleSuccessResponse[PartitioningWithIdDTO]]("/api/v2/partitionings", Map("partitioning" -> encodedPartitioning))
    queryResult.map { result =>
      result.map(_.data.id)
    }
  }
}

By defining PartitioningIdProvider as a trait, you can mix this functionality into any class that needs it without being constrained by inheritance. This makes the code more flexible and reusable. The self-type constraint (self: Reader[F] =>) ensures that PartitioningIdProvider can only be mixed into classes that also mix in Reader[F]. This guarantees that the getQuery method from Reader[F] is available in PartitioningIdProvider, eliminating the need for abstract methods and making the code more cohesive.

case class PartitioningReader[F[_]](partitioning: AtumPartitions)
                                   (implicit serverConfig: ServerConfig, backend: SttpBackend[F, Any], ev: MonadError[F])
  extends Reader[F] with PartitioningIdProvider[F] {
  def foo(): String = {
    // just to have some testable content
    "bar"
  }
}

class FlowReader[F[_]](val mainFlowPartitioning: AtumPartitions)
                      (implicit serverConfig: ServerConfig, backend: SttpBackend[F, Any], ev: MonadError[F])
  extends Reader[F] with PartitioningIdProvider[F] {

  override def partitioning: AtumPartitions = mainFlowPartitioning
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While I don't see the inheritance that much limiting here (not expecting a wide variety of successors, it's a cool code. Will switch to it. 👍

echo "Total 'model' module coverage ${{ steps.jacoco-model.outputs.coverage-overall }}"
echo "Changed files of 'model' module coverage ${{ steps.jacoco-model.outputs.coverage-changed-files }}"
echo "Total 'server' module coverage ${{ steps.jacoco-server.outputs.coverage-overall }}"
echo "Changed files of'server' module coverage ${{ steps.jacoco-server.outputs.coverage-changed-files }}"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
echo "Changed files of'server' module coverage ${{ steps.jacoco-server.outputs.coverage-changed-files }}"
echo "Changed files of 'server' module coverage ${{ steps.jacoco-server.outputs.coverage-changed-files }}"

@@ -247,6 +246,7 @@ Code coverage wil be generated on path:
To make this project runnable via IntelliJ, do the following:
- Make sure that your configuration in `server/src/main/resources/reference.conf`
is configured according to your needs
- When building within the UI be sure to have the option `-language:higherKinds` on in the compiler options
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand - what UI do you mean?

case Right(value) => value
case Left(error) => throw new RuntimeException(s"Failed to decode JSON: $error")
case Left(error) => throw error
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why removing the prefix of the exception msg? I think it was quite useful, we would exactly know what this is coming from

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Add the ability to query REST endpoints from Reader module
3 participants