Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add the ability to export events for a subset of projects and from a starting offset #4701

Merged
merged 7 commits into from
Jan 31, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion delta/app/src/main/resources/app.conf
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,14 @@ app {
}
}

# Database export configuration
export {
# Max number of concurrent exports
permits = 1
# Target directory for exports
target = "/tmp"
}

# Fusion configuration
fusion {
# The base url for Fusion
Expand Down Expand Up @@ -116,7 +124,8 @@ app {
"storages/write",
"version/read",
"quotas/read",
"supervision/read"
"supervision/read",
"export/run"
]

# permissions applied to the creator of the project
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import ch.epfl.bluebrain.nexus.delta.sdk.resources.ResourcesConfig
import ch.epfl.bluebrain.nexus.delta.sdk.schemas.SchemasConfig
import ch.epfl.bluebrain.nexus.delta.sdk.sse.SseConfig
import ch.epfl.bluebrain.nexus.delta.sourcing.config.{DatabaseConfig, ProjectionConfig}
import ch.epfl.bluebrain.nexus.delta.sourcing.exporter.ExportConfig
import com.typesafe.config.{Config, ConfigFactory, ConfigParseOptions, ConfigResolveOptions}
import pureconfig.generic.semiauto.deriveReader
import pureconfig.{ConfigReader, ConfigSource}
Expand Down Expand Up @@ -47,7 +48,8 @@ final case class AppConfig(
serviceAccount: ServiceAccountConfig,
sse: SseConfig,
projections: ProjectionConfig,
fusion: FusionConfig
fusion: FusionConfig,
`export`: ExportConfig
)

object AppConfig {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package ch.epfl.bluebrain.nexus.delta.routes

import akka.http.scaladsl.model.StatusCodes
import akka.http.scaladsl.server.Route
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.RemoteContextResolution
import ch.epfl.bluebrain.nexus.delta.rdf.utils.JsonKeyOrdering
import ch.epfl.bluebrain.nexus.delta.sdk.acls.AclCheck
import ch.epfl.bluebrain.nexus.delta.sdk.acls.model.AclAddress
import ch.epfl.bluebrain.nexus.delta.sdk.circe.CirceUnmarshalling
import ch.epfl.bluebrain.nexus.delta.sdk.directives.AuthDirectives
import ch.epfl.bluebrain.nexus.delta.sdk.directives.DeltaDirectives._
import ch.epfl.bluebrain.nexus.delta.sdk.directives.UriDirectives.baseUriPrefix
import ch.epfl.bluebrain.nexus.delta.sdk.identities.Identities
import ch.epfl.bluebrain.nexus.delta.sdk.model.BaseUri
import ch.epfl.bluebrain.nexus.delta.sdk.permissions.Permissions
import ch.epfl.bluebrain.nexus.delta.sourcing.exporter.{ExportEventQuery, Exporter}

class ExportRoutes(identities: Identities, aclCheck: AclCheck, exporter: Exporter)(implicit
baseUri: BaseUri,
cr: RemoteContextResolution,
ordering: JsonKeyOrdering
) extends AuthDirectives(identities, aclCheck)
with CirceUnmarshalling {

def routes: Route =
baseUriPrefix(baseUri.prefix) {
pathPrefix("export") {
pathPrefix("events") {
extractCaller { implicit caller =>
(post & pathEndOrSingleSlash & entity(as[ExportEventQuery])) { query =>
authorizeFor(AclAddress.Root, Permissions.exporter.run).apply {
emit(StatusCodes.Accepted, exporter.events(query).start.void)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be interesting later to have a way of knowing a failure in the export occurred?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, to be discussed like reporting in general

}
}
}
}
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package ch.epfl.bluebrain.nexus.delta.wiring

import cats.effect.{Clock, IO}
import ch.epfl.bluebrain.nexus.delta.Main.pluginsMaxPriority
import ch.epfl.bluebrain.nexus.delta.config.AppConfig
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.RemoteContextResolution
import ch.epfl.bluebrain.nexus.delta.rdf.utils.JsonKeyOrdering
import ch.epfl.bluebrain.nexus.delta.routes.ExportRoutes
import ch.epfl.bluebrain.nexus.delta.sdk.PriorityRoute
import ch.epfl.bluebrain.nexus.delta.sdk.acls.AclCheck
import ch.epfl.bluebrain.nexus.delta.sdk.identities.Identities
import ch.epfl.bluebrain.nexus.delta.sourcing.Transactors
import ch.epfl.bluebrain.nexus.delta.sourcing.exporter.Exporter
import izumi.distage.model.definition.{Id, ModuleDef}

/**
* Version module wiring config.
*/
olivergrabinski marked this conversation as resolved.
Show resolved Hide resolved
// $COVERAGE-OFF$
object ExportModule extends ModuleDef {

make[Exporter].fromEffect { (config: AppConfig, clock: Clock[IO], xas: Transactors) =>
Exporter(config.`export`, clock, xas)
}

make[ExportRoutes].from {
(
cfg: AppConfig,
identities: Identities,
aclCheck: AclCheck,
exporter: Exporter,
cr: RemoteContextResolution @Id("aggregate"),
ordering: JsonKeyOrdering
) =>
new ExportRoutes(identities, aclCheck, exporter)(
cfg.http.baseUri,
cr,
ordering
)
}

many[PriorityRoute].add { (route: ExportRoutes) =>
PriorityRoute(pluginsMaxPriority + 1, route.routes, requiresStrictEntity = true)
}

}
// $COVERAGE-ON$
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package ch.epfl.bluebrain.nexus.delta.routes

import akka.http.scaladsl.model.StatusCodes
import akka.http.scaladsl.model.headers.OAuth2BearerToken
import akka.http.scaladsl.server.Route
import cats.effect.{IO, Ref}
import ch.epfl.bluebrain.nexus.delta.sdk.acls.AclSimpleCheck
import ch.epfl.bluebrain.nexus.delta.sdk.acls.model.AclAddress.Root
import ch.epfl.bluebrain.nexus.delta.sdk.identities.IdentitiesDummy
import ch.epfl.bluebrain.nexus.delta.sdk.identities.model.Caller
import ch.epfl.bluebrain.nexus.delta.sdk.permissions.Permissions
import ch.epfl.bluebrain.nexus.delta.sdk.utils.BaseRouteSpec
import ch.epfl.bluebrain.nexus.delta.sourcing.exporter.Exporter.ExportResult
import ch.epfl.bluebrain.nexus.delta.sourcing.exporter.{ExportEventQuery, Exporter}
import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.{Anonymous, Authenticated, Group}
import fs2.io.file.Path

import java.time.Instant

class ExportRoutesSpec extends BaseRouteSpec {

private val caller = Caller(alice, Set(alice, Anonymous, Authenticated(realm), Group("group", realm)))

private val identities = IdentitiesDummy(caller)

private val asAlice = addCredentials(OAuth2BearerToken("alice"))

private val exportTrigger = Ref.unsafe[IO, Boolean](false)

private val aclCheck = AclSimpleCheck((alice, Root, Set(Permissions.exporter.run))).accepted

private val exporter = new Exporter {
override def events(query: ExportEventQuery): IO[ExportResult] =
exportTrigger.set(true).as(ExportResult(Path("json"), Path("Success"), Instant.EPOCH, Instant.EPOCH))
}

private lazy val routes = Route.seal(
new ExportRoutes(
identities,
aclCheck,
exporter
).routes
)

"The export route" should {
val query =
json"""{ "id": "export-test", "projects": ["org/proj", "org/proj2"], "offset": {"@type": "At", "value": 2} }"""
"fail triggering the export the 'export/run' permission" in {
Post("/v1/export/events", query.toEntity) ~> routes ~> check {
response.shouldBeForbidden
exportTrigger.get.accepted shouldEqual false
}
}

"trigger the 'export/run' permission" in {
Post("/v1/export/events", query.toEntity) ~> asAlice ~> routes ~> check {
response.status shouldEqual StatusCodes.Accepted
exportTrigger.get.accepted shouldEqual true
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,13 @@ object Permissions {
final val read: Permission = Permission.unsafe("events/read")
}

/**
* Generic version permissions.
*/
olivergrabinski marked this conversation as resolved.
Show resolved Hide resolved
object exporter {
final val run: Permission = Permission.unsafe("export/run")
}

/**
* Generic version permissions.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package ch.epfl.bluebrain.nexus.delta.sourcing.exporter

import fs2.io.file.Path
import pureconfig.ConfigConvert.catchReadError
import pureconfig.{ConfigConvert, ConfigReader}
import pureconfig.generic.semiauto.deriveReader

import scala.annotation.nowarn

final case class ExportConfig(permits: Int, target: Path)

object ExportConfig {

@nowarn("cat=unused")
implicit final val databaseConfigReader: ConfigReader[ExportConfig] = {
implicit val pathConverter: ConfigReader[Path] = ConfigConvert.viaString(catchReadError(s => Path(s)), _.toString)
deriveReader[ExportConfig]
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package ch.epfl.bluebrain.nexus.delta.sourcing.exporter

import cats.data.NonEmptyList
import ch.epfl.bluebrain.nexus.delta.sourcing.model.{Label, ProjectRef}
import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset
import io.circe.Decoder
import io.circe.generic.extras.Configuration
import io.circe.generic.extras.semiauto.deriveConfiguredDecoder

import scala.annotation.nowarn

final case class ExportEventQuery(id: Label, projects: NonEmptyList[ProjectRef], offset: Offset)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is id anything other than the folder containing export files? Might be clearer to name it that


object ExportEventQuery {

@nowarn("cat=unused")
implicit private val config: Configuration = Configuration.default.withStrictDecoding
implicit val exportQueryDecoder: Decoder[ExportEventQuery] = deriveConfiguredDecoder[ExportEventQuery]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package ch.epfl.bluebrain.nexus.delta.sourcing.exporter

import cats.effect.IO
import cats.effect.kernel.Clock
import cats.effect.std.Semaphore
import ch.epfl.bluebrain.nexus.delta.kernel.Logger
import ch.epfl.bluebrain.nexus.delta.sourcing.Transactors
import ch.epfl.bluebrain.nexus.delta.sourcing.exporter.Exporter.ExportResult
import ch.epfl.bluebrain.nexus.delta.sourcing.implicits._
import doobie.Fragments
import doobie.implicits._
import doobie.util.fragment.Fragment
import fs2.io.file.{Files, Path}

import java.time.Instant

trait Exporter {

def events(query: ExportEventQuery): IO[ExportResult]

}

object Exporter {

private val logger = Logger[Exporter]

final case class ExportResult(json: Path, success: Path, start: Instant, end: Instant)

def apply(config: ExportConfig, clock: Clock[IO], xas: Transactors): IO[Exporter] =
Semaphore[IO](config.permits.toLong).map(new ExporterImpl(config.target, _, clock, xas))

private class ExporterImpl(rootDirectory: Path, semaphore: Semaphore[IO], clock: Clock[IO], xas: Transactors)
extends Exporter {
override def events(query: ExportEventQuery): IO[ExportResult] = {
val projectFilter = Fragments.orOpt(
query.projects.map { project => sql"(org = ${project.organization} and project = ${project.project})" }
)
val q = asJson(sql"""SELECT *
|FROM public.scoped_events
|${Fragments.whereAndOpt(projectFilter, query.offset.asFragment)}
|ORDER BY ordering
|""".stripMargin)

val exportIO = for {
start <- clock.realTimeInstant
_ <- logger.info(s"Starting export for projects ${query.projects} from offset ${query.offset}")
targetDirectory = rootDirectory / query.id.value
_ <- Files[IO].createDirectory(targetDirectory)
exportFile = targetDirectory / s"$start.json"
_ <- exportToFile(q, exportFile)
end <- clock.realTimeInstant
exportSuccess = targetDirectory / s"$start.success"
_ = println(exportFile)
_ <- Files[IO].createFile(exportSuccess)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This marker file allows to know that the export succeeded.
If we plan to sync those files, this allows to know the state without an additional api call

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this file could contain the input payload + when it started / ended? So that we know what projects are included at a glance

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea to put the query in it.
The starting time is encoded in the name and the end will be the creation time of that file.
Ok to just include the query for now ?

_ <-
logger.info(
s"Export for projects ${query.projects} from offset' ${query.offset}' after ${end.getEpochSecond - start.getEpochSecond} seconds."
)
} yield ExportResult(exportFile, exportSuccess, start, end)

semaphore.permit.use { _ => exportIO }
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We use a semaphore so that exports don't consume all connections

}

private def exportToFile(q: Fragment, targetFile: Path) =
q.query[String]
.stream
.intersperse("\n")
.transact(xas.streaming)
.through(Files[IO].writeUtf8(targetFile))
.compile
.drain

private def asJson(query: Fragment) =
sql"""(select row_to_json(t) from ($query) t)"""
}

}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package ch.epfl.bluebrain.nexus.delta.sourcing

import cats.effect.IO
import cats.syntax.all._
import ch.epfl.bluebrain.nexus.delta.kernel.error.ThrowableValue
import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri
import ch.epfl.bluebrain.nexus.delta.rdf.Vocabulary.{nxv, rdfs, schemas}
Expand All @@ -11,23 +12,39 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.PullRequest.PullRequestCommand._
import ch.epfl.bluebrain.nexus.delta.sourcing.PullRequest.PullRequestEvent.{PullRequestCreated, PullRequestMerged, PullRequestTagged, PullRequestUpdated}
import ch.epfl.bluebrain.nexus.delta.sourcing.PullRequest.PullRequestRejection.{AlreadyExists, NotFound, PullRequestAlreadyClosed}
import ch.epfl.bluebrain.nexus.delta.sourcing.PullRequest.PullRequestState.{PullRequestActive, PullRequestClosed}
import ch.epfl.bluebrain.nexus.delta.sourcing.config.QueryConfig
import ch.epfl.bluebrain.nexus.delta.sourcing.event.Event.ScopedEvent
import ch.epfl.bluebrain.nexus.delta.sourcing.event.ScopedEventStore
import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.{Anonymous, Subject}
import ch.epfl.bluebrain.nexus.delta.sourcing.model.ResourceRef.Latest
import ch.epfl.bluebrain.nexus.delta.sourcing.model._
import ch.epfl.bluebrain.nexus.delta.sourcing.query.RefreshStrategy
import ch.epfl.bluebrain.nexus.delta.sourcing.state.GraphResource
import ch.epfl.bluebrain.nexus.delta.sourcing.state.State.ScopedState
import io.circe.generic.extras.Configuration
import io.circe.generic.extras.semiauto.deriveConfiguredCodec
import io.circe.{Codec, Json}
import doobie.implicits._

import java.time.Instant
import scala.annotation.nowarn

object PullRequest {

type EventStore = ScopedEventStore[Iri, PullRequestEvent]

val entityType: EntityType = EntityType("merge-request")

def eventStore(xas: Transactors, populateWith: PullRequestEvent*): IO[EventStore] = {
val store = ScopedEventStore[Iri, PullRequestEvent](
PullRequest.entityType,
PullRequestEvent.serializer,
QueryConfig(10, RefreshStrategy.Stop),
xas
)
populateWith.traverse(store.unsafeSave).transact(xas.write).as(store)
}

val stateMachine: StateMachine[PullRequestState, PullRequestCommand, PullRequestEvent] =
StateMachine(
None,
Expand Down
Loading