Skip to content

Commit

Permalink
Fix S3 file download (#5008)
Browse files Browse the repository at this point in the history
Co-authored-by: Simon Dumas <[email protected]>
  • Loading branch information
imsdu and Simon Dumas authored Jun 4, 2024
1 parent 3df27f6 commit 069afbf
Show file tree
Hide file tree
Showing 6 changed files with 23 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ import ch.epfl.bluebrain.nexus.delta.sdk.AkkaSource
import ch.epfl.bluebrain.nexus.delta.sdk.stream.StreamConverter
import software.amazon.awssdk.services.s3.model.NoSuchKeyException

import scala.concurrent.duration.DurationInt

trait S3FileOperations {
def checkBucketExists(bucket: String): IO[Unit]

Expand All @@ -34,8 +32,7 @@ trait S3FileOperations {
object S3FileOperations {
final case class S3FileMetadata(contentType: Option[ContentType], metadata: FileStorageMetadata)

private val log = Logger[S3FileOperations]
private val ChunkSize = 8 * 1024
private val log = Logger[S3FileOperations]

def mk(client: S3StorageClient, locationGenerator: S3LocationGenerator)(implicit
as: ActorSystem,
Expand All @@ -55,8 +52,7 @@ object S3FileOperations {
StreamConverter(
client
.readFile(bucket, UrlUtils.decode(path))
.groupWithin(ChunkSize, 1.second)
.map(bytes => ByteString(bytes.toArray))
.map(bytes => ByteString(bytes))
.adaptError {
case _: NoSuchKeyException => FetchFileRejection.FileNotFound(path.toString)
case err => UnexpectedFetchError(path.toString, err.getMessage)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ trait S3StorageClient {

def listObjectsV2(bucket: String, prefix: String): IO[ListObjectsV2Response]

def readFile(bucket: String, fileKey: String): Stream[IO, Byte]
def readFile(bucket: String, fileKey: String): Stream[IO, ByteBuffer]

def readFileMultipart(bucket: String, fileKey: String): Stream[IO, Byte]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ private[client] object S3StorageClientDisabled extends S3StorageClient {

override def listObjectsV2(bucket: String, prefix: String): IO[ListObjectsV2Response] = raiseDisabledErr

override def readFile(bucket: String, fileKey: String): Stream[IO, Byte] = Stream.raiseError[IO](disabledErr)
override def readFile(bucket: String, fileKey: String): Stream[IO, ByteBuffer] = Stream.raiseError[IO](disabledErr)

override def headObject(bucket: String, key: String): IO[HeadObject] = raiseDisabledErr

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.StorageRejec
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.{checksumAlgorithm, CopyOptions, HeadObject, PutObjectRequest}
import eu.timepit.refined.refineMV
import eu.timepit.refined.types.string.NonEmptyString
import fs2.Stream
import fs2.aws.s3.S3
import fs2.aws.s3.models.Models.{BucketName, FileKey, PartSizeMB}
import fs2.interop.reactivestreams.{PublisherOps, _}
import fs2.{Chunk, Stream}
import io.laserdisc.pure.s3.tagless.S3AsyncClientOp
import org.reactivestreams.Subscriber
import software.amazon.awssdk.core.async.AsyncRequestBody
Expand All @@ -31,11 +31,10 @@ final private[client] class S3StorageClientImpl(client: S3AsyncClientOp[IO]) ext
override def listObjectsV2(bucket: String, prefix: String): IO[ListObjectsV2Response] =
client.listObjectsV2(ListObjectsV2Request.builder().bucket(bucket).prefix(prefix).build())

override def readFile(bucket: String, fileKey: String): Stream[IO, Byte] = {
override def readFile(bucket: String, fileKey: String): Stream[IO, ByteBuffer] =
Stream
.eval(client.getObject(getObjectRequest(bucket, fileKey), new Fs2StreamAsyncResponseTransformer))
.flatMap(_.toStreamBuffered[IO](2).flatMap(bb => Stream.chunk(Chunk.byteBuffer(bb))))
}
.flatMap(_.toStreamBuffered[IO](2))

override def readFileMultipart(bucket: String, fileKey: String): Stream[IO, Byte] = {
val bucketName = BucketName(NonEmptyString.unsafeFrom(bucket))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,14 @@ import ch.epfl.bluebrain.nexus.delta.kernel.config.Configs
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.client.S3StorageClient
import ch.epfl.bluebrain.nexus.delta.sourcing.config.DatabaseConfig
import com.typesafe.config.Config
import fs2.Stream
import fs2.{Chunk, Stream}
import fs2.io.file.Path
import pureconfig.ConfigReader
import pureconfig.backend.ConfigFactoryWrapper
import pureconfig.error.ConfigReaderException
import pureconfig.generic.semiauto.deriveReader

import java.nio.ByteBuffer
import java.nio.charset.StandardCharsets.UTF_8

final case class ShipConfig(database: DatabaseConfig, s3: S3Config, input: InputConfig)
Expand All @@ -28,7 +29,7 @@ object ShipConfig {
externalConfig.flatMap(mergeFromConfig)
}

def merge(externalConfigStream: Stream[IO, Byte]): IO[(ShipConfig, Config)] = {
def merge(externalConfigStream: Stream[IO, ByteBuffer]): IO[(ShipConfig, Config)] = {
val externalConfig = configFromStream(externalConfigStream)
externalConfig.flatMap(mergeFromConfig)
}
Expand All @@ -51,9 +52,9 @@ object ShipConfig {
* Loads a config from a stream. Taken from
* https://github.com/pureconfig/pureconfig/tree/master/modules/fs2/src/main/scala/pureconfig/module/fs2
*/
private def configFromStream(configStream: Stream[IO, Byte]): IO[Config] =
private def configFromStream(configStream: Stream[IO, ByteBuffer]): IO[Config] =
for {
bytes <- configStream.compile.to(Array)
bytes <- configStream.flatMap(bb => Stream.chunk(Chunk.byteBuffer(bb))).compile.to(Array)
string = new String(bytes, UTF_8)
configOrError <- IO.delay(ConfigFactoryWrapper.parseString(string))
config <- IO.fromEither(configOrError.leftMap(ConfigReaderException[Config]))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -337,8 +337,8 @@ class S3StorageSpec extends StorageSpec {
}
}

"Uploading a large file" should {
"succeed" ignore {
"Uploading and downloading a large file" should {
"succeed" in {
val content = {
val sb = new StringBuilder
(1 to 100_000_000).foreach(_ => sb.append('1'))
Expand All @@ -351,11 +351,15 @@ class S3StorageSpec extends StorageSpec {
content
)
for {
_ <- IO.println("Starting the upload")
_ <- deltaClient.uploadFile(projectRef, storageId, fileInput, None) { expectCreated }
_ <- deltaClient.get[ByteString](s"/files/$projectRef/${fileInput.fileId}", Coyote, acceptAll) {
expectOk
}
_ <- IO.println("Starting the upload")
startUpload <- IO.delay(System.currentTimeMillis())
_ <- deltaClient.uploadFile(projectRef, storageId, fileInput, None) { expectCreated }.timed
endUpload <- IO.delay(System.currentTimeMillis())
_ <- IO.println(s"End of upload after ${endUpload - startUpload}")
_ <- IO.println("Starting the download")
_ <- deltaClient.get[ByteString](s"/files/$projectRef/${fileInput.fileId}", Coyote, acceptAll) { expectOk }
endDownload <- IO.delay(System.currentTimeMillis())
_ <- IO.println(s"End of download after ${endDownload - endUpload}")
} yield succeed
}
}
Expand Down

0 comments on commit 069afbf

Please sign in to comment.