Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

S3 content type user supplied #4895

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package ch.epfl.bluebrain.nexus.delta.plugins.storage.files
import akka.actor.typed.ActorSystem
import akka.actor.{ActorSystem => ClassicActorSystem}
import akka.http.scaladsl.model.ContentTypes.`application/octet-stream`
import akka.http.scaladsl.model.{BodyPartEntity, HttpEntity, Uri}
import akka.http.scaladsl.model.{BodyPartEntity, ContentType, HttpEntity, Uri}
import cats.effect.{Clock, IO}
import cats.syntax.all._
import ch.epfl.bluebrain.nexus.delta.kernel.kamon.KamonMetricComponent
Expand Down Expand Up @@ -223,14 +223,18 @@ final class Files(
storageId: Option[IdSegment],
metadata: Option[FileCustomMetadata],
path: Uri.Path,
tag: Option[UserTag]
tag: Option[UserTag],
mediaType: Option[ContentType]
)(implicit caller: Caller): IO[FileResource] = {
for {
(iri, pc) <- id.expandIri(fetchContext.onCreate)
(storageRef, storage) <- fetchAndValidateActiveStorage(storageId, id.project, pc)
s3Metadata <- fileOperations.register(storage, path)
filename <- IO.fromOption(path.lastSegment)(InvalidFilePath)
attr = FileAttributes.from(FileDescription(filename, s3Metadata.contentType.some, metadata), s3Metadata.metadata)
attr = FileAttributes.from(
FileDescription(filename, mediaType.orElse(s3Metadata.contentType), metadata),
s3Metadata.metadata
)
res <- eval(
CreateFile(
iri,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,8 @@ final class FilesRoutes(
storage,
registerRequest.metadata,
registerRequest.path,
tag
tag,
registerRequest.mediaType
)
.index(mode)
.attemptNarrow[FileRejection]
Expand Down Expand Up @@ -355,7 +356,7 @@ object FilesRoutes {
metadata: Option[FileCustomMetadata]
)

final case class RegisterFileRequest(path: Path, metadata: Option[FileCustomMetadata])
final case class RegisterFileRequest(path: Path, mediaType: Option[ContentType], metadata: Option[FileCustomMetadata])

object LinkFileRequest {
@nowarn("cat=unused")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import akka.http.scaladsl.model.{BodyPartEntity, ContentType, Uri}
import akka.stream.scaladsl.Source
import akka.util.ByteString
import cats.effect.IO
import cats.syntax.all._
import ch.epfl.bluebrain.nexus.delta.kernel.Logger
import ch.epfl.bluebrain.nexus.delta.kernel.utils.UUIDF
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.FileAttributes.FileAttributesOrigin
Expand All @@ -17,11 +16,11 @@ import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.Storage
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.StorageFileRejection.RegisterFileRejection.InvalidContentType
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.S3FileOperations.S3FileMetadata
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.client.S3StorageClient
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.client.S3StorageClient.HeadObject
import ch.epfl.bluebrain.nexus.delta.rdf.syntax.uriSyntax
import ch.epfl.bluebrain.nexus.delta.sdk.AkkaSource
import ch.epfl.bluebrain.nexus.delta.sdk.stream.StreamConverter
import org.apache.commons.codec.binary.Hex
import software.amazon.awssdk.services.s3.model.HeadObjectResponse

import java.net.URLDecoder
import java.nio.charset.StandardCharsets.UTF_8
Expand All @@ -43,21 +42,19 @@ trait S3FileOperations {
}

object S3FileOperations {
final case class S3FileMetadata(contentType: ContentType, metadata: FileStorageMetadata)
final case class S3FileMetadata(contentType: Option[ContentType], metadata: FileStorageMetadata)

private val log = Logger[S3FileOperations]

def mk(client: S3StorageClient)(implicit as: ActorSystem, uuidf: UUIDF): S3FileOperations = new S3FileOperations {

private lazy val saveFile = new S3StorageSaveFile(client)

override def checkBucketExists(bucket: String): IO[Unit] =
client
.listObjectsV2(bucket)
.redeemWith(
err => IO.raiseError(StorageNotAccessible(err.getMessage)),
response => log.info(s"S3 bucket $bucket contains ${response.keyCount()} objects")
)
override def checkBucketExists(bucket: String): IO[Unit] = {
client.bucketExists(bucket).flatMap { exists =>
IO.raiseUnless(exists)(StorageNotAccessible(s"Bucket $bucket does not exist"))
}
}

override def fetch(bucket: String, path: Uri.Path): IO[AkkaSource] = IO
.delay(
Expand Down Expand Up @@ -86,36 +83,38 @@ object S3FileOperations {
uuidF: UUIDF
): IO[S3FileMetadata] = {
for {
_ <- log.info(s"Fetching attributes for S3 file. Bucket $bucket at path $path")
resp <- client.headObject(bucket, path.toString())
contentType <- parseContentType(resp.contentType())
metadata <- mkS3Metadata(client, bucket, path, resp, contentType)
_ <- log.info(s"Fetching attributes for S3 file. Bucket $bucket at path $path")
resp <- client.headObject(bucket, path.toString())
metadata <- mkS3Metadata(client, bucket, path, resp)
} yield metadata
}
.onError { e =>
log.error(e)(s"Failed fetching required attributes for S3 file registration. Bucket $bucket and path $path")
}

private def parseContentType(raw: String): IO[ContentType] =
ContentType.parse(raw).map(_.pure[IO]).getOrElse(IO.raiseError(InvalidContentType(raw)))
private def parseContentType(raw: Option[String]): IO[Option[ContentType]] = {
raw match {
case Some(value) =>
ContentType.parse(value) match {
case Left(_) => IO.raiseError(InvalidContentType(value))
case Right(value) => IO.pure(Some(value))
}
case None => IO.none
}
}

private def mkS3Metadata(
client: S3StorageClient,
bucket: String,
path: Uri.Path,
resp: HeadObjectResponse,
ct: ContentType
)(implicit
private def mkS3Metadata(client: S3StorageClient, bucket: String, path: Uri.Path, resp: HeadObject)(implicit
uuidf: UUIDF
) = {
for {
uuid <- uuidf()
checksum <- checksumFrom(resp)
uuid <- uuidf()
contentType <- parseContentType(resp.contentType)
checksum <- checksumFrom(resp)
} yield S3FileMetadata(
ct,
contentType,
FileStorageMetadata(
uuid,
resp.contentLength(),
resp.fileSize,
checksum,
FileAttributesOrigin.External,
client.baseEndpoint / bucket / path,
Expand All @@ -124,8 +123,8 @@ object S3FileOperations {
)
}

private def checksumFrom(response: HeadObjectResponse) = IO.fromOption {
Option(response.checksumSHA256())
private def checksumFrom(response: HeadObject) = IO.fromOption {
response.sha256Checksum
.map { checksum =>
Digest.ComputedDigest(
DigestAlgorithm.default,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ import cats.effect.{IO, Ref, Resource}
import cats.syntax.all._
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.StoragesConfig.S3StorageConfig
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.DigestAlgorithm
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.client.S3StorageClient.UploadMetadata
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.StorageRejection.StorageNotAccessible
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.client.S3StorageClient.{HeadObject, UploadMetadata}
import ch.epfl.bluebrain.nexus.delta.rdf.syntax.uriSyntax
import ch.epfl.bluebrain.nexus.delta.sdk.error.ServiceError.FeatureDisabled
import fs2.aws.s3.S3
Expand All @@ -32,7 +33,7 @@ trait S3StorageClient {

def readFile(bucket: BucketName, fileKey: FileKey): Stream[IO, Byte]

def headObject(bucket: String, key: String): IO[HeadObjectResponse]
def headObject(bucket: String, key: String): IO[HeadObject]

def copyObject(
sourceBucket: BucketName,
Expand All @@ -50,13 +51,20 @@ trait S3StorageClient {
): IO[UploadMetadata]

def objectExists(bucket: String, key: String): IO[Boolean]
def bucketExists(bucket: String): IO[Boolean]

def baseEndpoint: Uri
}

object S3StorageClient {

case class UploadMetadata(checksum: String, fileSize: Long, location: Uri)
case class HeadObject(
fileSize: Long,
contentType: Option[String],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It could have directly a ContentType here

sha256Checksum: Option[String],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same it could be a Digest

sha1Checksum: Option[String]
)

def resource(s3Config: Option[S3StorageConfig]): Resource[IO, S3StorageClient] = s3Config match {
case Some(cfg) =>
Expand Down Expand Up @@ -93,8 +101,24 @@ object S3StorageClient {
override def readFile(bucket: BucketName, fileKey: FileKey): Stream[IO, Byte] =
s3.readFile(bucket, fileKey)

override def headObject(bucket: String, key: String): IO[HeadObjectResponse] =
client.headObject(HeadObjectRequest.builder().bucket(bucket).key(key).checksumMode(ChecksumMode.ENABLED).build)
override def headObject(bucket: String, key: String): IO[HeadObject] =
client
.headObject(
HeadObjectRequest
.builder()
.bucket(bucket)
.key(key)
.checksumMode(ChecksumMode.ENABLED)
.build
)
.map(resp =>
HeadObject(
resp.contentLength(),
Option(resp.contentType()),
Option(resp.checksumSHA256()),
Option(resp.checksumSHA1())
)
)

override def copyObject(
sourceBucket: BucketName,
Expand Down Expand Up @@ -175,13 +199,16 @@ object S3StorageClient {
}
}

implicit class PutObjectRequestOps(request: PutObjectRequest.Builder) {
def deltaDigest(algorithm: DigestAlgorithm): PutObjectRequest.Builder =
algorithm.value match {
case "SHA-256" => request.checksumAlgorithm(ChecksumAlgorithm.SHA256)
case "SHA-1" => request.checksumAlgorithm(ChecksumAlgorithm.SHA1)
case _ => throw new IllegalArgumentException(s"Unsupported algorithm for S3: ${algorithm.value}")
}
override def bucketExists(bucket: String): IO[Boolean] = {
listObjectsV2(bucket)
.redeemWith(
err =>
err match {
case _: NoSuchBucketException => IO.pure(false)
case e => IO.raiseError(StorageNotAccessible(e.getMessage))
},
_ => IO.pure(true)
)
}
}

Expand All @@ -195,7 +222,7 @@ object S3StorageClient {

override def readFile(bucket: BucketName, fileKey: FileKey): Stream[IO, Byte] = Stream.raiseError[IO](disabledErr)

override def headObject(bucket: String, key: String): IO[HeadObjectResponse] = raiseDisabledErr
override def headObject(bucket: String, key: String): IO[HeadObject] = raiseDisabledErr

override def baseEndpoint: Uri = throw disabledErr

Expand All @@ -215,5 +242,16 @@ object S3StorageClient {
key: String,
algorithm: DigestAlgorithm
): IO[UploadMetadata] = raiseDisabledErr

override def bucketExists(bucket: String): IO[Boolean] = raiseDisabledErr
}

implicit class PutObjectRequestOps(request: PutObjectRequest.Builder) {
def deltaDigest(algorithm: DigestAlgorithm): PutObjectRequest.Builder =
algorithm.value match {
case "SHA-256" => request.checksumAlgorithm(ChecksumAlgorithm.SHA256)
case "SHA-1" => request.checksumAlgorithm(ChecksumAlgorithm.SHA1)
case _ => throw new IllegalArgumentException(s"Unsupported algorithm for S3: ${algorithm.value}")
}
}
}
Original file line number Diff line number Diff line change
@@ -1,40 +1,19 @@
package ch.epfl.bluebrain.nexus.testkit.scalatest

import ch.epfl.bluebrain.nexus.testkit.scalatest.JsonMatchers.field
import io.circe.Json
import org.scalatest.matchers.{HavePropertyMatchResult, HavePropertyMatcher}
import org.scalatest.matchers.HavePropertyMatcher

object FileMatchers {

def keywords(expected: (String, String)*): HavePropertyMatcher[Json, Map[String, String]] = keywords(expected.toMap)

def keywords(expected: Map[String, String]): HavePropertyMatcher[Json, Map[String, String]] = HavePropertyMatcher {
json =>
val actual = json.hcursor.downField("_keywords").as[Map[String, String]].toOption
HavePropertyMatchResult(
actual.contains(expected),
"keywords",
expected,
actual.orNull
)
}
def keywords(expected: Map[String, String]): HavePropertyMatcher[Json, Map[String, String]] =
field("_keywords", expected)

def description(expected: String): HavePropertyMatcher[Json, String] = HavePropertyMatcher { json =>
val actual = json.hcursor.downField("description").as[String].toOption
HavePropertyMatchResult(
actual.contains(expected),
"description",
expected,
actual.orNull
)
}
def description(expected: String): HavePropertyMatcher[Json, String] = field("description", expected)

def name(expected: String): HavePropertyMatcher[Json, String] = HavePropertyMatcher { json =>
val actual = json.hcursor.downField("name").as[String].toOption
HavePropertyMatchResult(
actual.contains(expected),
"name",
expected,
actual.orNull
)
}
def name(expected: String): HavePropertyMatcher[Json, String] = field("name", expected)

def mediaType(expected: String): HavePropertyMatcher[Json, String] = field("_mediaType", expected)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package ch.epfl.bluebrain.nexus.testkit.scalatest

import io.circe.{Decoder, Json}
import org.scalatest.matchers.{HavePropertyMatchResult, HavePropertyMatcher}

import scala.reflect.ClassTag

object JsonMatchers {
def field[A: Decoder: ClassTag](key: String, expectedValue: A)(implicit
ev: Null <:< A
): HavePropertyMatcher[Json, A] = HavePropertyMatcher { json =>
val actual = json.hcursor.downField(key).as[A].toOption
HavePropertyMatchResult(
actual.contains(expectedValue),
key,
expectedValue,
actual.orNull
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,9 @@ class FileProcessor private (
event match {
case e: FileCreated =>
fileCopier.copyFile(e.attributes.path) >>
files.registerFile(FileId(e.id, project), None, None, e.attributes.path, e.tag).flatMap(IO.println)
files
.registerFile(FileId(e.id, project), None, None, e.attributes.path, e.tag, e.attributes.mediaType)
.flatMap(IO.println)
case e: FileUpdated =>
fileCopier.copyFile(e.attributes.path) >> IO.unit
case e: FileCustomMetadataUpdated =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import akka.http.scaladsl.model.Uri
import cats.effect.IO
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.DigestAlgorithm
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.client.S3StorageClient
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.client.S3StorageClient.HeadObject
import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri
import ch.epfl.bluebrain.nexus.delta.rdf.Vocabulary.nxv
import ch.epfl.bluebrain.nexus.delta.sdk.projects.Projects
Expand All @@ -21,7 +22,7 @@ import doobie.implicits._
import fs2.aws.s3.models.Models
import fs2.io.file.Path
import munit.AnyFixture
import software.amazon.awssdk.services.s3.model.{ChecksumAlgorithm, CopyObjectResponse, HeadObjectResponse, ListObjectsV2Response}
import software.amazon.awssdk.services.s3.model.{ChecksumAlgorithm, CopyObjectResponse, ListObjectsV2Response}

import java.time.Instant

Expand Down Expand Up @@ -111,7 +112,7 @@ object RunShipSuite {
override def readFile(bucket: Models.BucketName, fileKey: Models.FileKey): fs2.Stream[IO, Byte] =
fs2.Stream.empty

override def headObject(bucket: String, key: String): IO[HeadObjectResponse] =
override def headObject(bucket: String, key: String): IO[HeadObject] =
IO.raiseError(new NotImplementedError("headObject is not implemented"))

override def copyObject(
Expand All @@ -135,6 +136,9 @@ object RunShipSuite {

override def objectExists(bucket: String, key: String): IO[Boolean] =
IO.raiseError(new NotImplementedError("objectExists is not implemented"))

override def bucketExists(bucket: String): IO[Boolean] =
IO.raiseError(new NotImplementedError("bucketExists is not implemented"))
}

// The expected import report for the import.json file, as well as for the /import/multi-part-import directory
Expand Down
Loading