Skip to content

Commit

Permalink
Filter out directories from import batch (#4985)
Browse files Browse the repository at this point in the history
Co-authored-by: Simon Dumas <[email protected]>
  • Loading branch information
imsdu and Simon Dumas authored May 17, 2024
1 parent 660bfda commit 16cbed1
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import scala.concurrent.duration.DurationInt

trait FileCopier {

def copyFile(project: ProjectRef, attributes: FileAttributes): IO[Uri.Path]
def copyFile(project: ProjectRef, attributes: FileAttributes): IO[Option[Uri.Path]]

}

Expand Down Expand Up @@ -51,16 +51,25 @@ object FileCopier {
s3StorageClient.copyObjectMultiPart(importBucket, originKey, targetBucket, targetKey).void
else
s3StorageClient.copyObject(importBucket, originKey, targetBucket, targetKey).void
}

copy.timed.flatMap { case (duration, _) =>
}.timed.flatMap { case (duration, _) =>
IO.whenA(duration > longCopyThreshold)(
logger.info(s"Copy file ${attributes.path} of size ${attributes.bytes} took ${duration.toSeconds} seconds.")
).as(target)
)
}

for {
isObject <- s3StorageClient.objectExists(importBucket, originKey)
isFolder <-
if (isObject) IO.pure(false) else s3StorageClient.listObjectsV2(importBucket, originKey).map(_.hasContents)
_ <- IO.whenA(isObject) { copy }
_ <- IO.whenA(isFolder) { logger.info(s"$target has been found to be a folder, skipping the file copy...") }
_ <- IO.whenA(!isFolder && !isObject) {
logger.error(s"$target is neither an object or folder, something is wrong.")
}
} yield Option.when(isObject)(target)
}
}

def apply(): FileCopier = (_: ProjectRef, attributes: FileAttributes) => IO.pure(attributes.path)
def apply(): FileCopier = (_: ProjectRef, attributes: FileAttributes) => IO.some(attributes.path)

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import ch.epfl.bluebrain.nexus.ship.files.FileProcessor.logger
import ch.epfl.bluebrain.nexus.ship.files.FileWiring._
import ch.epfl.bluebrain.nexus.ship.storages.StorageWiring
import io.circe.Decoder
import software.amazon.awssdk.services.s3.model.NoSuchKeyException

class FileProcessor private (
files: Files,
Expand Down Expand Up @@ -63,56 +62,51 @@ class FileProcessor private (

// TODO: Remove the 5_000_000_000L limit when the multipart works correctly
event match {
case e: FileCreated =>
case e: FileCreated if e.attributes.bytes < 5_000_000_000L =>
val attrs = e.attributes
val customMetadata = Some(getCustomMetadata(attrs))
IO.whenA(attrs.bytes < 5_000_000_000L) {
fileCopier.copyFile(e.project, attrs).flatMap { newPath =>
files.registerFile(fileId, None, customMetadata, newPath, e.tag, attrs.mediaType).void
}
fileCopier.copyFile(e.project, attrs).flatMap {
case Some(newPath) =>
files
.registerFile(fileId, None, customMetadata, newPath, e.tag, attrs.mediaType)
.as(ImportStatus.Success)
case None => IO.pure(ImportStatus.Dropped)
}
case e: FileUpdated =>
case _: FileCreated => IO.pure(ImportStatus.Dropped)
case e: FileUpdated if e.attributes.bytes < 5_000_000_000L =>
val attrs = e.attributes
val customMetadata = Some(getCustomMetadata(attrs))
IO.whenA(attrs.bytes < 5_000_000_000L) {
fileCopier.copyFile(e.project, attrs).flatMap { newPath =>
files.updateRegisteredFile(fileId, None, customMetadata, cRev, newPath, e.tag, attrs.mediaType).void
}
fileCopier.copyFile(e.project, attrs).flatMap {
case Some(newPath) =>
files
.updateRegisteredFile(fileId, None, customMetadata, cRev, newPath, e.tag, attrs.mediaType)
.as(ImportStatus.Success)
case None => IO.pure(ImportStatus.Dropped)
}
case e: FileCustomMetadataUpdated =>
files.updateMetadata(fileId, cRev, e.metadata, e.tag)
case e: FileAttributesUpdated =>
case _: FileUpdated => IO.pure(ImportStatus.Dropped)
case e: FileCustomMetadataUpdated =>
files.updateMetadata(fileId, cRev, e.metadata, e.tag).as(ImportStatus.Success)
case e: FileAttributesUpdated =>
val reason = "`FileAttributesUpdated` are events related to deprecated remote storages."
files.cancelEvent(CancelEvent(e.id, e.project, reason, cRev, e.subject))
case e: FileTagAdded =>
files.tag(fileId, e.tag, e.targetRev, cRev)
case e: FileTagDeleted =>
files.deleteTag(fileId, e.tag, cRev)
case _: FileDeprecated =>
files.deprecate(fileId, cRev)
case _: FileUndeprecated =>
files.undeprecate(fileId, cRev)
case _: FileCancelledEvent => IO.unit // Not present in the export anyway
files.cancelEvent(CancelEvent(e.id, e.project, reason, cRev, e.subject)).as(ImportStatus.Success)
case e: FileTagAdded =>
files.tag(fileId, e.tag, e.targetRev, cRev).as(ImportStatus.Success)
case e: FileTagDeleted =>
files.deleteTag(fileId, e.tag, cRev).as(ImportStatus.Success)
case _: FileDeprecated =>
files.deprecate(fileId, cRev).as(ImportStatus.Success)
case _: FileUndeprecated =>
files.undeprecate(fileId, cRev).as(ImportStatus.Success)
case _: FileCancelledEvent => IO.pure(ImportStatus.Dropped) // Not present in the export anyway
}
}.redeemWith(
{
case a: ResourceAlreadyExists => logger.warn(a)("The resource already exists").as(ImportStatus.Dropped)
case i: IncorrectRev => logger.warn(i)("An incorrect revision has been provided").as(ImportStatus.Dropped)
case f: FileNotFound =>
// TODO: Remove this redemption when empty filenames are handled correctly
logger.warn(f)(s"The file ${f.id} in project ${f.project} does not exist.").as(ImportStatus.Dropped)
case n: NoSuchKeyException =>
event match {
// format: off
case e: FileCreated => logger.error(n)(s"The file ${e.id} in project ${e.project} at path ${e.attributes.path} does not exist in the source bucket. ").as(ImportStatus.Dropped)
case e: FileUpdated => logger.error(n)(s"The file ${e.id} in project ${e.project} at path ${e.attributes.path} does not exist in the source bucket. ").as(ImportStatus.Dropped)
case e => logger.error(n)(s"This error should not occur as event for file ${e.id} at rev ${e.rev} is not moving any file.").as(ImportStatus.Dropped)
// format: on
}
case other => IO.raiseError(other)
},
_ => IO.pure(ImportStatus.Success)
)
}.recoverWith {
case a: ResourceAlreadyExists => logger.warn(a)("The resource already exists").as(ImportStatus.Dropped)
case i: IncorrectRev => logger.warn(i)("An incorrect revision has been provided").as(ImportStatus.Dropped)
case f: FileNotFound =>
// TODO: Remove this redemption when empty filenames are handled correctly
logger.warn(f)(s"The file ${f.id} in project ${f.project} does not exist.").as(ImportStatus.Dropped)
case other => IO.raiseError(other)
}

}

Expand Down
5 changes: 3 additions & 2 deletions ship/src/test/resources/import/file-import.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
{"ordering":1,"type":"project","org":"public","project":"sscx","id":"projects/public/sscx" ,"rev":1,"value":{"rev": 1, "base": "https://bbp.epfl.ch/neurosciencegraph/data/", "uuid": "c7d70522-4305-480a-b190-75d757ed9a49", "@type": "ProjectCreated", "label": "sscx", "vocab": "https://bbp.epfl.ch/nexus/v1/resources/public/sscx/_/", "instant": "2020-01-22T16:03:33.105Z", "subject": {"@type": "User", "realm": "bbp", "subject": "bob"}, "apiMappings": {"prov": "http://www.w3.org/ns/prov#", "context": "https://incf.github.io/neuroshapes/contexts/", "schemaorg": "http://schema.org/", "datashapes": "https://neuroshapes.org/dash/", "ontologies": "https://neuroshapes.org/dash/ontology", "taxonomies": "https://neuroshapes.org/dash/taxonomy", "commonshapes": "https://neuroshapes.org/commons/", "provdatashapes": "https://provshapes.org/datashapes/", "provcommonshapes": "https://provshapes.org/commons/"}, "description": "This project contains somatosensorycortex dissemination publication data.", "organizationUuid": "d098a020-508b-4131-a28d-75f73b7f5f0e", "organizationLabel": "public"},"instant":"2020-01-22T17:03:33.105+01:00"}
{"ordering":2,"type":"file" ,"org":"public","project":"sscx","id":"https://bbp.epfl.ch/neurosciencegraph/data/old-path","rev":1,"value":{"id": "https://bbp.epfl.ch/neurosciencegraph/data/old-path", "rev": 1, "@type": "FileCreated", "instant": "2020-04-15T14:34:05.190Z", "project": "public/sscx", "storage": "https://bbp.epfl.ch/neurosciencegraph/data/storage?rev=1", "subject": {"@type": "User", "realm": "bbp", "subject": "bob"}, "attributes": {"path": "75b85666-b66f-4d90-8fd2-c6fb04beb5c6/0/a/7/9/a/d/1/d/002_160120B3_OH.nwb", "uuid": "0a79ad1d-4e74-4234-8efc-7fc8d41bafe6", "bytes": 9917692, "digest": {"@type": "ComputedDigest", "value": "4b9fcb8143f50d8a3949eb7f4be1dfb5790e13f3aa681211a637ef345dfafa00", "algorithm": "SHA-256"}, "origin": "Client", "filename": "002_160120B3_OH.nwb", "location": "file:///path/nexus/75b85666-b66f-4d90-8fd2-c6fb04beb5c6/0/a/7/9/a/d/1/d/002_160120B3_OH.nwb", "mediaType": "application/octet-stream"}, "storageType": "RemoteDiskStorage"},"instant":"2020-04-15T16:34:05.19+02:00"}
{"ordering":3,"type":"file" ,"org":"public","project":"sscx","id":"https://bbp.epfl.ch/neurosciencegraph/data/old-path","rev":2,"value":{"id": "https://bbp.epfl.ch/neurosciencegraph/data/old-path", "rev": 2, "@type": "FileAttributesUpdated", "bytes": 10488964, "digest": {"@type": "ComputedDigest", "value": "e237a2e1a4d56ffd63f9d2222788bc4fd9fa5e9c05466570b58675fde255dcc0", "algorithm": "SHA-256"}, "instant": "2020-04-27T07:49:39.161Z", "project": "public/sscx", "storage": "https://bbp.epfl.ch/neurosciencegraph/data/0617f3c3-8d30-4054-807f-c3708922f973?rev=1", "subject": {"@type": "User", "realm": "bbp", "subject": "alice"}, "mediaType": "application/x-hdf", "storageType": "RemoteDiskStorage"},"instant":"2020-04-27T09:49:39.161+02:00"}
{"ordering":4,"type":"file" ,"org":"public","project":"sscx","id":"https://bbp.epfl.ch/neurosciencegraph/data/old-path","rev":3,"value":{"id": "https://bbp.epfl.ch/neurosciencegraph/data/old-path", "rev": 3, "@type": "FileUpdated", "instant": "2020-09-24T09:58:43.479Z", "project": "public/sscx", "storage": "https://bbp.epfl.ch/neurosciencegraph/data/0617f3c3-8d30-4054-807f-c3708922f973?rev=1", "subject": {"@type": "User", "realm": "bbp", "subject": "bob"}, "attributes": {"path": "75b85666-b66f-4d90-8fd2-c6fb04beb5c6/8/9/5/4/c/3/e/c/002_160120B3_OH_updated.nwb", "uuid": "8954c3ec-da81-47b9-bcec-b72a1706a6a3", "bytes": 10701815, "digest": {"@type": "NotComputedDigest"}, "origin": "Storage", "filename": "002_160120B3_OH_updated.nwb", "location": "file:///path/nexus/75b85666-b66f-4d90-8fd2-c6fb04beb5c6/8/9/5/4/c/3/e/c/002_160120B3_updated.nwb", "mediaType": "application/nwb"}, "storageType": "RemoteDiskStorage"},"instant":"2020-09-24T11:58:43.479+02:00"}
{"ordering":4,"type":"file" ,"org":"public","project":"sscx","id":"https://bbp.epfl.ch/neurosciencegraph/data/old-path","rev":3,"value":{"id": "https://bbp.epfl.ch/neurosciencegraph/data/old-path", "rev": 3, "@type": "FileUpdated", "instant": "2020-09-24T09:58:43.479Z", "project": "public/sscx", "storage": "https://bbp.epfl.ch/neurosciencegraph/data/storage?rev=1", "subject": {"@type": "User", "realm": "bbp", "subject": "bob"}, "attributes": {"path": "75b85666-b66f-4d90-8fd2-c6fb04beb5c6/8/9/5/4/c/3/e/c/002_160120B3_OH_updated.nwb", "uuid": "8954c3ec-da81-47b9-bcec-b72a1706a6a3", "bytes": 10701815, "digest": {"@type": "NotComputedDigest"}, "origin": "Storage", "filename": "002_160120B3_OH_updated.nwb", "location": "file:///path/nexus/75b85666-b66f-4d90-8fd2-c6fb04beb5c6/8/9/5/4/c/3/e/c/002_160120B3_updated.nwb", "mediaType": "application/nwb"}, "storageType": "RemoteDiskStorage"},"instant":"2020-09-24T11:58:43.479+02:00"}
{"ordering":5,"type":"file" ,"org":"public","project":"sscx","id":"https://bbp.epfl.ch/neurosciencegraph/data/empty-filename","rev":1,"value":{"id": "https://bbp.epfl.ch/neurosciencegraph/data/empty-filename", "rev": 1, "@type": "FileCreated", "instant": "2023-03-02T10:46:36.969Z", "project": "public/sscx", "storage": "https://bbp.epfl.ch/neurosciencegraph/data/storage?rev=1", "subject": {"@type": "User", "realm": "bbp", "subject": "alice"}, "attributes": {"path": "public/sscx/2/b/3/9/7/9/3/0/", "uuid": "2b397930-0f69-4dad-bf6a-51825e940e12", "bytes": 538, "digest": {"@type": "ComputedDigest", "value": "b39a754a0988ca1f62e04a34d70479e9610b87beab91c58766f80c6ef6f93f3d", "algorithm": "SHA-256"}, "origin": "Client", "filename": "", "location": "file:///path/public/sscx/2/b/3/9/7/9/3/0", "mediaType": "text/plain"}, "storageType": "RemoteDiskStorage"},"instant":"2023-03-02T11:46:36.969+01:00"}
{"ordering":6,"type":"file" ,"org":"public","project":"sscx","id":"https://bbp.epfl.ch/neurosciencegraph/data/special-chars-filename","rev":1,"value":{"id": "https://bbp.epfl.ch/neurosciencegraph/data/special-chars-filename", "rev": 1, "@type": "FileCreated", "instant": "2023-03-02T10:46:36.969Z", "project": "public/sscx", "storage": "https://bbp.epfl.ch/neurosciencegraph/data/storage?rev=1", "subject": {"@type": "User", "realm": "bbp", "subject": "alice"}, "attributes": {"path": "public/sscx/1/2/3/4/5/6/7/8/special%20%5Bfile%5D.json", "uuid": "12345678-0f69-4dad-bf6a-51825e940e12", "bytes": 538, "digest": {"@type": "ComputedDigest", "value": "b39a754a0988ca1f62e04a34d70479e9610b87beab91c58766f80c6ef6f93f3d", "algorithm": "SHA-256"}, "origin": "Client", "filename": "special [file].json", "location": "file:///path/public/sscx/1/2/3/4/5/6/7/8/special%20%5Bfile%5D.json", "mediaType": "text/plain"}, "storageType": "RemoteDiskStorage"},"instant":"2023-03-02T11:46:36.969+01:00"}
{"ordering":6,"type":"file" ,"org":"public","project":"sscx","id":"https://bbp.epfl.ch/neurosciencegraph/data/special-chars-filename","rev":1,"value":{"id": "https://bbp.epfl.ch/neurosciencegraph/data/special-chars-filename", "rev": 1, "@type": "FileCreated", "instant": "2023-03-02T10:46:36.969Z", "project": "public/sscx", "storage": "https://bbp.epfl.ch/neurosciencegraph/data/storage?rev=1", "subject": {"@type": "User", "realm": "bbp", "subject": "alice"}, "attributes": {"path": "public/sscx/1/2/3/4/5/6/7/8/special%20%5Bfile%5D.json", "uuid": "12345678-0f69-4dad-bf6a-51825e940e12", "bytes": 538, "digest": {"@type": "ComputedDigest", "value": "b39a754a0988ca1f62e04a34d70479e9610b87beab91c58766f80c6ef6f93f3d", "algorithm": "SHA-256"}, "origin": "Client", "filename": "special [file].json", "location": "file:///path/public/sscx/1/2/3/4/5/6/7/8/special%20%5Bfile%5D.json", "mediaType": "text/plain"}, "storageType": "RemoteDiskStorage"},"instant":"2023-03-02T11:46:36.969+01:00"}
{"ordering":7,"type":"file" ,"org":"public","project":"sscx","id":"https://bbp.epfl.ch/neurosciencegraph/data/directory","rev":1,"value":{"id": "https://bbp.epfl.ch/neurosciencegraph/data/directory", "rev": 1, "@type": "FileCreated", "instant": "2022-09-28T12:59:11.800Z", "project": "public/sscx", "storage": "https://bbp.epfl.ch/neurosciencegraph/data/storage?rev=1", "subject": {"@type": "User", "realm": "bbp", "subject": "akkaufma"}, "attributes": {"path": "public/sscx/0/d/8/b/7/b/3/7/test_linking", "uuid": "0d8b7b37-bf62-4576-a5b4-ba2398731b8f", "bytes": 0, "digest": {"@type": "NotComputedDigest"}, "origin": "Storage", "filename": "test_linking", "location": "file:///path/nexus/bbp/atlas/0/d/8/b/7/b/3/7/test_linking", "mediaType": "application/x-directory"}, "storageType": "RemoteDiskStorage"},"instant":"2022-09-28T14:59:11.8+02:00"}
Loading

0 comments on commit 16cbed1

Please sign in to comment.