Skip to content

Commit

Permalink
Add FileProcessor
Browse files Browse the repository at this point in the history
  • Loading branch information
olivergrabinski committed Apr 22, 2024
1 parent a404d42 commit 1e163ba
Show file tree
Hide file tree
Showing 20 changed files with 456 additions and 80 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,10 @@ object StorageScopeInitialization {
* Creates a [[StorageScopeInitialization]] that creates a default S3Storage with the provided default fields
*/
def s3(
storage: Storages,
storages: Storages,
serviceAccount: ServiceAccount,
defaultFields: S3StorageFields
): StorageScopeInitialization =
new StorageScopeInitialization(storage, serviceAccount, defaultS3StorageId, defaultFields)
new StorageScopeInitialization(storages, serviceAccount, defaultS3StorageId, defaultFields)

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri
import scala.concurrent.{ExecutionContext, Future}
import scala.util.Try

sealed trait FormDataExtractor {
trait FormDataExtractor {

/**
* Extracts the part with fieldName ''file'' from the passed ''entity'' MultiPart/FormData. Any other part is
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,10 @@ trait S3FileOperations {
object S3FileOperations {
final case class S3FileMetadata(contentType: ContentType, metadata: FileStorageMetadata)

private val log = Logger[S3FileOperations]

def mk(client: S3StorageClient)(implicit as: ActorSystem, uuidf: UUIDF): S3FileOperations = new S3FileOperations {

private val log = Logger[S3FileOperations]
private lazy val saveFile = new S3StorageSaveFile(client)

override def checkBucketExists(bucket: String): IO[Unit] =
Expand Down Expand Up @@ -76,47 +77,61 @@ object S3FileOperations {
override def save(storage: S3Storage, filename: String, entity: BodyPartEntity): IO[FileStorageMetadata] =
saveFile.apply(storage, filename, entity)

override def register(bucket: String, path: Uri.Path): IO[S3FileMetadata] = {
for {
_ <- log.info(s"Fetching attributes for S3 file. Bucket $bucket at path $path")
resp <- client.headObject(bucket, path.toString())
contentType <- parseContentType(resp.contentType())
metadata <- mkS3Metadata(bucket, path, resp, contentType)
} yield metadata
}
.onError { e =>
log.error(e)(s"Failed fetching required attributes for S3 file registration. Bucket $bucket and path $path")
}
override def register(bucket: String, path: Uri.Path): IO[S3FileMetadata] =
registerInternal(client, bucket, path)

private def parseContentType(raw: String): IO[ContentType] =
ContentType.parse(raw).map(_.pure[IO]).getOrElse(IO.raiseError(InvalidContentType(raw)))

private def mkS3Metadata(bucket: String, path: Uri.Path, resp: HeadObjectResponse, ct: ContentType) = {
for {
uuid <- uuidf()
checksum <- checksumFrom(resp)
} yield S3FileMetadata(
ct,
FileStorageMetadata(
uuid,
resp.contentLength(),
checksum,
FileAttributesOrigin.External,
client.baseEndpoint / bucket / path,
path
)
)
}

def registerInternal(client: S3StorageClient, bucket: String, path: Uri.Path)(implicit
uuidF: UUIDF
): IO[S3FileMetadata] = {
for {
_ <- log.info(s"Fetching attributes for S3 file. Bucket $bucket at path $path")
resp <- client.headObject(bucket, path.toString())
contentType <- parseContentType(resp.contentType())
metadata <- mkS3Metadata(client, bucket, path, resp, contentType)
} yield metadata
}
.onError { e =>
log.error(e)(s"Failed fetching required attributes for S3 file registration. Bucket $bucket and path $path")
}

private def checksumFrom(response: HeadObjectResponse) = IO.fromOption {
Option(response.checksumSHA256())
.map { checksum =>
Digest.ComputedDigest(
DigestAlgorithm.default,
Hex.encodeHexString(Base64.getDecoder.decode(checksum))
)
}
}(new IllegalArgumentException("Missing checksum"))
private def parseContentType(raw: String): IO[ContentType] =
ContentType.parse(raw).map(_.pure[IO]).getOrElse(IO.raiseError(InvalidContentType(raw)))

private def mkS3Metadata(
client: S3StorageClient,
bucket: String,
path: Uri.Path,
resp: HeadObjectResponse,
ct: ContentType
)(implicit
uuidf: UUIDF
) = {
for {
uuid <- uuidf()
checksum <- checksumFrom(resp)
} yield S3FileMetadata(
ct,
FileStorageMetadata(
uuid,
resp.contentLength(),
checksum,
FileAttributesOrigin.External,
client.baseEndpoint / bucket / path,
path
)
)
}

private def checksumFrom(response: HeadObjectResponse) = IO.fromOption {
Option(response.checksumSHA256())
.map { checksum =>
Digest.ComputedDigest(
DigestAlgorithm.default,
Hex.encodeHexString(Base64.getDecoder.decode(checksum))
)
}
}(new IllegalArgumentException("Missing checksum"))

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,16 @@ import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.DigestAlgori
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.client.S3StorageClient.UploadMetadata
import ch.epfl.bluebrain.nexus.delta.rdf.syntax.uriSyntax
import ch.epfl.bluebrain.nexus.delta.sdk.error.ServiceError.FeatureDisabled
import fs2.{Chunk, Pipe, Stream}
import fs2.aws.s3.S3
import fs2.aws.s3.models.Models.{BucketName, FileKey}
import fs2.{Chunk, Pipe, Stream}
import io.laserdisc.pure.s3.tagless.{Interpreter, S3AsyncClientOp}
import org.apache.commons.codec.binary.Hex
import software.amazon.awssdk.auth.credentials.{AwsBasicCredentials, AwsCredentialsProvider, StaticCredentialsProvider}
import software.amazon.awssdk.core.async.AsyncRequestBody
import software.amazon.awssdk.regions.Region
import software.amazon.awssdk.services.s3.S3AsyncClient
import software.amazon.awssdk.services.s3.model.{ChecksumAlgorithm, ChecksumMode, HeadObjectRequest, HeadObjectResponse, ListObjectsV2Request, ListObjectsV2Response, NoSuchKeyException, PutObjectRequest, PutObjectResponse}
import software.amazon.awssdk.services.s3.model._

import java.net.URI
import java.util.Base64
Expand All @@ -34,6 +34,13 @@ trait S3StorageClient {

def headObject(bucket: String, key: String): IO[HeadObjectResponse]

def copyObject(
sourceBucket: BucketName,
sourceKey: FileKey,
destinationBucket: BucketName,
destinationKey: FileKey
): IO[CopyObjectResponse]

def uploadFile(
fileData: Stream[IO, Byte],
bucket: String,
Expand Down Expand Up @@ -88,6 +95,23 @@ object S3StorageClient {
override def headObject(bucket: String, key: String): IO[HeadObjectResponse] =
client.headObject(HeadObjectRequest.builder().bucket(bucket).key(key).checksumMode(ChecksumMode.ENABLED).build)

override def copyObject(
sourceBucket: BucketName,
sourceKey: FileKey,
destinationBucket: BucketName,
destinationKey: FileKey
): IO[CopyObjectResponse] =
client.copyObject(
CopyObjectRequest
.builder()
.sourceBucket(sourceBucket.value.value)
.sourceKey(sourceKey.value.value)
.destinationBucket(destinationBucket.value.value)
.destinationKey(destinationKey.value.value)
.checksumAlgorithm(ChecksumAlgorithm.SHA256) // TODO: See what to do with this
.build()
)

override def objectExists(bucket: String, key: String): IO[Boolean] = {
headObject(bucket, key)
.redeemWith(
Expand Down Expand Up @@ -173,6 +197,13 @@ object S3StorageClient {

override def baseEndpoint: Uri = throw disabledErr

override def copyObject(
sourceBucket: BucketName,
sourceKey: FileKey,
destinationBucket: BucketName,
destinationKey: FileKey
): IO[CopyObjectResponse] = raiseDisabledErr

override def objectExists(bucket: String, key: String): IO[Boolean] = raiseDisabledErr

override def uploadFile(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,18 @@ import java.nio.file.Paths
object LocalStackS3StorageClient {
val ServiceType = Service.S3

def createBucket(s3Client: S3AsyncClientOp[IO], bucket: BucketName) =
s3Client.createBucket(CreateBucketRequest.builder().bucket(bucket.value.value).build)

def uploadFileToS3(s3Client: S3AsyncClientOp[IO], bucket: BucketName, path: Path): IO[PutObjectResponse] = {
s3Client.createBucket(CreateBucketRequest.builder().bucket(bucket.value.value).build) >>
val absoluteResourcePath = if (path.isAbsolute) path else Path("/" + path.toString)
createBucket(s3Client, bucket) >>
s3Client.putObject(
PutObjectRequest.builder.bucket(bucket.value.value).key(path.toString).build,
Paths.get(getClass.getResource(path.toString).toURI)
PutObjectRequest.builder
.bucket(bucket.value.value)
.key(path.toString)
.build,
Paths.get(getClass.getResource(absoluteResourcePath.toString).toURI)
)
}

Expand Down
1 change: 1 addition & 0 deletions ship/src/main/resources/ship-default.conf
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ ship {
realm: "internal"
}

import-bucket = ${ship.s-3.import-bucket}
# The bucket to which the files will be copied by the Nexus Ship
target-bucket = "nexus-delta-production"

Expand Down
39 changes: 19 additions & 20 deletions ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/InitShip.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,31 +10,30 @@ import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider

object InitShip {

def apply(run: RunCommand): Resource[IO, (ShipConfig, fs2.Stream[IO, RowEvent], Transactors)] =
Resource.eval(configAndStream(run)).flatMap { case (config, eventStream) =>
def apply(run: RunCommand): Resource[IO, (ShipConfig, fs2.Stream[IO, RowEvent], S3StorageClient, Transactors)] =
Resource.eval(configAndStream(run)).flatMap { case (config, eventStream, s3Client) =>
Transactors
.init(config.database)
.map { xas => (config, eventStream, xas) }
.map { xas => (config, eventStream, s3Client, xas) }
}

private def configAndStream(run: RunCommand): IO[(ShipConfig, fs2.Stream[IO, RowEvent])] = run.mode match {
case RunMode.Local =>
val eventsStream = EventStreamer.localStreamer.stream(run.path, run.offset)
ShipConfig.load(run.config).map(_ -> eventsStream)
case RunMode.S3 =>
for {
localConfig <- ShipConfig.load(None)
s3Config = localConfig.s3
(config, eventsStream) <-
S3StorageClient.resource(s3Config.endpoint, DefaultCredentialsProvider.create()).use { client =>
val eventsStream = EventStreamer.s3eventStreamer(client, s3Config.importBucket).stream(run.path, run.offset)
private def configAndStream(run: RunCommand): IO[(ShipConfig, fs2.Stream[IO, RowEvent], S3StorageClient)] = {
ShipConfig.load(run.config).flatMap { shipConfig =>
S3StorageClient.resource(shipConfig.s3.endpoint, DefaultCredentialsProvider.create()).use { s3Client =>
run.mode match {
case RunMode.Local =>
val eventsStream = EventStreamer.localStreamer.stream(run.path, run.offset)
ShipConfig.load(run.config).map((_, eventsStream, s3Client))
case RunMode.S3 =>
val eventsStream =
EventStreamer.s3eventStreamer(s3Client, shipConfig.s3.importBucket).stream(run.path, run.offset)
val config = run.config match {
case Some(configPath) => ShipConfig.loadFromS3(client, s3Config.importBucket, configPath)
case None => IO.pure(localConfig)
case Some(configPath) => ShipConfig.loadFromS3(s3Client, shipConfig.s3.importBucket, configPath)
case None => IO.pure(shipConfig)
}
config.map(_ -> eventsStream)
}
} yield (config, eventsStream)
config.map((_, eventsStream, s3Client))
}
}
}
}

}
4 changes: 2 additions & 2 deletions ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/Main.scala
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,10 @@ object Main

private[ship] def run(r: RunCommand): IO[Unit] = {
val clock = Clock[IO]
InitShip(r).use { case (config, eventsStream, xas) =>
InitShip(r).use { case (config, eventsStream, s3Client, xas) =>
for {
start <- clock.realTimeInstant
reportOrError <- RunShip(eventsStream, config.input, xas).attempt
reportOrError <- RunShip(eventsStream, s3Client, config.input, xas).attempt
end <- clock.realTimeInstant
_ <- ShipSummaryStore.save(xas, start, end, r, reportOrError)
_ <- IO.fromEither(reportOrError)
Expand Down
13 changes: 11 additions & 2 deletions ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/RunShip.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package ch.epfl.bluebrain.nexus.ship

import cats.effect.{Clock, IO}
import ch.epfl.bluebrain.nexus.delta.kernel.utils.UUIDF
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.client.S3StorageClient
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.api.{JsonLdApi, JsonLdJavaApi}
import ch.epfl.bluebrain.nexus.delta.rdf.shacl.ValidateShacl
import ch.epfl.bluebrain.nexus.delta.sdk.organizations.FetchActiveOrganization
Expand All @@ -11,6 +12,7 @@ import ch.epfl.bluebrain.nexus.delta.sdk.quotas.Quotas
import ch.epfl.bluebrain.nexus.delta.sourcing.Transactors
import ch.epfl.bluebrain.nexus.delta.sourcing.exporter.RowEvent
import ch.epfl.bluebrain.nexus.ship.config.InputConfig
import ch.epfl.bluebrain.nexus.ship.files.FileProcessor
import ch.epfl.bluebrain.nexus.ship.organizations.OrganizationProvider
import ch.epfl.bluebrain.nexus.ship.projects.ProjectProcessor
import ch.epfl.bluebrain.nexus.ship.resolvers.ResolverProcessor
Expand All @@ -21,7 +23,12 @@ import fs2.Stream

object RunShip {

def apply(eventsStream: Stream[IO, RowEvent], config: InputConfig, xas: Transactors): IO[ImportReport] = {
def apply(
eventsStream: Stream[IO, RowEvent],
s3Client: S3StorageClient,
config: InputConfig,
xas: Transactors
): IO[ImportReport] = {
val clock = Clock[IO]
val uuidF = UUIDF.random
// Resources may have been created with different configurations so we adopt the lenient one for the import
Expand Down Expand Up @@ -55,6 +62,7 @@ object RunShip {
esViewsProcessor = ElasticSearchViewProcessor(fetchContext, rcr, projectMapper, eventLogConfig, eventClock, xas)
bgViewsProcessor = BlazegraphViewProcessor(fetchContext, rcr, projectMapper, eventLogConfig, eventClock, xas)
compositeViewsProcessor = CompositeViewProcessor(fetchContext, rcr, projectMapper, eventLogConfig, eventClock, xas)
fileProcessor = FileProcessor(fetchContext, s3Client, projectMapper, rcr, config, eventClock, xas)
// format: on
report <- EventProcessor
.run(
Expand All @@ -65,7 +73,8 @@ object RunShip {
resourceProcessor,
esViewsProcessor,
bgViewsProcessor,
compositeViewsProcessor
compositeViewsProcessor,
fileProcessor
)
} yield report
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ final case class InputConfig(
viewDefaults: ViewDefaults,
serviceAccount: ServiceAccountConfig,
storages: StoragesConfig,
importBucket: BucketName,
targetBucket: BucketName
)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package ch.epfl.bluebrain.nexus.ship.files

import akka.http.scaladsl.model.Uri
import cats.effect.IO
import cats.implicits._
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.client.S3StorageClient
import eu.timepit.refined.collection.NonEmpty
import eu.timepit.refined.refineV
import fs2.aws.s3.models.Models.{BucketName, FileKey}

trait FileCopier {

def copyFile(path: Uri.Path): IO[Unit]

}

object FileCopier {

def apply(
s3StorageClient: S3StorageClient,
importBucket: BucketName,
targetBucket: BucketName
): FileCopier =
(path: Uri.Path) => {
def refineString(str: String) =
refineV[NonEmpty](str).leftMap(e => new IllegalArgumentException(e))

val fileKey = IO.fromEither(refineString(path.toString).map(FileKey))

fileKey.flatMap { key =>
s3StorageClient.copyObject(importBucket, key, targetBucket, key)
}.void
}

def apply(): FileCopier =
(_: Uri.Path) => IO.unit

}
Loading

0 comments on commit 1e163ba

Please sign in to comment.