Skip to content

Commit

Permalink
Refactor ship before introducing save summary feature
Browse files Browse the repository at this point in the history
  • Loading branch information
Simon Dumas committed Apr 15, 2024
1 parent 7470269 commit 9c5f811
Show file tree
Hide file tree
Showing 21 changed files with 349 additions and 296 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,27 @@ import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.DigestAlgori
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.client.S3StorageClient
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.permissions
import ch.epfl.bluebrain.nexus.testkit.minio.LocalStackS3
import fs2.aws.s3.models.Models.BucketName
import fs2.io.file.Path
import io.laserdisc.pure.s3.tagless.S3AsyncClientOp
import munit.CatsEffectSuite
import munit.catseffect.IOFixture
import org.testcontainers.containers.localstack.LocalStackContainer.Service
import software.amazon.awssdk.services.s3.model.{CreateBucketRequest, PutObjectRequest, PutObjectResponse}

import java.nio.file.Paths

object LocalStackS3StorageClient {
val ServiceType = Service.S3

def uploadFileToS3(s3Client: S3AsyncClientOp[IO], bucket: BucketName, path: Path): IO[PutObjectResponse] = {
s3Client.createBucket(CreateBucketRequest.builder().bucket(bucket.value.value).build) >>
s3Client.putObject(
PutObjectRequest.builder.bucket(bucket.value.value).key(path.toString).build,
Paths.get(getClass.getResource(path.toString).toURI)
)
}

def s3StorageClientResource(): Resource[IO, (S3StorageClient, S3AsyncClientOp[IO], S3StorageConfig)] =
LocalStackS3.localstackS3().flatMap { localstack =>
LocalStackS3.fs2ClientFromLocalstack(localstack).map { client =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.model.ProjectRef
import ch.epfl.bluebrain.nexus.delta.sourcing.{Execute, Transactors}
import ch.epfl.bluebrain.nexus.testkit.mu.NexusSuite
import ch.epfl.bluebrain.nexus.testkit.postgres.PostgresContainer
import doobie.Fragment
import doobie.implicits._
import doobie.postgres.sqlstate
import munit.Location
import munit.catseffect.IOFixture
import munit.{catseffect, Location}
import munit.catseffect.ResourceFixture.FixtureNotInstantiatedException
import org.postgresql.util.PSQLException

object Doobie {
Expand All @@ -34,9 +36,38 @@ object Doobie {
transactors(PostgresContainer.resource(user, pass), user, pass)

trait Fixture { self: NexusSuite =>
val doobie: catseffect.IOFixture[Transactors] =

val doobie: IOFixture[Transactors] =
ResourceSuiteLocalFixture("doobie", resource(PostgresUser, PostgresPassword))

val doobieTruncateAfterTest: IOFixture[Transactors] = new IOFixture[Transactors]("doobie") {
@volatile var value: Option[(Transactors, IO[Unit])] = None

def apply(): Transactors = value match {
case Some(v) => v._1
case None => throw new FixtureNotInstantiatedException(fixtureName)
}

def xas: Transactors = apply()

override def beforeAll(): IO[Unit] = resource(PostgresUser, PostgresPassword).allocated.flatMap { value =>
IO(this.value = Some(value))
}

override def afterAll(): IO[Unit] = value.fold(IO.unit)(_._2)

override def afterEach(context: AfterEach): IO[Unit] =
for {
allTables <- sql"""SELECT table_name from information_schema.tables WHERE table_schema = 'public'"""
.query[String]
.to[List]
.transact(xas.read)
_ <- allTables
.traverse { table => Fragment.const(s"""TRUNCATE $table""").update.run.transact(xas.write) }
.onError(IO.println)
} yield ()
}

def doobieInject[A](f: Transactors => IO[A]): IOFixture[(Transactors, A)] =
ResourceSuiteLocalFixture(
s"doobie",
Expand Down
53 changes: 28 additions & 25 deletions ship/src/main/resources/ship-default.conf
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
ship {
base-uri = "http://localhost:8080/v1"

database {
read = ${ship.database.access}
Expand Down Expand Up @@ -39,36 +38,40 @@ ship {
import-bucket = "nexus-ship-import"
}

event-log {
query-config = {
batch-size = 30
refresh-strategy = 3s
}
max-duration = 14 seconds
}
input {
base-uri = "http://localhost:8080/v1"

organizations {
values {
# organization example
#obp = "The Open Brain Platform Organization"
event-log {
query-config = {
batch-size = 30
refresh-strategy = 3s
}
max-duration = 14 seconds
}
}

view-defaults {
elasticsearch {
name = "Default Elasticsearch view"
description = "An Elasticsearch view of all resources in the project."
view-defaults {
elasticsearch {
name = "Default Elasticsearch view"
description = "An Elasticsearch view of all resources in the project."
}

blazegraph {
name = "Default Sparql view"
description = "A Sparql view of all resources in the project."
}
}

blazegraph {
name = "Default Sparql view"
description = "A Sparql view of all resources in the project."
organizations {
values {
# organization example
#obp = "The Open Brain Platform Organization"
}
}
}

# Service account configuration for internal operations
service-account {
subject: "delta"
realm: "internal"
# Service account configuration for internal operations
service-account {
subject: "delta"
realm: "internal"
}
}
}
32 changes: 32 additions & 0 deletions ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/InitShip.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package ch.epfl.bluebrain.nexus.ship

import cats.effect.IO
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.client.S3StorageClient
import ch.epfl.bluebrain.nexus.delta.sourcing.exporter.RowEvent
import ch.epfl.bluebrain.nexus.ship.ShipCommand.RunCommand
import ch.epfl.bluebrain.nexus.ship.config.ShipConfig
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider

object InitShip {

def apply(run: RunCommand): IO[(ShipConfig, fs2.Stream[IO, RowEvent])] = run.mode match {
case RunMode.Local =>
val eventsStream = EventStreamer.localStreamer.stream(run.path, run.offset)
ShipConfig.load(run.config).map(_ -> eventsStream)
case RunMode.S3 =>
for {
localConfig <- ShipConfig.load(None)
s3Config = localConfig.s3
(config, eventsStream) <-
S3StorageClient.resource(s3Config.endpoint, DefaultCredentialsProvider.create()).use { client =>
val eventsStream = EventStreamer.s3eventStreamer(client, s3Config.importBucket).stream(run.path, run.offset)
val config = run.config match {
case Some(configPath) => ShipConfig.loadFromS3(client, s3Config.importBucket, configPath)
case None => IO.pure(localConfig)
}
config.map(_ -> eventsStream)
}
} yield (config, eventsStream)
}

}
44 changes: 19 additions & 25 deletions ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/Main.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@ package ch.epfl.bluebrain.nexus.ship
import cats.effect.{ExitCode, IO}
import cats.syntax.all._
import ch.epfl.bluebrain.nexus.delta.kernel.Logger
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.client.S3StorageClient
import ch.epfl.bluebrain.nexus.delta.ship.BuildInfo
import ch.epfl.bluebrain.nexus.delta.sourcing.Transactors
import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset
import ch.epfl.bluebrain.nexus.ship.ShipCommand._
import ch.epfl.bluebrain.nexus.ship.config.ShipConfig
import com.monovore.decline.Opts
import com.monovore.decline.effect.CommandIOApp
import fs2.io.file.Path
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider

object Main
extends CommandIOApp(
Expand All @@ -34,46 +34,40 @@ object Main
.map(Offset.at)
.withDefault(Offset.start)

private val s3: Opts[Boolean] =
Opts.flag("s3", help = "Run the import from an S3 bucket").orFalse
private val runMode: Opts[RunMode] =
Opts.flag("s3", help = "Run the import from an S3 bucket").orFalse.map {
case true => RunMode.S3
case false => RunMode.Local
}

private val run = Opts.subcommand("run", "Run an import") {
(inputPath, configFile, offset, s3).mapN(Run)
(inputPath, configFile, offset, runMode).mapN(RunCommand)
}

private val showConfig = Opts.subcommand("config", "Show reconciled config") {
configFile.map(ShowConfig)
configFile.map(ShowConfigCommand)
}

override def main: Opts[IO[ExitCode]] =
(run orElse showConfig)
.map {
case Run(path, config, offset, false) =>
RunShip.localShip.run(path, config, offset)
case Run(path, config, offset, true) =>
ShipConfig.load(config).flatMap { cfg =>
s3client(cfg).use { client =>
RunShip.s3Ship(client, cfg.S3.importBucket).run(path, config, offset)
}
}
case ShowConfig(config) => showConfig(config)
case r: RunCommand => run(r)
case ShowConfigCommand(config) => showConfig(config)
}
.map(_.as(ExitCode.Success))

private[ship] def run(r: RunCommand) =
for {
(config, eventsStream) <- InitShip(r)
_ <- Transactors.init(config.database).use { xas =>
RunShip(eventsStream, config.input, xas)
}
} yield ()

private[ship] def showConfig(config: Option[Path]) =
for {
_ <- logger.info(s"Showing reconciled config")
config <- ShipConfig.merge(config).map(_._2)
_ <- logger.info(config.root().render())
} yield ()

private def s3client(config: ShipConfig) =
S3StorageClient.resource(config.S3.endpoint, DefaultCredentialsProvider.create())

sealed private trait Command

final private case class Run(path: Path, config: Option[Path], offset: Offset, s3: Boolean) extends Command

final private case class ShowConfig(config: Option[Path]) extends Command

}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package ch.epfl.bluebrain.nexus.ship

import ch.epfl.bluebrain.nexus.delta.sourcing.model.ProjectRef
import ch.epfl.bluebrain.nexus.ship.config.ShipConfig.ProjectMapping
import ch.epfl.bluebrain.nexus.ship.config.InputConfig.ProjectMapping

trait ProjectMapper {
def map(project: ProjectRef): ProjectRef
Expand Down
11 changes: 11 additions & 0 deletions ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/RunMode.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package ch.epfl.bluebrain.nexus.ship

sealed trait RunMode extends Product with Serializable

object RunMode {

final case object Local extends RunMode

final case object S3 extends RunMode

}
Loading

0 comments on commit 9c5f811

Please sign in to comment.