-
Notifications
You must be signed in to change notification settings - Fork 74
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add the ability to export events for a subset of projects and from a starting offset #4701
Changes from 2 commits
335779c
38c6dc2
2a930af
37c5af9
46b3339
f468410
8df40f3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,40 @@ | ||
package ch.epfl.bluebrain.nexus.delta.routes | ||
|
||
import akka.http.scaladsl.model.StatusCodes | ||
import akka.http.scaladsl.server.Route | ||
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.RemoteContextResolution | ||
import ch.epfl.bluebrain.nexus.delta.rdf.utils.JsonKeyOrdering | ||
import ch.epfl.bluebrain.nexus.delta.sdk.acls.AclCheck | ||
import ch.epfl.bluebrain.nexus.delta.sdk.acls.model.AclAddress | ||
import ch.epfl.bluebrain.nexus.delta.sdk.circe.CirceUnmarshalling | ||
import ch.epfl.bluebrain.nexus.delta.sdk.directives.AuthDirectives | ||
import ch.epfl.bluebrain.nexus.delta.sdk.directives.DeltaDirectives._ | ||
import ch.epfl.bluebrain.nexus.delta.sdk.directives.UriDirectives.baseUriPrefix | ||
import ch.epfl.bluebrain.nexus.delta.sdk.identities.Identities | ||
import ch.epfl.bluebrain.nexus.delta.sdk.model.BaseUri | ||
import ch.epfl.bluebrain.nexus.delta.sdk.permissions.Permissions | ||
import ch.epfl.bluebrain.nexus.delta.sourcing.exporter.{ExportEventQuery, Exporter} | ||
|
||
class ExportRoutes(identities: Identities, aclCheck: AclCheck, exporter: Exporter)(implicit | ||
baseUri: BaseUri, | ||
cr: RemoteContextResolution, | ||
ordering: JsonKeyOrdering | ||
) extends AuthDirectives(identities, aclCheck) | ||
with CirceUnmarshalling { | ||
|
||
def routes: Route = | ||
baseUriPrefix(baseUri.prefix) { | ||
pathPrefix("export") { | ||
pathPrefix("events") { | ||
extractCaller { implicit caller => | ||
(post & pathEndOrSingleSlash & entity(as[ExportEventQuery])) { query => | ||
authorizeFor(AclAddress.Root, Permissions.exporter.run).apply { | ||
emit(StatusCodes.Accepted, exporter.events(query).start.void) | ||
} | ||
} | ||
} | ||
} | ||
} | ||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,47 @@ | ||
package ch.epfl.bluebrain.nexus.delta.wiring | ||
|
||
import cats.effect.{Clock, IO} | ||
import ch.epfl.bluebrain.nexus.delta.Main.pluginsMaxPriority | ||
import ch.epfl.bluebrain.nexus.delta.config.AppConfig | ||
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.RemoteContextResolution | ||
import ch.epfl.bluebrain.nexus.delta.rdf.utils.JsonKeyOrdering | ||
import ch.epfl.bluebrain.nexus.delta.routes.ExportRoutes | ||
import ch.epfl.bluebrain.nexus.delta.sdk.PriorityRoute | ||
import ch.epfl.bluebrain.nexus.delta.sdk.acls.AclCheck | ||
import ch.epfl.bluebrain.nexus.delta.sdk.identities.Identities | ||
import ch.epfl.bluebrain.nexus.delta.sourcing.Transactors | ||
import ch.epfl.bluebrain.nexus.delta.sourcing.exporter.Exporter | ||
import izumi.distage.model.definition.{Id, ModuleDef} | ||
|
||
/** | ||
* Version module wiring config. | ||
*/ | ||
olivergrabinski marked this conversation as resolved.
Show resolved
Hide resolved
|
||
// $COVERAGE-OFF$ | ||
object ExportModule extends ModuleDef { | ||
|
||
make[Exporter].fromEffect { (config: AppConfig, clock: Clock[IO], xas: Transactors) => | ||
Exporter(config.`export`, clock, xas) | ||
} | ||
|
||
make[ExportRoutes].from { | ||
( | ||
cfg: AppConfig, | ||
identities: Identities, | ||
aclCheck: AclCheck, | ||
exporter: Exporter, | ||
cr: RemoteContextResolution @Id("aggregate"), | ||
ordering: JsonKeyOrdering | ||
) => | ||
new ExportRoutes(identities, aclCheck, exporter)( | ||
cfg.http.baseUri, | ||
cr, | ||
ordering | ||
) | ||
} | ||
|
||
many[PriorityRoute].add { (route: ExportRoutes) => | ||
PriorityRoute(pluginsMaxPriority + 1, route.routes, requiresStrictEntity = true) | ||
} | ||
|
||
} | ||
// $COVERAGE-ON$ |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,63 @@ | ||
package ch.epfl.bluebrain.nexus.delta.routes | ||
|
||
import akka.http.scaladsl.model.StatusCodes | ||
import akka.http.scaladsl.model.headers.OAuth2BearerToken | ||
import akka.http.scaladsl.server.Route | ||
import cats.effect.{IO, Ref} | ||
import ch.epfl.bluebrain.nexus.delta.sdk.acls.AclSimpleCheck | ||
import ch.epfl.bluebrain.nexus.delta.sdk.acls.model.AclAddress.Root | ||
import ch.epfl.bluebrain.nexus.delta.sdk.identities.IdentitiesDummy | ||
import ch.epfl.bluebrain.nexus.delta.sdk.identities.model.Caller | ||
import ch.epfl.bluebrain.nexus.delta.sdk.permissions.Permissions | ||
import ch.epfl.bluebrain.nexus.delta.sdk.utils.BaseRouteSpec | ||
import ch.epfl.bluebrain.nexus.delta.sourcing.exporter.Exporter.ExportResult | ||
import ch.epfl.bluebrain.nexus.delta.sourcing.exporter.{ExportEventQuery, Exporter} | ||
import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.{Anonymous, Authenticated, Group} | ||
import fs2.io.file.Path | ||
|
||
import java.time.Instant | ||
|
||
class ExportRoutesSpec extends BaseRouteSpec { | ||
|
||
private val caller = Caller(alice, Set(alice, Anonymous, Authenticated(realm), Group("group", realm))) | ||
|
||
private val identities = IdentitiesDummy(caller) | ||
|
||
private val asAlice = addCredentials(OAuth2BearerToken("alice")) | ||
|
||
private val exportTrigger = Ref.unsafe[IO, Boolean](false) | ||
|
||
private val aclCheck = AclSimpleCheck((alice, Root, Set(Permissions.exporter.run))).accepted | ||
|
||
private val exporter = new Exporter { | ||
override def events(query: ExportEventQuery): IO[ExportResult] = | ||
exportTrigger.set(true).as(ExportResult(Path("json"), Path("Success"), Instant.EPOCH, Instant.EPOCH)) | ||
} | ||
|
||
private lazy val routes = Route.seal( | ||
new ExportRoutes( | ||
identities, | ||
aclCheck, | ||
exporter | ||
).routes | ||
) | ||
|
||
"The export route" should { | ||
val query = | ||
json"""{ "id": "export-test", "projects": ["org/proj", "org/proj2"], "offset": {"@type": "At", "value": 2} }""" | ||
"fail triggering the export the 'export/run' permission" in { | ||
Post("/v1/export/events", query.toEntity) ~> routes ~> check { | ||
response.shouldBeForbidden | ||
exportTrigger.get.accepted shouldEqual false | ||
} | ||
} | ||
|
||
"trigger the 'export/run' permission" in { | ||
Post("/v1/export/events", query.toEntity) ~> asAlice ~> routes ~> check { | ||
response.status shouldEqual StatusCodes.Accepted | ||
exportTrigger.get.accepted shouldEqual true | ||
} | ||
} | ||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,20 @@ | ||
package ch.epfl.bluebrain.nexus.delta.sourcing.exporter | ||
|
||
import fs2.io.file.Path | ||
import pureconfig.ConfigConvert.catchReadError | ||
import pureconfig.{ConfigConvert, ConfigReader} | ||
import pureconfig.generic.semiauto.deriveReader | ||
|
||
import scala.annotation.nowarn | ||
|
||
final case class ExportConfig(permits: Int, target: Path) | ||
|
||
object ExportConfig { | ||
|
||
@nowarn("cat=unused") | ||
implicit final val databaseConfigReader: ConfigReader[ExportConfig] = { | ||
implicit val pathConverter: ConfigReader[Path] = ConfigConvert.viaString(catchReadError(s => Path(s)), _.toString) | ||
deriveReader[ExportConfig] | ||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,19 @@ | ||
package ch.epfl.bluebrain.nexus.delta.sourcing.exporter | ||
|
||
import cats.data.NonEmptyList | ||
import ch.epfl.bluebrain.nexus.delta.sourcing.model.{Label, ProjectRef} | ||
import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset | ||
import io.circe.Decoder | ||
import io.circe.generic.extras.Configuration | ||
import io.circe.generic.extras.semiauto.deriveConfiguredDecoder | ||
|
||
import scala.annotation.nowarn | ||
|
||
final case class ExportEventQuery(id: Label, projects: NonEmptyList[ProjectRef], offset: Offset) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is |
||
|
||
object ExportEventQuery { | ||
|
||
@nowarn("cat=unused") | ||
implicit private val config: Configuration = Configuration.default.withStrictDecoding | ||
implicit val exportQueryDecoder: Decoder[ExportEventQuery] = deriveConfiguredDecoder[ExportEventQuery] | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,77 @@ | ||
package ch.epfl.bluebrain.nexus.delta.sourcing.exporter | ||
|
||
import cats.effect.IO | ||
import cats.effect.kernel.Clock | ||
import cats.effect.std.Semaphore | ||
import ch.epfl.bluebrain.nexus.delta.kernel.Logger | ||
import ch.epfl.bluebrain.nexus.delta.sourcing.Transactors | ||
import ch.epfl.bluebrain.nexus.delta.sourcing.exporter.Exporter.ExportResult | ||
import ch.epfl.bluebrain.nexus.delta.sourcing.implicits._ | ||
import doobie.Fragments | ||
import doobie.implicits._ | ||
import doobie.util.fragment.Fragment | ||
import fs2.io.file.{Files, Path} | ||
|
||
import java.time.Instant | ||
|
||
trait Exporter { | ||
|
||
def events(query: ExportEventQuery): IO[ExportResult] | ||
|
||
} | ||
|
||
object Exporter { | ||
|
||
private val logger = Logger[Exporter] | ||
|
||
final case class ExportResult(json: Path, success: Path, start: Instant, end: Instant) | ||
|
||
def apply(config: ExportConfig, clock: Clock[IO], xas: Transactors): IO[Exporter] = | ||
Semaphore[IO](config.permits.toLong).map(new ExporterImpl(config.target, _, clock, xas)) | ||
|
||
private class ExporterImpl(rootDirectory: Path, semaphore: Semaphore[IO], clock: Clock[IO], xas: Transactors) | ||
extends Exporter { | ||
override def events(query: ExportEventQuery): IO[ExportResult] = { | ||
val projectFilter = Fragments.orOpt( | ||
query.projects.map { project => sql"(org = ${project.organization} and project = ${project.project})" } | ||
) | ||
val q = asJson(sql"""SELECT * | ||
|FROM public.scoped_events | ||
|${Fragments.whereAndOpt(projectFilter, query.offset.asFragment)} | ||
|ORDER BY ordering | ||
|""".stripMargin) | ||
|
||
val exportIO = for { | ||
start <- clock.realTimeInstant | ||
_ <- logger.info(s"Starting export for projects ${query.projects} from offset ${query.offset}") | ||
targetDirectory = rootDirectory / query.id.value | ||
_ <- Files[IO].createDirectory(targetDirectory) | ||
exportFile = targetDirectory / s"$start.json" | ||
_ <- exportToFile(q, exportFile) | ||
end <- clock.realTimeInstant | ||
exportSuccess = targetDirectory / s"$start.success" | ||
_ = println(exportFile) | ||
_ <- Files[IO].createFile(exportSuccess) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This marker file allows to know that the export succeeded. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe this file could contain the input payload + when it started / ended? So that we know what projects are included at a glance There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good idea to put the query in it. |
||
_ <- | ||
logger.info( | ||
s"Export for projects ${query.projects} from offset' ${query.offset}' after ${end.getEpochSecond - start.getEpochSecond} seconds." | ||
) | ||
} yield ExportResult(exportFile, exportSuccess, start, end) | ||
|
||
semaphore.permit.use { _ => exportIO } | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We use a semaphore so that exports don't consume all connections |
||
} | ||
|
||
private def exportToFile(q: Fragment, targetFile: Path) = | ||
q.query[String] | ||
.stream | ||
.intersperse("\n") | ||
.transact(xas.streaming) | ||
.through(Files[IO].writeUtf8(targetFile)) | ||
.compile | ||
.drain | ||
|
||
private def asJson(query: Fragment) = | ||
sql"""(select row_to_json(t) from ($query) t)""" | ||
} | ||
|
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might be interesting later to have a way of knowing a failure in the export occurred?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, to be discussed like reporting in general