Skip to content

Commit

Permalink
Ensure S3 storage can't be configured with an invalid digest algorithm
Browse files Browse the repository at this point in the history
  • Loading branch information
shinyhappydan committed Apr 19, 2024
1 parent e986f80 commit 5ea25b5
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,11 @@ import ch.epfl.bluebrain.nexus.delta.sdk.model.BaseUri
import ch.epfl.bluebrain.nexus.delta.sdk.model.search.PaginationConfig
import ch.epfl.bluebrain.nexus.delta.sdk.permissions.model.Permission
import ch.epfl.bluebrain.nexus.delta.sourcing.config.EventLogConfig
import pureconfig.ConfigReader.Result
import pureconfig.ConvertHelpers.{catchReadError, optF}
import pureconfig.error.{CannotConvert, ConfigReaderFailures, ConvertFailure, FailureReason}
import pureconfig.generic.auto._
import pureconfig.{ConfigConvert, ConfigReader}
import pureconfig.{ConfigConvert, ConfigObjectCursor, ConfigReader}

import scala.annotation.nowarn
import scala.concurrent.duration.FiniteDuration
Expand Down Expand Up @@ -80,6 +81,29 @@ object StoragesConfig {
val description: String = s"'allowed-volumes' must contain at least '$defaultVolume' (default-volume)"
}

final case class DigestNotSupportedOnS3(digestAlgorithm: DigestAlgorithm) extends FailureReason {
val description: String = s"Digest algorithm '${digestAlgorithm.value}' is not supported on S3"
}

private def assertValidS3Algorithm(
digestAlgorithm: DigestAlgorithm,
amazonCursor: ConfigObjectCursor
): Result[Unit] = {
digestAlgorithm.value match {
case "SHA-256" | "SHA-1" | "MD5" => Right(())
case _ =>
Left(
ConfigReaderFailures(
ConvertFailure(
DigestNotSupportedOnS3(digestAlgorithm),
None,
amazonCursor.atKeyOrUndefined("digest-algorithm").path
)
)
)
}
}

implicit val storageTypeConfigReader: ConfigReader[StorageTypeConfig] = ConfigReader.fromCursor { cursor =>
for {
obj <- cursor.asObjectCursor
Expand All @@ -96,6 +120,7 @@ object StoragesConfig {
amazonEnabledCursor <- amazonCursor.atKey("enabled")
amazonEnabled <- amazonEnabledCursor.asBoolean
amazon <- ConfigReader[S3StorageConfig].from(amazonCursor)
_ <- assertValidS3Algorithm(amazon.digestAlgorithm, amazonCursor)
remoteCursor <- obj.atKeyOrUndefined("remote-disk").asObjectCursor
remoteEnabledCursor <- remoteCursor.atKey("enabled")
remoteEnabled <- remoteEnabledCursor.asBoolean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.DigestAlgori
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.Storage.S3Storage
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.FileOperations.intermediateFolders
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.StorageFileRejection.SaveFileRejection._
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.S3StorageSaveFile.PutObjectRequestOps
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.client.S3StorageClient
import ch.epfl.bluebrain.nexus.delta.rdf.syntax.uriSyntax
import ch.epfl.bluebrain.nexus.delta.sdk.stream.StreamConverter
Expand Down Expand Up @@ -124,27 +125,25 @@ final class S3StorageSaveFile(s3StorageClient: S3StorageClient)(implicit
private def uploadFilePipe(bucket: String, key: String, algorithm: DigestAlgorithm): Pipe[IO, Byte, String] = { in =>
fs2.Stream.eval {
in.compile.to(Chunk).flatMap { chunks =>
val bs = chunks.toByteBuffer
val request = PutObjectRequest
.builder()
.bucket(bucket)
.key(key)

val bs = chunks.toByteBuffer
for {
fullRequest <- setAlgorithm(request, algorithm)
response <- s3.putObject(
fullRequest
.build(),
AsyncRequestBody.fromByteBuffer(bs)
)
response <- s3.putObject(
PutObjectRequest
.builder()
.bucket(bucket)
.deltaDigest(algorithm)
.key(key)
.build(),
AsyncRequestBody.fromByteBuffer(bs)
)
} yield {
parseResponse(response, algorithm)
checksumFromResponse(response, algorithm)
}
}
}
}

private def parseResponse(response: PutObjectResponse, algorithm: DigestAlgorithm): String = {
private def checksumFromResponse(response: PutObjectResponse, algorithm: DigestAlgorithm): String = {
algorithm.value match {
case "MD5" => response.eTag().stripPrefix("\"").stripSuffix("\"")
case "SHA-256" => Hex.encodeHexString(Base64.getDecoder.decode(response.checksumSHA256()))
Expand All @@ -153,20 +152,21 @@ final class S3StorageSaveFile(s3StorageClient: S3StorageClient)(implicit
}
}

private def setAlgorithm(
request: PutObjectRequest.Builder,
algorithm: DigestAlgorithm
): IO[PutObjectRequest.Builder] =
algorithm.value match {
case "MD5" => IO.pure(request)
case "SHA-256" => IO.delay(request.checksumAlgorithm(ChecksumAlgorithm.SHA256))
case "SHA-1" => IO.delay(request.checksumAlgorithm(ChecksumAlgorithm.SHA1))
case _ => IO.raiseError(new IllegalArgumentException(s"Unsupported algorithm for S3: ${algorithm.value}"))
}

private def getFileAttributes(bucket: String, key: String): IO[GetObjectAttributesResponse] =
s3StorageClient.getFileAttributes(bucket, key)

private def log(bucket: String, key: String, msg: String): IO[Unit] =
logger.info(s"Bucket: ${bucket}. Key: $key. $msg")
}

object S3StorageSaveFile {
implicit class PutObjectRequestOps(request: PutObjectRequest.Builder) {
def deltaDigest(algorithm: DigestAlgorithm): PutObjectRequest.Builder =
algorithm.value match {
case "MD5" => request
case "SHA-256" => request.checksumAlgorithm(ChecksumAlgorithm.SHA256)
case "SHA-1" => request.checksumAlgorithm(ChecksumAlgorithm.SHA1)
case _ => throw new IllegalArgumentException(s"Unsupported algorithm for S3: ${algorithm.value}")
}
}
}

0 comments on commit 5ea25b5

Please sign in to comment.