Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace alpakka S3 with fs2-aws and minio with localstack #4852

Merged
merged 12 commits into from
Apr 9, 2024

Conversation

dantb
Copy link
Contributor

@dantb dantb commented Apr 8, 2024

No description provided.

Copy link
Contributor Author

@dantb dantb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@shinyhappydan sorry bit of a mess. We may have an annoying problem with the digest + algorithm. The integration tests assume it's always SHA-256 but S3 computed digests will be based on MD5 - the hardcoded responses will be wrong (and StorageSpec is leaking again 😬)

@@ -56,7 +56,7 @@ final class Files(
storages: FetchStorage,
storagesStatistics: StoragesStatistics,
remoteDiskStorageClient: RemoteDiskStorageClient,
config: StorageTypeConfig
s3Client: S3StorageClient
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Didn't get round to it yet but I was planning to pull out the file operations like SaveFile - it will need some refactoring to the interface. The clients should really be abstracted away from here

): IO[FileStorageMetadata] =
maybeEtags.sequence match {
case Some(onlyPartETag :: Nil) =>
// TODO our tests expect specific values for digests and the only algorithm currently used is SHA-256.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can be done separately, and will be much simpler if we just use a regular put object (as @imsdu suggested)

Comment on lines +133 to +139
// TODO issue fetching attributes when tested against localstack, only after the object is saved
// Verify if it's the same for real S3. Error msg: 'Could not parse XML response.'
// For now we just compute it manually.
// private def getFileSize(key: String) =
// getFileAttributes(key).flatMap { attr =>
// log(key, s"File attributes from S3: $attr").as(attr.objectSize())
// }
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea was both to verify the saving in S3 and using the returned attributes - object size and checksum. Currently blocked by some error. Again can be separate

Comment on lines 140 to 148
private def computeSize(bytes: Stream[IO, Byte]): IO[Long] = bytes.fold(0L)(_ + _).compile.lastOrError

private def computeDigest(bytes: Stream[IO, Byte], algorithm: DigestAlgorithm): IO[String] = {
val digest = algorithm.digest
bytes.chunks
.evalMap(chunk => IO(digest.update(chunk.toArray)))
.compile
.lastOrError
.map(_ => digest.digest().map("%02x".format(_)).mkString)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both temporary hacks until we fetch the data

Comment on lines +26 to +28
def underlyingClient: S3AsyncClientOp[IO]

def baseEndpoint: IO[Uri]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Purely for convenience, ideally we shouldn't need to expose these here, it's a smell. All S3 interactions can be abstracted by the client once we know exactly the calls we need w/ the correct parameters.

@dantb dantb requested a review from shinyhappydan April 8, 2024 22:55

private def raiseUnexpectedErr[A](key: String, msg: String): IO[A] = IO.raiseError(UnexpectedSaveError(key, msg))

private def log(key: String, msg: String) = logger.info(s"Bucket: ${bucket.value}. Key: $key. $msg")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be set to debug

private def uploadFileMultipart(fileData: Stream[IO, Byte], key: String): IO[List[Option[ETag]]] =
fileData
.through(
s3.uploadFileMultipart(
Copy link
Contributor

@imsdu imsdu Apr 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could do a put object here, not sure what the multipart brings here.
Is there only 1 part in the end ?
Using a multipart upload will have an impact on compacting the checksum

private val s3 = S3.create(client)
private val multipartETagValidation = MultipartETagValidation.create[IO]
private val logger = Logger[S3StorageSaveFile]
private val partSizeMB: PartSizeMB = refineMV(5)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could be configured, is that a recommended value ?
The PutObject operation is recommended up to 100MB (but can go further)

.compile
.to(List)

private def getFileAttributes(key: String): IO[GetObjectAttributesResponse] =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some of these operations could go in the S3StorageClient no ?

import scala.jdk.CollectionConverters.ListHasAsScala

class S3StorageFetchSaveSpec
extends NexusSuite
Copy link
Contributor

@imsdu imsdu Apr 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems to be mixing Scalatest and MUnit

  • A test on a missing file ?


def readFile(bucket: String, fileKey: String): fs2.Stream[IO, Byte] =
for {
bk <- Stream.fromEither[IO](refineV[NonEmpty](bucket).leftMap(e => new IllegalArgumentException(e)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could be improved

@imsdu imsdu marked this pull request as ready for review April 9, 2024 13:55
@imsdu imsdu merged commit 96a1c94 into BlueBrain:master Apr 9, 2024
9 checks passed
@dantb dantb deleted the s3-readfile-fs2 branch April 11, 2024 11:38
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants