From 905823299bfb8a3873e47cb5cdcb29da62c0fd73 Mon Sep 17 00:00:00 2001 From: dantb Date: Fri, 9 Feb 2024 12:38:12 +0000 Subject: [PATCH] Allow linking with copy/delete in storage service (#4728) --- build.sbt | 3 + .../utils/TransactionalFileCopier.scala | 4 + storage/src/main/resources/app.conf | 2 + .../epfl/bluebrain/nexus/storage/Main.scala | 2 + .../bluebrain/nexus/storage/Storages.scala | 61 +++++-- .../nexus/storage/config/AppConfig.scala | 3 +- .../nexus/storage/DiskStorageSpec.scala | 165 ++++++++++-------- 7 files changed, 157 insertions(+), 83 deletions(-) diff --git a/build.sbt b/build.sbt index 265a63b611..4c04d2c3ac 100755 --- a/build.sbt +++ b/build.sbt @@ -21,6 +21,7 @@ val akkaCorsVersion = "1.2.0" val akkaVersion = "2.6.21" val alpakkaVersion = "3.0.4" val apacheCompressVersion = "1.25.0" +val apacheIOVersion = "2.15.1" val awsSdkVersion = "2.17.184" val byteBuddyAgentVersion = "1.10.17" val betterMonadicForVersion = "0.3.1" @@ -74,6 +75,7 @@ lazy val alpakkaFile = "com.lightbend.akka" %% "akka-stream-alp lazy val alpakkaSse = "com.lightbend.akka" %% "akka-stream-alpakka-sse" % alpakkaVersion lazy val alpakkaS3 = "com.lightbend.akka" %% "akka-stream-alpakka-s3" % alpakkaVersion lazy val apacheCompress = "org.apache.commons" % "commons-compress" % apacheCompressVersion +lazy val apacheIO = "commons-io" % "commons-io" % apacheIOVersion lazy val awsSdk = "software.amazon.awssdk" % "s3" % awsSdkVersion lazy val betterMonadicFor = "com.olegpy" %% "better-monadic-for" % betterMonadicForVersion lazy val byteBuddyAgent = "net.bytebuddy" % "byte-buddy-agent" % byteBuddyAgentVersion @@ -756,6 +758,7 @@ lazy val storage = project Docker / packageName := "nexus-storage", libraryDependencies ++= Seq( apacheCompress, + apacheIO, akkaHttp, akkaHttpCirce, akkaStream, diff --git a/delta/kernel/src/main/scala/ch/epfl/bluebrain/nexus/delta/kernel/utils/TransactionalFileCopier.scala b/delta/kernel/src/main/scala/ch/epfl/bluebrain/nexus/delta/kernel/utils/TransactionalFileCopier.scala index 254d5b5273..e343354485 100644 --- a/delta/kernel/src/main/scala/ch/epfl/bluebrain/nexus/delta/kernel/utils/TransactionalFileCopier.scala +++ b/delta/kernel/src/main/scala/ch/epfl/bluebrain/nexus/delta/kernel/utils/TransactionalFileCopier.scala @@ -6,12 +6,16 @@ import cats.implicits._ import ch.epfl.bluebrain.nexus.delta.kernel.Logger import ch.epfl.bluebrain.nexus.delta.kernel.error.Rejection import fs2.io.file.{CopyFlag, CopyFlags, Files, Path} +import java.nio.file.{Path => JPath} trait TransactionalFileCopier { def copyAll(files: NonEmptyList[CopyBetween]): IO[Unit] } final case class CopyBetween(source: Path, destination: Path) +object CopyBetween { + def mk(source: JPath, dest: JPath) = CopyBetween(Path.fromNioPath(source), Path.fromNioPath(dest)) +} final case class CopyOperationFailed(failingCopy: CopyBetween, e: Throwable) extends Rejection { override def reason: String = diff --git a/storage/src/main/resources/app.conf b/storage/src/main/resources/app.conf index c98e2f043b..76d1d3c168 100644 --- a/storage/src/main/resources/app.conf +++ b/storage/src/main/resources/app.conf @@ -41,6 +41,8 @@ app { # permissions fixer fixer-enabled = false fixer-command = [] + # if atomic move (e.g. mv or rename) isn't supported, link using a copy and delete instead + link-with-atomic-move = false } # Allows to define default media types for the given file extensions diff --git a/storage/src/main/scala/ch/epfl/bluebrain/nexus/storage/Main.scala b/storage/src/main/scala/ch/epfl/bluebrain/nexus/storage/Main.scala index 74e66b9e2b..2688434dc2 100644 --- a/storage/src/main/scala/ch/epfl/bluebrain/nexus/storage/Main.scala +++ b/storage/src/main/scala/ch/epfl/bluebrain/nexus/storage/Main.scala @@ -83,6 +83,8 @@ object Main extends IOApp { logger.warning("The application has been configured with anonymous, the caller will not be verified !") } + logger.info(s"==== Full configuration is $appConfig ====") + val routes: Route = Routes(storages) val httpBinding: Future[Http.ServerBinding] = { diff --git a/storage/src/main/scala/ch/epfl/bluebrain/nexus/storage/Storages.scala b/storage/src/main/scala/ch/epfl/bluebrain/nexus/storage/Storages.scala index 67c880f792..f0d46e822c 100644 --- a/storage/src/main/scala/ch/epfl/bluebrain/nexus/storage/Storages.scala +++ b/storage/src/main/scala/ch/epfl/bluebrain/nexus/storage/Storages.scala @@ -6,6 +6,7 @@ import akka.stream.alpakka.file.scaladsl.Directory import akka.stream.scaladsl.{FileIO, Keep} import cats.data.{EitherT, NonEmptyList} import cats.effect.IO +import ch.epfl.bluebrain.nexus.delta.kernel.Logger import ch.epfl.bluebrain.nexus.delta.kernel.utils.{CopyBetween, TransactionalFileCopier} import ch.epfl.bluebrain.nexus.storage.File._ import ch.epfl.bluebrain.nexus.storage.Rejection.PathNotFound @@ -18,10 +19,10 @@ import ch.epfl.bluebrain.nexus.storage.attributes.{AttributesCache, ContentTypeD import ch.epfl.bluebrain.nexus.storage.config.AppConfig.{DigestConfig, StorageConfig} import ch.epfl.bluebrain.nexus.storage.files.{CopyFileOutput, ValidateFile} import ch.epfl.bluebrain.nexus.storage.routes.CopyFile +import org.apache.commons.io.FileUtils import java.nio.file.StandardCopyOption._ import java.nio.file.{Files, Path} -import fs2.io.file.{Path => Fs2Path} import java.security.MessageDigest import scala.concurrent.{ExecutionContext, Future} import scala.sys.process._ @@ -159,9 +160,6 @@ object Storages { type PathDoesNotExist = PathDoesNotExist.type } - /** - * An Disk implementation of Storage interface. - */ final class DiskStorage( config: StorageConfig, contentTypeDetector: ContentTypeDetector, @@ -174,18 +172,35 @@ object Storages { mt: Materializer ) extends Storages[AkkaSource] { + private val log = Logger[DiskStorage] + private val linkWithAtomicMove = config.linkWithAtomicMove.getOrElse(true) + + private def logUnsafe(msg: String): Unit = { + import cats.effect.unsafe.implicits.global + log.info(msg).unsafeRunSync() + } + def exists(name: String): BucketExistence = { val path = basePath(config, name) - if (path.getParent.getParent != config.rootVolume) BucketDoesNotExist - else if (Files.isDirectory(path) && Files.isReadable(path)) BucketExists - else BucketDoesNotExist + logUnsafe(s"Checking bucket existence at path $path") + if (path.getParent.getParent != config.rootVolume) { + logUnsafe(s"Invalid bucket because the root volume is not two directories above $path") + BucketDoesNotExist + } else if (Files.isDirectory(path) && Files.isReadable(path)) BucketExists + else { + logUnsafe(s"Invalid bucket because $path is not a readable directory") + BucketDoesNotExist + } } def pathExists(name: String, path: Uri.Path): PathExistence = { val absPath = filePath(config, name, path) if (Files.exists(absPath) && Files.isReadable(absPath) && descendantOf(absPath, basePath(config, name))) PathExists - else PathDoesNotExist + else { + logUnsafe(s"Invalid absolute path $absPath for bucket $name and relative path $path") + PathDoesNotExist + } } def createFile( @@ -195,6 +210,7 @@ object Storages { )(implicit bucketEv: BucketExists, pathEv: PathDoesNotExist): IO[FileAttributes] = for { validated <- validateFile.forCreate(name, path) + _ <- log.info(s"Creating file in bucket $name at path $path") _ <- IO.blocking(Files.createDirectories(validated.absDestPath.getParent)) msgDigest <- IO.delay(MessageDigest.getInstance(digestConfig.algorithm)) attributes <- streamFileContents(source, path, validated.absDestPath, msgDigest) @@ -244,10 +260,11 @@ object Storages { val process = Process(config.fixerCommand :+ absPath) for { + _ <- log.info(s"Fixing permissions for file at $absPath") exitCode <- IO.blocking(process ! logger) _ <- IO.raiseUnless(exitCode == 0)(PermissionsFixingFailed(absPath, logger.toString)) } yield () - } else IO.unit + } else log.info(s"Not changing permissions for file at $path") private def computeSizeAndMoveFile( absSourcePath: Path, @@ -256,12 +273,26 @@ object Storages { ): IO[FileAttributes] = for { computedSize <- size(absSourcePath) - _ <- IO.blocking(Files.createDirectories(absDestPath.getParent)) - _ <- IO.blocking(Files.move(absSourcePath, absDestPath, ATOMIC_MOVE)) + msg = if (linkWithAtomicMove) "atomic move" else "copy and delete" + _ <- log.info(s"Performing link with $msg from $absSourcePath to $absDestPath") + _ <- if (linkWithAtomicMove) doMove(absSourcePath, absDestPath) + else doCopyAndDelete(absSourcePath, absDestPath, isDir) _ <- IO.delay(cache.asyncComputePut(absDestPath, digestConfig.algorithm)) mediaType <- IO.blocking(contentTypeDetector(absDestPath, isDir)) } yield FileAttributes(absDestPath.toAkkaUri, computedSize, Digest.empty, mediaType) + private def doMove(absSourcePath: Path, absDestPath: Path): IO[Unit] = + IO.blocking(Files.createDirectories(absDestPath.getParent)) >> + IO.blocking(Files.move(absSourcePath, absDestPath, ATOMIC_MOVE)).void + + private def doCopyAndDelete(absSourcePath: Path, absDestPath: Path, isDir: Boolean): IO[Unit] = + if (isDir) + IO.blocking(FileUtils.copyDirectory(absSourcePath.toFile, absDestPath.toFile)) >> + IO.blocking(FileUtils.deleteDirectory(absSourcePath.toFile)) + else + copyFiles.copyAll(NonEmptyList.of(CopyBetween.mk(absSourcePath, absDestPath))) >> + IO.blocking(Files.delete(absSourcePath)) + private def size(absPath: Path): IO[Long] = if (Files.isDirectory(absPath)) { IO.fromFuture(IO.delay(Directory.walk(absPath).filter(Files.isRegularFile(_)).runFold(0L)(_ + Files.size(_)))) @@ -279,8 +310,7 @@ object Storages { files.traverse(f => EitherT(validateFile.forCopyWithinProtectedDir(f.sourceBucket, destBucket, f.source, f.destination)) ) - copyBetween = - validated.map(v => CopyBetween(Fs2Path.fromNioPath(v.absSourcePath), Fs2Path.fromNioPath(v.absDestPath))) + copyBetween = validated.map(v => CopyBetween.mk(v.absSourcePath, v.absDestPath)) _ <- EitherT.right[Rejection](copyFiles.copyAll(copyBetween)) } yield files.zip(validated).map { case (raw, valid) => CopyFileOutput(raw.source, raw.destination, valid.absSourcePath, valid.absDestPath) @@ -293,7 +323,10 @@ object Storages { val absPath = filePath(config, name, path) if (Files.isRegularFile(absPath)) Right(fileSource(absPath) -> Some(absPath.getFileName.toString)) else if (Files.isDirectory(absPath)) Right(folderSource(absPath) -> None) - else Left(PathNotFound(name, path)) + else { + logUnsafe(s"Invalid absolute path $absPath for bucket $name and relative path $path") + Left(PathNotFound(name, path)) + } } def getAttributes( diff --git a/storage/src/main/scala/ch/epfl/bluebrain/nexus/storage/config/AppConfig.scala b/storage/src/main/scala/ch/epfl/bluebrain/nexus/storage/config/AppConfig.scala index 130c3d402f..7c40524cb6 100644 --- a/storage/src/main/scala/ch/epfl/bluebrain/nexus/storage/config/AppConfig.scala +++ b/storage/src/main/scala/ch/epfl/bluebrain/nexus/storage/config/AppConfig.scala @@ -89,7 +89,8 @@ object AppConfig { extraPrefixes: List[Path], protectedDirectory: Path, fixerEnabled: Boolean, - fixerCommand: Vector[String] + fixerCommand: Vector[String], + linkWithAtomicMove: Option[Boolean] ) /** diff --git a/storage/src/test/scala/ch/epfl/bluebrain/nexus/storage/DiskStorageSpec.scala b/storage/src/test/scala/ch/epfl/bluebrain/nexus/storage/DiskStorageSpec.scala index f9c1d33afb..a2ef4bc76a 100644 --- a/storage/src/test/scala/ch/epfl/bluebrain/nexus/storage/DiskStorageSpec.scala +++ b/storage/src/test/scala/ch/epfl/bluebrain/nexus/storage/DiskStorageSpec.scala @@ -47,13 +47,17 @@ class DiskStorageSpec val rootPath = Files.createTempDirectory("storage-test") val scratchPath = Files.createTempDirectory("scratch") - val sConfig = StorageConfig(rootPath, List(scratchPath), Paths.get("nexus"), fixerEnabled = true, Vector("/bin/echo")) + val sConfig = + StorageConfig(rootPath, List(scratchPath), Paths.get("nexus"), fixerEnabled = true, Vector("/bin/echo"), None) val dConfig = DigestConfig("SHA-256", 1L, 1, 1, 1.second) val contentTypeDetector = new ContentTypeDetector(MediaTypeDetectorConfig.Empty) val cache = mock[AttributesCache] val validateFile = ValidateFile.mk(sConfig) val copyFiles = TransactionalFileCopier.mk() - val storage = new DiskStorage(sConfig, contentTypeDetector, dConfig, cache, validateFile, copyFiles) + val storage = mkDiskStorage(sConfig) + + private def mkDiskStorage(cfg: StorageConfig) = + new DiskStorage(cfg, contentTypeDetector, dConfig, cache, validateFile, copyFiles) override def afterAll(): Unit = { Directory(rootPath.toFile).deleteRecursively() @@ -167,15 +171,7 @@ class DiskStorageSpec "fail when call to nexus-fixer fails" in new AbsoluteDirectoryCreated { val falseBinary = if (new File("/bin/false").exists()) "/bin/false" else "/usr/bin/false" - val badStorage = - new DiskStorage( - sConfig.copy(fixerCommand = Vector(falseBinary)), - contentTypeDetector, - dConfig, - cache, - validateFile, - copyFiles - ) + val badStorage = mkDiskStorage(sConfig.copy(fixerCommand = Vector(falseBinary))) val file = "some/folder/my !file.txt" val absoluteFile = baseRootPath.resolve(Paths.get(file)) Files.createDirectories(absoluteFile.getParent) @@ -262,72 +258,105 @@ class DiskStorageSpec result shouldEqual PathInvalid(name, Uri.Path(absoluteDir.toString)) } - "pass on file specified using a relative path" in new AbsoluteDirectoryCreated { - val file = "some/folder/my !file.txt" - val absoluteFile = baseRootPath.resolve(Paths.get(file.toString)) - Files.createDirectories(absoluteFile.getParent) + val linkingOptions: List[Option[Boolean]] = List(Option(true), Option(false), None) - val content = "some content" - Files.write(absoluteFile, content.getBytes(StandardCharsets.UTF_8)) + "pass on file specified using a relative path" in { + forAll(linkingOptions) { linkWithAtomicMove => + val diskStorage = mkDiskStorage(sConfig.copy(linkWithAtomicMove = linkWithAtomicMove)) + new AbsoluteDirectoryCreated { + val file = "some/folder/my !file.txt" + val absoluteFile = baseRootPath.resolve(Paths.get(file.toString)) + Files.createDirectories(absoluteFile.getParent) - storage.moveFile(name, Uri.Path(file), Uri.Path("some/other path.txt")).accepted.rightValue shouldEqual - FileAttributes(s"file://${basePath.resolve("some/other%20path.txt")}", 12L, Digest.empty, `text/plain(UTF-8)`) - Files.exists(absoluteFile) shouldEqual false - Files.exists(basePath.resolve("some/other path.txt")) shouldEqual true + val content = "some content" + Files.write(absoluteFile, content.getBytes(StandardCharsets.UTF_8)) + + diskStorage.moveFile(name, Uri.Path(file), Uri.Path("some/other path.txt")).accepted.rightValue shouldEqual + FileAttributes( + s"file://${basePath.resolve("some/other%20path.txt")}", + 12L, + Digest.empty, + `text/plain(UTF-8)` + ) + Files.exists(absoluteFile) shouldEqual false + Files.exists(basePath.resolve("some/other path.txt")) shouldEqual true + } + } } - "pass on file specified using an absolute path" in new AbsoluteDirectoryCreated { - val file = "some/folder/my !file.txt" - val absoluteFile = scratchPath.resolve(Paths.get(file)) - Files.createDirectories(absoluteFile.getParent) - - val content = "some content" - Files.write(absoluteFile, content.getBytes(StandardCharsets.UTF_8)) - - storage - .moveFile(name, Uri.Path(absoluteFile.toString), Uri.Path("some/other path.txt")) - .accepted - .rightValue shouldEqual - FileAttributes(s"file://${basePath.resolve("some/other%20path.txt")}", 12L, Digest.empty, `text/plain(UTF-8)`) + "pass on file specified using an absolute path" in { + forAll(linkingOptions) { linkWithAtomicMove => + val diskStorage = mkDiskStorage(sConfig.copy(linkWithAtomicMove = linkWithAtomicMove)) + new AbsoluteDirectoryCreated { + val file = "some/folder/my !file.txt" + val absoluteFile = scratchPath.resolve(Paths.get(file)) + Files.createDirectories(absoluteFile.getParent) - Files.exists(absoluteFile) shouldEqual false - Files.exists(basePath.resolve("some/other path.txt")) shouldEqual true + val content = "some content" + Files.write(absoluteFile, content.getBytes(StandardCharsets.UTF_8)) + + diskStorage + .moveFile(name, Uri.Path(absoluteFile.toString), Uri.Path("some/other path.txt")) + .accepted + .rightValue shouldEqual + FileAttributes( + s"file://${basePath.resolve("some/other%20path.txt")}", + 12L, + Digest.empty, + `text/plain(UTF-8)` + ) + + Files.exists(absoluteFile) shouldEqual false + Files.exists(basePath.resolve("some/other path.txt")) shouldEqual true + } + } } - "pass on directory specified with a relative path" in new AbsoluteDirectoryCreated { - val dir = "some/folder" - val absoluteDir = baseRootPath.resolve(Paths.get(dir.toString)) - Files.createDirectories(absoluteDir) - - val absoluteFile = absoluteDir.resolve(Paths.get("my !file.txt")) - val content = "some content" - Files.write(absoluteFile, content.getBytes(StandardCharsets.UTF_8)) - - val result = storage.moveFile(name, Uri.Path(dir), Uri.Path("some/other")).accepted.rightValue - val resolvedDir = basePath.resolve("some/other") - result shouldEqual FileAttributes(s"file://$resolvedDir", 12L, Digest.empty, `application/x-tar`) - Files.exists(absoluteDir) shouldEqual false - Files.exists(absoluteFile) shouldEqual false - Files.exists(resolvedDir) shouldEqual true - Files.exists(basePath.resolve("some/other/my !file.txt")) shouldEqual true + "pass on directory specified with a relative path" in { + forAll(linkingOptions) { linkWithAtomicMove => + val diskStorage = mkDiskStorage(sConfig.copy(linkWithAtomicMove = linkWithAtomicMove)) + new AbsoluteDirectoryCreated { + val dir = "some/folder" + val absoluteDir = baseRootPath.resolve(Paths.get(dir.toString)) + Files.createDirectories(absoluteDir) + + val absoluteFile = absoluteDir.resolve(Paths.get("my !file.txt")) + val content = "some content" + Files.write(absoluteFile, content.getBytes(StandardCharsets.UTF_8)) + + val result = diskStorage.moveFile(name, Uri.Path(dir), Uri.Path("some/other")).accepted.rightValue + val resolvedDir = basePath.resolve("some/other") + result shouldEqual FileAttributes(s"file://$resolvedDir", 12L, Digest.empty, `application/x-tar`) + Files.exists(absoluteDir) shouldEqual false + Files.exists(absoluteFile) shouldEqual false + Files.exists(resolvedDir) shouldEqual true + Files.exists(basePath.resolve("some/other/my !file.txt")) shouldEqual true + } + } } - "pass on directory specified with an absolute path" in new AbsoluteDirectoryCreated { - val dir = "some/folder" - val absoluteDir = scratchPath.resolve(Paths.get(dir.toString)) - Files.createDirectories(absoluteDir) - - val absoluteFile = absoluteDir.resolve(Paths.get("my !file.txt")) - val content = "some content" - Files.write(absoluteFile, content.getBytes(StandardCharsets.UTF_8)) - - val result = storage.moveFile(name, Uri.Path(absoluteDir.toString), Uri.Path("some/other")).accepted.rightValue - val resolvedDir = basePath.resolve("some/other") - result shouldEqual FileAttributes(s"file://$resolvedDir", 12L, Digest.empty, `application/x-tar`) - Files.exists(absoluteDir) shouldEqual false - Files.exists(absoluteFile) shouldEqual false - Files.exists(resolvedDir) shouldEqual true - Files.exists(basePath.resolve("some/other/my !file.txt")) shouldEqual true + "pass on directory specified with an absolute path" in { + forAll(linkingOptions) { linkWithAtomicMove => + val diskStorage = mkDiskStorage(sConfig.copy(linkWithAtomicMove = linkWithAtomicMove)) + new AbsoluteDirectoryCreated { + val dir = "some/folder" + val absoluteDir = scratchPath.resolve(Paths.get(dir.toString)) + Files.createDirectories(absoluteDir) + + val absoluteFile = absoluteDir.resolve(Paths.get("my !file.txt")) + val content = "some content" + Files.write(absoluteFile, content.getBytes(StandardCharsets.UTF_8)) + + val result = + diskStorage.moveFile(name, Uri.Path(absoluteDir.toString), Uri.Path("some/other")).accepted.rightValue + val resolvedDir = basePath.resolve("some/other") + result shouldEqual FileAttributes(s"file://$resolvedDir", 12L, Digest.empty, `application/x-tar`) + Files.exists(absoluteDir) shouldEqual false + Files.exists(absoluteFile) shouldEqual false + Files.exists(resolvedDir) shouldEqual true + Files.exists(basePath.resolve("some/other/my !file.txt")) shouldEqual true + } + } } }