diff --git a/build.sbt b/build.sbt index c62c0df1e5..ab30d7a8de 100755 --- a/build.sbt +++ b/build.sbt @@ -854,7 +854,9 @@ lazy val tests = project awsSdk % Test, scalaTest % Test, akkaSlf4j % Test, - alpakkaSse % Test + alpakkaSse % Test, + fs2Aws % Test, + fs2AwsS3 % Test ), scalacOptions ~= { options: Seq[String] => options.filterNot(Set("-Wunused:imports")) }, Test / parallelExecution := false, diff --git a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/files/Files.scala b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/files/Files.scala index 9e04b9e26f..225b2aa2fb 100644 --- a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/files/Files.scala +++ b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/files/Files.scala @@ -218,6 +218,33 @@ final class Files( } yield res }.span("updateFileMetadata") + def registerFile( + id: FileId, + storageId: Option[IdSegment], + metadata: Option[FileCustomMetadata], + path: Uri.Path, + tag: Option[UserTag] + )(implicit caller: Caller): IO[FileResource] = { + for { + (iri, pc) <- id.expandIri(fetchContext.onCreate) + (storageRef, storage) <- fetchAndValidateActiveStorage(storageId, id.project, pc) + s3Metadata <- fileOperations.register(storage, path) + filename <- IO.fromOption(path.lastSegment)(InvalidFilePath) + attr = FileAttributes.from(FileDescription(filename, s3Metadata.contentType.some, metadata), s3Metadata.metadata) + res <- eval( + CreateFile( + iri, + id.project, + storageRef, + storage.tpe, + attr, + caller.subject, + tag + ) + ) + } yield res + }.span("registerFile") + /** * Update a new file linking it from an existing file in a storage * diff --git a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/files/model/FileAttributes.scala b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/files/model/FileAttributes.scala index f25db277c4..399bad2d85 100644 --- a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/files/model/FileAttributes.scala +++ b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/files/model/FileAttributes.scala @@ -95,17 +95,20 @@ object FileAttributes { object FileAttributesOrigin { final case object Client extends FileAttributesOrigin - final case object Storage extends FileAttributesOrigin + final case object Storage extends FileAttributesOrigin + final case object External extends FileAttributesOrigin implicit val fileAttributesEncoder: Encoder[FileAttributesOrigin] = Encoder.encodeString.contramap { - case Client => "Client" - case Storage => "Storage" + case Client => "Client" + case Storage => "Storage" + case External => "External" } implicit val fileAttributesDecoder: Decoder[FileAttributesOrigin] = Decoder.decodeString.emap { - case "Client" => Right(Client) - case "Storage" => Right(Storage) - case str => Left(s"'$str' is not a 'FileAttributesOrigin'") + case "Client" => Right(Client) + case "Storage" => Right(Storage) + case "External" => Right(External) + case str => Left(s"'$str' is not a 'FileAttributesOrigin'") } } diff --git a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/files/model/FileRejection.scala b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/files/model/FileRejection.scala index 0eedccb8c1..bc12dcf47f 100644 --- a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/files/model/FileRejection.scala +++ b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/files/model/FileRejection.scala @@ -226,9 +226,9 @@ object FileRejection { * Rejection returned when attempting to link a file without providing a filename or a path that ends with a * filename. */ - final case object InvalidFileLink + final case object InvalidFilePath extends FileRejection( - s"Linking a file cannot be performed without a 'filename' or a 'path' that does not end with a filename." + s"Linking or registering a file cannot be performed without a 'filename' or a 'path' that does not end with a filename." ) final case class CopyRejection( diff --git a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/files/routes/FilesRoutes.scala b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/files/routes/FilesRoutes.scala index 29a86ece58..9abf0a8975 100644 --- a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/files/routes/FilesRoutes.scala +++ b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/files/routes/FilesRoutes.scala @@ -11,7 +11,7 @@ import cats.syntax.all._ import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.FileRejection._ import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model._ import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.permissions.{read => Read, write => Write} -import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.routes.FilesRoutes.LinkFileRequest.{fileDescriptionFromRequest, linkFileDecoder} +import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.routes.FilesRoutes.LinkFileRequest.{fileDescriptionFromRequest, linkFileDecoder, regFileDecoder} import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.routes.FilesRoutes._ import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.{schemas, FileResource, Files} import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.StoragesConfig.ShowFileLocation @@ -264,6 +264,32 @@ final class FilesRoutes( } } ) + }, + pathPrefix("register") { + (idSegment & indexingMode) { (id, mode) => + pathEndOrSingleSlash { + operationName(s"$prefixSegment/files/{org}/{project}/register/{id}") { + parameters("storage".as[IdSegment].?, "tag".as[UserTag].?) { (storage, tag) => + entity(as[RegisterFileRequest]) { registerRequest => + val fileId = FileId(id, project) + emit( + Created, + files + .registerFile( + fileId, + storage, + registerRequest.metadata, + registerRequest.path, + tag + ) + .index(mode) + .attemptNarrow[FileRejection] + ) + } + } + } + } + } } ) } @@ -329,15 +355,18 @@ object FilesRoutes { metadata: Option[FileCustomMetadata] ) + final case class RegisterFileRequest(path: Path, metadata: Option[FileCustomMetadata]) + object LinkFileRequest { @nowarn("cat=unused") - implicit private val config: Configuration = Configuration.default - implicit val linkFileDecoder: Decoder[LinkFileRequest] = deriveConfiguredDecoder[LinkFileRequest] + implicit private val config: Configuration = Configuration.default + implicit val linkFileDecoder: Decoder[LinkFileRequest] = deriveConfiguredDecoder[LinkFileRequest] + implicit val regFileDecoder: Decoder[RegisterFileRequest] = deriveConfiguredDecoder[RegisterFileRequest] def fileDescriptionFromRequest(f: LinkFileRequest): IO[FileDescription] = f.filename.orElse(f.path.lastSegment) match { case Some(value) => IO.pure(FileDescription(value, f.mediaType, f.metadata)) - case None => IO.raiseError(InvalidFileLink) + case None => IO.raiseError(InvalidFilePath) } } } diff --git a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/FileOperations.scala b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/FileOperations.scala index a18e6688b9..f687e13965 100644 --- a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/FileOperations.scala +++ b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/FileOperations.scala @@ -6,10 +6,11 @@ import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.{ComputedFileAt import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.Storage.{DiskStorage, RemoteDiskStorage, S3Storage} import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.StorageValue.{DiskStorageValue, RemoteDiskStorageValue, S3StorageValue} import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.{Storage, StorageValue} -import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.StorageFileRejection.{FetchAttributeRejection, MoveFileRejection} +import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.StorageFileRejection.{FetchAttributeRejection, MoveFileRejection, RegisterFileRejection} import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.disk.DiskFileOperations import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.remote.RemoteDiskFileOperations import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.S3FileOperations +import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.S3FileOperations.S3FileMetadata import ch.epfl.bluebrain.nexus.delta.sdk.AkkaSource import ch.epfl.bluebrain.nexus.delta.sourcing.model.ProjectRef @@ -22,6 +23,8 @@ trait FileOperations extends StorageAccess { def link(storage: Storage, sourcePath: Uri.Path, filename: String): IO[FileStorageMetadata] + def register(storage: Storage, path: Uri.Path): IO[S3FileMetadata] + def fetchAttributes(storage: Storage, attributes: FileAttributes): IO[ComputedFileAttributes] } @@ -62,6 +65,12 @@ object FileOperations { case s: RemoteDiskStorage => remoteDiskFileOps.fetchAttributes(s.value.folder, attributes.path) case s => IO.raiseError(FetchAttributeRejection.UnsupportedOperation(s.tpe)) } + + override def register(storage: Storage, path: Uri.Path): IO[S3FileMetadata] = + storage match { + case s: S3Storage => s3FileOps.register(s.value.bucket, path) + case s => IO.raiseError(RegisterFileRejection.UnsupportedOperation(s.tpe)) + } } /** diff --git a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/StorageFileRejection.scala b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/StorageFileRejection.scala index 8e7036481a..c4758b9994 100644 --- a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/StorageFileRejection.scala +++ b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/StorageFileRejection.scala @@ -1,6 +1,7 @@ package ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations -import akka.http.scaladsl.model.StatusCodes +import akka.http.scaladsl.model.{StatusCodes, Uri} +import cats.data.NonEmptyList import ch.epfl.bluebrain.nexus.delta.kernel.error.Rejection import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.StorageType import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri @@ -192,4 +193,20 @@ object StorageFileRejection { } + sealed abstract class RegisterFileRejection(loggedDetails: String) extends StorageFileRejection(loggedDetails) + + object RegisterFileRejection { + final case class MissingS3Attributes(missingAttributes: NonEmptyList[String]) + extends RegisterFileRejection(s"Missing attributes from S3: ${missingAttributes.toList.mkString(", ")}") + + final case class InvalidContentType(received: String) + extends RegisterFileRejection(s"Invalid content type returned from S3: $received") + + final case class InvalidPath(path: Uri.Path) + extends RegisterFileRejection(s"An S3 path must contain at least the filename. Path was $path") + + final case class UnsupportedOperation(tpe: StorageType) + extends MoveFileRejection(s"Registering a file in-place is not supported for storages of type '${tpe.iri}'") + } + } diff --git a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/S3FileOperations.scala b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/S3FileOperations.scala index 6881a341ab..bea15744ab 100644 --- a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/S3FileOperations.scala +++ b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/S3FileOperations.scala @@ -1,22 +1,31 @@ package ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3 import akka.actor.ActorSystem -import akka.http.scaladsl.model.{BodyPartEntity, Uri} +import akka.http.scaladsl.model.{BodyPartEntity, ContentType, Uri} import akka.stream.scaladsl.Source import akka.util.ByteString import cats.effect.IO +import cats.syntax.all._ import ch.epfl.bluebrain.nexus.delta.kernel.Logger import ch.epfl.bluebrain.nexus.delta.kernel.utils.UUIDF -import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.FileStorageMetadata +import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.FileAttributes.FileAttributesOrigin +import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.{Digest, FileStorageMetadata} +import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.DigestAlgorithm import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.Storage.S3Storage import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.StorageRejection.StorageNotAccessible import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.StorageFileRejection.FetchFileRejection.UnexpectedFetchError +import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.StorageFileRejection.RegisterFileRejection.InvalidContentType +import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.S3FileOperations.S3FileMetadata import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.client.S3StorageClient +import ch.epfl.bluebrain.nexus.delta.rdf.syntax.uriSyntax import ch.epfl.bluebrain.nexus.delta.sdk.AkkaSource import ch.epfl.bluebrain.nexus.delta.sdk.stream.StreamConverter +import org.apache.commons.codec.binary.Hex +import software.amazon.awssdk.services.s3.model.HeadObjectResponse import java.net.URLDecoder import java.nio.charset.StandardCharsets.UTF_8 +import java.util.Base64 import scala.concurrent.duration.DurationInt trait S3FileOperations { @@ -29,9 +38,12 @@ trait S3FileOperations { filename: String, entity: BodyPartEntity ): IO[FileStorageMetadata] + + def register(bucket: String, path: Uri.Path): IO[S3FileMetadata] } object S3FileOperations { + final case class S3FileMetadata(contentType: ContentType, metadata: FileStorageMetadata) def mk(client: S3StorageClient)(implicit as: ActorSystem, uuidf: UUIDF): S3FileOperations = new S3FileOperations { @@ -63,6 +75,48 @@ object S3FileOperations { override def save(storage: S3Storage, filename: String, entity: BodyPartEntity): IO[FileStorageMetadata] = saveFile.apply(storage, filename, entity) + + override def register(bucket: String, path: Uri.Path): IO[S3FileMetadata] = { + for { + _ <- log.info(s"Fetching attributes for S3 file. Bucket $bucket at path $path") + resp <- client.headObject(bucket, path.toString()) + contentType <- parseContentType(resp.contentType()) + metadata <- mkS3Metadata(bucket, path, resp, contentType) + } yield metadata + } + .onError { e => + log.error(e)(s"Failed fetching required attributes for S3 file registration. Bucket $bucket and path $path") + } + + private def parseContentType(raw: String): IO[ContentType] = + ContentType.parse(raw).map(_.pure[IO]).getOrElse(IO.raiseError(InvalidContentType(raw))) + + private def mkS3Metadata(bucket: String, path: Uri.Path, resp: HeadObjectResponse, ct: ContentType) = { + for { + uuid <- uuidf() + checksum <- checksumFrom(resp) + } yield S3FileMetadata( + ct, + FileStorageMetadata( + uuid, + resp.contentLength(), + checksum, + FileAttributesOrigin.External, + client.baseEndpoint / bucket / path, + path + ) + ) + } + + private def checksumFrom(response: HeadObjectResponse) = IO.fromOption { + Option(response.checksumSHA256()) + .map { checksum => + Digest.ComputedDigest( + DigestAlgorithm.default, + Hex.encodeHexString(Base64.getDecoder.decode(checksum)) + ) + } + }(new IllegalArgumentException("Missing checksum")) } } diff --git a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/S3StorageSaveFile.scala b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/S3StorageSaveFile.scala index 744d007b20..3fe7e41850 100644 --- a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/S3StorageSaveFile.scala +++ b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/S3StorageSaveFile.scala @@ -62,7 +62,7 @@ final class S3StorageSaveFile(s3StorageClient: S3StorageClient)(implicit _ <- log(bucket, key, s"Beginning upload") (digest, fileSize) <- uploadFile(fileData, bucket, key, algorithm) _ <- log(bucket, key, s"Finished upload. Digest: $digest") - attr <- fileMetadata(bucket, key, uuid, fileSize, algorithm, digest) + attr = fileMetadata(bucket, key, uuid, fileSize, algorithm, digest) } yield attr) .onError(e => logger.error(e)("Unexpected error when storing file")) .adaptError { err => UnexpectedSaveError(key, err.getMessage) } @@ -75,26 +75,27 @@ final class S3StorageSaveFile(s3StorageClient: S3StorageClient)(implicit fileSize: Long, algorithm: DigestAlgorithm, digest: String - ): IO[FileStorageMetadata] = - s3StorageClient.baseEndpoint.map { base => - FileStorageMetadata( - uuid = uuid, - bytes = fileSize, - digest = Digest.ComputedDigest(algorithm, digest), - origin = Client, - location = base / bucket / Uri.Path(key), - path = Uri.Path(key) - ) - } + ): FileStorageMetadata = + FileStorageMetadata( + uuid = uuid, + bytes = fileSize, + digest = Digest.ComputedDigest(algorithm, digest), + origin = Client, + location = s3StorageClient.baseEndpoint / bucket / Uri.Path(key), + path = Uri.Path(key) + ) private def validateObjectDoesNotExist(bucket: String, key: String) = - getFileAttributes(bucket, key).redeemWith( - { - case _: NoSuchKeyException => IO.unit - case e => IO.raiseError(e) - }, - _ => IO.raiseError(ResourceAlreadyExists(key)) - ) + s3StorageClient + .headObject(bucket, key) + .void + .redeemWith( + { + case _: NoSuchKeyException => IO.unit + case e => IO.raiseError(e) + }, + _ => IO.raiseError(ResourceAlreadyExists(key)) + ) private def convertStream(source: Source[ByteString, Any]): Stream[IO, Byte] = StreamConverter( @@ -152,9 +153,6 @@ final class S3StorageSaveFile(s3StorageClient: S3StorageClient)(implicit } } - private def getFileAttributes(bucket: String, key: String): IO[GetObjectAttributesResponse] = - s3StorageClient.getFileAttributes(bucket, key) - private def log(bucket: String, key: String, msg: String): IO[Unit] = logger.info(s"Bucket: ${bucket}. Key: $key. $msg") } diff --git a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/client/S3StorageClient.scala b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/client/S3StorageClient.scala index 75f1424876..da606a3cfe 100644 --- a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/client/S3StorageClient.scala +++ b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/client/S3StorageClient.scala @@ -12,7 +12,7 @@ import io.laserdisc.pure.s3.tagless.{Interpreter, S3AsyncClientOp} import software.amazon.awssdk.auth.credentials.{AwsBasicCredentials, AwsCredentialsProvider, StaticCredentialsProvider} import software.amazon.awssdk.regions.Region import software.amazon.awssdk.services.s3.S3AsyncClient -import software.amazon.awssdk.services.s3.model.{GetObjectAttributesRequest, GetObjectAttributesResponse, ListObjectsV2Request, ListObjectsV2Response, ObjectAttributes} +import software.amazon.awssdk.services.s3.model.{ChecksumMode, HeadObjectRequest, HeadObjectResponse, ListObjectsV2Request, ListObjectsV2Response} import java.net.URI @@ -26,11 +26,11 @@ trait S3StorageClient { def readFile(bucket: BucketName, fileKey: FileKey): Stream[IO, Byte] - def getFileAttributes(bucket: String, key: String): IO[GetObjectAttributesResponse] + def headObject(bucket: String, key: String): IO[HeadObjectResponse] def underlyingClient: S3AsyncClientOp[IO] - def baseEndpoint: IO[Uri] + def baseEndpoint: Uri } object S3StorageClient { @@ -57,7 +57,7 @@ object S3StorageClient { ) .map(new S3StorageClientImpl(_, endpoint.toString)) - final class S3StorageClientImpl(client: S3AsyncClientOp[IO], baseEndpoint: Uri) extends S3StorageClient { + final class S3StorageClientImpl(client: S3AsyncClientOp[IO], val baseEndpoint: Uri) extends S3StorageClient { private val s3: S3[IO] = S3.create(client) override def listObjectsV2(bucket: String): IO[ListObjectsV2Response] = @@ -69,20 +69,10 @@ object S3StorageClient { override def readFile(bucket: BucketName, fileKey: FileKey): Stream[IO, Byte] = s3.readFile(bucket, fileKey) - override def getFileAttributes(bucket: String, key: String): IO[GetObjectAttributesResponse] = - client - .getObjectAttributes( - GetObjectAttributesRequest - .builder() - .bucket(bucket) - .key(key) - .objectAttributes(ObjectAttributes.knownValues()) // TODO get all values - .build() - ) + override def headObject(bucket: String, key: String): IO[HeadObjectResponse] = + client.headObject(HeadObjectRequest.builder().bucket(bucket).key(key).checksumMode(ChecksumMode.ENABLED).build) override def underlyingClient: S3AsyncClientOp[IO] = client - - override def baseEndpoint: IO[Uri] = IO.pure(baseEndpoint) } final case object S3StorageClientDisabled extends S3StorageClient { @@ -95,10 +85,10 @@ object S3StorageClient { override def readFile(bucket: BucketName, fileKey: FileKey): Stream[IO, Byte] = Stream.raiseError[IO](disabledErr) - override def getFileAttributes(bucket: String, key: String): IO[GetObjectAttributesResponse] = raiseDisabledErr + override def headObject(bucket: String, key: String): IO[HeadObjectResponse] = raiseDisabledErr override def underlyingClient: S3AsyncClientOp[IO] = throw disabledErr - override def baseEndpoint: IO[Uri] = raiseDisabledErr + override def baseEndpoint: Uri = throw disabledErr } } diff --git a/delta/plugins/storage/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/files/mocks/FileOperationsMock.scala b/delta/plugins/storage/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/files/mocks/FileOperationsMock.scala index aaae15727c..860a3b163e 100644 --- a/delta/plugins/storage/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/files/mocks/FileOperationsMock.scala +++ b/delta/plugins/storage/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/files/mocks/FileOperationsMock.scala @@ -36,6 +36,7 @@ object FileOperationsMock { def link(storage: Storage, sourcePath: Uri.Path, filename: String): IO[FileStorageMetadata] = ??? def fetch(storage: Storage, attributes: FileAttributes): IO[AkkaSource] = ??? def fetchAttributes(storage: Storage, attributes: FileAttributes): IO[ComputedFileAttributes] = ??? + def register(storage: Storage, path: Uri.Path): IO[S3FileOperations.S3FileMetadata] = ??? } def diskUnimplemented: DiskFileOperations = new DiskFileOperations { @@ -48,5 +49,6 @@ object FileOperationsMock { def checkBucketExists(bucket: String): IO[Unit] = ??? def fetch(bucket: String, path: Uri.Path): IO[AkkaSource] = ??? def save(storage: Storage.S3Storage, filename: String, entity: BodyPartEntity): IO[FileStorageMetadata] = ??? + def register(bucket: String, path: Uri.Path): IO[S3FileOperations.S3FileMetadata] = ??? } } diff --git a/delta/plugins/storage/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/files/routes/FilesRoutesSpec.scala b/delta/plugins/storage/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/files/routes/FilesRoutesSpec.scala index af67235e8b..287fff1181 100644 --- a/delta/plugins/storage/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/files/routes/FilesRoutesSpec.scala +++ b/delta/plugins/storage/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/files/routes/FilesRoutesSpec.scala @@ -270,8 +270,8 @@ class FilesRoutesSpec json""" { "@context" : "https://bluebrain.github.io/nexus/contexts/error.json", - "@type" : "InvalidFileLink", - "reason" : "Linking a file cannot be performed without a 'filename' or a 'path' that does not end with a filename." + "@type" : "InvalidFilePath", + "reason" : "Linking or registering a file cannot be performed without a 'filename' or a 'path' that does not end with a filename." } """ } diff --git a/tests/src/test/resources/kg/files/registration-metadata.json b/tests/src/test/resources/kg/files/registration-metadata.json new file mode 100644 index 0000000000..8a5ec60b8f --- /dev/null +++ b/tests/src/test/resources/kg/files/registration-metadata.json @@ -0,0 +1,31 @@ +{ + "@context" : [ + "https://bluebrain.github.io/nexus/contexts/files.json", + "https://bluebrain.github.io/nexus/contexts/metadata.json" + ], + "@id": "{{id}}", + "@type": "File", + "_storage": { + "@id": "https://bluebrain.github.io/nexus/vocabulary/{{storageId}}", + "@type" : "S3Storage", + "_rev": 3 + }, + "_bytes": 29625, + "_digest": { + "_algorithm": "SHA-256", + "_value": "{{digestValue}}" + }, + "_filename": "nexus-logo.png", + "_location": "{{location}}", + "_mediaType": "image/png", + "_origin" : "External", + "_incoming": "{{self}}/incoming", + "_outgoing": "{{self}}/outgoing", + "_self": "{{self}}", + "_constrainedBy": "https://bluebrain.github.io/nexus/schemas/files.json", + "_project": "{{deltaUri}}/projects/{{projId}}", + "_rev": 1, + "_deprecated": false, + "_createdBy": "{{deltaUri}}/realms/{{realm}}/users/{{user}}", + "_updatedBy": "{{deltaUri}}/realms/{{realm}}/users/{{user}}" +} \ No newline at end of file diff --git a/tests/src/test/scala/ch/epfl/bluebrain/nexus/tests/kg/files/S3StorageSpec.scala b/tests/src/test/scala/ch/epfl/bluebrain/nexus/tests/kg/files/S3StorageSpec.scala index 2ec325b63b..d0e5d579f8 100644 --- a/tests/src/test/scala/ch/epfl/bluebrain/nexus/tests/kg/files/S3StorageSpec.scala +++ b/tests/src/test/scala/ch/epfl/bluebrain/nexus/tests/kg/files/S3StorageSpec.scala @@ -2,20 +2,24 @@ package ch.epfl.bluebrain.nexus.tests.kg.files import akka.http.scaladsl.model.StatusCodes import cats.effect.IO +import cats.implicits.toTraverseOps import ch.epfl.bluebrain.nexus.tests.Identity.storages.Coyote import ch.epfl.bluebrain.nexus.tests.Optics.{error, filterMetadataKeys} import ch.epfl.bluebrain.nexus.tests.config.S3Config import ch.epfl.bluebrain.nexus.tests.iam.types.Permission import io.circe.Json import io.circe.syntax.EncoderOps +import io.laserdisc.pure.s3.tagless.Interpreter +import org.apache.commons.codec.binary.Hex import org.scalatest.Assertion import software.amazon.awssdk.auth.credentials.{AnonymousCredentialsProvider, AwsBasicCredentials, StaticCredentialsProvider} import software.amazon.awssdk.regions.Region -import software.amazon.awssdk.services.s3.S3Client +import software.amazon.awssdk.services.s3.S3AsyncClient import software.amazon.awssdk.services.s3.model._ import java.net.URI import java.nio.file.Paths +import java.util.Base64 import scala.jdk.CollectionConverters._ class S3StorageSpec extends StorageSpec { @@ -30,8 +34,11 @@ class S3StorageSpec extends StorageSpec { val s3Config: S3Config = storageConfig.s3 - private val bucket = genId() - private val logoKey = "some/path/to/nexus-logo.png" + private val bucket = genId() + private val logoFilename = "nexus-logo.png" + private val logoKey = s"some/path/to/$logoFilename" + private val logoSha256Base64Digest = "Bb9EKBAhO55f7NUkLu/v8fPSB5E4YclmWMdcz1iZfoc=" + private val logoSha256HexDigest = Hex.encodeHexString(Base64.getDecoder.decode(logoSha256Base64Digest)) val s3Endpoint: String = "http://s3.localhost.localstack.cloud:4566" val s3BucketEndpoint: String = s"http://s3.localhost.localstack.cloud:4566/$bucket" @@ -41,29 +48,45 @@ class S3StorageSpec extends StorageSpec { case _ => AnonymousCredentialsProvider.create() } - private val s3Client = S3Client.builder - .endpointOverride(new URI(s3Endpoint)) - .credentialsProvider(credentialsProvider) - .region(Region.US_EAST_1) - .build + val s3Client = Interpreter[IO] + .S3AsyncClientOpResource( + S3AsyncClient + .builder() + .credentialsProvider(credentialsProvider) + .endpointOverride(new URI(s3Endpoint)) + .forcePathStyle(true) + .region(Region.US_EAST_1) + ) + .allocated + .map(_._1) + .accepted override def beforeAll(): Unit = { super.beforeAll() - // Configure minio - s3Client.createBucket(CreateBucketRequest.builder.bucket(bucket).build) - s3Client.putObject( - PutObjectRequest.builder.bucket(bucket).key(logoKey).build, - Paths.get(getClass.getResource("/kg/files/nexus-logo.png").toURI) - ) + (s3Client.createBucket(CreateBucketRequest.builder.bucket(bucket).build) >> uploadLogoFile(logoKey)).accepted () } + private def uploadLogoFile(key: String): IO[PutObjectResponse] = s3Client.putObject( + PutObjectRequest.builder + .bucket(bucket) + .key(key) + .checksumAlgorithm(ChecksumAlgorithm.SHA256) + .checksumSHA256(logoSha256Base64Digest) + .build, + Paths.get(getClass.getResource("/kg/files/nexus-logo.png").toURI) + ) + override def afterAll(): Unit = { - val objects = s3Client.listObjects(ListObjectsRequest.builder.bucket(bucket).build) - objects.contents.asScala.foreach { obj => - s3Client.deleteObject(DeleteObjectRequest.builder.bucket(bucket).key(obj.key).build) - } - s3Client.deleteBucket(DeleteBucketRequest.builder.bucket(bucket).build) + val cleanup: IO[Unit] = for { + resp <- s3Client.listObjects(ListObjectsRequest.builder.bucket(bucket).build) + objects = resp.contents.asScala.toList + _ <- objects.traverse(obj => s3Client.deleteObject(DeleteObjectRequest.builder.bucket(bucket).key(obj.key).build)) + _ <- s3Client.deleteBucket(DeleteBucketRequest.builder.bucket(bucket).build) + } yield () + + cleanup.accepted + super.afterAll() } @@ -145,4 +168,40 @@ class S3StorageSpec extends StorageSpec { } } } + + private def registrationResponse(id: String, digestValue: String, location: String): Json = + jsonContentOf( + "kg/files/registration-metadata.json", + replacements( + Coyote, + "id" -> id, + "storageId" -> storageId, + "self" -> fileSelf(projectRef, id), + "projId" -> s"$projectRef", + "digestValue" -> digestValue, + "location" -> location + ): _* + ) + + s"Registering an S3 file in-place" should { + "succeed" in { + val id = genId() + val path = s"$id/nexus-logo.png" + val payload = Json.obj("path" -> Json.fromString(path)) + + for { + _ <- uploadLogoFile(path) + _ <- deltaClient.put[Json](s"/files/$projectRef/register/$id?storage=nxv:$storageId", payload, Coyote) { + (_, response) => response.status shouldEqual StatusCodes.Created + } + fullId = s"$attachmentPrefix$id" + location = s"$s3BucketEndpoint/$path" + expected = registrationResponse(fullId, logoSha256HexDigest, location) + assertion <- deltaClient.get[Json](s"/files/$projectRef/$id", Coyote) { (json, response) => + response.status shouldEqual StatusCodes.OK + filterMetadataKeys(json) shouldEqual expected + } + } yield assertion + } + } }