diff --git a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/EventStreamer.scala b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/EventStreamer.scala index 6515264e13..7a96c78d2f 100644 --- a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/EventStreamer.scala +++ b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/EventStreamer.scala @@ -5,7 +5,8 @@ import ch.epfl.bluebrain.nexus.delta.kernel.Logger import ch.epfl.bluebrain.nexus.delta.sourcing.exporter.RowEvent import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset import ch.epfl.bluebrain.nexus.ship.EventStreamer.logger -import eu.timepit.refined.types.string.NonEmptyString +import eu.timepit.refined.collection.NonEmpty +import eu.timepit.refined.refineV import fs2.aws.s3.S3 import fs2.aws.s3.models.Models.{BucketName, FileKey} import fs2.io.file.{Files, Path} @@ -52,24 +53,33 @@ object EventStreamer { private val logger = Logger[EventStreamer] - def s3eventStreamer(client: S3AsyncClientOp[IO], bucket: BucketName): EventStreamer = new EventStreamer { + def s3eventStreamer(client: S3AsyncClientOp[IO], bucket: String): EventStreamer = new EventStreamer { + + import cats.implicits._ override def streamLines(path: Path): Stream[IO, String] = - S3.create(client) - .readFile(bucket, FileKey.apply(NonEmptyString.unsafeFrom(path.toString))) - .through(text.utf8.decode) - .through(text.lines) + for { + bk <- Stream.fromEither[IO](refineString(bucket)) + key <- Stream.fromEither[IO](refineString(path.toString)) + lines <- S3.create(client) + .readFile(BucketName(bk), FileKey(key)) + .through(text.utf8.decode) + .through(text.lines) + } yield lines override def fileList(path: Path): IO[List[Path]] = client - .listObjectsV2(ListObjectsV2Request.builder().bucket(bucket.value.value).prefix(path.toString).build()) + .listObjectsV2(ListObjectsV2Request.builder().bucket(bucket).prefix(path.toString).build()) .map(_.contents().asScala.map(obj => Path(obj.key())).toList) override def isDirectory(path: Path): IO[Boolean] = client - .listObjectsV2(ListObjectsV2Request.builder().bucket(bucket.value.value).prefix(path.toString).build()) + .listObjectsV2(ListObjectsV2Request.builder().bucket(bucket).prefix(path.toString).build()) .map(_.keyCount() > 1) + private def refineString(str: String) = + refineV[NonEmpty](str).leftMap(e => new IllegalArgumentException(e)) + } def localStreamer: EventStreamer = new EventStreamer { diff --git a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/RunShip.scala b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/RunShip.scala index 79048e37cf..014e4518f5 100644 --- a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/RunShip.scala +++ b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/RunShip.scala @@ -19,9 +19,7 @@ import ch.epfl.bluebrain.nexus.ship.resolvers.ResolverProcessor import ch.epfl.bluebrain.nexus.ship.resources.{ResourceProcessor, ResourceWiring} import ch.epfl.bluebrain.nexus.ship.schemas.{SchemaProcessor, SchemaWiring} import ch.epfl.bluebrain.nexus.ship.views.{BlazegraphViewProcessor, CompositeViewProcessor, ElasticSearchViewProcessor} -import eu.timepit.refined.types.string.NonEmptyString import fs2.Stream -import fs2.aws.s3.models.Models.BucketName import fs2.io.file.Path import io.laserdisc.pure.s3.tagless.S3AsyncClientOp @@ -94,12 +92,10 @@ object RunShip { } def s3Ship(client: S3AsyncClientOp[IO], bucket: String) = new RunShip { - override def eventsStream(path: Path, fromOffset: Offset): Stream[IO, RowEvent] = { - val bk = BucketName(NonEmptyString.unsafeFrom(bucket)) + override def eventsStream(path: Path, fromOffset: Offset): Stream[IO, RowEvent] = EventStreamer - .s3eventStreamer(client, bk) + .s3eventStreamer(client, bucket) .stream(path, fromOffset) - } } } diff --git a/ship/src/test/scala/ch/epfl/bluebrain/nexus/ship/RunShipSuite.scala b/ship/src/test/scala/ch/epfl/bluebrain/nexus/ship/RunShipSuite.scala index cfa850d899..4f362b63eb 100644 --- a/ship/src/test/scala/ch/epfl/bluebrain/nexus/ship/RunShipSuite.scala +++ b/ship/src/test/scala/ch/epfl/bluebrain/nexus/ship/RunShipSuite.scala @@ -51,7 +51,8 @@ class RunShipSuite extends NexusSuite with RunShipSuite.Fixture with LocalStackS val directoryPath = Path("/import/multi-part-import") for { _ <- uploadImportFileToS3(s3Client, "bucket", Path("/import/multi-part-import/2024-04-05T14:38:31.165389Z.json")) - _ <- uploadImportFileToS3(s3Client, "bucket", Path("/import/multi-part-import/2024-04-05T14:38:31.165389Z.success")) + _ <- + uploadImportFileToS3(s3Client, "bucket", Path("/import/multi-part-import/2024-04-05T14:38:31.165389Z.success")) _ <- uploadImportFileToS3(s3Client, "bucket", Path("/import/multi-part-import/2024-04-06T11:34:31.165389Z.json")) _ <- RunShip .s3Ship(s3Client.underlyingClient, "bucket")