Skip to content

Commit

Permalink
Add the ability to export events for a subset of projects and from a …
Browse files Browse the repository at this point in the history
…starting offset (#4701)

* Add the ability to export events for a subset of projects and from a starting offset

---------

Co-authored-by: Simon Dumas <[email protected]>
  • Loading branch information
imsdu and Simon Dumas authored Jan 31, 2024
1 parent e1522f9 commit afd36df
Show file tree
Hide file tree
Showing 16 changed files with 452 additions and 28 deletions.
11 changes: 10 additions & 1 deletion delta/app/src/main/resources/app.conf
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,14 @@ app {
}
}

# Database export configuration
export {
# Max number of concurrent exports
permits = 1
# Target directory for exports
target = "/tmp"
}

# Fusion configuration
fusion {
# The base url for Fusion
Expand Down Expand Up @@ -116,7 +124,8 @@ app {
"storages/write",
"version/read",
"quotas/read",
"supervision/read"
"supervision/read",
"export/run"
]

# permissions applied to the creator of the project
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import ch.epfl.bluebrain.nexus.delta.sdk.resources.ResourcesConfig
import ch.epfl.bluebrain.nexus.delta.sdk.schemas.SchemasConfig
import ch.epfl.bluebrain.nexus.delta.sdk.sse.SseConfig
import ch.epfl.bluebrain.nexus.delta.sourcing.config.{DatabaseConfig, ProjectionConfig}
import ch.epfl.bluebrain.nexus.delta.sourcing.exporter.ExportConfig
import com.typesafe.config.{Config, ConfigFactory, ConfigParseOptions, ConfigResolveOptions}
import pureconfig.generic.semiauto.deriveReader
import pureconfig.{ConfigReader, ConfigSource}
Expand Down Expand Up @@ -47,7 +48,8 @@ final case class AppConfig(
serviceAccount: ServiceAccountConfig,
sse: SseConfig,
projections: ProjectionConfig,
fusion: FusionConfig
fusion: FusionConfig,
`export`: ExportConfig
)

object AppConfig {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package ch.epfl.bluebrain.nexus.delta.routes

import akka.http.scaladsl.model.StatusCodes
import akka.http.scaladsl.server.Route
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.RemoteContextResolution
import ch.epfl.bluebrain.nexus.delta.rdf.utils.JsonKeyOrdering
import ch.epfl.bluebrain.nexus.delta.sdk.acls.AclCheck
import ch.epfl.bluebrain.nexus.delta.sdk.acls.model.AclAddress
import ch.epfl.bluebrain.nexus.delta.sdk.circe.CirceUnmarshalling
import ch.epfl.bluebrain.nexus.delta.sdk.directives.AuthDirectives
import ch.epfl.bluebrain.nexus.delta.sdk.directives.DeltaDirectives._
import ch.epfl.bluebrain.nexus.delta.sdk.directives.UriDirectives.baseUriPrefix
import ch.epfl.bluebrain.nexus.delta.sdk.identities.Identities
import ch.epfl.bluebrain.nexus.delta.sdk.model.BaseUri
import ch.epfl.bluebrain.nexus.delta.sdk.permissions.Permissions
import ch.epfl.bluebrain.nexus.delta.sourcing.exporter.{ExportEventQuery, Exporter}

class ExportRoutes(identities: Identities, aclCheck: AclCheck, exporter: Exporter)(implicit
baseUri: BaseUri,
cr: RemoteContextResolution,
ordering: JsonKeyOrdering
) extends AuthDirectives(identities, aclCheck)
with CirceUnmarshalling {

def routes: Route =
baseUriPrefix(baseUri.prefix) {
pathPrefix("export") {
pathPrefix("events") {
extractCaller { implicit caller =>
(post & pathEndOrSingleSlash & entity(as[ExportEventQuery])) { query =>
authorizeFor(AclAddress.Root, Permissions.exporter.run).apply {
emit(StatusCodes.Accepted, exporter.events(query).start.void)
}
}
}
}
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package ch.epfl.bluebrain.nexus.delta.wiring

import cats.effect.{Clock, IO}
import ch.epfl.bluebrain.nexus.delta.Main.pluginsMaxPriority
import ch.epfl.bluebrain.nexus.delta.config.AppConfig
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.RemoteContextResolution
import ch.epfl.bluebrain.nexus.delta.rdf.utils.JsonKeyOrdering
import ch.epfl.bluebrain.nexus.delta.routes.ExportRoutes
import ch.epfl.bluebrain.nexus.delta.sdk.PriorityRoute
import ch.epfl.bluebrain.nexus.delta.sdk.acls.AclCheck
import ch.epfl.bluebrain.nexus.delta.sdk.identities.Identities
import ch.epfl.bluebrain.nexus.delta.sourcing.Transactors
import ch.epfl.bluebrain.nexus.delta.sourcing.exporter.Exporter
import izumi.distage.model.definition.{Id, ModuleDef}

// $COVERAGE-OFF$
object ExportModule extends ModuleDef {

make[Exporter].fromEffect { (config: AppConfig, clock: Clock[IO], xas: Transactors) =>
Exporter(config.`export`, clock, xas)
}

make[ExportRoutes].from {
(
cfg: AppConfig,
identities: Identities,
aclCheck: AclCheck,
exporter: Exporter,
cr: RemoteContextResolution @Id("aggregate"),
ordering: JsonKeyOrdering
) =>
new ExportRoutes(identities, aclCheck, exporter)(
cfg.http.baseUri,
cr,
ordering
)
}

many[PriorityRoute].add { (route: ExportRoutes) =>
PriorityRoute(pluginsMaxPriority + 1, route.routes, requiresStrictEntity = true)
}

}
// $COVERAGE-ON$
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package ch.epfl.bluebrain.nexus.delta.routes

import akka.http.scaladsl.model.StatusCodes
import akka.http.scaladsl.model.headers.OAuth2BearerToken
import akka.http.scaladsl.server.Route
import cats.effect.{IO, Ref}
import ch.epfl.bluebrain.nexus.delta.sdk.acls.AclSimpleCheck
import ch.epfl.bluebrain.nexus.delta.sdk.acls.model.AclAddress.Root
import ch.epfl.bluebrain.nexus.delta.sdk.identities.IdentitiesDummy
import ch.epfl.bluebrain.nexus.delta.sdk.identities.model.Caller
import ch.epfl.bluebrain.nexus.delta.sdk.permissions.Permissions
import ch.epfl.bluebrain.nexus.delta.sdk.utils.BaseRouteSpec
import ch.epfl.bluebrain.nexus.delta.sourcing.exporter.Exporter.ExportResult
import ch.epfl.bluebrain.nexus.delta.sourcing.exporter.{ExportEventQuery, Exporter}
import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.{Anonymous, Authenticated, Group}
import fs2.io.file.Path

import java.time.Instant

class ExportRoutesSpec extends BaseRouteSpec {

private val caller = Caller(alice, Set(alice, Anonymous, Authenticated(realm), Group("group", realm)))

private val identities = IdentitiesDummy(caller)

private val asAlice = addCredentials(OAuth2BearerToken("alice"))

private val exportTrigger = Ref.unsafe[IO, Boolean](false)

private val aclCheck = AclSimpleCheck((alice, Root, Set(Permissions.exporter.run))).accepted

private val exporter = new Exporter {
override def events(query: ExportEventQuery): IO[ExportResult] =
exportTrigger.set(true).as(ExportResult(Path("json"), Path("Success"), Instant.EPOCH, Instant.EPOCH))
}

private lazy val routes = Route.seal(
new ExportRoutes(
identities,
aclCheck,
exporter
).routes
)

"The export route" should {
val query =
json"""{ "output": "export-test", "projects": ["org/proj", "org/proj2"], "offset": {"@type": "At", "value": 2} }"""
"fail triggering the export the 'export/run' permission" in {
Post("/v1/export/events", query.toEntity) ~> routes ~> check {
response.shouldBeForbidden
exportTrigger.get.accepted shouldEqual false
}
}

"trigger the 'export/run' permission" in {
Post("/v1/export/events", query.toEntity) ~> asAlice ~> routes ~> check {
response.status shouldEqual StatusCodes.Accepted
exportTrigger.get.accepted shouldEqual true
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,10 @@ object Permissions {
final val read: Permission = Permission.unsafe("events/read")
}

object exporter {
final val run: Permission = Permission.unsafe("export/run")
}

/**
* Generic version permissions.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package ch.epfl.bluebrain.nexus.delta.sourcing.exporter

import fs2.io.file.Path
import pureconfig.ConfigConvert.catchReadError
import pureconfig.{ConfigConvert, ConfigReader}
import pureconfig.generic.semiauto.deriveReader

import scala.annotation.nowarn

final case class ExportConfig(permits: Int, target: Path)

object ExportConfig {

@nowarn("cat=unused")
implicit final val databaseConfigReader: ConfigReader[ExportConfig] = {
implicit val pathConverter: ConfigReader[Path] = ConfigConvert.viaString(catchReadError(s => Path(s)), _.toString)
deriveReader[ExportConfig]
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package ch.epfl.bluebrain.nexus.delta.sourcing.exporter

import cats.data.NonEmptyList
import ch.epfl.bluebrain.nexus.delta.sourcing.model.{Label, ProjectRef}
import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset
import io.circe.Codec
import io.circe.generic.extras.Configuration
import io.circe.generic.extras.semiauto.deriveConfiguredCodec

import scala.annotation.nowarn

final case class ExportEventQuery(output: Label, projects: NonEmptyList[ProjectRef], offset: Offset)

object ExportEventQuery {

@nowarn("cat=unused")
implicit private val config: Configuration = Configuration.default.withStrictDecoding
implicit val exportQueryCodec: Codec[ExportEventQuery] = deriveConfiguredCodec[ExportEventQuery]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package ch.epfl.bluebrain.nexus.delta.sourcing.exporter

import cats.effect.IO
import cats.effect.kernel.Clock
import cats.effect.std.Semaphore
import ch.epfl.bluebrain.nexus.delta.kernel.Logger
import ch.epfl.bluebrain.nexus.delta.sourcing.Transactors
import ch.epfl.bluebrain.nexus.delta.sourcing.exporter.Exporter.ExportResult
import ch.epfl.bluebrain.nexus.delta.sourcing.implicits._
import doobie.Fragments
import doobie.implicits._
import doobie.util.fragment.Fragment
import fs2.io.file.{Files, Path}
import fs2.Stream
import io.circe.syntax.EncoderOps

import java.time.Instant

trait Exporter {

def events(query: ExportEventQuery): IO[ExportResult]

}

object Exporter {

private val logger = Logger[Exporter]

final case class ExportResult(json: Path, success: Path, start: Instant, end: Instant)

def apply(config: ExportConfig, clock: Clock[IO], xas: Transactors): IO[Exporter] =
Semaphore[IO](config.permits.toLong).map(new ExporterImpl(config.target, _, clock, xas))

private class ExporterImpl(rootDirectory: Path, semaphore: Semaphore[IO], clock: Clock[IO], xas: Transactors)
extends Exporter {
override def events(query: ExportEventQuery): IO[ExportResult] = {
val projectFilter = Fragments.orOpt(
query.projects.map { project => sql"(org = ${project.organization} and project = ${project.project})" }
)
val q = asJson(sql"""SELECT *
|FROM public.scoped_events
|${Fragments.whereAndOpt(projectFilter, query.offset.asFragment)}
|ORDER BY ordering
|""".stripMargin)

val exportIO = for {
start <- clock.realTimeInstant
_ <- logger.info(s"Starting export for projects ${query.projects} from offset ${query.offset}")
targetDirectory = rootDirectory / query.output.value
_ <- Files[IO].createDirectory(targetDirectory)
exportFile = targetDirectory / s"$start.json"
_ <- exportToFile(q, exportFile)
end <- clock.realTimeInstant
exportSuccess = targetDirectory / s"$start.success"
_ <- writeSuccessFile(query, exportSuccess)
_ <-
logger.info(
s"Export for projects ${query.projects} from offset' ${query.offset}' after ${end.getEpochSecond - start.getEpochSecond} seconds."
)
} yield ExportResult(exportFile, exportSuccess, start, end)

semaphore.permit.use { _ => exportIO }
}

private def exportToFile(q: Fragment, targetFile: Path) =
q.query[String]
.stream
.intersperse("\n")
.transact(xas.streaming)
.through(Files[IO].writeUtf8(targetFile))
.compile
.drain

private def writeSuccessFile(query: ExportEventQuery, targetFile: Path) =
Stream(query.asJson.toString()).through(Files[IO].writeUtf8(targetFile)).compile.drain

private def asJson(query: Fragment) =
sql"""(select row_to_json(t) from ($query) t)"""
}

}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package ch.epfl.bluebrain.nexus.delta.sourcing

import cats.effect.IO
import cats.syntax.all._
import ch.epfl.bluebrain.nexus.delta.kernel.error.ThrowableValue
import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri
import ch.epfl.bluebrain.nexus.delta.rdf.Vocabulary.{nxv, rdfs, schemas}
Expand All @@ -11,23 +12,39 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.PullRequest.PullRequestCommand._
import ch.epfl.bluebrain.nexus.delta.sourcing.PullRequest.PullRequestEvent.{PullRequestCreated, PullRequestMerged, PullRequestTagged, PullRequestUpdated}
import ch.epfl.bluebrain.nexus.delta.sourcing.PullRequest.PullRequestRejection.{AlreadyExists, NotFound, PullRequestAlreadyClosed}
import ch.epfl.bluebrain.nexus.delta.sourcing.PullRequest.PullRequestState.{PullRequestActive, PullRequestClosed}
import ch.epfl.bluebrain.nexus.delta.sourcing.config.QueryConfig
import ch.epfl.bluebrain.nexus.delta.sourcing.event.Event.ScopedEvent
import ch.epfl.bluebrain.nexus.delta.sourcing.event.ScopedEventStore
import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.{Anonymous, Subject}
import ch.epfl.bluebrain.nexus.delta.sourcing.model.ResourceRef.Latest
import ch.epfl.bluebrain.nexus.delta.sourcing.model._
import ch.epfl.bluebrain.nexus.delta.sourcing.query.RefreshStrategy
import ch.epfl.bluebrain.nexus.delta.sourcing.state.GraphResource
import ch.epfl.bluebrain.nexus.delta.sourcing.state.State.ScopedState
import io.circe.generic.extras.Configuration
import io.circe.generic.extras.semiauto.deriveConfiguredCodec
import io.circe.{Codec, Json}
import doobie.implicits._

import java.time.Instant
import scala.annotation.nowarn

object PullRequest {

type EventStore = ScopedEventStore[Iri, PullRequestEvent]

val entityType: EntityType = EntityType("merge-request")

def eventStore(xas: Transactors, populateWith: PullRequestEvent*): IO[EventStore] = {
val store = ScopedEventStore[Iri, PullRequestEvent](
PullRequest.entityType,
PullRequestEvent.serializer,
QueryConfig(10, RefreshStrategy.Stop),
xas
)
populateWith.traverse(store.unsafeSave).transact(xas.write).as(store)
}

val stateMachine: StateMachine[PullRequestState, PullRequestCommand, PullRequestEvent] =
StateMachine(
None,
Expand Down
Loading

0 comments on commit afd36df

Please sign in to comment.