diff --git a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/StorageScopeInitialization.scala b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/StorageScopeInitialization.scala index a53c1644b6..2c2f1fd68c 100644 --- a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/StorageScopeInitialization.scala +++ b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/StorageScopeInitialization.scala @@ -83,10 +83,10 @@ object StorageScopeInitialization { * Creates a [[StorageScopeInitialization]] that creates a default S3Storage with the provided default fields */ def s3( - storage: Storages, + storages: Storages, serviceAccount: ServiceAccount, defaultFields: S3StorageFields ): StorageScopeInitialization = - new StorageScopeInitialization(storage, serviceAccount, defaultS3StorageId, defaultFields) + new StorageScopeInitialization(storages, serviceAccount, defaultS3StorageId, defaultFields) } diff --git a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/files/FormDataExtractor.scala b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/files/FormDataExtractor.scala index 6d431e3074..b8be837c76 100644 --- a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/files/FormDataExtractor.scala +++ b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/files/FormDataExtractor.scala @@ -19,7 +19,7 @@ import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri import scala.concurrent.{ExecutionContext, Future} import scala.util.Try -sealed trait FormDataExtractor { +trait FormDataExtractor { /** * Extracts the part with fieldName ''file'' from the passed ''entity'' MultiPart/FormData. Any other part is diff --git a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/S3FileOperations.scala b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/S3FileOperations.scala index bea15744ab..e0e1138ac6 100644 --- a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/S3FileOperations.scala +++ b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/S3FileOperations.scala @@ -45,9 +45,10 @@ trait S3FileOperations { object S3FileOperations { final case class S3FileMetadata(contentType: ContentType, metadata: FileStorageMetadata) + private val log = Logger[S3FileOperations] + def mk(client: S3StorageClient)(implicit as: ActorSystem, uuidf: UUIDF): S3FileOperations = new S3FileOperations { - private val log = Logger[S3FileOperations] private lazy val saveFile = new S3StorageSaveFile(client) override def checkBucketExists(bucket: String): IO[Unit] = @@ -76,47 +77,61 @@ object S3FileOperations { override def save(storage: S3Storage, filename: String, entity: BodyPartEntity): IO[FileStorageMetadata] = saveFile.apply(storage, filename, entity) - override def register(bucket: String, path: Uri.Path): IO[S3FileMetadata] = { - for { - _ <- log.info(s"Fetching attributes for S3 file. Bucket $bucket at path $path") - resp <- client.headObject(bucket, path.toString()) - contentType <- parseContentType(resp.contentType()) - metadata <- mkS3Metadata(bucket, path, resp, contentType) - } yield metadata - } - .onError { e => - log.error(e)(s"Failed fetching required attributes for S3 file registration. Bucket $bucket and path $path") - } + override def register(bucket: String, path: Uri.Path): IO[S3FileMetadata] = + registerInternal(client, bucket, path) - private def parseContentType(raw: String): IO[ContentType] = - ContentType.parse(raw).map(_.pure[IO]).getOrElse(IO.raiseError(InvalidContentType(raw))) - - private def mkS3Metadata(bucket: String, path: Uri.Path, resp: HeadObjectResponse, ct: ContentType) = { - for { - uuid <- uuidf() - checksum <- checksumFrom(resp) - } yield S3FileMetadata( - ct, - FileStorageMetadata( - uuid, - resp.contentLength(), - checksum, - FileAttributesOrigin.External, - client.baseEndpoint / bucket / path, - path - ) - ) + } + + def registerInternal(client: S3StorageClient, bucket: String, path: Uri.Path)(implicit + uuidF: UUIDF + ): IO[S3FileMetadata] = { + for { + _ <- log.info(s"Fetching attributes for S3 file. Bucket $bucket at path $path") + resp <- client.headObject(bucket, path.toString()) + contentType <- parseContentType(resp.contentType()) + metadata <- mkS3Metadata(client, bucket, path, resp, contentType) + } yield metadata + } + .onError { e => + log.error(e)(s"Failed fetching required attributes for S3 file registration. Bucket $bucket and path $path") } - private def checksumFrom(response: HeadObjectResponse) = IO.fromOption { - Option(response.checksumSHA256()) - .map { checksum => - Digest.ComputedDigest( - DigestAlgorithm.default, - Hex.encodeHexString(Base64.getDecoder.decode(checksum)) - ) - } - }(new IllegalArgumentException("Missing checksum")) + private def parseContentType(raw: String): IO[ContentType] = + ContentType.parse(raw).map(_.pure[IO]).getOrElse(IO.raiseError(InvalidContentType(raw))) + + private def mkS3Metadata( + client: S3StorageClient, + bucket: String, + path: Uri.Path, + resp: HeadObjectResponse, + ct: ContentType + )(implicit + uuidf: UUIDF + ) = { + for { + uuid <- uuidf() + checksum <- checksumFrom(resp) + } yield S3FileMetadata( + ct, + FileStorageMetadata( + uuid, + resp.contentLength(), + checksum, + FileAttributesOrigin.External, + client.baseEndpoint / bucket / path, + path + ) + ) } + private def checksumFrom(response: HeadObjectResponse) = IO.fromOption { + Option(response.checksumSHA256()) + .map { checksum => + Digest.ComputedDigest( + DigestAlgorithm.default, + Hex.encodeHexString(Base64.getDecoder.decode(checksum)) + ) + } + }(new IllegalArgumentException("Missing checksum")) + } diff --git a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/client/S3StorageClient.scala b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/client/S3StorageClient.scala index 35d68afd4a..94b02cdb7b 100644 --- a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/client/S3StorageClient.scala +++ b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/client/S3StorageClient.scala @@ -8,16 +8,16 @@ import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.DigestAlgori import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.client.S3StorageClient.UploadMetadata import ch.epfl.bluebrain.nexus.delta.rdf.syntax.uriSyntax import ch.epfl.bluebrain.nexus.delta.sdk.error.ServiceError.FeatureDisabled -import fs2.{Chunk, Pipe, Stream} import fs2.aws.s3.S3 import fs2.aws.s3.models.Models.{BucketName, FileKey} +import fs2.{Chunk, Pipe, Stream} import io.laserdisc.pure.s3.tagless.{Interpreter, S3AsyncClientOp} import org.apache.commons.codec.binary.Hex import software.amazon.awssdk.auth.credentials.{AwsBasicCredentials, AwsCredentialsProvider, StaticCredentialsProvider} import software.amazon.awssdk.core.async.AsyncRequestBody import software.amazon.awssdk.regions.Region import software.amazon.awssdk.services.s3.S3AsyncClient -import software.amazon.awssdk.services.s3.model.{ChecksumAlgorithm, ChecksumMode, HeadObjectRequest, HeadObjectResponse, ListObjectsV2Request, ListObjectsV2Response, NoSuchKeyException, PutObjectRequest, PutObjectResponse} +import software.amazon.awssdk.services.s3.model._ import java.net.URI import java.util.Base64 @@ -34,6 +34,14 @@ trait S3StorageClient { def headObject(bucket: String, key: String): IO[HeadObjectResponse] + def copyObject( + sourceBucket: BucketName, + sourceKey: FileKey, + destinationBucket: BucketName, + destinationKey: FileKey, + checksumAlgorithm: ChecksumAlgorithm + ): IO[CopyObjectResponse] + def uploadFile( fileData: Stream[IO, Byte], bucket: String, @@ -88,6 +96,24 @@ object S3StorageClient { override def headObject(bucket: String, key: String): IO[HeadObjectResponse] = client.headObject(HeadObjectRequest.builder().bucket(bucket).key(key).checksumMode(ChecksumMode.ENABLED).build) + override def copyObject( + sourceBucket: BucketName, + sourceKey: FileKey, + destinationBucket: BucketName, + destinationKey: FileKey, + checksumAlgorithm: ChecksumAlgorithm + ): IO[CopyObjectResponse] = + client.copyObject( + CopyObjectRequest + .builder() + .sourceBucket(sourceBucket.value.value) + .sourceKey(sourceKey.value.value) + .destinationBucket(destinationBucket.value.value) + .destinationKey(destinationKey.value.value) + .checksumAlgorithm(checksumAlgorithm) + .build() + ) + override def objectExists(bucket: String, key: String): IO[Boolean] = { headObject(bucket, key) .redeemWith( @@ -173,6 +199,14 @@ object S3StorageClient { override def baseEndpoint: Uri = throw disabledErr + override def copyObject( + sourceBucket: BucketName, + sourceKey: FileKey, + destinationBucket: BucketName, + destinationKey: FileKey, + checksumAlgorithm: ChecksumAlgorithm + ): IO[CopyObjectResponse] = raiseDisabledErr + override def objectExists(bucket: String, key: String): IO[Boolean] = raiseDisabledErr override def uploadFile( diff --git a/delta/plugins/storage/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/LocalStackS3StorageClient.scala b/delta/plugins/storage/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/LocalStackS3StorageClient.scala index 41778c02c3..200c631b34 100644 --- a/delta/plugins/storage/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/LocalStackS3StorageClient.scala +++ b/delta/plugins/storage/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/LocalStackS3StorageClient.scala @@ -21,11 +21,18 @@ import java.nio.file.Paths object LocalStackS3StorageClient { val ServiceType = Service.S3 + def createBucket(s3Client: S3AsyncClientOp[IO], bucket: BucketName) = + s3Client.createBucket(CreateBucketRequest.builder().bucket(bucket.value.value).build) + def uploadFileToS3(s3Client: S3AsyncClientOp[IO], bucket: BucketName, path: Path): IO[PutObjectResponse] = { - s3Client.createBucket(CreateBucketRequest.builder().bucket(bucket.value.value).build) >> + val absoluteResourcePath = if (path.isAbsolute) path else Path("/" + path.toString) + createBucket(s3Client, bucket) >> s3Client.putObject( - PutObjectRequest.builder.bucket(bucket.value.value).key(path.toString).build, - Paths.get(getClass.getResource(path.toString).toURI) + PutObjectRequest.builder + .bucket(bucket.value.value) + .key(path.toString) + .build, + Paths.get(getClass.getResource(absoluteResourcePath.toString).toURI) ) } diff --git a/ship/src/main/resources/ship-default.conf b/ship/src/main/resources/ship-default.conf index 261d435892..8add4a5c2b 100644 --- a/ship/src/main/resources/ship-default.conf +++ b/ship/src/main/resources/ship-default.conf @@ -75,6 +75,7 @@ ship { realm: "internal" } + import-bucket = ${ship.s-3.import-bucket} # The bucket to which the files will be copied by the Nexus Ship target-bucket = "nexus-delta-production" diff --git a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/InitShip.scala b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/InitShip.scala index c40b1c1fb4..490a42b29e 100644 --- a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/InitShip.scala +++ b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/InitShip.scala @@ -10,31 +10,30 @@ import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider object InitShip { - def apply(run: RunCommand): Resource[IO, (ShipConfig, fs2.Stream[IO, RowEvent], Transactors)] = - Resource.eval(configAndStream(run)).flatMap { case (config, eventStream) => + def apply(run: RunCommand): Resource[IO, (ShipConfig, fs2.Stream[IO, RowEvent], S3StorageClient, Transactors)] = + Resource.eval(configAndStream(run)).flatMap { case (config, eventStream, s3Client) => Transactors .init(config.database) - .map { xas => (config, eventStream, xas) } + .map { xas => (config, eventStream, s3Client, xas) } } - private def configAndStream(run: RunCommand): IO[(ShipConfig, fs2.Stream[IO, RowEvent])] = run.mode match { - case RunMode.Local => - val eventsStream = EventStreamer.localStreamer.stream(run.path, run.offset) - ShipConfig.load(run.config).map(_ -> eventsStream) - case RunMode.S3 => - for { - localConfig <- ShipConfig.load(None) - s3Config = localConfig.s3 - (config, eventsStream) <- - S3StorageClient.resource(s3Config.endpoint, DefaultCredentialsProvider.create()).use { client => - val eventsStream = EventStreamer.s3eventStreamer(client, s3Config.importBucket).stream(run.path, run.offset) + private def configAndStream(run: RunCommand): IO[(ShipConfig, fs2.Stream[IO, RowEvent], S3StorageClient)] = { + ShipConfig.load(run.config).flatMap { shipConfig => + S3StorageClient.resource(shipConfig.s3.endpoint, DefaultCredentialsProvider.create()).use { s3Client => + run.mode match { + case RunMode.Local => + val eventsStream = EventStreamer.localStreamer.stream(run.path, run.offset) + ShipConfig.load(run.config).map((_, eventsStream, s3Client)) + case RunMode.S3 => + val eventsStream = + EventStreamer.s3eventStreamer(s3Client, shipConfig.s3.importBucket).stream(run.path, run.offset) val config = run.config match { - case Some(configPath) => ShipConfig.loadFromS3(client, s3Config.importBucket, configPath) - case None => IO.pure(localConfig) + case Some(configPath) => ShipConfig.loadFromS3(s3Client, shipConfig.s3.importBucket, configPath) + case None => IO.pure(shipConfig) } - config.map(_ -> eventsStream) - } - } yield (config, eventsStream) + config.map((_, eventsStream, s3Client)) + } + } + } } - } diff --git a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/Main.scala b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/Main.scala index 45607738a1..3d8fc9fe55 100644 --- a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/Main.scala +++ b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/Main.scala @@ -57,10 +57,10 @@ object Main private[ship] def run(r: RunCommand): IO[Unit] = { val clock = Clock[IO] - InitShip(r).use { case (config, eventsStream, xas) => + InitShip(r).use { case (config, eventsStream, s3Client, xas) => for { start <- clock.realTimeInstant - reportOrError <- RunShip(eventsStream, config.input, xas).attempt + reportOrError <- RunShip(eventsStream, s3Client, config.input, xas).attempt end <- clock.realTimeInstant _ <- ShipSummaryStore.save(xas, start, end, r, reportOrError) _ <- IO.fromEither(reportOrError) diff --git a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/RunShip.scala b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/RunShip.scala index 18ecd56f98..0d42b68d77 100644 --- a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/RunShip.scala +++ b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/RunShip.scala @@ -2,6 +2,7 @@ package ch.epfl.bluebrain.nexus.ship import cats.effect.{Clock, IO} import ch.epfl.bluebrain.nexus.delta.kernel.utils.UUIDF +import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.client.S3StorageClient import ch.epfl.bluebrain.nexus.delta.plugins.storage.FileSelf import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.api.{JsonLdApi, JsonLdJavaApi} import ch.epfl.bluebrain.nexus.delta.rdf.shacl.ValidateShacl @@ -12,6 +13,7 @@ import ch.epfl.bluebrain.nexus.delta.sdk.quotas.Quotas import ch.epfl.bluebrain.nexus.delta.sourcing.Transactors import ch.epfl.bluebrain.nexus.delta.sourcing.exporter.RowEvent import ch.epfl.bluebrain.nexus.ship.config.InputConfig +import ch.epfl.bluebrain.nexus.ship.files.FileProcessor import ch.epfl.bluebrain.nexus.ship.organizations.OrganizationProvider import ch.epfl.bluebrain.nexus.ship.projects.ProjectProcessor import ch.epfl.bluebrain.nexus.ship.resolvers.ResolverProcessor @@ -23,7 +25,12 @@ import fs2.Stream object RunShip { - def apply(eventsStream: Stream[IO, RowEvent], config: InputConfig, xas: Transactors): IO[ImportReport] = { + def apply( + eventsStream: Stream[IO, RowEvent], + s3Client: S3StorageClient, + config: InputConfig, + xas: Transactors + ): IO[ImportReport] = { val clock = Clock[IO] val uuidF = UUIDF.random // Resources may have been created with different configurations so we adopt the lenient one for the import @@ -55,12 +62,13 @@ object RunShip { projectProcessor <- ProjectProcessor(fetchActiveOrg, fetchContext, rcr, originalProjectContext, projectMapper, config, eventClock, xas)(targetBaseUri, jsonLdApi) resolverProcessor = ResolverProcessor(fetchContext, projectMapper, eventLogConfig, eventClock, xas) schemaProcessor = SchemaProcessor(schemaLog, fetchContext, schemaImports, rcr, projectMapper, eventClock) - fileSelf = FileSelf(originalProjectContext)(originalBaseUri) + fileSelf = FileSelf(originalProjectContext)(originalBaseUri) distributionPatcher = new DistributionPatcher(fileSelf, projectMapper, targetBaseUri) resourceProcessor = ResourceProcessor(resourceLog, rcr, projectMapper, fetchContext, distributionPatcher, eventClock) esViewsProcessor = ElasticSearchViewProcessor(fetchContext, rcr, projectMapper, eventLogConfig, eventClock, xas) bgViewsProcessor = BlazegraphViewProcessor(fetchContext, rcr, projectMapper, eventLogConfig, eventClock, xas) compositeViewsProcessor = CompositeViewProcessor(fetchContext, rcr, projectMapper, eventLogConfig, eventClock, xas) + fileProcessor = FileProcessor(fetchContext, s3Client, projectMapper, rcr, config, eventClock, xas) // format: on report <- EventProcessor .run( @@ -71,7 +79,8 @@ object RunShip { resourceProcessor, esViewsProcessor, bgViewsProcessor, - compositeViewsProcessor + compositeViewsProcessor, + fileProcessor ) } yield report } diff --git a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/config/InputConfig.scala b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/config/InputConfig.scala index c469556291..e72af35b5a 100644 --- a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/config/InputConfig.scala +++ b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/config/InputConfig.scala @@ -23,6 +23,7 @@ final case class InputConfig( viewDefaults: ViewDefaults, serviceAccount: ServiceAccountConfig, storages: StoragesConfig, + importBucket: BucketName, targetBucket: BucketName ) diff --git a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/files/FileCopier.scala b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/files/FileCopier.scala new file mode 100644 index 0000000000..b3cb640f58 --- /dev/null +++ b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/files/FileCopier.scala @@ -0,0 +1,40 @@ +package ch.epfl.bluebrain.nexus.ship.files + +import akka.http.scaladsl.model.Uri +import cats.effect.IO +import cats.implicits._ +import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.client.S3StorageClient +import eu.timepit.refined.collection.NonEmpty +import eu.timepit.refined.refineV +import fs2.aws.s3.models.Models.{BucketName, FileKey} +import software.amazon.awssdk.services.s3.model.ChecksumAlgorithm + +trait FileCopier { + + def copyFile(path: Uri.Path): IO[Unit] + +} + +object FileCopier { + + def apply( + s3StorageClient: S3StorageClient, + importBucket: BucketName, + targetBucket: BucketName + ): FileCopier = + (path: Uri.Path) => { + def refineString(str: String) = + refineV[NonEmpty](str).leftMap(e => new IllegalArgumentException(e)) + + val fileKey = IO.fromEither(refineString(path.toString).map(FileKey)) + + fileKey.flatMap { key => + // TODO: Check if we only use SHA256 or not? If not we need to pass the right algo + s3StorageClient.copyObject(importBucket, key, targetBucket, key, ChecksumAlgorithm.SHA256) + }.void + } + + def apply(): FileCopier = + (_: Uri.Path) => IO.unit + +} diff --git a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/files/FileProcessor.scala b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/files/FileProcessor.scala new file mode 100644 index 0000000000..7e458fd4ed --- /dev/null +++ b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/files/FileProcessor.scala @@ -0,0 +1,119 @@ +package ch.epfl.bluebrain.nexus.ship.files + +import cats.effect.IO +import ch.epfl.bluebrain.nexus.delta.kernel.Logger +import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.Files +import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.Files.definition +import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.FileEvent._ +import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.FileRejection.{IncorrectRev, ResourceAlreadyExists} +import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.{FileEvent, FileId} +import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.client.S3StorageClient +import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.{FetchStorage, StorageResource} +import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.api.JsonLdApi +import ch.epfl.bluebrain.nexus.delta.sdk.identities.model.Caller +import ch.epfl.bluebrain.nexus.delta.sdk.model.IdSegmentRef +import ch.epfl.bluebrain.nexus.delta.sdk.projects.FetchContext +import ch.epfl.bluebrain.nexus.delta.sdk.resolvers.ResolverContextResolution +import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.Subject +import ch.epfl.bluebrain.nexus.delta.sourcing.model.{EntityType, ProjectRef} +import ch.epfl.bluebrain.nexus.delta.sourcing.{ScopedEventLog, Transactors} +import ch.epfl.bluebrain.nexus.ship._ +import ch.epfl.bluebrain.nexus.ship.acls.AclWiring.alwaysAuthorize +import ch.epfl.bluebrain.nexus.ship.config.InputConfig +import ch.epfl.bluebrain.nexus.ship.files.FileProcessor.logger +import ch.epfl.bluebrain.nexus.ship.files.FileWiring._ +import ch.epfl.bluebrain.nexus.ship.storages.StorageWiring +import io.circe.Decoder + +class FileProcessor private ( + files: Files, + projectMapper: ProjectMapper, + fileCopier: FileCopier, + clock: EventClock +) extends EventProcessor[FileEvent] { + + override def resourceType: EntityType = Files.entityType + + override def decoder: Decoder[FileEvent] = FileEvent.serializer.codec + + override def evaluate(event: FileEvent): IO[ImportStatus] = + for { + _ <- clock.setInstant(event.instant) + result <- evaluateInternal(event) + } yield result + + private def evaluateInternal(event: FileEvent): IO[ImportStatus] = { + implicit val s: Subject = event.subject + implicit val c: Caller = Caller(s, Set.empty) + val cRev = event.rev - 1 + val project = projectMapper.map(event.project) + + event match { + case e: FileCreated => + fileCopier.copyFile(e.attributes.path) >> + files.registerFile(FileId(e.id, project), None, None, e.attributes.path, e.tag).flatMap(IO.println) + case e: FileUpdated => + fileCopier.copyFile(e.attributes.path) >> IO.unit + case e: FileCustomMetadataUpdated => + files.updateMetadata(FileId(e.id, project), cRev, e.metadata, e.tag) + case _: FileAttributesUpdated => IO.unit + case e: FileTagAdded => + files.tag(FileId(e.id, project), e.tag, e.targetRev, cRev) + case e: FileTagDeleted => + files.deleteTag(FileId(e.id, project), e.tag, cRev) + case e: FileDeprecated => + files.deprecate(FileId(e.id, project), cRev) + case e: FileUndeprecated => + files.undeprecate(FileId(e.id, project), cRev) + } + }.redeemWith( + { + case a: ResourceAlreadyExists => logger.warn(a)("The resource already exists").as(ImportStatus.Dropped) + case i: IncorrectRev => logger.warn(i)("An incorrect revision has been provided").as(ImportStatus.Dropped) + case other => IO.raiseError(other) + }, + _ => IO.pure(ImportStatus.Success) + ) + +} + +object FileProcessor { + + private val logger = Logger[FileProcessor] + + def apply( + fetchContext: FetchContext, + s3Client: S3StorageClient, + projectMapper: ProjectMapper, + rcr: ResolverContextResolution, + config: InputConfig, + clock: EventClock, + xas: Transactors + )(implicit jsonLdApi: JsonLdApi): FileProcessor = { + + val storages = StorageWiring.storages(fetchContext, rcr, config, clock, xas) + + val fe = new FetchStorage { + override def fetch(id: IdSegmentRef, project: ProjectRef): IO[StorageResource] = + storages.flatMap(_.fetch(id, project)) + + override def fetchDefault(project: ProjectRef): IO[StorageResource] = + storages.flatMap(_.fetchDefault(project)) + } + + val fileCopier = FileCopier(s3Client, config.importBucket, config.targetBucket) + + val files = + new Files( + failingFormDataExtractor, + ScopedEventLog(definition(clock), config.eventLog, xas), + alwaysAuthorize, + fetchContext, + fe, + registerOperationOnly(s3Client) + )(FailingUUID) + + new FileProcessor(files, projectMapper, fileCopier, clock) + } + +} diff --git a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/files/FileWiring.scala b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/files/FileWiring.scala new file mode 100644 index 0000000000..9454ffa880 --- /dev/null +++ b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/files/FileWiring.scala @@ -0,0 +1,24 @@ +package ch.epfl.bluebrain.nexus.ship.files + +import akka.http.scaladsl.model.HttpEntity +import cats.effect.IO +import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.FormDataExtractor +import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.FileOperations +import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.client.S3StorageClient +import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode +import ch.epfl.bluebrain.nexus.ship.storages.StorageWiring.{failingDiskFileOperations, failingRemoteDiskFileOperations, registerS3FileOperationOnly} + +object FileWiring { + + def registerOperationOnly(s3StorageClient: S3StorageClient): FileOperations = + FileOperations.mk( + failingDiskFileOperations, + failingRemoteDiskFileOperations, + registerS3FileOperationOnly(s3StorageClient) + ) + + def failingFormDataExtractor: FormDataExtractor = + (_: IriOrBNode.Iri, _: HttpEntity, _: Long) => + IO.raiseError(new IllegalArgumentException("FormDataExtractor should not be called")) + +} diff --git a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/storages/StorageWiring.scala b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/storages/StorageWiring.scala index c12c057769..2da32bd655 100644 --- a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/storages/StorageWiring.scala +++ b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/storages/StorageWiring.scala @@ -1,18 +1,26 @@ package ch.epfl.bluebrain.nexus.ship.storages +import akka.http.scaladsl.model.{BodyPartEntity, Uri} import cats.effect.IO import ch.epfl.bluebrain.nexus.delta.kernel.utils.UUIDF import ch.epfl.bluebrain.nexus.delta.plugins.storage.StorageScopeInitialization +import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.{ComputedFileAttributes, FileStorageMetadata} import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.Storages import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.StoragesConfig.S3StorageConfig +import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.Storage.{DiskStorage, RemoteDiskStorage, S3Storage} import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.StorageFields.S3StorageFields -import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.StorageValue +import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.{AbsolutePath, StorageValue} import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.StorageAccess +import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.disk.DiskFileOperations +import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.remote.RemoteDiskFileOperations +import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.S3FileOperations +import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.client.S3StorageClient import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.api.JsonLdApi -import ch.epfl.bluebrain.nexus.delta.sdk.Defaults import ch.epfl.bluebrain.nexus.delta.sdk.projects.FetchContext import ch.epfl.bluebrain.nexus.delta.sdk.resolvers.ResolverContextResolution +import ch.epfl.bluebrain.nexus.delta.sdk.{AkkaSource, Defaults} import ch.epfl.bluebrain.nexus.delta.sourcing.Transactors +import ch.epfl.bluebrain.nexus.delta.sourcing.model.Label import ch.epfl.bluebrain.nexus.ship.EventClock import ch.epfl.bluebrain.nexus.ship.config.InputConfig import fs2.aws.s3.models.Models.BucketName @@ -75,4 +83,46 @@ object StorageWiring { ) } + def failingDiskFileOperations: DiskFileOperations = new DiskFileOperations { + override def checkVolumeExists(path: AbsolutePath): IO[Unit] = + IO.raiseError(new IllegalArgumentException("DiskFileOperations should not be called")) + + override def fetch(path: Uri.Path): IO[AkkaSource] = + IO.raiseError(new IllegalArgumentException("DiskFileOperations should not be called")) + + override def save(storage: DiskStorage, filename: String, entity: BodyPartEntity): IO[FileStorageMetadata] = + IO.raiseError(new IllegalArgumentException("DiskFileOperations should not be called")) + } + + def failingRemoteDiskFileOperations: RemoteDiskFileOperations = new RemoteDiskFileOperations { + override def checkFolderExists(folder: Label): IO[Unit] = + IO.raiseError(new IllegalArgumentException("RemoteDiskFileOperations should not be called")) + + override def fetch(folder: Label, path: Uri.Path): IO[AkkaSource] = + IO.raiseError(new IllegalArgumentException("RemoteDiskFileOperations should not be called")) + + override def save(storage: RemoteDiskStorage, filename: String, entity: BodyPartEntity): IO[FileStorageMetadata] = + IO.raiseError(new IllegalArgumentException("RemoteDiskFileOperations should not be called")) + + override def link(storage: RemoteDiskStorage, sourcePath: Uri.Path, filename: String): IO[FileStorageMetadata] = + IO.raiseError(new IllegalArgumentException("RemoteDiskFileOperations should not be called")) + + override def fetchAttributes(folder: Label, path: Uri.Path): IO[ComputedFileAttributes] = + IO.raiseError(new IllegalArgumentException("RemoteDiskFileOperations should not be called")) + } + + def registerS3FileOperationOnly(s3Client: S3StorageClient): S3FileOperations = new S3FileOperations { + override def checkBucketExists(bucket: String): IO[Unit] = + IO.raiseError(new IllegalArgumentException("S3FileOperations should not be called")) + + override def fetch(bucket: String, path: Uri.Path): IO[AkkaSource] = + IO.raiseError(new IllegalArgumentException("S3FileOperations should not be called")) + + override def save(storage: S3Storage, filename: String, entity: BodyPartEntity): IO[FileStorageMetadata] = + IO.raiseError(new IllegalArgumentException("S3FileOperations should not be called")) + + override def register(bucket: String, path: Uri.Path): IO[S3FileOperations.S3FileMetadata] = + S3FileOperations.registerInternal(s3Client, bucket, path)(UUIDF.random) + } + } diff --git a/ship/src/test/resources/gpfs/cat_scream.gif b/ship/src/test/resources/gpfs/cat_scream.gif new file mode 100644 index 0000000000..cc5a14fd56 Binary files /dev/null and b/ship/src/test/resources/gpfs/cat_scream.gif differ diff --git a/ship/src/test/resources/import/file-events-import.json b/ship/src/test/resources/import/file-events-import.json new file mode 100644 index 0000000000..3097a9004c --- /dev/null +++ b/ship/src/test/resources/import/file-events-import.json @@ -0,0 +1,2 @@ +{"ordering":100,"type":"project","org":"public","project":"sscx","id":"projects/public/sscx","rev":1,"value":{"rev": 1, "base": "https://bbp.epfl.ch/neurosciencegraph/data/", "uuid": "c7d70522-4305-480a-b190-75d757ed9a49", "@type": "ProjectCreated", "label": "sscx", "vocab": "https://bbp.epfl.ch/nexus/v1/resources/public/sscx/_/", "instant": "2020-01-22T16:03:33.105Z", "subject": {"@type": "User", "realm": "bbp", "subject": "bob"}, "apiMappings": {"prov": "http://www.w3.org/ns/prov#", "context": "https://incf.github.io/neuroshapes/contexts/", "schemaorg": "http://schema.org/", "datashapes": "https://neuroshapes.org/dash/", "ontologies": "https://neuroshapes.org/dash/ontology", "taxonomies": "https://neuroshapes.org/dash/taxonomy", "commonshapes": "https://neuroshapes.org/commons/", "provdatashapes": "https://provshapes.org/datashapes/", "provcommonshapes": "https://provshapes.org/commons/"}, "description": "This project contains somatosensorycortex dissemination publication data.", "organizationUuid": "d098a020-508b-4131-a28d-75f73b7f5f0e", "organizationLabel": "public"},"instant":"2020-01-22T17:03:33.105+01:00"} +{"ordering": 111, "type": "file", "org": "public", "project": "sscx", "id": "https://staging.nise.bbp.epfl.ch/nexus/v1/resources/tests/oliver/_/c6eb0e30-14c3-4460-a569-662fe1359e9d", "rev": 1, "value": {"id":"https://staging.nise.bbp.epfl.ch/nexus/v1/resources/tests/oliver/_/c6eb0e30-14c3-4460-a569-662fe1359e9d","rev":1,"@type":"FileCreated","instant":"2024-04-18T13:26:14.235032Z","project":"public/sscx","storage":"https://bluebrain.github.io/nexus/vocabulary/diskStorageDefault?rev=1","subject":{"@type":"User","realm":"bbp","subject":"grabinsk"},"attributes":{"path":"gpfs/cat_scream.gif","uuid":"53c531c3-7544-44e3-9b8b-a3412769d586","bytes":80030,"digest":{"@type":"ComputedDigest","value":"26620d2de1d6c77479a77bf0439e84b8a8c92ade96db70feaf62066cab14c65f","algorithm":"SHA-256"},"origin":"Client","filename":"cat_scream.gif","location":"file:///opt/binaries/tests/oliver/5/3/c/5/3/1/c/3/cat_scream.gif","mediaType":"image/gif"},"storageType":"DiskStorage"}, "instant":"2023-07-16T20:42:59.53+02:00"} \ No newline at end of file diff --git a/ship/src/test/scala/ch/epfl/bluebrain/nexus/ship/RunShipSuite.scala b/ship/src/test/scala/ch/epfl/bluebrain/nexus/ship/RunShipSuite.scala index 2faae90f80..4026b63f79 100644 --- a/ship/src/test/scala/ch/epfl/bluebrain/nexus/ship/RunShipSuite.scala +++ b/ship/src/test/scala/ch/epfl/bluebrain/nexus/ship/RunShipSuite.scala @@ -1,6 +1,9 @@ package ch.epfl.bluebrain.nexus.ship +import akka.http.scaladsl.model.Uri import cats.effect.IO +import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.DigestAlgorithm +import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.client.S3StorageClient import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri import ch.epfl.bluebrain.nexus.delta.rdf.Vocabulary.nxv import ch.epfl.bluebrain.nexus.delta.sdk.projects.Projects @@ -11,12 +14,14 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.model.{EntityType, Label, ProjectR import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset import ch.epfl.bluebrain.nexus.delta.sourcing.postgres.Doobie import ch.epfl.bluebrain.nexus.ship.ImportReport.Statistics -import ch.epfl.bluebrain.nexus.ship.RunShipSuite.{checkFor, expectedImportReport, getDistinctOrgProjects} +import ch.epfl.bluebrain.nexus.ship.RunShipSuite.{checkFor, expectedImportReport, getDistinctOrgProjects, noopS3Client} import ch.epfl.bluebrain.nexus.ship.config.ShipConfigFixtures import ch.epfl.bluebrain.nexus.testkit.mu.NexusSuite import doobie.implicits._ +import fs2.aws.s3.models.Models import fs2.io.file.Path import munit.AnyFixture +import software.amazon.awssdk.services.s3.model.{ChecksumAlgorithm, CopyObjectResponse, HeadObjectResponse, ListObjectsV2Response} import java.time.Instant @@ -35,7 +40,7 @@ class RunShipSuite extends NexusSuite with Doobie.Fixture with ShipConfigFixture test("Run import by providing the path to a file") { for { events <- eventsStream("import/import.json") - _ <- RunShip(events, inputConfig, xas).assertEquals(expectedImportReport) + _ <- RunShip(events, noopS3Client, inputConfig, xas).assertEquals(expectedImportReport) _ <- checkFor("elasticsearch", nxv + "defaultElasticSearchIndex", xas).assertEquals(1) _ <- checkFor("blazegraph", nxv + "defaultSparqlIndex", xas).assertEquals(1) _ <- checkFor("storage", nxv + "defaultS3Storage", xas).assertEquals(1) @@ -45,7 +50,7 @@ class RunShipSuite extends NexusSuite with Doobie.Fixture with ShipConfigFixture test("Run import by providing the path to a directory") { for { events <- eventsStream("import/multi-part-import") - _ <- RunShip(events, inputConfig, xas).assertEquals(expectedImportReport) + _ <- RunShip(events, noopS3Client, inputConfig, xas).assertEquals(expectedImportReport) } yield () } @@ -53,7 +58,7 @@ class RunShipSuite extends NexusSuite with Doobie.Fixture with ShipConfigFixture val start = Offset.at(2) for { events <- eventsStream("import/two-projects.json", offset = start) - _ <- RunShip(events, inputConfig, xas).map { report => + _ <- RunShip(events, noopS3Client, inputConfig, xas).map { report => assert(report.offset == Offset.at(2L)) assert(thereIsOneProjectEventIn(report)) } @@ -68,7 +73,7 @@ class RunShipSuite extends NexusSuite with Doobie.Fixture with ShipConfigFixture ) for { events <- eventsStream("import/import.json") - _ <- RunShip(events, configWithProjectMapping, xas) + _ <- RunShip(events, noopS3Client, configWithProjectMapping, xas) _ <- getDistinctOrgProjects(xas).map { project => assertEquals(project, target) } @@ -96,6 +101,42 @@ object RunShipSuite { | AND id = ${id.toString} """.stripMargin.query[Int].unique.transact(xas.read) + private val noopS3Client = new S3StorageClient { + override def listObjectsV2(bucket: String): IO[ListObjectsV2Response] = + IO.raiseError(new NotImplementedError("listObjectsV2 is not implemented")) + + override def listObjectsV2(bucket: Models.BucketName, prefix: String): IO[ListObjectsV2Response] = + IO.raiseError(new NotImplementedError("listObjectsV2 is not implemented")) + + override def readFile(bucket: Models.BucketName, fileKey: Models.FileKey): fs2.Stream[IO, Byte] = + fs2.Stream.empty + + override def headObject(bucket: String, key: String): IO[HeadObjectResponse] = + IO.raiseError(new NotImplementedError("headObject is not implemented")) + + override def copyObject( + sourceBucket: Models.BucketName, + sourceKey: Models.FileKey, + destinationBucket: Models.BucketName, + destinationKey: Models.FileKey, + checksumAlgorithm: ChecksumAlgorithm + ): IO[CopyObjectResponse] = + IO.raiseError(new NotImplementedError("copyObject is not implemented")) + + override def baseEndpoint: Uri = Uri.apply("http://localhost:4566") + + override def uploadFile( + fileData: fs2.Stream[IO, Byte], + bucket: String, + key: String, + algorithm: DigestAlgorithm + ): IO[S3StorageClient.UploadMetadata] = + IO.raiseError(new NotImplementedError("uploadFile is not implemented")) + + override def objectExists(bucket: String, key: String): IO[Boolean] = + IO.raiseError(new NotImplementedError("objectExists is not implemented")) + } + // The expected import report for the import.json file, as well as for the /import/multi-part-import directory val expectedImportReport: ImportReport = ImportReport( Offset.at(9999999L), diff --git a/ship/src/test/scala/ch/epfl/bluebrain/nexus/ship/S3RunShipSuite.scala b/ship/src/test/scala/ch/epfl/bluebrain/nexus/ship/S3RunShipSuite.scala index 3341fc5f9a..78c8fe9e58 100644 --- a/ship/src/test/scala/ch/epfl/bluebrain/nexus/ship/S3RunShipSuite.scala +++ b/ship/src/test/scala/ch/epfl/bluebrain/nexus/ship/S3RunShipSuite.scala @@ -1,7 +1,8 @@ package ch.epfl.bluebrain.nexus.ship import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.LocalStackS3StorageClient -import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.LocalStackS3StorageClient.uploadFileToS3 +import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.LocalStackS3StorageClient.{createBucket, uploadFileToS3} +import ch.epfl.bluebrain.nexus.delta.sourcing.model.EntityType import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset import ch.epfl.bluebrain.nexus.delta.sourcing.postgres.Doobie import ch.epfl.bluebrain.nexus.ship.RunShipSuite.expectedImportReport @@ -11,6 +12,7 @@ import eu.timepit.refined.types.string.NonEmptyString import fs2.aws.s3.models.Models.BucketName import fs2.io.file.Path import munit.AnyFixture +import software.amazon.awssdk.services.s3.model.GetObjectAttributesRequest import scala.concurrent.duration.{Duration, DurationInt} @@ -33,10 +35,39 @@ class S3RunShipSuite for { _ <- uploadFileToS3(fs2S3client, bucket, importFilePath) events = EventStreamer.s3eventStreamer(s3Client, bucket).stream(importFilePath, Offset.start) - _ <- RunShip(events, inputConfig, xas).assertEquals(expectedImportReport) + _ <- RunShip(events, s3Client, inputConfig, xas).assertEquals(expectedImportReport) } yield () } + test("Run import with file events") { + val importFilePath = Path("/import/file-events-import.json") + val gif = Path("gpfs/cat_scream.gif") + + val importBucket = BucketName(NonEmptyString.unsafeFrom("nexus-ship-production")) + val targetBucket = BucketName(NonEmptyString.unsafeFrom("nexus-delta-production")) + val shipConfig = inputConfig.copy(importBucket = importBucket, targetBucket = targetBucket) + + { + for { + _ <- uploadFileToS3(fs2S3client, importBucket, importFilePath) + _ <- uploadFileToS3(fs2S3client, importBucket, gif) + _ <- createBucket(fs2S3client, targetBucket) + events = EventStreamer.s3eventStreamer(s3Client, importBucket).stream(importFilePath, Offset.start) + _ <- RunShip(events, s3Client, shipConfig, xas).map(_.progress(EntityType("file")).success == 1L) + _ <- fs2S3client.getObjectAttributes( + GetObjectAttributesRequest + .builder() + .bucket(targetBucket.value.value) + .key(gif.toString) + .objectAttributesWithStrings(java.util.List.of("Checksum")) + .build() + ) + } yield () + }.accepted + + println(123) + } + test("Run import from S3 providing a directory") { val directoryPath = Path("/import/multi-part-import") for { @@ -44,7 +75,7 @@ class S3RunShipSuite _ <- uploadFileToS3(fs2S3client, bucket, Path("/import/multi-part-import/2024-04-05T14:38:31.165389Z.success")) _ <- uploadFileToS3(fs2S3client, bucket, Path("/import/multi-part-import/2024-04-06T11:34:31.165389Z.json")) events = EventStreamer.s3eventStreamer(s3Client, bucket).stream(directoryPath, Offset.start) - _ <- RunShip(events, inputConfig, xas).assertEquals(expectedImportReport) + _ <- RunShip(events, s3Client, inputConfig, xas).assertEquals(expectedImportReport) } yield () } diff --git a/ship/src/test/scala/ch/epfl/bluebrain/nexus/ship/config/ShipConfigFixtures.scala b/ship/src/test/scala/ch/epfl/bluebrain/nexus/ship/config/ShipConfigFixtures.scala index 8835112481..a093de901f 100644 --- a/ship/src/test/scala/ch/epfl/bluebrain/nexus/ship/config/ShipConfigFixtures.scala +++ b/ship/src/test/scala/ch/epfl/bluebrain/nexus/ship/config/ShipConfigFixtures.scala @@ -43,6 +43,7 @@ trait ShipConfigFixtures extends ConfigFixtures with StorageFixtures with Classp 10737418240L ) + private val importBucket = BucketName(NonEmptyString.unsafeFrom("nexus-ship-production")) private val targetBucket = BucketName(NonEmptyString.unsafeFrom("nexus-delta-production")) def inputConfig: InputConfig = @@ -55,6 +56,7 @@ trait ShipConfigFixtures extends ConfigFixtures with StorageFixtures with Classp viewDefaults, serviceAccount, StoragesConfig(eventLogConfig, pagination, config.copy(amazon = Some(amazonConfig))), + importBucket, targetBucket ) diff --git a/ship/src/test/scala/ch/epfl/bluebrain/nexus/ship/config/ShipConfigSuite.scala b/ship/src/test/scala/ch/epfl/bluebrain/nexus/ship/config/ShipConfigSuite.scala index 66f8f00558..da2d18285c 100644 --- a/ship/src/test/scala/ch/epfl/bluebrain/nexus/ship/config/ShipConfigSuite.scala +++ b/ship/src/test/scala/ch/epfl/bluebrain/nexus/ship/config/ShipConfigSuite.scala @@ -66,6 +66,13 @@ class ShipConfigSuite extends NexusSuite with ShipConfigFixtures with LocalStack } yield () } + test("Should read the import bucket") { + for { + config <- ShipConfig.load(None).map(_.input) + _ = assertEquals(config.importBucket, inputConfig.importBucket) + } yield () + } + test("Should read the target bucket") { for { config <- ShipConfig.load(None).map(_.input)