Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Patch distributions in import batch #4880

Merged
merged 3 commits into from
Apr 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -754,7 +754,7 @@ lazy val ship = project
tests % "test->compile;test->test"
)
.settings(
libraryDependencies ++= Seq(declineEffect, logback, fs2Aws, fs2AwsS3),
libraryDependencies ++= Seq(declineEffect, logback, circeOptics),
addCompilerPlugin(betterMonadicFor),
run / fork := true,
buildInfoKeys := Seq[BuildInfoKey](version),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@ import cats.effect.IO
import cats.implicits._
import cats.effect.unsafe.implicits._
import ch.epfl.bluebrain.nexus.delta.kernel.Logger
import ch.epfl.bluebrain.nexus.delta.plugins.archive.FileSelf.ParsingError
import ch.epfl.bluebrain.nexus.delta.plugins.storage.FileSelf.ParsingError
import ch.epfl.bluebrain.nexus.delta.plugins.archive.model.ArchiveReference.{FileReference, FileSelfReference, ResourceReference}
import ch.epfl.bluebrain.nexus.delta.plugins.archive.model.ArchiveRejection._
import ch.epfl.bluebrain.nexus.delta.plugins.archive.model._
import ch.epfl.bluebrain.nexus.delta.plugins.storage.FileSelf
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.Files
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.{FileId, FileRejection}
import ch.epfl.bluebrain.nexus.delta.rdf.implicits._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import cats.effect.{Clock, IO}
import ch.epfl.bluebrain.nexus.delta.kernel.utils.{ClasspathResourceLoader, UUIDF}
import ch.epfl.bluebrain.nexus.delta.plugins.archive.model.contexts
import ch.epfl.bluebrain.nexus.delta.plugins.archive.routes.ArchiveRoutes
import ch.epfl.bluebrain.nexus.delta.plugins.storage.FileSelf
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.Files
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.api.JsonLdApi
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.{ContextValue, RemoteContextResolution}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package ch.epfl.bluebrain.nexus.delta.plugins.archive.model
import akka.http.scaladsl.model.StatusCodes
import ch.epfl.bluebrain.nexus.delta.kernel.error.Rejection
import ch.epfl.bluebrain.nexus.delta.kernel.utils.ClassUtils
import ch.epfl.bluebrain.nexus.delta.plugins.archive.FileSelf
import ch.epfl.bluebrain.nexus.delta.plugins.storage.FileSelf
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.FileRejection
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.AbsolutePath
import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ import akka.util.ByteString
import cats.data.NonEmptySet
import cats.effect.IO
import ch.epfl.bluebrain.nexus.delta.kernel.utils.UrlUtils.encode
import ch.epfl.bluebrain.nexus.delta.plugins.archive.FileSelf.ParsingError
import ch.epfl.bluebrain.nexus.delta.plugins.storage.FileSelf.ParsingError
import ch.epfl.bluebrain.nexus.delta.plugins.archive.model.ArchiveReference.{FileReference, FileSelfReference, ResourceReference}
import ch.epfl.bluebrain.nexus.delta.plugins.archive.model.ArchiveRejection.{InvalidFileSelf, ResourceNotFound}
import ch.epfl.bluebrain.nexus.delta.plugins.archive.model.{ArchiveRejection, ArchiveValue}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.RemoteContextResolutionFixture
import ch.epfl.bluebrain.nexus.delta.plugins.storage.{FileSelf, RemoteContextResolutionFixture}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.generators.FileGen
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.FileAttributes.FileAttributesOrigin.Client
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.FileRejection.FileNotFound
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import akka.util.ByteString
import cats.effect.IO
import ch.epfl.bluebrain.nexus.delta.kernel.utils.UrlUtils.encode
import ch.epfl.bluebrain.nexus.delta.kernel.utils.{StatefulUUIDF, UUIDF}
import ch.epfl.bluebrain.nexus.delta.plugins.archive.FileSelf.ParsingError.InvalidPath
import ch.epfl.bluebrain.nexus.delta.plugins.storage.FileSelf.ParsingError.InvalidPath
import ch.epfl.bluebrain.nexus.delta.plugins.archive.routes.ArchiveRoutes
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.generators.FileGen
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.Digest.ComputedDigest
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package ch.epfl.bluebrain.nexus.delta.plugins.archive
package ch.epfl.bluebrain.nexus.delta.plugins.storage

import akka.http.scaladsl.model.Uri
import cats.effect.IO
import cats.syntax.all._
import ch.epfl.bluebrain.nexus.delta.kernel.utils.UrlUtils
import ch.epfl.bluebrain.nexus.delta.plugins.archive.FileSelf.ParsingError._
import ch.epfl.bluebrain.nexus.delta.plugins.storage.FileSelf.ParsingError._
import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri
import ch.epfl.bluebrain.nexus.delta.rdf.implicits._
import ch.epfl.bluebrain.nexus.delta.sdk.error.SDKError
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
package ch.epfl.bluebrain.nexus.delta.plugins.archive
package ch.epfl.bluebrain.nexus.delta.plugins.storage.files

import ch.epfl.bluebrain.nexus.delta.kernel.utils.UrlUtils.encode
import ch.epfl.bluebrain.nexus.delta.plugins.archive.FileSelf.ParsingError._
import ch.epfl.bluebrain.nexus.delta.plugins.storage.FileSelf
import ch.epfl.bluebrain.nexus.delta.plugins.storage.FileSelf.ParsingError._
import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri
import ch.epfl.bluebrain.nexus.delta.rdf.Vocabulary.nxv
import ch.epfl.bluebrain.nexus.delta.rdf.implicits._
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
DROP TABLE IF EXISTS public.ship_original_project_context;
DROP TABLE IF EXISTS public.ship_reports;
DROP TABLE IF EXISTS public.global_events;
DROP TABLE IF EXISTS public.global_states;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
CREATE TABLE IF NOT EXISTS public.ship_original_project_context(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To save the original project context to expand selfs when needed

org text NOT NULL,
project text NOT NULL,
context JSONB NOT NULL,
PRIMARY KEY(org, project)
)

3 changes: 2 additions & 1 deletion ship/src/main/resources/ship-default.conf
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ ship {
}

input {
base-uri = "http://localhost:8080/v1"
original-base-uri = "http://localhost:8080/v1"
target-base-uri = "http://localhost:8080/v1"

event-log {
query-config = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ import ch.epfl.bluebrain.nexus.delta.kernel.utils.ClasspathResourceLoader
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.model.{contexts => bgContexts}
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.model.{contexts => compositeViewContexts}
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.model.{contexts => esContexts}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.{contexts => storageContext}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.{contexts => fileContext}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.{contexts => storageContext}
import ch.epfl.bluebrain.nexus.delta.rdf.Vocabulary
import ch.epfl.bluebrain.nexus.delta.rdf.Vocabulary.contexts
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.api.JsonLdApi
Expand Down
22 changes: 14 additions & 8 deletions ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/RunShip.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package ch.epfl.bluebrain.nexus.ship

import cats.effect.{Clock, IO}
import ch.epfl.bluebrain.nexus.delta.kernel.utils.UUIDF
import ch.epfl.bluebrain.nexus.delta.plugins.storage.FileSelf
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.api.{JsonLdApi, JsonLdJavaApi}
import ch.epfl.bluebrain.nexus.delta.rdf.shacl.ValidateShacl
import ch.epfl.bluebrain.nexus.delta.sdk.organizations.FetchActiveOrganization
Expand All @@ -14,8 +15,9 @@ import ch.epfl.bluebrain.nexus.ship.config.InputConfig
import ch.epfl.bluebrain.nexus.ship.organizations.OrganizationProvider
import ch.epfl.bluebrain.nexus.ship.projects.ProjectProcessor
import ch.epfl.bluebrain.nexus.ship.resolvers.ResolverProcessor
import ch.epfl.bluebrain.nexus.ship.resources.{ResourceProcessor, ResourceWiring}
import ch.epfl.bluebrain.nexus.ship.resources.{DistributionPatcher, ResourceProcessor, ResourceWiring}
import ch.epfl.bluebrain.nexus.ship.schemas.{SchemaProcessor, SchemaWiring}
import ch.epfl.bluebrain.nexus.ship.projects.OriginalProjectContext
import ch.epfl.bluebrain.nexus.ship.views.{BlazegraphViewProcessor, CompositeViewProcessor, ElasticSearchViewProcessor}
import fs2.Stream

Expand All @@ -28,12 +30,14 @@ object RunShip {
implicit val jsonLdApi: JsonLdApi = JsonLdJavaApi.lenient
for {
report <- {
val orgProvider =
val orgProvider =
OrganizationProvider(config.eventLog, config.serviceAccount.value, xas, clock)(uuidF)
val fetchContext = FetchContext(ApiMappings.empty, xas, Quotas.disabled)
val eventLogConfig = config.eventLog
val baseUri = config.baseUri
val projectMapper = ProjectMapper(config.projectMapping)
val fetchContext = FetchContext(ApiMappings.empty, xas, Quotas.disabled)
val originalProjectContext = new OriginalProjectContext(xas)
val eventLogConfig = config.eventLog
val originalBaseUri = config.originalBaseUri
val targetBaseUri = config.targetBaseUri
val projectMapper = ProjectMapper(config.projectMapping)
for {
// Provision organizations
_ <- orgProvider.create(config.organizations.values)
Expand All @@ -48,10 +52,12 @@ object RunShip {
rcr = ContextWiring.resolverContextResolution(fetchResource, fetchContext, remoteContextResolution, eventLogConfig, eventClock, xas)
schemaImports = SchemaWiring.schemaImports(fetchResource, fetchSchema, fetchContext, eventLogConfig, eventClock, xas)
// Processors
projectProcessor <- ProjectProcessor(fetchActiveOrg, fetchContext, rcr, projectMapper, config, eventClock, xas)(baseUri, jsonLdApi)
projectProcessor <- ProjectProcessor(fetchActiveOrg, fetchContext, rcr, originalProjectContext, projectMapper, config, eventClock, xas)(targetBaseUri, jsonLdApi)
resolverProcessor = ResolverProcessor(fetchContext, projectMapper, eventLogConfig, eventClock, xas)
schemaProcessor = SchemaProcessor(schemaLog, fetchContext, schemaImports, rcr, projectMapper, eventClock)
resourceProcessor = ResourceProcessor(resourceLog, rcr, projectMapper, fetchContext, eventClock)
fileSelf = FileSelf(originalProjectContext)(originalBaseUri)
distributionPatcher = new DistributionPatcher(fileSelf, projectMapper, targetBaseUri)
resourceProcessor = ResourceProcessor(resourceLog, rcr, projectMapper, fetchContext, distributionPatcher, eventClock)
esViewsProcessor = ElasticSearchViewProcessor(fetchContext, rcr, projectMapper, eventLogConfig, eventClock, xas)
bgViewsProcessor = BlazegraphViewProcessor(fetchContext, rcr, projectMapper, eventLogConfig, eventClock, xas)
compositeViewsProcessor = CompositeViewProcessor(fetchContext, rcr, projectMapper, eventLogConfig, eventClock, xas)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ import pureconfig.error.{CannotConvert, FailureReason}
import pureconfig.generic.semiauto.deriveReader

final case class InputConfig(
baseUri: BaseUri,
originalBaseUri: BaseUri,
targetBaseUri: BaseUri,
eventLog: EventLogConfig,
organizations: OrganizationCreationConfig,
projectMapping: ProjectMapping = Map.empty,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package ch.epfl.bluebrain.nexus.ship.projects

import cats.effect.IO
import ch.epfl.bluebrain.nexus.delta.sdk.projects.FetchContext
import ch.epfl.bluebrain.nexus.delta.sdk.projects.model.{ApiMappings, ProjectContext}
import ch.epfl.bluebrain.nexus.delta.sourcing.Transactors
import ch.epfl.bluebrain.nexus.delta.sourcing.implicits._
import ch.epfl.bluebrain.nexus.delta.sourcing.model.{Identity, ProjectRef}
import doobie.implicits._
import io.circe.generic.semiauto.deriveCodec
import io.circe.syntax.EncoderOps
import io.circe.{Codec, Json}

/**
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this is going in postgres so we can import in multiple sessions?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The batch will run in an incremental way so we need to keep track of those contexts in the different runs.
Is that what you are asking ?

* Allows to keep track of the original project context which will change because of project renaming / patching
* configuration so as to be able to use them to expand iris in resource payload
*/
class OriginalProjectContext(xas: Transactors) extends FetchContext {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't need to implement FetchContext for this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only place it's needed is in FileSelf, so you could just split the interface. I think it's likely that a lot of places it would make sense to just pass onRead rather than all of these methods

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most of the methods come from FetchContext and the import batch is a special use case, if we are to refactor it, it is outside the scope of this PR


implicit val projectContextCodec: Codec[ProjectContext] = deriveCodec[ProjectContext]

override def defaultApiMappings: ApiMappings = ApiMappings.empty

override def onRead(ref: ProjectRef): IO[ProjectContext] =
sql"""|SELECT context
|FROM public.ship_original_project_context
|WHERE org = ${ref.organization}
|AND project = ${ref.project}""".stripMargin.query[Json].unique.transact(xas.read).flatMap { json =>
IO.fromEither(json.as[ProjectContext])
}

override def onCreate(ref: ProjectRef)(implicit subject: Identity.Subject): IO[ProjectContext] =
IO.raiseError(new IllegalStateException("OnCreate should not be called in this context"))

override def onModify(ref: ProjectRef)(implicit subject: Identity.Subject): IO[ProjectContext] =
IO.raiseError(new IllegalStateException("OnCreate should not be called in this context"))

def save(project: ProjectRef, context: ProjectContext): IO[Unit] =
sql"""INSERT INTO public.ship_original_project_context (org, project, context)
|VALUES (
| ${project.organization}, ${project.project}, ${context.asJson}
|)
|ON CONFLICT (org, project)
|DO UPDATE set
| context = EXCLUDED.context;
|""".stripMargin.update.run
.transact(xas.write)
.void
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import ch.epfl.bluebrain.nexus.delta.sdk.model.BaseUri
import ch.epfl.bluebrain.nexus.delta.sdk.organizations.FetchActiveOrganization
import ch.epfl.bluebrain.nexus.delta.sdk.projects.model.ProjectEvent._
import ch.epfl.bluebrain.nexus.delta.sdk.projects.model.ProjectRejection.NotFound
import ch.epfl.bluebrain.nexus.delta.sdk.projects.model.{ApiMappings, ProjectEvent, ProjectFields, ProjectRejection}
import ch.epfl.bluebrain.nexus.delta.sdk.projects.model.{ApiMappings, ProjectBase, ProjectContext, ProjectEvent, ProjectFields, ProjectRejection}
import ch.epfl.bluebrain.nexus.delta.sdk.projects.{FetchContext, Projects, ProjectsImpl, ValidateProjectDeletion}
import ch.epfl.bluebrain.nexus.delta.sdk.resolvers.ResolverContextResolution
import ch.epfl.bluebrain.nexus.delta.sourcing.Transactors
Expand All @@ -22,6 +22,7 @@ import io.circe.Decoder

final class ProjectProcessor private (
projects: Projects,
originalProjectContext: OriginalProjectContext,
projectMapper: ProjectMapper,
clock: EventClock,
uuidF: EventUUIDF,
Expand All @@ -46,12 +47,16 @@ final class ProjectProcessor private (

event match {
case ProjectCreated(_, _, _, _, _, description, apiMappings, base, vocab, enforceSchema, _, _) =>
val fields = ProjectFields(description, apiMappings, Some(base), Some(vocab), enforceSchema)
projects.create(projectRef, fields) >>
val fields = ProjectFields(description, apiMappings, Some(base), Some(vocab), enforceSchema)
val context = ProjectContext(apiMappings, ProjectBase.unsafe(base.value), vocab.value, enforceSchema)
originalProjectContext.save(projectRef, context) >>
projects.create(projectRef, fields) >>
scopeInitializer.initializeProject(projectRef)
case ProjectUpdated(_, _, _, _, _, description, apiMappings, base, vocab, enforceSchema, _, _) =>
val fields = ProjectFields(description, apiMappings, Some(base), Some(vocab), enforceSchema)
projects.update(projectRef, cRev, fields)
val fields = ProjectFields(description, apiMappings, Some(base), Some(vocab), enforceSchema)
val context = ProjectContext(apiMappings, ProjectBase.unsafe(base.value), vocab.value, enforceSchema)
originalProjectContext.save(projectRef, context) >>
projects.update(projectRef, cRev, fields)
case _: ProjectDeprecated =>
projects.deprecate(projectRef, cRev)
case _: ProjectUndeprecated =>
Expand All @@ -76,6 +81,7 @@ object ProjectProcessor {
fetchActiveOrg: FetchActiveOrganization,
fetchContext: FetchContext,
rcr: ResolverContextResolution,
originalProjectContext: OriginalProjectContext,
projectMapper: ProjectMapper,
config: InputConfig,
clock: EventClock,
Expand All @@ -98,6 +104,6 @@ object ProjectProcessor {
xas,
clock
)(base, uuidF)
new ProjectProcessor(projects, projectMapper, clock, uuidF, initializer)
new ProjectProcessor(projects, originalProjectContext, projectMapper, clock, uuidF, initializer)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package ch.epfl.bluebrain.nexus.ship.resources

import cats.effect.IO
import cats.syntax.all._
import ch.epfl.bluebrain.nexus.delta.kernel.Logger
import ch.epfl.bluebrain.nexus.delta.plugins.storage.FileSelf
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.defaultS3StorageId
import ch.epfl.bluebrain.nexus.delta.rdf.utils.UriUtils
import ch.epfl.bluebrain.nexus.delta.sdk.model.{BaseUri, ResourceUris}
import ch.epfl.bluebrain.nexus.ship.ProjectMapper
import ch.epfl.bluebrain.nexus.ship.resources.DistributionPatcher._
import io.circe.Json
import io.circe.optics.JsonPath.root
import io.circe.syntax.KeyOps

final class DistributionPatcher(fileSelfParser: FileSelf, projectMapper: ProjectMapper, targetBase: BaseUri) {

/**
* Distribution may be defined as an object or as an array in original payloads
*/
def singleOrArray: Json => IO[Json] = root.distribution.json.modifyA { json =>
json.asArray match {
case Some(array) => array.traverse(single).map(Json.arr(_: _*))
case None => single(json)
}
}(_)

private[resources] def single: Json => IO[Json] = (json: Json) => fileContentUrl(json).map(toS3Location)

private def toS3Location: Json => Json = root.atLocation.store.json.replace(targetStorage)

private def fileContentUrl: Json => IO[Json] = root.contentUrl.string.modifyA { string: String =>
for {
uri <- parseAsUri(string)
fileSelfAttempt <- fileSelfParser.parse(uri).attempt
result <- fileSelfAttempt match {
case Right((project, resourceRef)) =>
val targetProject = projectMapper.map(project)
IO.pure(ResourceUris("files", targetProject, resourceRef.original).accessUri(targetBase).toString())
case Left(error) =>
// We log and keep the value
logger.error(error)(s"'$string' could not be parsed as a file self").as(string)
}
} yield result
}(_)

private def parseAsUri(string: String) = IO.fromEither(UriUtils.uri(string).leftMap(new IllegalArgumentException(_)))

}

object DistributionPatcher {
private val logger = Logger[DistributionPatcher]

// All files are moved to a storage in S3 with a stable id
private val targetStorage = Json.obj("@id" := defaultS3StorageId, "@type" := "S3Storage", "_rev" := 1)

}
Loading