Skip to content

Commit

Permalink
Add endpoint to update previously registered S3 files
Browse files Browse the repository at this point in the history
  • Loading branch information
dantb committed Apr 23, 2024
1 parent a404d42 commit 7aa3385
Show file tree
Hide file tree
Showing 8 changed files with 157 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,37 @@ final class Files(
} yield res
}.span("registerFile")

def updateRegisteredFile(
id: FileId,
storageId: Option[IdSegment],
rev: Int,
entity: HttpEntity,
tag: Option[UserTag],
metadata: Option[FileCustomMetadata]
)(implicit caller: Caller): IO[FileResource] = {
for {
(iri, pc) <- id.expandIri(fetchContext.onModify)
_ <- test(UpdateFile(iri, id.project, testStorageRef, testStorageType, testAttributes, rev, caller.subject, tag))
(storageRef, storage) <- fetchAndValidateActiveStorage(storageId, id.project, pc)
uploadedInfo <- formDataExtractor(iri, entity, storage.storageValue.maxFileSize)
path <- fetchState(id, iri).map(_.attributes.path)
storageMetadata <- fileOperations.registerUpdate(storage, path, uploadedInfo.contents)
attr = FileAttributes.from(FileDescription.from(uploadedInfo, metadata), storageMetadata)
res <- eval(
UpdateFile(
iri,
id.project,
storageRef,
storage.tpe,
attr,
rev,
caller.subject,
tag
)
)
} yield res
}.span("updateRegisteredFile")

/**
* Update a new file linking it from an existing file in a storage
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,32 @@ final class FilesRoutes(
}
}
}
},
pathPrefix("register-update") {
(idSegment & indexingMode) { (id, mode) =>
pathEndOrSingleSlash {
operationName(s"$prefixSegment/files/{org}/{project}/register-update/{id}") {
parameters("rev".as[Int], "storage".as[IdSegment].?, "tag".as[UserTag].?) { (rev, storage, tag) =>
(requestEntityPresent & extractRequestEntity & extractFileMetadata) { (entity, metadata) =>
val fileId = FileId(id, project)
emit(
files
.updateRegisteredFile(
fileId,
storage,
rev,
entity,
tag,
metadata
)
.index(mode)
.attemptNarrow[FileRejection]
)
}
}
}
}
}
}
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ trait FileOperations extends StorageAccess {

def register(storage: Storage, path: Uri.Path): IO[S3FileMetadata]

def registerUpdate(storage: Storage, path: Uri.Path, entity: BodyPartEntity): IO[FileStorageMetadata]

def fetchAttributes(storage: Storage, attributes: FileAttributes): IO[ComputedFileAttributes]
}

Expand Down Expand Up @@ -71,6 +73,12 @@ object FileOperations {
case s: S3Storage => s3FileOps.register(s.value.bucket, path)
case s => IO.raiseError(RegisterFileRejection.UnsupportedOperation(s.tpe))
}

override def registerUpdate(storage: Storage, path: Uri.Path, entity: BodyPartEntity): IO[FileStorageMetadata] =
storage match {
case s: S3Storage => s3FileOps.registerUpdate(s, path, entity)
case s => IO.raiseError(RegisterFileRejection.UnsupportedOperation(s.tpe))
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations

import akka.http.scaladsl.model.{StatusCodes, Uri}
import cats.data.NonEmptyList
import ch.epfl.bluebrain.nexus.delta.kernel.error.Rejection
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.StorageType
import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri
Expand Down Expand Up @@ -138,6 +137,9 @@ object StorageFileRejection {
s"File cannot be saved because it already exists on path '$path'."
)

final case class FileNotFound(path: String)
extends SaveFileRejection(s"File could not be retrieved from expected path '$path'.")

/**
* Rejection returned when a storage cannot save a file due to an unexpected reason
*/
Expand Down Expand Up @@ -196,12 +198,12 @@ object StorageFileRejection {
sealed abstract class RegisterFileRejection(loggedDetails: String) extends StorageFileRejection(loggedDetails)

object RegisterFileRejection {
final case class MissingS3Attributes(missingAttributes: NonEmptyList[String])
extends RegisterFileRejection(s"Missing attributes from S3: ${missingAttributes.toList.mkString(", ")}")

final case class InvalidContentType(received: String)
extends RegisterFileRejection(s"Invalid content type returned from S3: $received")

final case class MissingChecksum(key: String)
extends RegisterFileRejection(s"Missing SHA-256 checksum for S3 object at key: ")

final case class InvalidPath(path: Uri.Path)
extends RegisterFileRejection(s"An S3 path must contain at least the filename. Path was $path")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.DigestAlgori
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.Storage.S3Storage
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.StorageRejection.StorageNotAccessible
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.StorageFileRejection.FetchFileRejection.UnexpectedFetchError
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.StorageFileRejection.RegisterFileRejection.InvalidContentType
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.StorageFileRejection.RegisterFileRejection.{InvalidContentType, MissingChecksum}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.S3FileOperations.S3FileMetadata
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.client.S3StorageClient
import ch.epfl.bluebrain.nexus.delta.rdf.syntax.uriSyntax
Expand All @@ -40,6 +40,8 @@ trait S3FileOperations {
): IO[FileStorageMetadata]

def register(bucket: String, path: Uri.Path): IO[S3FileMetadata]

def registerUpdate(storage: S3Storage, path: Uri.Path, entity: BodyPartEntity): IO[FileStorageMetadata]
}

object S3FileOperations {
Expand Down Expand Up @@ -74,7 +76,7 @@ object S3FileOperations {
}

override def save(storage: S3Storage, filename: String, entity: BodyPartEntity): IO[FileStorageMetadata] =
saveFile.apply(storage, filename, entity)
saveFile.saveNewFile(storage, filename, entity)

override def register(bucket: String, path: Uri.Path): IO[S3FileMetadata] = {
for {
Expand All @@ -88,13 +90,16 @@ object S3FileOperations {
log.error(e)(s"Failed fetching required attributes for S3 file registration. Bucket $bucket and path $path")
}

override def registerUpdate(storage: S3Storage, path: Uri.Path, entity: BodyPartEntity): IO[FileStorageMetadata] =
saveFile.overwriteFile(storage, path, entity)

private def parseContentType(raw: String): IO[ContentType] =
ContentType.parse(raw).map(_.pure[IO]).getOrElse(IO.raiseError(InvalidContentType(raw)))

private def mkS3Metadata(bucket: String, path: Uri.Path, resp: HeadObjectResponse, ct: ContentType) = {
private def mkS3Metadata(bucket: String, path: Uri.Path, resp: HeadObjectResponse, ct: ContentType) =
for {
uuid <- uuidf()
checksum <- checksumFrom(resp)
checksum <- checksumFrom(resp, path.toString())
} yield S3FileMetadata(
ct,
FileStorageMetadata(
Expand All @@ -106,17 +111,16 @@ object S3FileOperations {
path
)
)
}

private def checksumFrom(response: HeadObjectResponse) = IO.fromOption {
private def checksumFrom(response: HeadObjectResponse, key: String) = IO.fromOption {
Option(response.checksumSHA256())
.map { checksum =>
Digest.ComputedDigest(
DigestAlgorithm.default,
Hex.encodeHexString(Base64.getDecoder.decode(checksum))
)
}
}(new IllegalArgumentException("Missing checksum"))
}(MissingChecksum(key))
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,20 +29,37 @@ final class S3StorageSaveFile(s3StorageClient: S3StorageClient)(implicit

private val logger = Logger[S3StorageSaveFile]

def apply(
def saveNewFile(
storage: S3Storage,
filename: String,
entity: BodyPartEntity
): IO[FileStorageMetadata] = {
): IO[FileStorageMetadata] =
for {
uuid <- uuidf()
path = Uri.Path(intermediateFolders(storage.project, uuid, filename))
bucket = storage.value.bucket
key = path.toString()
_ <- log(bucket, key, s"Checking object does not exist in S3")
_ <- validateObjectDoesNotExist(bucket, key)
attr <- uploadFile(bucket, key, uuid, entity, storage.value.algorithm)
} yield attr

def overwriteFile(
storage: S3Storage,
path: Uri.Path,
entity: BodyPartEntity
): IO[FileStorageMetadata] = {
val bucket = storage.value.bucket
val key = path.toString()
for {
_ <- log(bucket, key, s"Checking object exists in S3")
_ <- validateObjectExists(storage.value.bucket, path.toString())
uuid <- uuidf()
path = Uri.Path(intermediateFolders(storage.project, uuid, filename))
result <- storeFile(storage.value.bucket, path.toString(), uuid, entity, storage.value.algorithm)
result <- uploadFile(storage.value.bucket, path.toString(), uuid, entity, storage.value.algorithm)
} yield result
}

private def storeFile(
private def uploadFile(
bucket: String,
key: String,
uuid: UUID,
Expand All @@ -52,14 +69,12 @@ final class S3StorageSaveFile(s3StorageClient: S3StorageClient)(implicit
val fileData: Stream[IO, Byte] = convertStream(entity.dataBytes)

(for {
_ <- log(bucket, key, s"Checking for object existence")
_ <- validateObjectDoesNotExist(bucket, key)
_ <- log(bucket, key, s"Beginning upload")
uploadMetadata <- s3StorageClient.uploadFile(fileData, bucket, key, algorithm)
_ <- log(bucket, key, s"Finished upload. Digest: ${uploadMetadata.checksum}")
attr = fileMetadata(key, uuid, algorithm, uploadMetadata)
} yield attr)
.onError(e => logger.error(e)("Unexpected error when storing file"))
.onError(e => logger.error(e)(s"Unexpected error when uploading file with key $key to bucket $bucket"))
.adaptError { err => UnexpectedSaveError(key, err.getMessage) }
}

Expand All @@ -78,13 +93,11 @@ final class S3StorageSaveFile(s3StorageClient: S3StorageClient)(implicit
path = Uri.Path(key)
)

private def validateObjectExists(bucket: String, key: String) =
s3StorageClient.objectExists(bucket, key).ifM(IO.unit, IO.raiseError(FileNotFound(key)))

private def validateObjectDoesNotExist(bucket: String, key: String) =
s3StorageClient
.objectExists(bucket, key)
.flatMap {
case true => IO.raiseError(ResourceAlreadyExists(key))
case false => IO.unit
}
s3StorageClient.objectExists(bucket, key).ifM(IO.raiseError(ResourceAlreadyExists(key)), IO.unit)

private def convertStream(source: Source[ByteString, Any]): Stream[IO, Byte] =
StreamConverter(
Expand All @@ -94,5 +107,5 @@ final class S3StorageSaveFile(s3StorageClient: S3StorageClient)(implicit
)

private def log(bucket: String, key: String, msg: String): IO[Unit] =
logger.info(s"Bucket: ${bucket}. Key: $key. $msg")
logger.info(s"Bucket: $bucket. Key: $key. $msg")
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,13 @@ object FileOperationsMock {
)

def unimplemented: FileOperations = new FileOperations {
def validateStorageAccess(storage: StorageValue): IO[Unit] = ???
def save(storage: Storage, filename: String, entity: BodyPartEntity): IO[FileStorageMetadata] = ???
def link(storage: Storage, sourcePath: Uri.Path, filename: String): IO[FileStorageMetadata] = ???
def fetch(storage: Storage, attributes: FileAttributes): IO[AkkaSource] = ???
def fetchAttributes(storage: Storage, attributes: FileAttributes): IO[ComputedFileAttributes] = ???
def register(storage: Storage, path: Uri.Path): IO[S3FileOperations.S3FileMetadata] = ???
def validateStorageAccess(storage: StorageValue): IO[Unit] = ???
def save(storage: Storage, filename: String, entity: BodyPartEntity): IO[FileStorageMetadata] = ???
def link(storage: Storage, sourcePath: Uri.Path, filename: String): IO[FileStorageMetadata] = ???
def fetch(storage: Storage, attributes: FileAttributes): IO[AkkaSource] = ???
def fetchAttributes(storage: Storage, attributes: FileAttributes): IO[ComputedFileAttributes] = ???
def register(storage: Storage, path: Uri.Path): IO[S3FileOperations.S3FileMetadata] = ???
def registerUpdate(storage: Storage, path: Uri.Path, entity: BodyPartEntity): IO[FileStorageMetadata] = ???
}

def diskUnimplemented: DiskFileOperations = new DiskFileOperations {
Expand All @@ -46,9 +47,11 @@ object FileOperationsMock {
}

def s3Unimplemented: S3FileOperations = new S3FileOperations {
def checkBucketExists(bucket: String): IO[Unit] = ???
def fetch(bucket: String, path: Uri.Path): IO[AkkaSource] = ???
def save(storage: Storage.S3Storage, filename: String, entity: BodyPartEntity): IO[FileStorageMetadata] = ???
def register(bucket: String, path: Uri.Path): IO[S3FileOperations.S3FileMetadata] = ???
def checkBucketExists(bucket: String): IO[Unit] = ???
def fetch(bucket: String, path: Uri.Path): IO[AkkaSource] = ???
def save(storage: Storage.S3Storage, filename: String, entity: BodyPartEntity): IO[FileStorageMetadata] = ???
def register(bucket: String, path: Uri.Path): IO[S3FileOperations.S3FileMetadata] = ???
def registerUpdate(storage: Storage.S3Storage, path: Uri.Path, entity: BodyPartEntity): IO[FileStorageMetadata] =
???
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ import ch.epfl.bluebrain.nexus.tests.Identity.storages.Coyote
import ch.epfl.bluebrain.nexus.tests.Optics.{error, filterMetadataKeys}
import ch.epfl.bluebrain.nexus.tests.config.S3Config
import ch.epfl.bluebrain.nexus.tests.iam.types.Permission
import ch.epfl.bluebrain.nexus.tests.kg.files.model.FileInput.textFileWithContentType
import eu.timepit.refined.types.all.NonEmptyString
import fs2.aws.s3.S3
import fs2.aws.s3.models.Models.{BucketName, FileKey}
import fs2.text
import io.circe.Json
import io.circe.syntax.EncoderOps
import io.laserdisc.pure.s3.tagless.Interpreter
Expand Down Expand Up @@ -184,9 +189,10 @@ class S3StorageSpec extends StorageSpec {
)

s"Registering an S3 file in-place" should {
val id = genId()
val path = s"$id/nexus-logo.png"

"succeed" in {
val id = genId()
val path = s"$id/nexus-logo.png"
val payload = Json.obj("path" -> Json.fromString(path))

for {
Expand All @@ -203,5 +209,31 @@ class S3StorageSpec extends StorageSpec {
}
} yield assertion
}

"be updated" in {
val file = textFileWithContentType.copy(fileId = id)

for {
_ <- deltaClient.uploadFile[Json](
s"/files/$projectRef/register-update/${file.fileId}?storage=nxv:$storageId&rev=1",
file.contents,
file.ct,
file.filename,
Coyote
)(expectOk)
_ <- deltaClient.get[Json](s"/files/$projectRef/$id", Coyote)(expectOk)
bucketName = BucketName(NonEmptyString.unsafeFrom(bucket))
fileKey = FileKey(NonEmptyString.unsafeFrom(path))
lines <- S3.create(s3Client)
.readFile(bucketName, fileKey)
.through(text.utf8.decode)
.through(fs2.text.lines)
.compile
.toVector
} yield {
val receivedFileContents: String = lines.reduce(_ + _).mkString
receivedFileContents shouldEqual file.contents
}
}
}
}

0 comments on commit 7aa3385

Please sign in to comment.