Skip to content

Commit

Permalink
Use head object request for S3 file metadata, verify in integration test
Browse files Browse the repository at this point in the history
  • Loading branch information
dantb committed Apr 18, 2024
1 parent 233cd06 commit 22a20ce
Show file tree
Hide file tree
Showing 10 changed files with 108 additions and 91 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ final class Files(
} yield res
}.span("updateFileMetadata")

def registerS3File(
def registerFile(
id: FileId,
storageId: Option[IdSegment],
metadata: Option[FileCustomMetadata],
Expand All @@ -239,10 +239,9 @@ final class Files(
for {
(iri, pc) <- id.expandIri(fetchContext.onCreate)
(storageRef, storage) <- fetchAndValidateActiveStorage(storageId, id.project, pc)
storageMetadata <- fileOperations.register(storage, path)
_ <- IO.println(s"Registering existing file in S3 bucket ${storageMetadata} at path $path")
_ <- IO.println(metadata)
attr = FileAttributes.from(FileDescription("TODO get from s3", None, metadata), storageMetadata)
s3Metadata <- fileOperations.register(storage, path)
filename <- IO.fromOption(path.lastSegment)(InvalidFilePath)
attr = FileAttributes.from(FileDescription(filename, s3Metadata.contentType.some, metadata), s3Metadata.metadata)
res <- eval(
CreateFile(
iri,
Expand All @@ -255,7 +254,7 @@ final class Files(
)
)
} yield res
}
}.span("registerFile")

/**
* Update a new file linking it from an existing file in a storage
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,9 +229,9 @@ object FileRejection {
* Rejection returned when attempting to link a file without providing a filename or a path that ends with a
* filename.
*/
final case object InvalidFileLink
final case object InvalidFilePath
extends FileRejection(
s"Linking a file cannot be performed without a 'filename' or a 'path' that does not end with a filename."
s"Linking or registering a file cannot be performed without a 'filename' or a 'path' that does not end with a filename."
)

final case class CopyRejection(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,25 +268,27 @@ final class FilesRoutes(
pathPrefix("register") {
(idSegment & indexingMode) { (id, mode) =>
pathEndOrSingleSlash {
parameters("storage".as[IdSegment].?, "tag".as[UserTag].?) { (storage, tag) =>
entity(as[RegisterFileRequest]) { registerRequest =>
val fileId = FileId(id, project)
emit(
files
.registerS3File(
fileId,
storage,
registerRequest.metadata,
registerRequest.path,
tag
)
.index(mode)
.attemptNarrow[FileRejection]
)
operationName(s"$prefixSegment/files/{org}/{project}/register/{id}") {
parameters("storage".as[IdSegment].?, "tag".as[UserTag].?) { (storage, tag) =>
entity(as[RegisterFileRequest]) { registerRequest =>
val fileId = FileId(id, project)
emit(
Created,
files
.registerFile(
fileId,
storage,
registerRequest.metadata,
registerRequest.path,
tag
)
.index(mode)
.attemptNarrow[FileRejection]
)
}
}
}
}

}
}
)
Expand Down Expand Up @@ -364,7 +366,7 @@ object FilesRoutes {
def fileDescriptionFromRequest(f: LinkFileRequest): IO[FileDescription] =
f.filename.orElse(f.path.lastSegment) match {
case Some(value) => IO.pure(FileDescription(value, f.mediaType, f.metadata))
case None => IO.raiseError(InvalidFileLink)
case None => IO.raiseError(InvalidFilePath)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.Storage
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.disk.DiskFileOperations
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.remote.RemoteDiskFileOperations
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.S3FileOperations
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.S3FileOperations.S3FileMetadata
import ch.epfl.bluebrain.nexus.delta.sdk.AkkaSource
import ch.epfl.bluebrain.nexus.delta.sourcing.model.ProjectRef

Expand All @@ -22,7 +23,7 @@ trait FileOperations extends StorageAccess {

def link(storage: Storage, sourcePath: Uri.Path, filename: String): IO[FileStorageMetadata]

def register(storage: Storage, path: Uri.Path): IO[FileStorageMetadata]
def register(storage: Storage, path: Uri.Path): IO[S3FileMetadata]

def fetchAttributes(storage: Storage, attributes: FileAttributes): IO[ComputedFileAttributes]
}
Expand Down Expand Up @@ -65,7 +66,7 @@ object FileOperations {
case s => IO.raiseError(FetchAttributeRejection.UnsupportedOperation(s.tpe))
}

override def register(storage: Storage, path: Uri.Path): IO[FileStorageMetadata] =
override def register(storage: Storage, path: Uri.Path): IO[S3FileMetadata] =
storage match {
case s: S3Storage => s3FileOps.register(s.value.bucket, path)
case s => IO.raiseError(RegisterFileRejection.UnsupportedOperation(s.tpe))
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations

import akka.http.scaladsl.model.StatusCodes
import akka.http.scaladsl.model.{StatusCodes, Uri}
import cats.data.NonEmptyList
import ch.epfl.bluebrain.nexus.delta.kernel.error.Rejection
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.StorageType
Expand Down Expand Up @@ -199,6 +199,12 @@ object StorageFileRejection {
final case class MissingS3Attributes(missingAttributes: NonEmptyList[String])
extends RegisterFileRejection(s"Missing attributes from S3: ${missingAttributes.toList.mkString(", ")}")

final case class InvalidContentType(received: String)
extends RegisterFileRejection(s"Invalid content type returned from S3: $received")

final case class InvalidPath(path: Uri.Path)
extends RegisterFileRejection(s"An S3 path must contain at least the filename. Path was $path")

final case class UnsupportedOperation(tpe: StorageType)
extends MoveFileRejection(s"Registering a file in-place is not supported for storages of type '${tpe.iri}'")
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
package ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3

import akka.actor.ActorSystem
import akka.http.scaladsl.model.{BodyPartEntity, Uri}
import akka.http.scaladsl.model.{BodyPartEntity, ContentType, Uri}
import akka.stream.scaladsl.Source
import akka.util.ByteString
import cats.data.Validated
import cats.effect.IO
import cats.syntax.all._
import ch.epfl.bluebrain.nexus.delta.kernel.Logger
Expand All @@ -15,11 +14,13 @@ import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.DigestAlgori
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.Storage.S3Storage
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.StorageRejection.StorageNotAccessible
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.StorageFileRejection.FetchFileRejection.UnexpectedFetchError
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.StorageFileRejection.RegisterFileRejection.MissingS3Attributes
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.StorageFileRejection.RegisterFileRejection.InvalidContentType
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.S3FileOperations.S3FileMetadata
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.client.S3StorageClient
import ch.epfl.bluebrain.nexus.delta.rdf.syntax.uriSyntax
import ch.epfl.bluebrain.nexus.delta.sdk.AkkaSource
import ch.epfl.bluebrain.nexus.delta.sdk.stream.StreamConverter
import software.amazon.awssdk.services.s3.model.HeadObjectResponse

import java.net.URLDecoder
import java.nio.charset.StandardCharsets.UTF_8
Expand All @@ -37,10 +38,11 @@ trait S3FileOperations {
entity: BodyPartEntity
): IO[FileStorageMetadata]

def register(bucket: String, path: Uri.Path): IO[FileStorageMetadata]
def register(bucket: String, path: Uri.Path): IO[S3FileMetadata]
}

object S3FileOperations {
final case class S3FileMetadata(contentType: ContentType, metadata: FileStorageMetadata)

def mk(client: S3StorageClient)(implicit as: ActorSystem, uuidf: UUIDF): S3FileOperations = new S3FileOperations {

Expand Down Expand Up @@ -73,28 +75,33 @@ object S3FileOperations {
override def save(storage: S3Storage, filename: String, entity: BodyPartEntity): IO[FileStorageMetadata] =
saveFile.apply(storage, filename, entity)

override def register(bucket: String, path: Uri.Path): IO[FileStorageMetadata] =
client.getFileAttributes(bucket, path.toString()).flatMap { resp =>
val maybeSize = Option(resp.objectSize()).toRight("object size").toValidatedNel
println(maybeSize)
val maybeEtag = Option(resp.objectSize()).toRight("etag").toValidatedNel
println(maybeEtag)
val maybeChecksum = Option(resp.checksum()).toRight("checksum").toValidatedNel
println(maybeChecksum)

maybeSize.product(maybeEtag).product(maybeChecksum) match {
case Validated.Valid(((size, _), checksum)) =>
FileStorageMetadata(
UUID.randomUUID(),
size,
Digest.ComputedDigest(DigestAlgorithm.MD5, checksum.toString),
FileAttributesOrigin.External,
location = client.baseEndpoint / bucket / path,
path = path
).pure[IO]
case Validated.Invalid(errors) => IO.raiseError(MissingS3Attributes(errors))
}
override def register(bucket: String, path: Uri.Path): IO[S3FileMetadata] = {
for {
_ <- log.info(s"Fetching attributes for S3 file. Bucket $bucket at path $path")
resp <- client.headObject(bucket, path.toString())
contentType <- parseContentType(resp.contentType())
} yield mkS3Metadata(bucket, path, resp, contentType)
}
.onError { e =>
log.error(e)(s"Failed fetching required attributes for S3 file registration. Bucket $bucket and path $path")
}

private def parseContentType(raw: String): IO[ContentType] =
ContentType.parse(raw).map(_.pure[IO]).getOrElse(IO.raiseError(InvalidContentType(raw)))

private def mkS3Metadata(bucket: String, path: Uri.Path, resp: HeadObjectResponse, ct: ContentType) = {
S3FileMetadata(
ct,
FileStorageMetadata(
UUID.randomUUID(), // TODO unused field for this use case, but mandatory everywhere?
resp.contentLength(),
Digest.ComputedDigest(DigestAlgorithm.MD5, resp.eTag().filterNot(_ == '"')),
FileAttributesOrigin.External,
client.baseEndpoint / bucket / path,
path
)
)
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -101,13 +101,16 @@ final class S3StorageSaveFile(s3StorageClient: S3StorageClient)(implicit
}

private def validateObjectDoesNotExist(bucket: String, key: String) =
getFileAttributes(bucket, key).redeemWith(
{
case _: NoSuchKeyException => IO.unit
case e => IO.raiseError(e)
},
_ => IO.raiseError(ResourceAlreadyExists(key))
)
s3StorageClient
.headObject(bucket, key)
.void
.redeemWith(
{
case _: NoSuchKeyException => IO.unit
case e => IO.raiseError(e)
},
_ => IO.raiseError(ResourceAlreadyExists(key))
)

private def convertStream(source: Source[ByteString, Any]): Stream[IO, Byte] =
StreamConverter(
Expand All @@ -130,9 +133,6 @@ final class S3StorageSaveFile(s3StorageClient: S3StorageClient)(implicit
.compile
.to(List)

private def getFileAttributes(bucket: String, key: String): IO[GetObjectAttributesResponse] =
s3StorageClient.getFileAttributes(bucket, key)

// TODO issue fetching attributes when tested against localstack, only after the object is saved
// Verify if it's the same for real S3. Error msg: 'Could not parse XML response.'
// For now we just compute it manually.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import io.laserdisc.pure.s3.tagless.{Interpreter, S3AsyncClientOp}
import software.amazon.awssdk.auth.credentials.{AwsBasicCredentials, AwsCredentialsProvider, StaticCredentialsProvider}
import software.amazon.awssdk.regions.Region
import software.amazon.awssdk.services.s3.S3AsyncClient
import software.amazon.awssdk.services.s3.model.{GetObjectAttributesRequest, GetObjectAttributesResponse, ListObjectsV2Request, ListObjectsV2Response, ObjectAttributes}
import software.amazon.awssdk.services.s3.model.{HeadObjectRequest, HeadObjectResponse, ListObjectsV2Request, ListObjectsV2Response}

import java.net.URI

Expand All @@ -26,7 +26,7 @@ trait S3StorageClient {

def readFile(bucket: BucketName, fileKey: FileKey): Stream[IO, Byte]

def getFileAttributes(bucket: String, key: String): IO[GetObjectAttributesResponse]
def headObject(bucket: String, key: String): IO[HeadObjectResponse]

def underlyingClient: S3[IO]

Expand Down Expand Up @@ -69,16 +69,8 @@ object S3StorageClient {
override def readFile(bucket: BucketName, fileKey: FileKey): Stream[IO, Byte] =
s3.readFile(bucket, fileKey)

override def getFileAttributes(bucket: String, key: String): IO[GetObjectAttributesResponse] =
client
.getObjectAttributes(
GetObjectAttributesRequest
.builder()
.bucket(bucket)
.key(key)
.objectAttributes(ObjectAttributes.knownValues())
.build()
)
override def headObject(bucket: String, key: String): IO[HeadObjectResponse] =
client.headObject(HeadObjectRequest.builder().bucket(bucket).key(key).build)

override def underlyingClient: S3[IO] = s3
}
Expand All @@ -93,7 +85,7 @@ object S3StorageClient {

override def readFile(bucket: BucketName, fileKey: FileKey): Stream[IO, Byte] = Stream.raiseError[IO](disabledErr)

override def getFileAttributes(bucket: String, key: String): IO[GetObjectAttributesResponse] = raiseDisabledErr
override def headObject(bucket: String, key: String): IO[HeadObjectResponse] = raiseDisabledErr

override def underlyingClient: S3[IO] = throw disabledErr

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ object FileOperationsMock {
def link(storage: Storage, sourcePath: Uri.Path, filename: String): IO[FileStorageMetadata] = ???
def fetch(storage: Storage, attributes: FileAttributes): IO[AkkaSource] = ???
def fetchAttributes(storage: Storage, attributes: FileAttributes): IO[ComputedFileAttributes] = ???
def register(storage: Storage, path: Uri.Path): IO[FileStorageMetadata] = ???
def register(storage: Storage, path: Uri.Path): IO[S3FileOperations.S3FileMetadata] = ???
}

def diskUnimplemented: DiskFileOperations = new DiskFileOperations {
Expand All @@ -49,6 +49,6 @@ object FileOperationsMock {
def checkBucketExists(bucket: String): IO[Unit] = ???
def fetch(bucket: String, path: Uri.Path): IO[AkkaSource] = ???
def save(storage: Storage.S3Storage, filename: String, entity: BodyPartEntity): IO[FileStorageMetadata] = ???
def register(bucket: String, path: Uri.Path): IO[FileStorageMetadata] = ???
def register(bucket: String, path: Uri.Path): IO[S3FileOperations.S3FileMetadata] = ???
}
}
Loading

0 comments on commit 22a20ce

Please sign in to comment.