Skip to content

Commit

Permalink
Allow linking with copy/delete in storage service (#4728)
Browse files Browse the repository at this point in the history
  • Loading branch information
dantb authored Feb 9, 2024
1 parent 473aeb1 commit 9058232
Show file tree
Hide file tree
Showing 7 changed files with 157 additions and 83 deletions.
3 changes: 3 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ val akkaCorsVersion = "1.2.0"
val akkaVersion = "2.6.21"
val alpakkaVersion = "3.0.4"
val apacheCompressVersion = "1.25.0"
val apacheIOVersion = "2.15.1"
val awsSdkVersion = "2.17.184"
val byteBuddyAgentVersion = "1.10.17"
val betterMonadicForVersion = "0.3.1"
Expand Down Expand Up @@ -74,6 +75,7 @@ lazy val alpakkaFile = "com.lightbend.akka" %% "akka-stream-alp
lazy val alpakkaSse = "com.lightbend.akka" %% "akka-stream-alpakka-sse" % alpakkaVersion
lazy val alpakkaS3 = "com.lightbend.akka" %% "akka-stream-alpakka-s3" % alpakkaVersion
lazy val apacheCompress = "org.apache.commons" % "commons-compress" % apacheCompressVersion
lazy val apacheIO = "commons-io" % "commons-io" % apacheIOVersion
lazy val awsSdk = "software.amazon.awssdk" % "s3" % awsSdkVersion
lazy val betterMonadicFor = "com.olegpy" %% "better-monadic-for" % betterMonadicForVersion
lazy val byteBuddyAgent = "net.bytebuddy" % "byte-buddy-agent" % byteBuddyAgentVersion
Expand Down Expand Up @@ -756,6 +758,7 @@ lazy val storage = project
Docker / packageName := "nexus-storage",
libraryDependencies ++= Seq(
apacheCompress,
apacheIO,
akkaHttp,
akkaHttpCirce,
akkaStream,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,16 @@ import cats.implicits._
import ch.epfl.bluebrain.nexus.delta.kernel.Logger
import ch.epfl.bluebrain.nexus.delta.kernel.error.Rejection
import fs2.io.file.{CopyFlag, CopyFlags, Files, Path}
import java.nio.file.{Path => JPath}

trait TransactionalFileCopier {
def copyAll(files: NonEmptyList[CopyBetween]): IO[Unit]
}

final case class CopyBetween(source: Path, destination: Path)
object CopyBetween {
def mk(source: JPath, dest: JPath) = CopyBetween(Path.fromNioPath(source), Path.fromNioPath(dest))
}

final case class CopyOperationFailed(failingCopy: CopyBetween, e: Throwable) extends Rejection {
override def reason: String =
Expand Down
2 changes: 2 additions & 0 deletions storage/src/main/resources/app.conf
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ app {
# permissions fixer
fixer-enabled = false
fixer-command = []
# if atomic move (e.g. mv or rename) isn't supported, link using a copy and delete instead
link-with-atomic-move = false
}

# Allows to define default media types for the given file extensions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ object Main extends IOApp {
logger.warning("The application has been configured with anonymous, the caller will not be verified !")
}

logger.info(s"==== Full configuration is $appConfig ====")

val routes: Route = Routes(storages)

val httpBinding: Future[Http.ServerBinding] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import akka.stream.alpakka.file.scaladsl.Directory
import akka.stream.scaladsl.{FileIO, Keep}
import cats.data.{EitherT, NonEmptyList}
import cats.effect.IO
import ch.epfl.bluebrain.nexus.delta.kernel.Logger
import ch.epfl.bluebrain.nexus.delta.kernel.utils.{CopyBetween, TransactionalFileCopier}
import ch.epfl.bluebrain.nexus.storage.File._
import ch.epfl.bluebrain.nexus.storage.Rejection.PathNotFound
Expand All @@ -18,10 +19,10 @@ import ch.epfl.bluebrain.nexus.storage.attributes.{AttributesCache, ContentTypeD
import ch.epfl.bluebrain.nexus.storage.config.AppConfig.{DigestConfig, StorageConfig}
import ch.epfl.bluebrain.nexus.storage.files.{CopyFileOutput, ValidateFile}
import ch.epfl.bluebrain.nexus.storage.routes.CopyFile
import org.apache.commons.io.FileUtils

import java.nio.file.StandardCopyOption._
import java.nio.file.{Files, Path}
import fs2.io.file.{Path => Fs2Path}
import java.security.MessageDigest
import scala.concurrent.{ExecutionContext, Future}
import scala.sys.process._
Expand Down Expand Up @@ -159,9 +160,6 @@ object Storages {
type PathDoesNotExist = PathDoesNotExist.type
}

/**
* An Disk implementation of Storage interface.
*/
final class DiskStorage(
config: StorageConfig,
contentTypeDetector: ContentTypeDetector,
Expand All @@ -174,18 +172,35 @@ object Storages {
mt: Materializer
) extends Storages[AkkaSource] {

private val log = Logger[DiskStorage]
private val linkWithAtomicMove = config.linkWithAtomicMove.getOrElse(true)

private def logUnsafe(msg: String): Unit = {
import cats.effect.unsafe.implicits.global
log.info(msg).unsafeRunSync()
}

def exists(name: String): BucketExistence = {
val path = basePath(config, name)
if (path.getParent.getParent != config.rootVolume) BucketDoesNotExist
else if (Files.isDirectory(path) && Files.isReadable(path)) BucketExists
else BucketDoesNotExist
logUnsafe(s"Checking bucket existence at path $path")
if (path.getParent.getParent != config.rootVolume) {
logUnsafe(s"Invalid bucket because the root volume is not two directories above $path")
BucketDoesNotExist
} else if (Files.isDirectory(path) && Files.isReadable(path)) BucketExists
else {
logUnsafe(s"Invalid bucket because $path is not a readable directory")
BucketDoesNotExist
}
}

def pathExists(name: String, path: Uri.Path): PathExistence = {
val absPath = filePath(config, name, path)
if (Files.exists(absPath) && Files.isReadable(absPath) && descendantOf(absPath, basePath(config, name)))
PathExists
else PathDoesNotExist
else {
logUnsafe(s"Invalid absolute path $absPath for bucket $name and relative path $path")
PathDoesNotExist
}
}

def createFile(
Expand All @@ -195,6 +210,7 @@ object Storages {
)(implicit bucketEv: BucketExists, pathEv: PathDoesNotExist): IO[FileAttributes] =
for {
validated <- validateFile.forCreate(name, path)
_ <- log.info(s"Creating file in bucket $name at path $path")
_ <- IO.blocking(Files.createDirectories(validated.absDestPath.getParent))
msgDigest <- IO.delay(MessageDigest.getInstance(digestConfig.algorithm))
attributes <- streamFileContents(source, path, validated.absDestPath, msgDigest)
Expand Down Expand Up @@ -244,10 +260,11 @@ object Storages {
val process = Process(config.fixerCommand :+ absPath)

for {
_ <- log.info(s"Fixing permissions for file at $absPath")
exitCode <- IO.blocking(process ! logger)
_ <- IO.raiseUnless(exitCode == 0)(PermissionsFixingFailed(absPath, logger.toString))
} yield ()
} else IO.unit
} else log.info(s"Not changing permissions for file at $path")

private def computeSizeAndMoveFile(
absSourcePath: Path,
Expand All @@ -256,12 +273,26 @@ object Storages {
): IO[FileAttributes] =
for {
computedSize <- size(absSourcePath)
_ <- IO.blocking(Files.createDirectories(absDestPath.getParent))
_ <- IO.blocking(Files.move(absSourcePath, absDestPath, ATOMIC_MOVE))
msg = if (linkWithAtomicMove) "atomic move" else "copy and delete"
_ <- log.info(s"Performing link with $msg from $absSourcePath to $absDestPath")
_ <- if (linkWithAtomicMove) doMove(absSourcePath, absDestPath)
else doCopyAndDelete(absSourcePath, absDestPath, isDir)
_ <- IO.delay(cache.asyncComputePut(absDestPath, digestConfig.algorithm))
mediaType <- IO.blocking(contentTypeDetector(absDestPath, isDir))
} yield FileAttributes(absDestPath.toAkkaUri, computedSize, Digest.empty, mediaType)

private def doMove(absSourcePath: Path, absDestPath: Path): IO[Unit] =
IO.blocking(Files.createDirectories(absDestPath.getParent)) >>
IO.blocking(Files.move(absSourcePath, absDestPath, ATOMIC_MOVE)).void

private def doCopyAndDelete(absSourcePath: Path, absDestPath: Path, isDir: Boolean): IO[Unit] =
if (isDir)
IO.blocking(FileUtils.copyDirectory(absSourcePath.toFile, absDestPath.toFile)) >>
IO.blocking(FileUtils.deleteDirectory(absSourcePath.toFile))
else
copyFiles.copyAll(NonEmptyList.of(CopyBetween.mk(absSourcePath, absDestPath))) >>
IO.blocking(Files.delete(absSourcePath))

private def size(absPath: Path): IO[Long] =
if (Files.isDirectory(absPath)) {
IO.fromFuture(IO.delay(Directory.walk(absPath).filter(Files.isRegularFile(_)).runFold(0L)(_ + Files.size(_))))
Expand All @@ -279,8 +310,7 @@ object Storages {
files.traverse(f =>
EitherT(validateFile.forCopyWithinProtectedDir(f.sourceBucket, destBucket, f.source, f.destination))
)
copyBetween =
validated.map(v => CopyBetween(Fs2Path.fromNioPath(v.absSourcePath), Fs2Path.fromNioPath(v.absDestPath)))
copyBetween = validated.map(v => CopyBetween.mk(v.absSourcePath, v.absDestPath))
_ <- EitherT.right[Rejection](copyFiles.copyAll(copyBetween))
} yield files.zip(validated).map { case (raw, valid) =>
CopyFileOutput(raw.source, raw.destination, valid.absSourcePath, valid.absDestPath)
Expand All @@ -293,7 +323,10 @@ object Storages {
val absPath = filePath(config, name, path)
if (Files.isRegularFile(absPath)) Right(fileSource(absPath) -> Some(absPath.getFileName.toString))
else if (Files.isDirectory(absPath)) Right(folderSource(absPath) -> None)
else Left(PathNotFound(name, path))
else {
logUnsafe(s"Invalid absolute path $absPath for bucket $name and relative path $path")
Left(PathNotFound(name, path))
}
}

def getAttributes(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,8 @@ object AppConfig {
extraPrefixes: List[Path],
protectedDirectory: Path,
fixerEnabled: Boolean,
fixerCommand: Vector[String]
fixerCommand: Vector[String],
linkWithAtomicMove: Option[Boolean]
)

/**
Expand Down
Loading

0 comments on commit 9058232

Please sign in to comment.