Skip to content

Commit

Permalink
Merge branch 'master' into 4876-patch-distributions
Browse files Browse the repository at this point in the history
  • Loading branch information
imsdu authored Apr 23, 2024
2 parents cf50158 + a404d42 commit 607fab2
Show file tree
Hide file tree
Showing 18 changed files with 492 additions and 175 deletions.
4 changes: 3 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -854,7 +854,9 @@ lazy val tests = project
awsSdk % Test,
scalaTest % Test,
akkaSlf4j % Test,
alpakkaSse % Test
alpakkaSse % Test,
fs2Aws % Test,
fs2AwsS3 % Test
),
scalacOptions ~= { options: Seq[String] => options.filterNot(Set("-Wunused:imports")) },
Test / parallelExecution := false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,33 @@ final class Files(
} yield res
}.span("updateFileMetadata")

def registerFile(
id: FileId,
storageId: Option[IdSegment],
metadata: Option[FileCustomMetadata],
path: Uri.Path,
tag: Option[UserTag]
)(implicit caller: Caller): IO[FileResource] = {
for {
(iri, pc) <- id.expandIri(fetchContext.onCreate)
(storageRef, storage) <- fetchAndValidateActiveStorage(storageId, id.project, pc)
s3Metadata <- fileOperations.register(storage, path)
filename <- IO.fromOption(path.lastSegment)(InvalidFilePath)
attr = FileAttributes.from(FileDescription(filename, s3Metadata.contentType.some, metadata), s3Metadata.metadata)
res <- eval(
CreateFile(
iri,
id.project,
storageRef,
storage.tpe,
attr,
caller.subject,
tag
)
)
} yield res
}.span("registerFile")

/**
* Update a new file linking it from an existing file in a storage
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,17 +95,20 @@ object FileAttributes {
object FileAttributesOrigin {
final case object Client extends FileAttributesOrigin

final case object Storage extends FileAttributesOrigin
final case object Storage extends FileAttributesOrigin
final case object External extends FileAttributesOrigin

implicit val fileAttributesEncoder: Encoder[FileAttributesOrigin] = Encoder.encodeString.contramap {
case Client => "Client"
case Storage => "Storage"
case Client => "Client"
case Storage => "Storage"
case External => "External"
}

implicit val fileAttributesDecoder: Decoder[FileAttributesOrigin] = Decoder.decodeString.emap {
case "Client" => Right(Client)
case "Storage" => Right(Storage)
case str => Left(s"'$str' is not a 'FileAttributesOrigin'")
case "Client" => Right(Client)
case "Storage" => Right(Storage)
case "External" => Right(External)
case str => Left(s"'$str' is not a 'FileAttributesOrigin'")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,9 +226,9 @@ object FileRejection {
* Rejection returned when attempting to link a file without providing a filename or a path that ends with a
* filename.
*/
final case object InvalidFileLink
final case object InvalidFilePath
extends FileRejection(
s"Linking a file cannot be performed without a 'filename' or a 'path' that does not end with a filename."
s"Linking or registering a file cannot be performed without a 'filename' or a 'path' that does not end with a filename."
)

final case class CopyRejection(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import cats.syntax.all._
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.FileRejection._
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model._
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.permissions.{read => Read, write => Write}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.routes.FilesRoutes.LinkFileRequest.{fileDescriptionFromRequest, linkFileDecoder}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.routes.FilesRoutes.LinkFileRequest.{fileDescriptionFromRequest, linkFileDecoder, regFileDecoder}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.routes.FilesRoutes._
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.{schemas, FileResource, Files}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.StoragesConfig.ShowFileLocation
Expand Down Expand Up @@ -264,6 +264,32 @@ final class FilesRoutes(
}
}
)
},
pathPrefix("register") {
(idSegment & indexingMode) { (id, mode) =>
pathEndOrSingleSlash {
operationName(s"$prefixSegment/files/{org}/{project}/register/{id}") {
parameters("storage".as[IdSegment].?, "tag".as[UserTag].?) { (storage, tag) =>
entity(as[RegisterFileRequest]) { registerRequest =>
val fileId = FileId(id, project)
emit(
Created,
files
.registerFile(
fileId,
storage,
registerRequest.metadata,
registerRequest.path,
tag
)
.index(mode)
.attemptNarrow[FileRejection]
)
}
}
}
}
}
}
)
}
Expand Down Expand Up @@ -329,15 +355,18 @@ object FilesRoutes {
metadata: Option[FileCustomMetadata]
)

final case class RegisterFileRequest(path: Path, metadata: Option[FileCustomMetadata])

object LinkFileRequest {
@nowarn("cat=unused")
implicit private val config: Configuration = Configuration.default
implicit val linkFileDecoder: Decoder[LinkFileRequest] = deriveConfiguredDecoder[LinkFileRequest]
implicit private val config: Configuration = Configuration.default
implicit val linkFileDecoder: Decoder[LinkFileRequest] = deriveConfiguredDecoder[LinkFileRequest]
implicit val regFileDecoder: Decoder[RegisterFileRequest] = deriveConfiguredDecoder[RegisterFileRequest]

def fileDescriptionFromRequest(f: LinkFileRequest): IO[FileDescription] =
f.filename.orElse(f.path.lastSegment) match {
case Some(value) => IO.pure(FileDescription(value, f.mediaType, f.metadata))
case None => IO.raiseError(InvalidFileLink)
case None => IO.raiseError(InvalidFilePath)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,11 @@ import ch.epfl.bluebrain.nexus.delta.sdk.model.BaseUri
import ch.epfl.bluebrain.nexus.delta.sdk.model.search.PaginationConfig
import ch.epfl.bluebrain.nexus.delta.sdk.permissions.model.Permission
import ch.epfl.bluebrain.nexus.delta.sourcing.config.EventLogConfig
import pureconfig.ConfigReader.Result
import pureconfig.ConvertHelpers.{catchReadError, optF}
import pureconfig.error.{CannotConvert, ConfigReaderFailures, ConvertFailure, FailureReason}
import pureconfig.generic.auto._
import pureconfig.{ConfigConvert, ConfigReader}
import pureconfig.{ConfigConvert, ConfigObjectCursor, ConfigReader}

import scala.annotation.nowarn
import scala.concurrent.duration.FiniteDuration
Expand Down Expand Up @@ -80,6 +81,29 @@ object StoragesConfig {
val description: String = s"'allowed-volumes' must contain at least '$defaultVolume' (default-volume)"
}

final case class DigestNotSupportedOnS3(digestAlgorithm: DigestAlgorithm) extends FailureReason {
val description: String = s"Digest algorithm '${digestAlgorithm.value}' is not supported on S3"
}

private def assertValidS3Algorithm(
digestAlgorithm: DigestAlgorithm,
amazonCursor: ConfigObjectCursor
): Result[Unit] = {
digestAlgorithm.value match {
case "SHA-256" | "SHA-1" => Right(())
case _ =>
Left(
ConfigReaderFailures(
ConvertFailure(
DigestNotSupportedOnS3(digestAlgorithm),
None,
amazonCursor.atKeyOrUndefined("digest-algorithm").path
)
)
)
}
}

implicit val storageTypeConfigReader: ConfigReader[StorageTypeConfig] = ConfigReader.fromCursor { cursor =>
for {
obj <- cursor.asObjectCursor
Expand All @@ -96,6 +120,7 @@ object StoragesConfig {
amazonEnabledCursor <- amazonCursor.atKey("enabled")
amazonEnabled <- amazonEnabledCursor.asBoolean
amazon <- ConfigReader[S3StorageConfig].from(amazonCursor)
_ <- assertValidS3Algorithm(amazon.digestAlgorithm, amazonCursor)
remoteCursor <- obj.atKeyOrUndefined("remote-disk").asObjectCursor
remoteEnabledCursor <- remoteCursor.atKey("enabled")
remoteEnabled <- remoteEnabledCursor.asBoolean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ object DigestAlgorithm {
final val default: DigestAlgorithm =
new DigestAlgorithm("SHA-256")

final val SHA1: DigestAlgorithm =
new DigestAlgorithm("SHA-1")

/**
* Safely construct an [[DigestAlgorithm]]
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,6 @@ object StorageValue {
) extends StorageValue {

override val tpe: StorageType = StorageType.S3Storage

}

object S3StorageValue {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@ import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.{ComputedFileAt
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.Storage.{DiskStorage, RemoteDiskStorage, S3Storage}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.StorageValue.{DiskStorageValue, RemoteDiskStorageValue, S3StorageValue}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.{Storage, StorageValue}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.StorageFileRejection.{FetchAttributeRejection, MoveFileRejection}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.StorageFileRejection.{FetchAttributeRejection, MoveFileRejection, RegisterFileRejection}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.disk.DiskFileOperations
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.remote.RemoteDiskFileOperations
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.S3FileOperations
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.S3FileOperations.S3FileMetadata
import ch.epfl.bluebrain.nexus.delta.sdk.AkkaSource
import ch.epfl.bluebrain.nexus.delta.sourcing.model.ProjectRef

Expand All @@ -22,6 +23,8 @@ trait FileOperations extends StorageAccess {

def link(storage: Storage, sourcePath: Uri.Path, filename: String): IO[FileStorageMetadata]

def register(storage: Storage, path: Uri.Path): IO[S3FileMetadata]

def fetchAttributes(storage: Storage, attributes: FileAttributes): IO[ComputedFileAttributes]
}

Expand Down Expand Up @@ -62,6 +65,12 @@ object FileOperations {
case s: RemoteDiskStorage => remoteDiskFileOps.fetchAttributes(s.value.folder, attributes.path)
case s => IO.raiseError(FetchAttributeRejection.UnsupportedOperation(s.tpe))
}

override def register(storage: Storage, path: Uri.Path): IO[S3FileMetadata] =
storage match {
case s: S3Storage => s3FileOps.register(s.value.bucket, path)
case s => IO.raiseError(RegisterFileRejection.UnsupportedOperation(s.tpe))
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations

import akka.http.scaladsl.model.StatusCodes
import akka.http.scaladsl.model.{StatusCodes, Uri}
import cats.data.NonEmptyList
import ch.epfl.bluebrain.nexus.delta.kernel.error.Rejection
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.StorageType
import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri
Expand Down Expand Up @@ -192,4 +193,20 @@ object StorageFileRejection {

}

sealed abstract class RegisterFileRejection(loggedDetails: String) extends StorageFileRejection(loggedDetails)

object RegisterFileRejection {
final case class MissingS3Attributes(missingAttributes: NonEmptyList[String])
extends RegisterFileRejection(s"Missing attributes from S3: ${missingAttributes.toList.mkString(", ")}")

final case class InvalidContentType(received: String)
extends RegisterFileRejection(s"Invalid content type returned from S3: $received")

final case class InvalidPath(path: Uri.Path)
extends RegisterFileRejection(s"An S3 path must contain at least the filename. Path was $path")

final case class UnsupportedOperation(tpe: StorageType)
extends MoveFileRejection(s"Registering a file in-place is not supported for storages of type '${tpe.iri}'")
}

}
Original file line number Diff line number Diff line change
@@ -1,22 +1,31 @@
package ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3

import akka.actor.ActorSystem
import akka.http.scaladsl.model.{BodyPartEntity, Uri}
import akka.http.scaladsl.model.{BodyPartEntity, ContentType, Uri}
import akka.stream.scaladsl.Source
import akka.util.ByteString
import cats.effect.IO
import cats.syntax.all._
import ch.epfl.bluebrain.nexus.delta.kernel.Logger
import ch.epfl.bluebrain.nexus.delta.kernel.utils.UUIDF
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.FileStorageMetadata
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.FileAttributes.FileAttributesOrigin
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.{Digest, FileStorageMetadata}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.DigestAlgorithm
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.Storage.S3Storage
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.StorageRejection.StorageNotAccessible
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.StorageFileRejection.FetchFileRejection.UnexpectedFetchError
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.StorageFileRejection.RegisterFileRejection.InvalidContentType
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.S3FileOperations.S3FileMetadata
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.client.S3StorageClient
import ch.epfl.bluebrain.nexus.delta.rdf.syntax.uriSyntax
import ch.epfl.bluebrain.nexus.delta.sdk.AkkaSource
import ch.epfl.bluebrain.nexus.delta.sdk.stream.StreamConverter
import org.apache.commons.codec.binary.Hex
import software.amazon.awssdk.services.s3.model.HeadObjectResponse

import java.net.URLDecoder
import java.nio.charset.StandardCharsets.UTF_8
import java.util.Base64
import scala.concurrent.duration.DurationInt

trait S3FileOperations {
Expand All @@ -29,9 +38,12 @@ trait S3FileOperations {
filename: String,
entity: BodyPartEntity
): IO[FileStorageMetadata]

def register(bucket: String, path: Uri.Path): IO[S3FileMetadata]
}

object S3FileOperations {
final case class S3FileMetadata(contentType: ContentType, metadata: FileStorageMetadata)

def mk(client: S3StorageClient)(implicit as: ActorSystem, uuidf: UUIDF): S3FileOperations = new S3FileOperations {

Expand Down Expand Up @@ -63,6 +75,48 @@ object S3FileOperations {

override def save(storage: S3Storage, filename: String, entity: BodyPartEntity): IO[FileStorageMetadata] =
saveFile.apply(storage, filename, entity)

override def register(bucket: String, path: Uri.Path): IO[S3FileMetadata] = {
for {
_ <- log.info(s"Fetching attributes for S3 file. Bucket $bucket at path $path")
resp <- client.headObject(bucket, path.toString())
contentType <- parseContentType(resp.contentType())
metadata <- mkS3Metadata(bucket, path, resp, contentType)
} yield metadata
}
.onError { e =>
log.error(e)(s"Failed fetching required attributes for S3 file registration. Bucket $bucket and path $path")
}

private def parseContentType(raw: String): IO[ContentType] =
ContentType.parse(raw).map(_.pure[IO]).getOrElse(IO.raiseError(InvalidContentType(raw)))

private def mkS3Metadata(bucket: String, path: Uri.Path, resp: HeadObjectResponse, ct: ContentType) = {
for {
uuid <- uuidf()
checksum <- checksumFrom(resp)
} yield S3FileMetadata(
ct,
FileStorageMetadata(
uuid,
resp.contentLength(),
checksum,
FileAttributesOrigin.External,
client.baseEndpoint / bucket / path,
path
)
)
}

private def checksumFrom(response: HeadObjectResponse) = IO.fromOption {
Option(response.checksumSHA256())
.map { checksum =>
Digest.ComputedDigest(
DigestAlgorithm.default,
Hex.encodeHexString(Base64.getDecoder.decode(checksum))
)
}
}(new IllegalArgumentException("Missing checksum"))
}

}
Loading

0 comments on commit 607fab2

Please sign in to comment.