-
Notifications
You must be signed in to change notification settings - Fork 74
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Replace alpakka S3 with fs2-aws and minio with localstack #4852
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@shinyhappydan sorry bit of a mess. We may have an annoying problem with the digest + algorithm. The integration tests assume it's always SHA-256
but S3 computed digests will be based on MD5 - the hardcoded responses will be wrong (and StorageSpec
is leaking again 😬)
@@ -56,7 +56,7 @@ final class Files( | |||
storages: FetchStorage, | |||
storagesStatistics: StoragesStatistics, | |||
remoteDiskStorageClient: RemoteDiskStorageClient, | |||
config: StorageTypeConfig | |||
s3Client: S3StorageClient |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Didn't get round to it yet but I was planning to pull out the file operations like SaveFile
- it will need some refactoring to the interface. The clients should really be abstracted away from here
): IO[FileStorageMetadata] = | ||
maybeEtags.sequence match { | ||
case Some(onlyPartETag :: Nil) => | ||
// TODO our tests expect specific values for digests and the only algorithm currently used is SHA-256. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can be done separately, and will be much simpler if we just use a regular put object (as @imsdu suggested)
// TODO issue fetching attributes when tested against localstack, only after the object is saved | ||
// Verify if it's the same for real S3. Error msg: 'Could not parse XML response.' | ||
// For now we just compute it manually. | ||
// private def getFileSize(key: String) = | ||
// getFileAttributes(key).flatMap { attr => | ||
// log(key, s"File attributes from S3: $attr").as(attr.objectSize()) | ||
// } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The idea was both to verify the saving in S3 and using the returned attributes - object size and checksum. Currently blocked by some error. Again can be separate
private def computeSize(bytes: Stream[IO, Byte]): IO[Long] = bytes.fold(0L)(_ + _).compile.lastOrError | ||
|
||
private def computeDigest(bytes: Stream[IO, Byte], algorithm: DigestAlgorithm): IO[String] = { | ||
val digest = algorithm.digest | ||
bytes.chunks | ||
.evalMap(chunk => IO(digest.update(chunk.toArray))) | ||
.compile | ||
.lastOrError | ||
.map(_ => digest.digest().map("%02x".format(_)).mkString) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Both temporary hacks until we fetch the data
def underlyingClient: S3AsyncClientOp[IO] | ||
|
||
def baseEndpoint: IO[Uri] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Purely for convenience, ideally we shouldn't need to expose these here, it's a smell. All S3 interactions can be abstracted by the client once we know exactly the calls we need w/ the correct parameters.
|
||
private def raiseUnexpectedErr[A](key: String, msg: String): IO[A] = IO.raiseError(UnexpectedSaveError(key, msg)) | ||
|
||
private def log(key: String, msg: String) = logger.info(s"Bucket: ${bucket.value}. Key: $key. $msg") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should be set to debug
private def uploadFileMultipart(fileData: Stream[IO, Byte], key: String): IO[List[Option[ETag]]] = | ||
fileData | ||
.through( | ||
s3.uploadFileMultipart( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could do a put object here, not sure what the multipart brings here.
Is there only 1 part in the end ?
Using a multipart upload will have an impact on compacting the checksum
private val s3 = S3.create(client) | ||
private val multipartETagValidation = MultipartETagValidation.create[IO] | ||
private val logger = Logger[S3StorageSaveFile] | ||
private val partSizeMB: PartSizeMB = refineMV(5) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could be configured, is that a recommended value ?
The PutObject
operation is recommended up to 100MB (but can go further)
.compile | ||
.to(List) | ||
|
||
private def getFileAttributes(key: String): IO[GetObjectAttributesResponse] = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some of these operations could go in the S3StorageClient
no ?
import scala.jdk.CollectionConverters.ListHasAsScala | ||
|
||
class S3StorageFetchSaveSpec | ||
extends NexusSuite |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems to be mixing Scalatest and MUnit
- A test on a missing file ?
|
||
def readFile(bucket: String, fileKey: String): fs2.Stream[IO, Byte] = | ||
for { | ||
bk <- Stream.fromEither[IO](refineV[NonEmpty](bucket).leftMap(e => new IllegalArgumentException(e))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could be improved
No description provided.