-
Notifications
You must be signed in to change notification settings - Fork 74
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Patch distributions in import batch #4880
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
CREATE TABLE IF NOT EXISTS public.ship_original_project_context( | ||
org text NOT NULL, | ||
project text NOT NULL, | ||
context JSONB NOT NULL, | ||
PRIMARY KEY(org, project) | ||
) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,49 @@ | ||
package ch.epfl.bluebrain.nexus.ship.projects | ||
|
||
import cats.effect.IO | ||
import ch.epfl.bluebrain.nexus.delta.sdk.projects.FetchContext | ||
import ch.epfl.bluebrain.nexus.delta.sdk.projects.model.{ApiMappings, ProjectContext} | ||
import ch.epfl.bluebrain.nexus.delta.sourcing.Transactors | ||
import ch.epfl.bluebrain.nexus.delta.sourcing.implicits._ | ||
import ch.epfl.bluebrain.nexus.delta.sourcing.model.{Identity, ProjectRef} | ||
import doobie.implicits._ | ||
import io.circe.generic.semiauto.deriveCodec | ||
import io.circe.syntax.EncoderOps | ||
import io.circe.{Codec, Json} | ||
|
||
/** | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I guess this is going in postgres so we can import in multiple sessions? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The batch will run in an incremental way so we need to keep track of those contexts in the different runs. |
||
* Allows to keep track of the original project context which will change because of project renaming / patching | ||
* configuration so as to be able to use them to expand iris in resource payload | ||
*/ | ||
class OriginalProjectContext(xas: Transactors) extends FetchContext { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You don't need to implement FetchContext for this? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The only place it's needed is in FileSelf, so you could just split the interface. I think it's likely that a lot of places it would make sense to just pass There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Most of the methods come from |
||
|
||
implicit val projectContextCodec: Codec[ProjectContext] = deriveCodec[ProjectContext] | ||
|
||
override def defaultApiMappings: ApiMappings = ApiMappings.empty | ||
|
||
override def onRead(ref: ProjectRef): IO[ProjectContext] = | ||
sql"""|SELECT context | ||
|FROM public.ship_original_project_context | ||
|WHERE org = ${ref.organization} | ||
|AND project = ${ref.project}""".stripMargin.query[Json].unique.transact(xas.read).flatMap { json => | ||
IO.fromEither(json.as[ProjectContext]) | ||
} | ||
|
||
override def onCreate(ref: ProjectRef)(implicit subject: Identity.Subject): IO[ProjectContext] = | ||
IO.raiseError(new IllegalStateException("OnCreate should not be called in this context")) | ||
|
||
override def onModify(ref: ProjectRef)(implicit subject: Identity.Subject): IO[ProjectContext] = | ||
IO.raiseError(new IllegalStateException("OnCreate should not be called in this context")) | ||
|
||
def save(project: ProjectRef, context: ProjectContext): IO[Unit] = | ||
sql"""INSERT INTO public.ship_original_project_context (org, project, context) | ||
|VALUES ( | ||
| ${project.organization}, ${project.project}, ${context.asJson} | ||
|) | ||
|ON CONFLICT (org, project) | ||
|DO UPDATE set | ||
| context = EXCLUDED.context; | ||
|""".stripMargin.update.run | ||
.transact(xas.write) | ||
.void | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,57 @@ | ||
package ch.epfl.bluebrain.nexus.ship.resources | ||
|
||
import cats.effect.IO | ||
import cats.syntax.all._ | ||
import ch.epfl.bluebrain.nexus.delta.kernel.Logger | ||
import ch.epfl.bluebrain.nexus.delta.plugins.storage.FileSelf | ||
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.defaultS3StorageId | ||
import ch.epfl.bluebrain.nexus.delta.rdf.utils.UriUtils | ||
import ch.epfl.bluebrain.nexus.delta.sdk.model.{BaseUri, ResourceUris} | ||
import ch.epfl.bluebrain.nexus.ship.ProjectMapper | ||
import ch.epfl.bluebrain.nexus.ship.resources.DistributionPatcher._ | ||
import io.circe.Json | ||
import io.circe.optics.JsonPath.root | ||
import io.circe.syntax.KeyOps | ||
|
||
final class DistributionPatcher(fileSelfParser: FileSelf, projectMapper: ProjectMapper, targetBase: BaseUri) { | ||
|
||
/** | ||
* Distribution may be defined as an object or as an array in original payloads | ||
*/ | ||
def singleOrArray: Json => IO[Json] = root.distribution.json.modifyA { json => | ||
json.asArray match { | ||
case Some(array) => array.traverse(single).map(Json.arr(_: _*)) | ||
case None => single(json) | ||
} | ||
}(_) | ||
|
||
private[resources] def single: Json => IO[Json] = (json: Json) => fileContentUrl(json).map(toS3Location) | ||
|
||
private def toS3Location: Json => Json = root.atLocation.store.json.replace(targetStorage) | ||
|
||
private def fileContentUrl: Json => IO[Json] = root.contentUrl.string.modifyA { string: String => | ||
for { | ||
uri <- parseAsUri(string) | ||
fileSelfAttempt <- fileSelfParser.parse(uri).attempt | ||
result <- fileSelfAttempt match { | ||
case Right((project, resourceRef)) => | ||
val targetProject = projectMapper.map(project) | ||
IO.pure(ResourceUris("files", targetProject, resourceRef.original).accessUri(targetBase).toString()) | ||
case Left(error) => | ||
// We log and keep the value | ||
logger.error(error)(s"'$string' could not be parsed as a file self").as(string) | ||
} | ||
} yield result | ||
}(_) | ||
|
||
private def parseAsUri(string: String) = IO.fromEither(UriUtils.uri(string).leftMap(new IllegalArgumentException(_))) | ||
|
||
} | ||
|
||
object DistributionPatcher { | ||
private val logger = Logger[DistributionPatcher] | ||
|
||
// All files are moved to a storage in S3 with a stable id | ||
private val targetStorage = Json.obj("@id" := defaultS3StorageId, "@type" := "S3Storage", "_rev" := 1) | ||
|
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To save the original project context to expand selfs when needed