Skip to content

Commit

Permalink
Merge branch 'master' into ship-file-processor
Browse files Browse the repository at this point in the history
  • Loading branch information
olivergrabinski authored Apr 23, 2024
2 parents e12b69d + 090a78e commit ceca8e3
Show file tree
Hide file tree
Showing 23 changed files with 325 additions and 36 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -754,7 +754,7 @@ lazy val ship = project
tests % "test->compile;test->test"
)
.settings(
libraryDependencies ++= Seq(declineEffect, logback, fs2Aws, fs2AwsS3),
libraryDependencies ++= Seq(declineEffect, logback, circeOptics),
addCompilerPlugin(betterMonadicFor),
run / fork := true,
buildInfoKeys := Seq[BuildInfoKey](version),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@ import cats.effect.IO
import cats.implicits._
import cats.effect.unsafe.implicits._
import ch.epfl.bluebrain.nexus.delta.kernel.Logger
import ch.epfl.bluebrain.nexus.delta.plugins.archive.FileSelf.ParsingError
import ch.epfl.bluebrain.nexus.delta.plugins.storage.FileSelf.ParsingError
import ch.epfl.bluebrain.nexus.delta.plugins.archive.model.ArchiveReference.{FileReference, FileSelfReference, ResourceReference}
import ch.epfl.bluebrain.nexus.delta.plugins.archive.model.ArchiveRejection._
import ch.epfl.bluebrain.nexus.delta.plugins.archive.model._
import ch.epfl.bluebrain.nexus.delta.plugins.storage.FileSelf
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.Files
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.{FileId, FileRejection}
import ch.epfl.bluebrain.nexus.delta.rdf.implicits._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import cats.effect.{Clock, IO}
import ch.epfl.bluebrain.nexus.delta.kernel.utils.{ClasspathResourceLoader, UUIDF}
import ch.epfl.bluebrain.nexus.delta.plugins.archive.model.contexts
import ch.epfl.bluebrain.nexus.delta.plugins.archive.routes.ArchiveRoutes
import ch.epfl.bluebrain.nexus.delta.plugins.storage.FileSelf
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.Files
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.api.JsonLdApi
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.{ContextValue, RemoteContextResolution}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package ch.epfl.bluebrain.nexus.delta.plugins.archive.model
import akka.http.scaladsl.model.StatusCodes
import ch.epfl.bluebrain.nexus.delta.kernel.error.Rejection
import ch.epfl.bluebrain.nexus.delta.kernel.utils.ClassUtils
import ch.epfl.bluebrain.nexus.delta.plugins.archive.FileSelf
import ch.epfl.bluebrain.nexus.delta.plugins.storage.FileSelf
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.FileRejection
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.AbsolutePath
import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ import akka.util.ByteString
import cats.data.NonEmptySet
import cats.effect.IO
import ch.epfl.bluebrain.nexus.delta.kernel.utils.UrlUtils.encode
import ch.epfl.bluebrain.nexus.delta.plugins.archive.FileSelf.ParsingError
import ch.epfl.bluebrain.nexus.delta.plugins.storage.FileSelf.ParsingError
import ch.epfl.bluebrain.nexus.delta.plugins.archive.model.ArchiveReference.{FileReference, FileSelfReference, ResourceReference}
import ch.epfl.bluebrain.nexus.delta.plugins.archive.model.ArchiveRejection.{InvalidFileSelf, ResourceNotFound}
import ch.epfl.bluebrain.nexus.delta.plugins.archive.model.{ArchiveRejection, ArchiveValue}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.RemoteContextResolutionFixture
import ch.epfl.bluebrain.nexus.delta.plugins.storage.{FileSelf, RemoteContextResolutionFixture}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.generators.FileGen
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.FileAttributes.FileAttributesOrigin.Client
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.FileRejection.FileNotFound
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import akka.util.ByteString
import cats.effect.IO
import ch.epfl.bluebrain.nexus.delta.kernel.utils.UrlUtils.encode
import ch.epfl.bluebrain.nexus.delta.kernel.utils.{StatefulUUIDF, UUIDF}
import ch.epfl.bluebrain.nexus.delta.plugins.archive.FileSelf.ParsingError.InvalidPath
import ch.epfl.bluebrain.nexus.delta.plugins.storage.FileSelf.ParsingError.InvalidPath
import ch.epfl.bluebrain.nexus.delta.plugins.archive.routes.ArchiveRoutes
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.generators.FileGen
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.Digest.ComputedDigest
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package ch.epfl.bluebrain.nexus.delta.plugins.archive
package ch.epfl.bluebrain.nexus.delta.plugins.storage

import akka.http.scaladsl.model.Uri
import cats.effect.IO
import cats.syntax.all._
import ch.epfl.bluebrain.nexus.delta.kernel.utils.UrlUtils
import ch.epfl.bluebrain.nexus.delta.plugins.archive.FileSelf.ParsingError._
import ch.epfl.bluebrain.nexus.delta.plugins.storage.FileSelf.ParsingError._
import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri
import ch.epfl.bluebrain.nexus.delta.rdf.implicits._
import ch.epfl.bluebrain.nexus.delta.sdk.error.SDKError
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
package ch.epfl.bluebrain.nexus.delta.plugins.archive
package ch.epfl.bluebrain.nexus.delta.plugins.storage.files

import ch.epfl.bluebrain.nexus.delta.kernel.utils.UrlUtils.encode
import ch.epfl.bluebrain.nexus.delta.plugins.archive.FileSelf.ParsingError._
import ch.epfl.bluebrain.nexus.delta.plugins.storage.FileSelf
import ch.epfl.bluebrain.nexus.delta.plugins.storage.FileSelf.ParsingError._
import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri
import ch.epfl.bluebrain.nexus.delta.rdf.Vocabulary.nxv
import ch.epfl.bluebrain.nexus.delta.rdf.implicits._
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
DROP TABLE IF EXISTS public.ship_original_project_context;
DROP TABLE IF EXISTS public.ship_reports;
DROP TABLE IF EXISTS public.global_events;
DROP TABLE IF EXISTS public.global_states;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
CREATE TABLE IF NOT EXISTS public.ship_original_project_context(
org text NOT NULL,
project text NOT NULL,
context JSONB NOT NULL,
PRIMARY KEY(org, project)
)

3 changes: 2 additions & 1 deletion ship/src/main/resources/ship-default.conf
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ ship {
}

input {
base-uri = "http://localhost:8080/v1"
original-base-uri = "http://localhost:8080/v1"
target-base-uri = "http://localhost:8080/v1"

event-log {
query-config = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ import ch.epfl.bluebrain.nexus.delta.kernel.utils.ClasspathResourceLoader
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.model.{contexts => bgContexts}
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.model.{contexts => compositeViewContexts}
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.model.{contexts => esContexts}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.{contexts => storageContext}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.{contexts => fileContext}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.{contexts => storageContext}
import ch.epfl.bluebrain.nexus.delta.rdf.Vocabulary
import ch.epfl.bluebrain.nexus.delta.rdf.Vocabulary.contexts
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.api.JsonLdApi
Expand Down
22 changes: 14 additions & 8 deletions ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/RunShip.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package ch.epfl.bluebrain.nexus.ship
import cats.effect.{Clock, IO}
import ch.epfl.bluebrain.nexus.delta.kernel.utils.UUIDF
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.client.S3StorageClient
import ch.epfl.bluebrain.nexus.delta.plugins.storage.FileSelf
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.api.{JsonLdApi, JsonLdJavaApi}
import ch.epfl.bluebrain.nexus.delta.rdf.shacl.ValidateShacl
import ch.epfl.bluebrain.nexus.delta.sdk.organizations.FetchActiveOrganization
Expand All @@ -16,8 +17,9 @@ import ch.epfl.bluebrain.nexus.ship.files.FileProcessor
import ch.epfl.bluebrain.nexus.ship.organizations.OrganizationProvider
import ch.epfl.bluebrain.nexus.ship.projects.ProjectProcessor
import ch.epfl.bluebrain.nexus.ship.resolvers.ResolverProcessor
import ch.epfl.bluebrain.nexus.ship.resources.{ResourceProcessor, ResourceWiring}
import ch.epfl.bluebrain.nexus.ship.resources.{DistributionPatcher, ResourceProcessor, ResourceWiring}
import ch.epfl.bluebrain.nexus.ship.schemas.{SchemaProcessor, SchemaWiring}
import ch.epfl.bluebrain.nexus.ship.projects.OriginalProjectContext
import ch.epfl.bluebrain.nexus.ship.views.{BlazegraphViewProcessor, CompositeViewProcessor, ElasticSearchViewProcessor}
import fs2.Stream

Expand All @@ -35,12 +37,14 @@ object RunShip {
implicit val jsonLdApi: JsonLdApi = JsonLdJavaApi.lenient
for {
report <- {
val orgProvider =
val orgProvider =
OrganizationProvider(config.eventLog, config.serviceAccount.value, xas, clock)(uuidF)
val fetchContext = FetchContext(ApiMappings.empty, xas, Quotas.disabled)
val eventLogConfig = config.eventLog
val baseUri = config.baseUri
val projectMapper = ProjectMapper(config.projectMapping)
val fetchContext = FetchContext(ApiMappings.empty, xas, Quotas.disabled)
val originalProjectContext = new OriginalProjectContext(xas)
val eventLogConfig = config.eventLog
val originalBaseUri = config.originalBaseUri
val targetBaseUri = config.targetBaseUri
val projectMapper = ProjectMapper(config.projectMapping)
for {
// Provision organizations
_ <- orgProvider.create(config.organizations.values)
Expand All @@ -55,10 +59,12 @@ object RunShip {
rcr = ContextWiring.resolverContextResolution(fetchResource, fetchContext, remoteContextResolution, eventLogConfig, eventClock, xas)
schemaImports = SchemaWiring.schemaImports(fetchResource, fetchSchema, fetchContext, eventLogConfig, eventClock, xas)
// Processors
projectProcessor <- ProjectProcessor(fetchActiveOrg, fetchContext, rcr, projectMapper, config, eventClock, xas)(baseUri, jsonLdApi)
projectProcessor <- ProjectProcessor(fetchActiveOrg, fetchContext, rcr, originalProjectContext, projectMapper, config, eventClock, xas)(targetBaseUri, jsonLdApi)
resolverProcessor = ResolverProcessor(fetchContext, projectMapper, eventLogConfig, eventClock, xas)
schemaProcessor = SchemaProcessor(schemaLog, fetchContext, schemaImports, rcr, projectMapper, eventClock)
resourceProcessor = ResourceProcessor(resourceLog, rcr, projectMapper, fetchContext, eventClock)
fileSelf = FileSelf(originalProjectContext)(originalBaseUri)
distributionPatcher = new DistributionPatcher(fileSelf, projectMapper, targetBaseUri)
resourceProcessor = ResourceProcessor(resourceLog, rcr, projectMapper, fetchContext, distributionPatcher, eventClock)
esViewsProcessor = ElasticSearchViewProcessor(fetchContext, rcr, projectMapper, eventLogConfig, eventClock, xas)
bgViewsProcessor = BlazegraphViewProcessor(fetchContext, rcr, projectMapper, eventLogConfig, eventClock, xas)
compositeViewsProcessor = CompositeViewProcessor(fetchContext, rcr, projectMapper, eventLogConfig, eventClock, xas)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ import pureconfig.error.{CannotConvert, FailureReason}
import pureconfig.generic.semiauto.deriveReader

final case class InputConfig(
baseUri: BaseUri,
originalBaseUri: BaseUri,
targetBaseUri: BaseUri,
eventLog: EventLogConfig,
organizations: OrganizationCreationConfig,
projectMapping: ProjectMapping = Map.empty,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package ch.epfl.bluebrain.nexus.ship.projects

import cats.effect.IO
import ch.epfl.bluebrain.nexus.delta.sdk.projects.FetchContext
import ch.epfl.bluebrain.nexus.delta.sdk.projects.model.{ApiMappings, ProjectContext}
import ch.epfl.bluebrain.nexus.delta.sourcing.Transactors
import ch.epfl.bluebrain.nexus.delta.sourcing.implicits._
import ch.epfl.bluebrain.nexus.delta.sourcing.model.{Identity, ProjectRef}
import doobie.implicits._
import io.circe.generic.semiauto.deriveCodec
import io.circe.syntax.EncoderOps
import io.circe.{Codec, Json}

/**
* Allows to keep track of the original project context which will change because of project renaming / patching
* configuration so as to be able to use them to expand iris in resource payload
*/
class OriginalProjectContext(xas: Transactors) extends FetchContext {

implicit val projectContextCodec: Codec[ProjectContext] = deriveCodec[ProjectContext]

override def defaultApiMappings: ApiMappings = ApiMappings.empty

override def onRead(ref: ProjectRef): IO[ProjectContext] =
sql"""|SELECT context
|FROM public.ship_original_project_context
|WHERE org = ${ref.organization}
|AND project = ${ref.project}""".stripMargin.query[Json].unique.transact(xas.read).flatMap { json =>
IO.fromEither(json.as[ProjectContext])
}

override def onCreate(ref: ProjectRef)(implicit subject: Identity.Subject): IO[ProjectContext] =
IO.raiseError(new IllegalStateException("OnCreate should not be called in this context"))

override def onModify(ref: ProjectRef)(implicit subject: Identity.Subject): IO[ProjectContext] =
IO.raiseError(new IllegalStateException("OnCreate should not be called in this context"))

def save(project: ProjectRef, context: ProjectContext): IO[Unit] =
sql"""INSERT INTO public.ship_original_project_context (org, project, context)
|VALUES (
| ${project.organization}, ${project.project}, ${context.asJson}
|)
|ON CONFLICT (org, project)
|DO UPDATE set
| context = EXCLUDED.context;
|""".stripMargin.update.run
.transact(xas.write)
.void
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import ch.epfl.bluebrain.nexus.delta.sdk.model.BaseUri
import ch.epfl.bluebrain.nexus.delta.sdk.organizations.FetchActiveOrganization
import ch.epfl.bluebrain.nexus.delta.sdk.projects.model.ProjectEvent._
import ch.epfl.bluebrain.nexus.delta.sdk.projects.model.ProjectRejection.NotFound
import ch.epfl.bluebrain.nexus.delta.sdk.projects.model.{ApiMappings, ProjectEvent, ProjectFields, ProjectRejection}
import ch.epfl.bluebrain.nexus.delta.sdk.projects.model.{ApiMappings, ProjectBase, ProjectContext, ProjectEvent, ProjectFields, ProjectRejection}
import ch.epfl.bluebrain.nexus.delta.sdk.projects.{FetchContext, Projects, ProjectsImpl, ValidateProjectDeletion}
import ch.epfl.bluebrain.nexus.delta.sdk.resolvers.ResolverContextResolution
import ch.epfl.bluebrain.nexus.delta.sourcing.Transactors
Expand All @@ -22,6 +22,7 @@ import io.circe.Decoder

final class ProjectProcessor private (
projects: Projects,
originalProjectContext: OriginalProjectContext,
projectMapper: ProjectMapper,
clock: EventClock,
uuidF: EventUUIDF,
Expand All @@ -46,12 +47,16 @@ final class ProjectProcessor private (

event match {
case ProjectCreated(_, _, _, _, _, description, apiMappings, base, vocab, enforceSchema, _, _) =>
val fields = ProjectFields(description, apiMappings, Some(base), Some(vocab), enforceSchema)
projects.create(projectRef, fields) >>
val fields = ProjectFields(description, apiMappings, Some(base), Some(vocab), enforceSchema)
val context = ProjectContext(apiMappings, ProjectBase.unsafe(base.value), vocab.value, enforceSchema)
originalProjectContext.save(projectRef, context) >>
projects.create(projectRef, fields) >>
scopeInitializer.initializeProject(projectRef)
case ProjectUpdated(_, _, _, _, _, description, apiMappings, base, vocab, enforceSchema, _, _) =>
val fields = ProjectFields(description, apiMappings, Some(base), Some(vocab), enforceSchema)
projects.update(projectRef, cRev, fields)
val fields = ProjectFields(description, apiMappings, Some(base), Some(vocab), enforceSchema)
val context = ProjectContext(apiMappings, ProjectBase.unsafe(base.value), vocab.value, enforceSchema)
originalProjectContext.save(projectRef, context) >>
projects.update(projectRef, cRev, fields)
case _: ProjectDeprecated =>
projects.deprecate(projectRef, cRev)
case _: ProjectUndeprecated =>
Expand All @@ -76,6 +81,7 @@ object ProjectProcessor {
fetchActiveOrg: FetchActiveOrganization,
fetchContext: FetchContext,
rcr: ResolverContextResolution,
originalProjectContext: OriginalProjectContext,
projectMapper: ProjectMapper,
config: InputConfig,
clock: EventClock,
Expand All @@ -98,6 +104,6 @@ object ProjectProcessor {
xas,
clock
)(base, uuidF)
new ProjectProcessor(projects, projectMapper, clock, uuidF, initializer)
new ProjectProcessor(projects, originalProjectContext, projectMapper, clock, uuidF, initializer)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package ch.epfl.bluebrain.nexus.ship.resources

import cats.effect.IO
import cats.syntax.all._
import ch.epfl.bluebrain.nexus.delta.kernel.Logger
import ch.epfl.bluebrain.nexus.delta.plugins.storage.FileSelf
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.defaultS3StorageId
import ch.epfl.bluebrain.nexus.delta.rdf.utils.UriUtils
import ch.epfl.bluebrain.nexus.delta.sdk.model.{BaseUri, ResourceUris}
import ch.epfl.bluebrain.nexus.ship.ProjectMapper
import ch.epfl.bluebrain.nexus.ship.resources.DistributionPatcher._
import io.circe.Json
import io.circe.optics.JsonPath.root
import io.circe.syntax.KeyOps

final class DistributionPatcher(fileSelfParser: FileSelf, projectMapper: ProjectMapper, targetBase: BaseUri) {

/**
* Distribution may be defined as an object or as an array in original payloads
*/
def singleOrArray: Json => IO[Json] = root.distribution.json.modifyA { json =>
json.asArray match {
case Some(array) => array.traverse(single).map(Json.arr(_: _*))
case None => single(json)
}
}(_)

private[resources] def single: Json => IO[Json] = (json: Json) => fileContentUrl(json).map(toS3Location)

private def toS3Location: Json => Json = root.atLocation.store.json.replace(targetStorage)

private def fileContentUrl: Json => IO[Json] = root.contentUrl.string.modifyA { string: String =>
for {
uri <- parseAsUri(string)
fileSelfAttempt <- fileSelfParser.parse(uri).attempt
result <- fileSelfAttempt match {
case Right((project, resourceRef)) =>
val targetProject = projectMapper.map(project)
IO.pure(ResourceUris("files", targetProject, resourceRef.original).accessUri(targetBase).toString())
case Left(error) =>
// We log and keep the value
logger.error(error)(s"'$string' could not be parsed as a file self").as(string)
}
} yield result
}(_)

private def parseAsUri(string: String) = IO.fromEither(UriUtils.uri(string).leftMap(new IllegalArgumentException(_)))

}

object DistributionPatcher {
private val logger = Logger[DistributionPatcher]

// All files are moved to a storage in S3 with a stable id
private val targetStorage = Json.obj("@id" := defaultS3StorageId, "@type" := "S3Storage", "_rev" := 1)

}
Loading

0 comments on commit ceca8e3

Please sign in to comment.