Skip to content

Commit

Permalink
Group upload metadata into case class
Browse files Browse the repository at this point in the history
  • Loading branch information
shinyhappydan committed Apr 22, 2024
1 parent d1dd9f7 commit f93a3b7
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.Storage.S3St
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.FileOperations.intermediateFolders
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.StorageFileRejection.SaveFileRejection._
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.client.S3StorageClient
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.client.S3StorageClient.UploadMetadata
import ch.epfl.bluebrain.nexus.delta.sdk.stream.StreamConverter
import fs2.Stream

Expand Down Expand Up @@ -51,12 +52,12 @@ final class S3StorageSaveFile(s3StorageClient: S3StorageClient)(implicit
val fileData: Stream[IO, Byte] = convertStream(entity.dataBytes)

(for {
_ <- log(bucket, key, s"Checking for object existence")
_ <- validateObjectDoesNotExist(bucket, key)
_ <- log(bucket, key, s"Beginning upload")
(digest, fileSize, location) <- s3StorageClient.uploadFile(fileData, bucket, key, algorithm)
_ <- log(bucket, key, s"Finished upload. Digest: $digest")
attr = fileMetadata(key, uuid, fileSize, algorithm, digest, location)
_ <- log(bucket, key, s"Checking for object existence")
_ <- validateObjectDoesNotExist(bucket, key)
_ <- log(bucket, key, s"Beginning upload")
uploadMetadata <- s3StorageClient.uploadFile(fileData, bucket, key, algorithm)
_ <- log(bucket, key, s"Finished upload. Digest: ${uploadMetadata.checksum}")
attr = fileMetadata(key, uuid, algorithm, uploadMetadata)
} yield attr)
.onError(e => logger.error(e)("Unexpected error when storing file"))
.adaptError { err => UnexpectedSaveError(key, err.getMessage) }
Expand All @@ -65,17 +66,15 @@ final class S3StorageSaveFile(s3StorageClient: S3StorageClient)(implicit
private def fileMetadata(
key: String,
uuid: UUID,
fileSize: Long,
algorithm: DigestAlgorithm,
digest: String,
location: Uri
uploadMetadata: UploadMetadata
): FileStorageMetadata =
FileStorageMetadata(
uuid = uuid,
bytes = fileSize,
digest = Digest.ComputedDigest(algorithm, digest),
bytes = uploadMetadata.fileSize,
digest = Digest.ComputedDigest(algorithm, uploadMetadata.checksum),
origin = Client,
location = location,
location = uploadMetadata.location,
path = Uri.Path(key)
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import cats.effect.{IO, Ref, Resource}
import cats.syntax.all._
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.StoragesConfig.S3StorageConfig
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.DigestAlgorithm
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.client.S3StorageClient.UploadMetadata
import ch.epfl.bluebrain.nexus.delta.rdf.syntax.uriSyntax
import ch.epfl.bluebrain.nexus.delta.sdk.error.ServiceError.FeatureDisabled
import fs2.{Chunk, Pipe, Stream}
Expand Down Expand Up @@ -38,14 +39,17 @@ trait S3StorageClient {
bucket: String,
key: String,
algorithm: DigestAlgorithm
): IO[(String, Long, Uri)]
): IO[UploadMetadata]

def objectExists(bucket: String, key: String): IO[Boolean]

def baseEndpoint: Uri
}

object S3StorageClient {

case class UploadMetadata(checksum: String, fileSize: Long, location: Uri)

def resource(s3Config: Option[S3StorageConfig]): Resource[IO, S3StorageClient] = s3Config match {
case Some(cfg) =>
val creds =
Expand Down Expand Up @@ -100,7 +104,7 @@ object S3StorageClient {
bucket: String,
key: String,
algorithm: DigestAlgorithm
): IO[(String, Long, Uri)] = {
): IO[UploadMetadata] = {
for {
fileSizeAcc <- Ref.of[IO, Long](0L)
digest <- fileData
Expand All @@ -112,7 +116,7 @@ object S3StorageClient {
.onlyOrError
fileSize <- fileSizeAcc.get
location = baseEndpoint / bucket / Uri.Path(key)
} yield (digest, fileSize, location)
} yield UploadMetadata(digest, fileSize, location)
}

private def uploadFilePipe(bucket: String, key: String, algorithm: DigestAlgorithm): Pipe[IO, Byte, String] = {
Expand Down Expand Up @@ -176,6 +180,6 @@ object S3StorageClient {
bucket: String,
key: String,
algorithm: DigestAlgorithm
): IO[(String, Long, Uri)] = raiseDisabledErr
): IO[UploadMetadata] = raiseDisabledErr
}
}

0 comments on commit f93a3b7

Please sign in to comment.