Skip to content

Commit

Permalink
Fix origin location decoding for local files with special chars (#5235)
Browse files Browse the repository at this point in the history
Co-authored-by: Simon Dumas <[email protected]>
  • Loading branch information
imsdu and Simon Dumas authored Nov 15, 2024
1 parent 11ff716 commit e6dbb51
Show file tree
Hide file tree
Showing 8 changed files with 86 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,13 @@ trait S3Helpers { self: Generators =>

def givenAFileInABucket(bucket: String, contents: String)(
test: String => IO[Unit]
)(implicit client: S3StorageClient): IO[Unit] =
givenAFileInABucket(bucket, genString(), contents)(test)

def givenAFileInABucket(bucket: String, key: String, contents: String)(
test: String => IO[Unit]
)(implicit client: S3StorageClient): IO[Unit] = {
val bytes = contents.getBytes(StandardCharsets.UTF_8)
val key = genString()
val put = PutObjectRequest(bucket, key, Some(ContentTypes.`text/plain(UTF-8)`), bytes.length.toLong)
client.uploadFile(put, Stream.emit(ByteBuffer.wrap(bytes))) >> test(key)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,25 @@ class S3StorageClientSuite extends NexusSuite with LocalStackS3StorageClient.Fix

override def munitFixtures: Seq[AnyFixture[_]] = List(localStackS3Client)

test("Copy a file containing special characters between buckets") {
givenAnS3Bucket { bucket =>
givenAnS3Bucket { targetBucket =>
val options = CopyOptions(overwriteTarget = false, None)
val key = "/org/proj/9/f/0/3/2/4/f/e/0925_Rhi13.3.13 cell 1+2 (superficial).asc"
givenAFileInABucket(bucket, key, fileContents) { _ =>
for {
result <- s3StorageClient.copyObject(bucket, key, targetBucket, key, options)
head <- s3StorageClient.headObject(targetBucket, key)
} yield {
assertEquals(result, S3OperationResult.Success)
assertEquals(head.fileSize, contentLength)
assertEquals(head.contentType, Some(expectedContentType))
}
}
}
}
}

test("Copy the file to its new location if none is already there without a content type") {
givenAnS3Bucket { bucket =>
val options = CopyOptions(overwriteTarget = false, None)
Expand Down
3 changes: 3 additions & 0 deletions delta/testkit/src/main/resources/logback-test.xml
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,7 @@

<logger name="ch.epfl.bluebrain.nexus" level="ERROR" />

<!-- Uncomment to get some information of the S3 sdk underlying calls -->
<!-- <logger name="software.amazon.awssdk.request" level="DEBUG" />-->

</configuration>
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@ import ch.epfl.bluebrain.nexus.ship.files.FileCopier.FileCopyResult
import ch.epfl.bluebrain.nexus.ship.files.FileCopier.FileCopyResult.{FileCopySkipped, FileCopySuccess}
import software.amazon.awssdk.services.s3.model.S3Exception

import java.net.URI
import java.nio.file.Paths
import scala.concurrent.duration.DurationInt

trait FileCopier {

def copyFile(project: ProjectRef, attributes: FileAttributes): IO[FileCopyResult]
def copyFile(project: ProjectRef, attributes: FileAttributes, localOrigin: Boolean): IO[FileCopyResult]

}

Expand All @@ -37,6 +39,8 @@ object FileCopier {
logError(logger, "s3Copy")
)

def localDiskPath(relative: Path): String = Paths.get(URI.create(s"file:/$relative")).toString

sealed trait FileCopyResult extends Product with Serializable

object FileCopyResult {
Expand All @@ -54,14 +58,14 @@ object FileCopier {
val importBucket = config.importBucket
val targetBucket = config.targetBucket
val locationGenerator = new S3LocationGenerator(config.prefix.getOrElse(Path.Empty))
(project: ProjectRef, attributes: FileAttributes) =>
(project: ProjectRef, attributes: FileAttributes, localOrigin: Boolean) =>
{
val origin = attributes.path
val patchedFileName = if (attributes.filename.isEmpty) "file" else attributes.filename
val target = locationGenerator.file(project, attributes.uuid, patchedFileName).path
val FIVE_GB = 5_000_000_000L

val originKey = UrlUtils.decode(origin)
val originKey = if (localOrigin) localDiskPath(origin) else UrlUtils.decode(origin)
val targetKey = UrlUtils.decode(target)

val copyOptions = CopyOptions(overwriteTarget = false, attributes.mediaType)
Expand All @@ -86,14 +90,17 @@ object FileCopier {
isFolder <-
if (isObject) IO.pure(false) else s3StorageClient.listObjectsV2(importBucket, originKey).map(_.hasContents)
_ <- IO.whenA(isObject) { copy }
_ <- IO.whenA(isFolder) { logger.info(s"$target has been found to be a folder, skipping the file copy...") }
_ <- IO.whenA(isFolder) {
logger.info(s"'$originKey' has been found to be a folder, skipping the file copy...")
}
_ <- IO.whenA(!isFolder && !isObject) {
logger.error(s"$target is neither an object or folder, something is wrong.")
logger.error(s"'$originKey' is neither an object or folder, something is wrong.")
}
} yield if (isObject) FileCopySuccess(target) else FileCopySkipped
}.retry(copyRetryStrategy)
}

def apply(): FileCopier = (_: ProjectRef, attributes: FileAttributes) => IO.pure(FileCopySuccess(attributes.path))
def apply(): FileCopier = (_: ProjectRef, attributes: FileAttributes, _: Boolean) =>
IO.pure(FileCopySuccess(attributes.path))

}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model._
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.{Files, MediaTypeDetector}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.FetchStorage
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.Storage
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.StorageType.DiskStorage
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.LinkFileAction
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.client.S3StorageClient
import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode
Expand Down Expand Up @@ -74,7 +75,7 @@ class FileProcessor private (
val newMediaType = patchMediaType(attrs.filename, attrs.mediaType)
val newAttrs = e.attributes.copy(mediaType = newMediaType)
val customMetadata = Some(getCustomMetadata(newAttrs))
fileCopier.copyFile(e.project, newAttrs).flatMap {
fileCopier.copyFile(e.project, newAttrs, e.storageType == DiskStorage).flatMap {
case FileCopySuccess(newPath) =>
val linkRequest = FileLinkRequest(newPath, newMediaType, customMetadata)
files
Expand All @@ -87,7 +88,7 @@ class FileProcessor private (
val newMediaType = patchMediaType(attrs.filename, attrs.mediaType)
val newAttrs = e.attributes.copy(mediaType = newMediaType)
val customMetadata = Some(getCustomMetadata(newAttrs))
fileCopier.copyFile(e.project, newAttrs).flatMap {
fileCopier.copyFile(e.project, newAttrs, e.storageType == DiskStorage).flatMap {
case FileCopySuccess(newPath) =>
val linkRequest = FileLinkRequest(newPath, newMediaType, customMetadata)
files
Expand Down
3 changes: 2 additions & 1 deletion ship/src/test/resources/import/file-import/000000001.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@
{"ordering":4,"type":"file" ,"org":"public","project":"sscx","id":"https://bbp.epfl.ch/neurosciencegraph/data/old-path","rev":3,"value":{"id": "https://bbp.epfl.ch/neurosciencegraph/data/old-path", "rev": 3, "@type": "FileUpdated", "instant": "2020-09-24T09:58:43.479Z", "project": "public/sscx", "storage": "https://bbp.epfl.ch/neurosciencegraph/data/storage?rev=1", "subject": {"@type": "User", "realm": "bbp", "subject": "bob"}, "attributes": {"path": "75b85666-b66f-4d90-8fd2-c6fb04beb5c6/8/9/5/4/c/3/e/c/002_160120B3_OH_updated.nwb", "uuid": "8954c3ec-da81-47b9-bcec-b72a1706a6a3", "bytes": 10701815, "digest": {"@type": "NotComputedDigest"}, "origin": "Storage", "filename": "002_160120B3_OH_updated.nwb", "location": "file:///path/nexus/75b85666-b66f-4d90-8fd2-c6fb04beb5c6/8/9/5/4/c/3/e/c/002_160120B3_updated.nwb", "mediaType": "application/object-stream"}, "storageType": "RemoteDiskStorage"},"instant":"2020-09-24T11:58:43.479+02:00"}
{"ordering":5,"type":"file" ,"org":"public","project":"sscx","id":"https://bbp.epfl.ch/neurosciencegraph/data/empty-filename","rev":1,"value":{"id": "https://bbp.epfl.ch/neurosciencegraph/data/empty-filename", "rev": 1, "@type": "FileCreated", "instant": "2023-03-02T10:46:36.969Z", "project": "public/sscx", "storage": "https://bbp.epfl.ch/neurosciencegraph/data/storage?rev=1", "subject": {"@type": "User", "realm": "bbp", "subject": "alice"}, "attributes": {"path": "public/sscx/2/b/3/9/7/9/3/0/", "uuid": "2b397930-0f69-4dad-bf6a-51825e940e12", "bytes": 538, "digest": {"@type": "ComputedDigest", "value": "b39a754a0988ca1f62e04a34d70479e9610b87beab91c58766f80c6ef6f93f3d", "algorithm": "SHA-256"}, "origin": "Client", "filename": "", "location": "file:///path/public/sscx/2/b/3/9/7/9/3/0", "mediaType": "text/plain"}, "storageType": "RemoteDiskStorage"},"instant":"2023-03-02T11:46:36.969+01:00"}
{"ordering":6,"type":"file" ,"org":"public","project":"sscx","id":"https://bbp.epfl.ch/neurosciencegraph/data/special-chars-filename","rev":1,"value":{"id": "https://bbp.epfl.ch/neurosciencegraph/data/special-chars-filename", "rev": 1, "@type": "FileCreated", "instant": "2023-03-02T10:46:36.969Z", "project": "public/sscx", "storage": "https://bbp.epfl.ch/neurosciencegraph/data/storage?rev=1", "subject": {"@type": "User", "realm": "bbp", "subject": "alice"}, "attributes": {"path": "public/sscx/1/2/3/4/5/6/7/8/special%20%5Bfile%5D.json", "uuid": "12345678-0f69-4dad-bf6a-51825e940e12", "bytes": 538, "digest": {"@type": "ComputedDigest", "value": "b39a754a0988ca1f62e04a34d70479e9610b87beab91c58766f80c6ef6f93f3d", "algorithm": "SHA-256"}, "origin": "Client", "filename": "special [file].json", "location": "file:///path/public/sscx/1/2/3/4/5/6/7/8/special%20%5Bfile%5D.json", "mediaType": "text/plain"}, "storageType": "RemoteDiskStorage"},"instant":"2023-03-02T11:46:36.969+01:00"}
{"ordering":7,"type":"file" ,"org":"public","project":"sscx","id":"https://bbp.epfl.ch/neurosciencegraph/data/directory","rev":1,"value":{"id": "https://bbp.epfl.ch/neurosciencegraph/data/directory", "rev": 1, "@type": "FileCreated", "instant": "2022-09-28T12:59:11.800Z", "project": "public/sscx", "storage": "https://bbp.epfl.ch/neurosciencegraph/data/storage?rev=1", "subject": {"@type": "User", "realm": "bbp", "subject": "akkaufma"}, "attributes": {"path": "public/sscx/0/d/8/b/7/b/3/7/test_linking", "uuid": "0d8b7b37-bf62-4576-a5b4-ba2398731b8f", "bytes": 0, "digest": {"@type": "NotComputedDigest"}, "origin": "Storage", "filename": "test_linking", "location": "file:///path/nexus/bbp/atlas/0/d/8/b/7/b/3/7/test_linking", "mediaType": "application/x-directory"}, "storageType": "RemoteDiskStorage"},"instant":"2022-09-28T14:59:11.8+02:00"}
{"ordering":7,"type":"file" ,"org":"public","project":"sscx","id":"https://bbp.epfl.ch/neurosciencegraph/data/directory","rev":1,"value":{"id": "https://bbp.epfl.ch/neurosciencegraph/data/directory", "rev": 1, "@type": "FileCreated", "instant": "2022-09-28T12:59:11.800Z", "project": "public/sscx", "storage": "https://bbp.epfl.ch/neurosciencegraph/data/storage?rev=1", "subject": {"@type": "User", "realm": "bbp", "subject": "akkaufma"}, "attributes": {"path": "public/sscx/0/d/8/b/7/b/3/7/test_linking", "uuid": "0d8b7b37-bf62-4576-a5b4-ba2398731b8f", "bytes": 0, "digest": {"@type": "NotComputedDigest"}, "origin": "Storage", "filename": "test_linking", "location": "file:///path/nexus/bbp/atlas/0/d/8/b/7/b/3/7/test_linking", "mediaType": "application/x-directory"}, "storageType": "RemoteDiskStorage"},"instant":"2022-09-28T14:59:11.8+02:00"}
{"ordering":8,"type":"file" ,"org":"public","project":"sscx","id":"https://bbp.epfl.ch/neurosciencegraph/data/local-plus","rev":1,"value":{"id": "https://bbp.epfl.ch/neurosciencegraph/data/local-plus", "rev": 1, "@type": "FileCreated", "instant": "2022-10-14T12:59:11.800Z", "project": "public/sscx", "storage": "https://bbp.epfl.ch/neurosciencegraph/data/storage?rev=1", "subject": {"@type": "User", "realm": "bbp", "subject": "akkaufma"}, "attributes": {"path": "95b0ee1e-a6a5-43e9-85fb-938b3c38dfc0/9/f/0/3/2/4/f/e/0925_Rhi13.3.13%20cell%201+2%20(superficial).asc", "uuid": "9f0324fe-9aac-4d34-84f0-5a44cd278197", "bytes": 0, "digest": {"@type": "NotComputedDigest"}, "origin": "Storage", "filename": "0925_Rhi13.3.13 cell 1+2 (superficial).asc", "location": "file:///path/nexus/public/sscx/9/f/0/3/2/4/f/e", "mediaType": "application/octet-stream"}, "storageType": "DiskStorage"},"instant":"2022-10-14T14:59:11.8+02:00"}
Original file line number Diff line number Diff line change
@@ -1,31 +1,32 @@
package ch.epfl.bluebrain.nexus.ship

import akka.http.scaladsl.model.{ContentType, ContentTypes, MediaTypes}
import akka.http.scaladsl.model.{ContentType, ContentTypes, MediaTypes, Uri}
import cats.effect.IO
import ch.epfl.bluebrain.nexus.delta.kernel.Hex
import ch.epfl.bluebrain.nexus.delta.kernel.utils.UrlUtils
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.Files
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.Digest.ComputedDigest
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.FileRejection.FileNotFound
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.{FileAttributes, FileState}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.DigestAlgorithm
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.StorageType.DiskStorage
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.{DigestAlgorithm, StorageType}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.{LocalStackS3StorageClient, PutObjectRequest}
import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri
import ch.epfl.bluebrain.nexus.delta.rdf.Vocabulary.nxv
import ch.epfl.bluebrain.nexus.delta.rdf.syntax.iriStringContextSyntax
import ch.epfl.bluebrain.nexus.delta.sdk.implicits._
import ch.epfl.bluebrain.nexus.delta.sdk.projects.Projects
import ch.epfl.bluebrain.nexus.delta.sdk.resolvers.Resolvers
import ch.epfl.bluebrain.nexus.delta.sdk.resources.Resources
import ch.epfl.bluebrain.nexus.delta.sourcing.Transactors
import ch.epfl.bluebrain.nexus.delta.sourcing.exporter.RowEvent
import ch.epfl.bluebrain.nexus.delta.sourcing.implicits._
import ch.epfl.bluebrain.nexus.delta.sourcing.model.{EntityType, Label, ProjectRef}
import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset
import ch.epfl.bluebrain.nexus.delta.sourcing.postgres.Doobie
import ch.epfl.bluebrain.nexus.delta.sourcing.state.ScopedStateGet
import ch.epfl.bluebrain.nexus.ship.ImportReport.Statistics
import ch.epfl.bluebrain.nexus.ship.RunShipSuite.{checkFor, expectedImportReport, fetchFileAttributes, getDistinctOrgProjects}
import ch.epfl.bluebrain.nexus.ship.config.ShipConfigFixtures
import ch.epfl.bluebrain.nexus.ship.files.FileCopier.localDiskPath
import ch.epfl.bluebrain.nexus.testkit.mu.NexusSuite
import doobie.Get
import doobie.syntax.all._
Expand Down Expand Up @@ -65,8 +66,19 @@ class RunShipSuite
s3Client.uploadFile(put, Stream.emit(contentAsBuffer))
}

private def decodedFilePath(json: Json) = root.attributes.path.string.getOption(json).map(UrlUtils.decode)
private def fileContentType = root.attributes.mediaType.string.getOption(_)
private def decodedFilePath(json: Json) = {
root.storageType.as[StorageType].getOption(json).flatMap {
case DiskStorage =>
root.attributes.path.as[Uri.Path].getOption(json).map { path =>
localDiskPath(path).toString
}
case _ =>
root.attributes.path.string.getOption(json).map(UrlUtils.decode)
}

}

private def fileContentType = root.attributes.mediaType.string.getOption(_)

private def generatePhysicalFile(row: RowEvent) =
IO.whenA(row.`type` == Files.entityType) {
Expand Down Expand Up @@ -136,7 +148,8 @@ class RunShipSuite
report.progress == Map(Projects.entityType -> Statistics(1L, 0L))

test("Import files in S3 and in the primary store") {
val textPlain = MediaTypes.`text/plain`.withMissingCharset
val textPlain = MediaTypes.`text/plain`.withMissingCharset
val applicationOctetStream = MediaTypes.`application/octet-stream`
for {
events <- eventsStream("import/file-import/000000001.json")
report <- RunShip(events, s3Client, inputConfig, xas)
Expand Down Expand Up @@ -166,13 +179,23 @@ class RunShipSuite
_ <- checkFor("file", specialCharsId, xas).assertEquals(1)
_ <- assertS3Object(specialCharsLocation, Some(textPlain))
_ <- assertFileAttributes(project, specialCharsId)(specialCharsLocation, "special [file].json", Some(textPlain))
// Local file containing a plus
localPlusId = iri"https://bbp.epfl.ch/neurosciencegraph/data/local-plus"
localPlusLocation = "/prefix/public/sscx/files/9/f/0/3/2/4/f/e/0925_Rhi13.3.13 cell 1 2 (superficial).asc"
_ <- checkFor("file", localPlusId, xas).assertEquals(1)
_ <- assertS3Object(localPlusLocation, Some(applicationOctetStream))
_ <- assertFileAttributes(project, localPlusId)(
localPlusLocation,
"0925_Rhi13.3.13 cell 1+2 (superficial).asc",
Some(applicationOctetStream)
)
// Directory, should be skipped
directoryId = iri"https://bbp.epfl.ch/neurosciencegraph/data/directory"
_ <- checkFor("file", directoryId, xas).assertEquals(0)
// Summary S3 check, 4 objects should have been imported in total
_ <- s3Client.listObjectsV2(targetBucket).map(_.keyCount().intValue()).assertEquals(4)
_ <- s3Client.listObjectsV2(targetBucket).map(_.keyCount().intValue()).assertEquals(5)
// Summary report check, only the directory event should have been skipped
_ = assertEquals(report.progress(Files.entityType).success, 5L)
_ = assertEquals(report.progress(Files.entityType).success, 6L)
_ = assertEquals(report.progress(Files.entityType).dropped, 1L)
} yield ()
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
package ch.epfl.bluebrain.nexus.ship.files

import akka.http.scaladsl.model.{ContentTypes, MediaTypes}
import akka.http.scaladsl.model.{ContentTypes, MediaTypes, Uri}
import ch.epfl.bluebrain.nexus.delta.kernel.http.MediaTypeDetectorConfig
import ch.epfl.bluebrain.nexus.ship.files.FileCopier.localDiskPath
import ch.epfl.bluebrain.nexus.testkit.mu.NexusSuite

class FileProcessorSuite extends NexusSuite {
Expand All @@ -11,6 +12,13 @@ class FileProcessorSuite extends NexusSuite {
"pdf" -> MediaTypes.`application/pdf`
)

test("Correctly decode a local path") {
val encoded = Uri.Path("org/proj/9/f/0/3/2/4/f/e/0925_Rhi13.3.13%20cell%201+2%20(superficial).asc")
val obtained = localDiskPath(encoded)
val expected = "/org/proj/9/f/0/3/2/4/f/e/0925_Rhi13.3.13 cell 1+2 (superficial).asc"
assertEquals(obtained, expected)
}

test("Return a new content type matching the config") {
assertEquals(
FileProcessor.patchMediaType("file.json", None),
Expand Down

0 comments on commit e6dbb51

Please sign in to comment.