Skip to content

Commit

Permalink
Make migration in coordinator explicit
Browse files Browse the repository at this point in the history
  • Loading branch information
dantb committed Oct 12, 2023
1 parent bd726cc commit 301fc6d
Showing 1 changed file with 13 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.model.ProjectRef
import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Operation.Sink
import ch.epfl.bluebrain.nexus.delta.sourcing.stream._
import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._
import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration
import fs2.Stream
import org.typelevel.log4cats.{Logger => Log4CatsLogger}

Expand Down Expand Up @@ -71,7 +71,7 @@ object GraphAnalyticsCoordinator {
private def start(project: ProjectRef): IO[Unit] =
for {
compiled <- compile(project)
status <- supervisor.describe(compiled.metadata.name): IO[Option[SupervisedDescription]]
status <- migration.toCatsIO(supervisor.describe(compiled.metadata.name))
_ <- status match {
case Some(value) if value.status == ExecutionStatus.Running =>
logger.info(s"Graph analysis of '$project' is already running.")
Expand All @@ -82,17 +82,17 @@ object GraphAnalyticsCoordinator {
private def startGraphAnalysis(compiled: CompiledProjection, project: ProjectRef): IO[ExecutionStatus] =
for {
_ <- logger.info(s"Starting graph analysis of '$project'...")
status <- supervisor.run(compiled, createIndex(project))
status <- migration.toCatsIO(supervisor.run(compiled, migration.toMonixBIO(createIndex(project))))
} yield status
// Destroy the analysis for the given project and deletes the related Elasticsearch index
private def destroy(project: ProjectRef): IO[Unit] = {
logger.info(s"Project '$project' has been marked as deleted, stopping the graph analysis...") >>
supervisor
migration.toCatsIO(supervisor
.destroy(
projectionName(project),
deleteIndex(project)
migration.toMonixBIO(deleteIndex(project))
)
.void
.void)
}
}
final val id = nxv + "graph-analytics"
Expand Down Expand Up @@ -128,7 +128,7 @@ object GraphAnalyticsCoordinator {
): IO[GraphAnalyticsCoordinator] =
if (config.indexingEnabled) {
val coordinator = apply(
projects.states(_).map(_.map { p => ProjectDef(p.project, p.markedForDeletion) }).translate(taskToIoK),
projects.states(_).map(_.map { p => ProjectDef(p.project, p.markedForDeletion) }).translate(migration.taskToIoK),
analyticsStream,
supervisor,
ref =>
Expand All @@ -140,14 +140,14 @@ object GraphAnalyticsCoordinator {
),
ref =>
graphAnalyticsMappings.flatMap { mappings =>
client.createIndex(index(config.prefix, ref), Some(mappings), None)
migration.toCatsIO(client.createIndex(index(config.prefix, ref), Some(mappings), None))
}.void,
ref => client.deleteIndex(index(config.prefix, ref)).void
ref => migration.toCatsIO(client.deleteIndex(index(config.prefix, ref)).void)
)

for {
script <- scriptContent
_ <- client.createScript(updateRelationshipsScriptId, script)
_ <- migration.toCatsIO(client.createScript(updateRelationshipsScriptId, script))
c <- coordinator
} yield c
} else {
Expand All @@ -163,14 +163,14 @@ object GraphAnalyticsCoordinator {
): IO[GraphAnalyticsCoordinator] = {
val coordinator =
new Active(fetchProjects, analyticsStream, supervisor, sink, createIndex, deleteIndex)
supervisor
migration.toCatsIO(supervisor
.run(
CompiledProjection.fromStream(
metadata,
ExecutionStrategy.EveryNode,
coordinator.run(_).translate(ioToUioK)
coordinator.run(_).translate(migration.ioToTaskK)
)
)
.as(coordinator)
.as(coordinator))
}
}

0 comments on commit 301fc6d

Please sign in to comment.