Skip to content

Commit

Permalink
Register S3 files in-place (#4878)
Browse files Browse the repository at this point in the history
* WIP - register S3 files in-place

* Use head object request for S3 file metadata, verify in integration test

* Update FileRoutesSpec test

* Deal with uuid and checksum in mkS3Metadata

* Fix import

* Use fs2-aws client in S3 integration tests, enable checksum

* Formatting

---------

Co-authored-by: Oliver <[email protected]>
  • Loading branch information
dantb and olivergrabinski authored Apr 22, 2024
1 parent 82238c6 commit 265d07d
Show file tree
Hide file tree
Showing 14 changed files with 299 additions and 78 deletions.
4 changes: 3 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -854,7 +854,9 @@ lazy val tests = project
awsSdk % Test,
scalaTest % Test,
akkaSlf4j % Test,
alpakkaSse % Test
alpakkaSse % Test,
fs2Aws % Test,
fs2AwsS3 % Test
),
scalacOptions ~= { options: Seq[String] => options.filterNot(Set("-Wunused:imports")) },
Test / parallelExecution := false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,33 @@ final class Files(
} yield res
}.span("updateFileMetadata")

def registerFile(
id: FileId,
storageId: Option[IdSegment],
metadata: Option[FileCustomMetadata],
path: Uri.Path,
tag: Option[UserTag]
)(implicit caller: Caller): IO[FileResource] = {
for {
(iri, pc) <- id.expandIri(fetchContext.onCreate)
(storageRef, storage) <- fetchAndValidateActiveStorage(storageId, id.project, pc)
s3Metadata <- fileOperations.register(storage, path)
filename <- IO.fromOption(path.lastSegment)(InvalidFilePath)
attr = FileAttributes.from(FileDescription(filename, s3Metadata.contentType.some, metadata), s3Metadata.metadata)
res <- eval(
CreateFile(
iri,
id.project,
storageRef,
storage.tpe,
attr,
caller.subject,
tag
)
)
} yield res
}.span("registerFile")

/**
* Update a new file linking it from an existing file in a storage
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,17 +95,20 @@ object FileAttributes {
object FileAttributesOrigin {
final case object Client extends FileAttributesOrigin

final case object Storage extends FileAttributesOrigin
final case object Storage extends FileAttributesOrigin
final case object External extends FileAttributesOrigin

implicit val fileAttributesEncoder: Encoder[FileAttributesOrigin] = Encoder.encodeString.contramap {
case Client => "Client"
case Storage => "Storage"
case Client => "Client"
case Storage => "Storage"
case External => "External"
}

implicit val fileAttributesDecoder: Decoder[FileAttributesOrigin] = Decoder.decodeString.emap {
case "Client" => Right(Client)
case "Storage" => Right(Storage)
case str => Left(s"'$str' is not a 'FileAttributesOrigin'")
case "Client" => Right(Client)
case "Storage" => Right(Storage)
case "External" => Right(External)
case str => Left(s"'$str' is not a 'FileAttributesOrigin'")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,9 +226,9 @@ object FileRejection {
* Rejection returned when attempting to link a file without providing a filename or a path that ends with a
* filename.
*/
final case object InvalidFileLink
final case object InvalidFilePath
extends FileRejection(
s"Linking a file cannot be performed without a 'filename' or a 'path' that does not end with a filename."
s"Linking or registering a file cannot be performed without a 'filename' or a 'path' that does not end with a filename."
)

final case class CopyRejection(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import cats.syntax.all._
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.FileRejection._
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model._
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.permissions.{read => Read, write => Write}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.routes.FilesRoutes.LinkFileRequest.{fileDescriptionFromRequest, linkFileDecoder}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.routes.FilesRoutes.LinkFileRequest.{fileDescriptionFromRequest, linkFileDecoder, regFileDecoder}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.routes.FilesRoutes._
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.{schemas, FileResource, Files}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.StoragesConfig.ShowFileLocation
Expand Down Expand Up @@ -264,6 +264,32 @@ final class FilesRoutes(
}
}
)
},
pathPrefix("register") {
(idSegment & indexingMode) { (id, mode) =>
pathEndOrSingleSlash {
operationName(s"$prefixSegment/files/{org}/{project}/register/{id}") {
parameters("storage".as[IdSegment].?, "tag".as[UserTag].?) { (storage, tag) =>
entity(as[RegisterFileRequest]) { registerRequest =>
val fileId = FileId(id, project)
emit(
Created,
files
.registerFile(
fileId,
storage,
registerRequest.metadata,
registerRequest.path,
tag
)
.index(mode)
.attemptNarrow[FileRejection]
)
}
}
}
}
}
}
)
}
Expand Down Expand Up @@ -329,15 +355,18 @@ object FilesRoutes {
metadata: Option[FileCustomMetadata]
)

final case class RegisterFileRequest(path: Path, metadata: Option[FileCustomMetadata])

object LinkFileRequest {
@nowarn("cat=unused")
implicit private val config: Configuration = Configuration.default
implicit val linkFileDecoder: Decoder[LinkFileRequest] = deriveConfiguredDecoder[LinkFileRequest]
implicit private val config: Configuration = Configuration.default
implicit val linkFileDecoder: Decoder[LinkFileRequest] = deriveConfiguredDecoder[LinkFileRequest]
implicit val regFileDecoder: Decoder[RegisterFileRequest] = deriveConfiguredDecoder[RegisterFileRequest]

def fileDescriptionFromRequest(f: LinkFileRequest): IO[FileDescription] =
f.filename.orElse(f.path.lastSegment) match {
case Some(value) => IO.pure(FileDescription(value, f.mediaType, f.metadata))
case None => IO.raiseError(InvalidFileLink)
case None => IO.raiseError(InvalidFilePath)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@ import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.{ComputedFileAt
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.Storage.{DiskStorage, RemoteDiskStorage, S3Storage}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.StorageValue.{DiskStorageValue, RemoteDiskStorageValue, S3StorageValue}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.{Storage, StorageValue}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.StorageFileRejection.{FetchAttributeRejection, MoveFileRejection}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.StorageFileRejection.{FetchAttributeRejection, MoveFileRejection, RegisterFileRejection}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.disk.DiskFileOperations
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.remote.RemoteDiskFileOperations
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.S3FileOperations
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.S3FileOperations.S3FileMetadata
import ch.epfl.bluebrain.nexus.delta.sdk.AkkaSource
import ch.epfl.bluebrain.nexus.delta.sourcing.model.ProjectRef

Expand All @@ -22,6 +23,8 @@ trait FileOperations extends StorageAccess {

def link(storage: Storage, sourcePath: Uri.Path, filename: String): IO[FileStorageMetadata]

def register(storage: Storage, path: Uri.Path): IO[S3FileMetadata]

def fetchAttributes(storage: Storage, attributes: FileAttributes): IO[ComputedFileAttributes]
}

Expand Down Expand Up @@ -62,6 +65,12 @@ object FileOperations {
case s: RemoteDiskStorage => remoteDiskFileOps.fetchAttributes(s.value.folder, attributes.path)
case s => IO.raiseError(FetchAttributeRejection.UnsupportedOperation(s.tpe))
}

override def register(storage: Storage, path: Uri.Path): IO[S3FileMetadata] =
storage match {
case s: S3Storage => s3FileOps.register(s.value.bucket, path)
case s => IO.raiseError(RegisterFileRejection.UnsupportedOperation(s.tpe))
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations

import akka.http.scaladsl.model.StatusCodes
import akka.http.scaladsl.model.{StatusCodes, Uri}
import cats.data.NonEmptyList
import ch.epfl.bluebrain.nexus.delta.kernel.error.Rejection
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.StorageType
import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri
Expand Down Expand Up @@ -192,4 +193,20 @@ object StorageFileRejection {

}

sealed abstract class RegisterFileRejection(loggedDetails: String) extends StorageFileRejection(loggedDetails)

object RegisterFileRejection {
final case class MissingS3Attributes(missingAttributes: NonEmptyList[String])
extends RegisterFileRejection(s"Missing attributes from S3: ${missingAttributes.toList.mkString(", ")}")

final case class InvalidContentType(received: String)
extends RegisterFileRejection(s"Invalid content type returned from S3: $received")

final case class InvalidPath(path: Uri.Path)
extends RegisterFileRejection(s"An S3 path must contain at least the filename. Path was $path")

final case class UnsupportedOperation(tpe: StorageType)
extends MoveFileRejection(s"Registering a file in-place is not supported for storages of type '${tpe.iri}'")
}

}
Original file line number Diff line number Diff line change
@@ -1,22 +1,31 @@
package ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3

import akka.actor.ActorSystem
import akka.http.scaladsl.model.{BodyPartEntity, Uri}
import akka.http.scaladsl.model.{BodyPartEntity, ContentType, Uri}
import akka.stream.scaladsl.Source
import akka.util.ByteString
import cats.effect.IO
import cats.syntax.all._
import ch.epfl.bluebrain.nexus.delta.kernel.Logger
import ch.epfl.bluebrain.nexus.delta.kernel.utils.UUIDF
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.FileStorageMetadata
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.FileAttributes.FileAttributesOrigin
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.{Digest, FileStorageMetadata}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.DigestAlgorithm
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.Storage.S3Storage
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.StorageRejection.StorageNotAccessible
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.StorageFileRejection.FetchFileRejection.UnexpectedFetchError
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.StorageFileRejection.RegisterFileRejection.InvalidContentType
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.S3FileOperations.S3FileMetadata
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.client.S3StorageClient
import ch.epfl.bluebrain.nexus.delta.rdf.syntax.uriSyntax
import ch.epfl.bluebrain.nexus.delta.sdk.AkkaSource
import ch.epfl.bluebrain.nexus.delta.sdk.stream.StreamConverter
import org.apache.commons.codec.binary.Hex
import software.amazon.awssdk.services.s3.model.HeadObjectResponse

import java.net.URLDecoder
import java.nio.charset.StandardCharsets.UTF_8
import java.util.Base64
import scala.concurrent.duration.DurationInt

trait S3FileOperations {
Expand All @@ -29,9 +38,12 @@ trait S3FileOperations {
filename: String,
entity: BodyPartEntity
): IO[FileStorageMetadata]

def register(bucket: String, path: Uri.Path): IO[S3FileMetadata]
}

object S3FileOperations {
final case class S3FileMetadata(contentType: ContentType, metadata: FileStorageMetadata)

def mk(client: S3StorageClient)(implicit as: ActorSystem, uuidf: UUIDF): S3FileOperations = new S3FileOperations {

Expand Down Expand Up @@ -63,6 +75,48 @@ object S3FileOperations {

override def save(storage: S3Storage, filename: String, entity: BodyPartEntity): IO[FileStorageMetadata] =
saveFile.apply(storage, filename, entity)

override def register(bucket: String, path: Uri.Path): IO[S3FileMetadata] = {
for {
_ <- log.info(s"Fetching attributes for S3 file. Bucket $bucket at path $path")
resp <- client.headObject(bucket, path.toString())
contentType <- parseContentType(resp.contentType())
metadata <- mkS3Metadata(bucket, path, resp, contentType)
} yield metadata
}
.onError { e =>
log.error(e)(s"Failed fetching required attributes for S3 file registration. Bucket $bucket and path $path")
}

private def parseContentType(raw: String): IO[ContentType] =
ContentType.parse(raw).map(_.pure[IO]).getOrElse(IO.raiseError(InvalidContentType(raw)))

private def mkS3Metadata(bucket: String, path: Uri.Path, resp: HeadObjectResponse, ct: ContentType) = {
for {
uuid <- uuidf()
checksum <- checksumFrom(resp)
} yield S3FileMetadata(
ct,
FileStorageMetadata(
uuid,
resp.contentLength(),
checksum,
FileAttributesOrigin.External,
client.baseEndpoint / bucket / path,
path
)
)
}

private def checksumFrom(response: HeadObjectResponse) = IO.fromOption {
Option(response.checksumSHA256())
.map { checksum =>
Digest.ComputedDigest(
DigestAlgorithm.default,
Hex.encodeHexString(Base64.getDecoder.decode(checksum))
)
}
}(new IllegalArgumentException("Missing checksum"))
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ final class S3StorageSaveFile(s3StorageClient: S3StorageClient)(implicit
_ <- log(bucket, key, s"Beginning upload")
(digest, fileSize) <- uploadFile(fileData, bucket, key, algorithm)
_ <- log(bucket, key, s"Finished upload. Digest: $digest")
attr <- fileMetadata(bucket, key, uuid, fileSize, algorithm, digest)
attr = fileMetadata(bucket, key, uuid, fileSize, algorithm, digest)
} yield attr)
.onError(e => logger.error(e)("Unexpected error when storing file"))
.adaptError { err => UnexpectedSaveError(key, err.getMessage) }
Expand All @@ -75,26 +75,27 @@ final class S3StorageSaveFile(s3StorageClient: S3StorageClient)(implicit
fileSize: Long,
algorithm: DigestAlgorithm,
digest: String
): IO[FileStorageMetadata] =
s3StorageClient.baseEndpoint.map { base =>
FileStorageMetadata(
uuid = uuid,
bytes = fileSize,
digest = Digest.ComputedDigest(algorithm, digest),
origin = Client,
location = base / bucket / Uri.Path(key),
path = Uri.Path(key)
)
}
): FileStorageMetadata =
FileStorageMetadata(
uuid = uuid,
bytes = fileSize,
digest = Digest.ComputedDigest(algorithm, digest),
origin = Client,
location = s3StorageClient.baseEndpoint / bucket / Uri.Path(key),
path = Uri.Path(key)
)

private def validateObjectDoesNotExist(bucket: String, key: String) =
getFileAttributes(bucket, key).redeemWith(
{
case _: NoSuchKeyException => IO.unit
case e => IO.raiseError(e)
},
_ => IO.raiseError(ResourceAlreadyExists(key))
)
s3StorageClient
.headObject(bucket, key)
.void
.redeemWith(
{
case _: NoSuchKeyException => IO.unit
case e => IO.raiseError(e)
},
_ => IO.raiseError(ResourceAlreadyExists(key))
)

private def convertStream(source: Source[ByteString, Any]): Stream[IO, Byte] =
StreamConverter(
Expand Down Expand Up @@ -152,9 +153,6 @@ final class S3StorageSaveFile(s3StorageClient: S3StorageClient)(implicit
}
}

private def getFileAttributes(bucket: String, key: String): IO[GetObjectAttributesResponse] =
s3StorageClient.getFileAttributes(bucket, key)

private def log(bucket: String, key: String, msg: String): IO[Unit] =
logger.info(s"Bucket: ${bucket}. Key: $key. $msg")
}
Expand Down
Loading

0 comments on commit 265d07d

Please sign in to comment.