Skip to content

Commit

Permalink
Clean-up refined types usage
Browse files Browse the repository at this point in the history
  • Loading branch information
olivergrabinski committed Apr 10, 2024
1 parent 3783e2f commit c8e9eaf
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ import ch.epfl.bluebrain.nexus.delta.kernel.Logger
import ch.epfl.bluebrain.nexus.delta.sourcing.exporter.RowEvent
import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset
import ch.epfl.bluebrain.nexus.ship.EventStreamer.logger
import eu.timepit.refined.types.string.NonEmptyString
import eu.timepit.refined.collection.NonEmpty
import eu.timepit.refined.refineV
import fs2.aws.s3.S3
import fs2.aws.s3.models.Models.{BucketName, FileKey}
import fs2.io.file.{Files, Path}
Expand Down Expand Up @@ -52,24 +53,33 @@ object EventStreamer {

private val logger = Logger[EventStreamer]

def s3eventStreamer(client: S3AsyncClientOp[IO], bucket: BucketName): EventStreamer = new EventStreamer {
def s3eventStreamer(client: S3AsyncClientOp[IO], bucket: String): EventStreamer = new EventStreamer {

import cats.implicits._

override def streamLines(path: Path): Stream[IO, String] =
S3.create(client)
.readFile(bucket, FileKey.apply(NonEmptyString.unsafeFrom(path.toString)))
.through(text.utf8.decode)
.through(text.lines)
for {
bk <- Stream.fromEither[IO](refineString(bucket))
key <- Stream.fromEither[IO](refineString(path.toString))
lines <- S3.create(client)
.readFile(BucketName(bk), FileKey(key))
.through(text.utf8.decode)
.through(text.lines)
} yield lines

override def fileList(path: Path): IO[List[Path]] =
client
.listObjectsV2(ListObjectsV2Request.builder().bucket(bucket.value.value).prefix(path.toString).build())
.listObjectsV2(ListObjectsV2Request.builder().bucket(bucket).prefix(path.toString).build())
.map(_.contents().asScala.map(obj => Path(obj.key())).toList)

override def isDirectory(path: Path): IO[Boolean] =
client
.listObjectsV2(ListObjectsV2Request.builder().bucket(bucket.value.value).prefix(path.toString).build())
.listObjectsV2(ListObjectsV2Request.builder().bucket(bucket).prefix(path.toString).build())
.map(_.keyCount() > 1)

private def refineString(str: String) =
refineV[NonEmpty](str).leftMap(e => new IllegalArgumentException(e))

}

def localStreamer: EventStreamer = new EventStreamer {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,7 @@ import ch.epfl.bluebrain.nexus.ship.resolvers.ResolverProcessor
import ch.epfl.bluebrain.nexus.ship.resources.{ResourceProcessor, ResourceWiring}
import ch.epfl.bluebrain.nexus.ship.schemas.{SchemaProcessor, SchemaWiring}
import ch.epfl.bluebrain.nexus.ship.views.{BlazegraphViewProcessor, CompositeViewProcessor, ElasticSearchViewProcessor}
import eu.timepit.refined.types.string.NonEmptyString
import fs2.Stream
import fs2.aws.s3.models.Models.BucketName
import fs2.io.file.Path
import io.laserdisc.pure.s3.tagless.S3AsyncClientOp

Expand Down Expand Up @@ -94,12 +92,10 @@ object RunShip {
}

def s3Ship(client: S3AsyncClientOp[IO], bucket: String) = new RunShip {
override def eventsStream(path: Path, fromOffset: Offset): Stream[IO, RowEvent] = {
val bk = BucketName(NonEmptyString.unsafeFrom(bucket))
override def eventsStream(path: Path, fromOffset: Offset): Stream[IO, RowEvent] =
EventStreamer
.s3eventStreamer(client, bk)
.s3eventStreamer(client, bucket)
.stream(path, fromOffset)
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ class RunShipSuite extends NexusSuite with RunShipSuite.Fixture with LocalStackS
val directoryPath = Path("/import/multi-part-import")
for {
_ <- uploadImportFileToS3(s3Client, "bucket", Path("/import/multi-part-import/2024-04-05T14:38:31.165389Z.json"))
_ <- uploadImportFileToS3(s3Client, "bucket", Path("/import/multi-part-import/2024-04-05T14:38:31.165389Z.success"))
_ <-
uploadImportFileToS3(s3Client, "bucket", Path("/import/multi-part-import/2024-04-05T14:38:31.165389Z.success"))
_ <- uploadImportFileToS3(s3Client, "bucket", Path("/import/multi-part-import/2024-04-06T11:34:31.165389Z.json"))
_ <- RunShip
.s3Ship(s3Client.underlyingClient, "bucket")
Expand Down

0 comments on commit c8e9eaf

Please sign in to comment.