From 9c5f811bce72cef923d1355be54aca539b6ae320 Mon Sep 17 00:00:00 2001 From: Simon Dumas Date: Mon, 15 Apr 2024 18:29:29 +0200 Subject: [PATCH] Refactor ship before introducing save summary feature --- .../s3/LocalStackS3StorageClient.scala | 13 ++ .../delta/sourcing/postgres/Doobie.scala | 35 ++++- ship/src/main/resources/ship-default.conf | 53 ++++---- .../epfl/bluebrain/nexus/ship/InitShip.scala | 32 +++++ .../ch/epfl/bluebrain/nexus/ship/Main.scala | 44 +++---- .../bluebrain/nexus/ship/ProjectMapper.scala | 2 +- .../epfl/bluebrain/nexus/ship/RunMode.scala | 11 ++ .../epfl/bluebrain/nexus/ship/RunShip.scala | 94 ++++---------- .../bluebrain/nexus/ship/ShipCommand.scala | 14 ++ .../nexus/ship/config/InputConfig.scala | 32 +++++ .../nexus/ship/config/ShipConfig.scala | 36 ++---- .../ship/projects/ProjectProcessor.scala | 4 +- .../nexus/ship/views/ViewWiring.scala | 6 +- ship/src/test/resources/config/external.conf | 4 +- .../config/project-mapping-sscx.conf | 5 - .../resources/config/project-mapping.conf | 6 +- .../bluebrain/nexus/ship/RunShipSuite.scala | 120 ++++++------------ .../bluebrain/nexus/ship/S3RunShipSuite.scala | 69 +++------- .../nexus/ship/ShipIntegrationSpec.scala | 5 +- .../ship/config/ShipConfigFixtures.scala | 29 +++++ .../nexus/ship/config/ShipConfigSuite.scala | 31 ++++- 21 files changed, 349 insertions(+), 296 deletions(-) create mode 100644 ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/InitShip.scala create mode 100644 ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/RunMode.scala create mode 100644 ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/ShipCommand.scala create mode 100644 ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/config/InputConfig.scala delete mode 100644 ship/src/test/resources/config/project-mapping-sscx.conf create mode 100644 ship/src/test/scala/ch/epfl/bluebrain/nexus/ship/config/ShipConfigFixtures.scala diff --git a/delta/plugins/storage/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/LocalStackS3StorageClient.scala b/delta/plugins/storage/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/LocalStackS3StorageClient.scala index 6804d4892d..41778c02c3 100644 --- a/delta/plugins/storage/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/LocalStackS3StorageClient.scala +++ b/delta/plugins/storage/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/LocalStackS3StorageClient.scala @@ -8,14 +8,27 @@ import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.DigestAlgori import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.client.S3StorageClient import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.permissions import ch.epfl.bluebrain.nexus.testkit.minio.LocalStackS3 +import fs2.aws.s3.models.Models.BucketName +import fs2.io.file.Path import io.laserdisc.pure.s3.tagless.S3AsyncClientOp import munit.CatsEffectSuite import munit.catseffect.IOFixture import org.testcontainers.containers.localstack.LocalStackContainer.Service +import software.amazon.awssdk.services.s3.model.{CreateBucketRequest, PutObjectRequest, PutObjectResponse} + +import java.nio.file.Paths object LocalStackS3StorageClient { val ServiceType = Service.S3 + def uploadFileToS3(s3Client: S3AsyncClientOp[IO], bucket: BucketName, path: Path): IO[PutObjectResponse] = { + s3Client.createBucket(CreateBucketRequest.builder().bucket(bucket.value.value).build) >> + s3Client.putObject( + PutObjectRequest.builder.bucket(bucket.value.value).key(path.toString).build, + Paths.get(getClass.getResource(path.toString).toURI) + ) + } + def s3StorageClientResource(): Resource[IO, (S3StorageClient, S3AsyncClientOp[IO], S3StorageConfig)] = LocalStackS3.localstackS3().flatMap { localstack => LocalStackS3.fs2ClientFromLocalstack(localstack).map { client => diff --git a/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/postgres/Doobie.scala b/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/postgres/Doobie.scala index a87e5cb068..44ed28670c 100644 --- a/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/postgres/Doobie.scala +++ b/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/postgres/Doobie.scala @@ -6,10 +6,12 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.model.ProjectRef import ch.epfl.bluebrain.nexus.delta.sourcing.{Execute, Transactors} import ch.epfl.bluebrain.nexus.testkit.mu.NexusSuite import ch.epfl.bluebrain.nexus.testkit.postgres.PostgresContainer +import doobie.Fragment import doobie.implicits._ import doobie.postgres.sqlstate +import munit.Location import munit.catseffect.IOFixture -import munit.{catseffect, Location} +import munit.catseffect.ResourceFixture.FixtureNotInstantiatedException import org.postgresql.util.PSQLException object Doobie { @@ -34,9 +36,38 @@ object Doobie { transactors(PostgresContainer.resource(user, pass), user, pass) trait Fixture { self: NexusSuite => - val doobie: catseffect.IOFixture[Transactors] = + + val doobie: IOFixture[Transactors] = ResourceSuiteLocalFixture("doobie", resource(PostgresUser, PostgresPassword)) + val doobieTruncateAfterTest: IOFixture[Transactors] = new IOFixture[Transactors]("doobie") { + @volatile var value: Option[(Transactors, IO[Unit])] = None + + def apply(): Transactors = value match { + case Some(v) => v._1 + case None => throw new FixtureNotInstantiatedException(fixtureName) + } + + def xas: Transactors = apply() + + override def beforeAll(): IO[Unit] = resource(PostgresUser, PostgresPassword).allocated.flatMap { value => + IO(this.value = Some(value)) + } + + override def afterAll(): IO[Unit] = value.fold(IO.unit)(_._2) + + override def afterEach(context: AfterEach): IO[Unit] = + for { + allTables <- sql"""SELECT table_name from information_schema.tables WHERE table_schema = 'public'""" + .query[String] + .to[List] + .transact(xas.read) + _ <- allTables + .traverse { table => Fragment.const(s"""TRUNCATE $table""").update.run.transact(xas.write) } + .onError(IO.println) + } yield () + } + def doobieInject[A](f: Transactors => IO[A]): IOFixture[(Transactors, A)] = ResourceSuiteLocalFixture( s"doobie", diff --git a/ship/src/main/resources/ship-default.conf b/ship/src/main/resources/ship-default.conf index d233502cf9..4563409c96 100644 --- a/ship/src/main/resources/ship-default.conf +++ b/ship/src/main/resources/ship-default.conf @@ -1,5 +1,4 @@ ship { - base-uri = "http://localhost:8080/v1" database { read = ${ship.database.access} @@ -39,36 +38,40 @@ ship { import-bucket = "nexus-ship-import" } - event-log { - query-config = { - batch-size = 30 - refresh-strategy = 3s - } - max-duration = 14 seconds - } + input { + base-uri = "http://localhost:8080/v1" - organizations { - values { - # organization example - #obp = "The Open Brain Platform Organization" + event-log { + query-config = { + batch-size = 30 + refresh-strategy = 3s + } + max-duration = 14 seconds } - } - view-defaults { - elasticsearch { - name = "Default Elasticsearch view" - description = "An Elasticsearch view of all resources in the project." + view-defaults { + elasticsearch { + name = "Default Elasticsearch view" + description = "An Elasticsearch view of all resources in the project." + } + + blazegraph { + name = "Default Sparql view" + description = "A Sparql view of all resources in the project." + } } - blazegraph { - name = "Default Sparql view" - description = "A Sparql view of all resources in the project." + organizations { + values { + # organization example + #obp = "The Open Brain Platform Organization" + } } - } - # Service account configuration for internal operations - service-account { - subject: "delta" - realm: "internal" + # Service account configuration for internal operations + service-account { + subject: "delta" + realm: "internal" + } } } \ No newline at end of file diff --git a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/InitShip.scala b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/InitShip.scala new file mode 100644 index 0000000000..fc418742e1 --- /dev/null +++ b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/InitShip.scala @@ -0,0 +1,32 @@ +package ch.epfl.bluebrain.nexus.ship + +import cats.effect.IO +import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.client.S3StorageClient +import ch.epfl.bluebrain.nexus.delta.sourcing.exporter.RowEvent +import ch.epfl.bluebrain.nexus.ship.ShipCommand.RunCommand +import ch.epfl.bluebrain.nexus.ship.config.ShipConfig +import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider + +object InitShip { + + def apply(run: RunCommand): IO[(ShipConfig, fs2.Stream[IO, RowEvent])] = run.mode match { + case RunMode.Local => + val eventsStream = EventStreamer.localStreamer.stream(run.path, run.offset) + ShipConfig.load(run.config).map(_ -> eventsStream) + case RunMode.S3 => + for { + localConfig <- ShipConfig.load(None) + s3Config = localConfig.s3 + (config, eventsStream) <- + S3StorageClient.resource(s3Config.endpoint, DefaultCredentialsProvider.create()).use { client => + val eventsStream = EventStreamer.s3eventStreamer(client, s3Config.importBucket).stream(run.path, run.offset) + val config = run.config match { + case Some(configPath) => ShipConfig.loadFromS3(client, s3Config.importBucket, configPath) + case None => IO.pure(localConfig) + } + config.map(_ -> eventsStream) + } + } yield (config, eventsStream) + } + +} diff --git a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/Main.scala b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/Main.scala index 2feee3ae3b..745d341610 100644 --- a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/Main.scala +++ b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/Main.scala @@ -3,14 +3,14 @@ package ch.epfl.bluebrain.nexus.ship import cats.effect.{ExitCode, IO} import cats.syntax.all._ import ch.epfl.bluebrain.nexus.delta.kernel.Logger -import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.client.S3StorageClient import ch.epfl.bluebrain.nexus.delta.ship.BuildInfo +import ch.epfl.bluebrain.nexus.delta.sourcing.Transactors import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset +import ch.epfl.bluebrain.nexus.ship.ShipCommand._ import ch.epfl.bluebrain.nexus.ship.config.ShipConfig import com.monovore.decline.Opts import com.monovore.decline.effect.CommandIOApp import fs2.io.file.Path -import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider object Main extends CommandIOApp( @@ -34,46 +34,40 @@ object Main .map(Offset.at) .withDefault(Offset.start) - private val s3: Opts[Boolean] = - Opts.flag("s3", help = "Run the import from an S3 bucket").orFalse + private val runMode: Opts[RunMode] = + Opts.flag("s3", help = "Run the import from an S3 bucket").orFalse.map { + case true => RunMode.S3 + case false => RunMode.Local + } private val run = Opts.subcommand("run", "Run an import") { - (inputPath, configFile, offset, s3).mapN(Run) + (inputPath, configFile, offset, runMode).mapN(RunCommand) } private val showConfig = Opts.subcommand("config", "Show reconciled config") { - configFile.map(ShowConfig) + configFile.map(ShowConfigCommand) } override def main: Opts[IO[ExitCode]] = (run orElse showConfig) .map { - case Run(path, config, offset, false) => - RunShip.localShip.run(path, config, offset) - case Run(path, config, offset, true) => - ShipConfig.load(config).flatMap { cfg => - s3client(cfg).use { client => - RunShip.s3Ship(client, cfg.S3.importBucket).run(path, config, offset) - } - } - case ShowConfig(config) => showConfig(config) + case r: RunCommand => run(r) + case ShowConfigCommand(config) => showConfig(config) } .map(_.as(ExitCode.Success)) + private[ship] def run(r: RunCommand) = + for { + (config, eventsStream) <- InitShip(r) + _ <- Transactors.init(config.database).use { xas => + RunShip(eventsStream, config.input, xas) + } + } yield () + private[ship] def showConfig(config: Option[Path]) = for { _ <- logger.info(s"Showing reconciled config") config <- ShipConfig.merge(config).map(_._2) _ <- logger.info(config.root().render()) } yield () - - private def s3client(config: ShipConfig) = - S3StorageClient.resource(config.S3.endpoint, DefaultCredentialsProvider.create()) - - sealed private trait Command - - final private case class Run(path: Path, config: Option[Path], offset: Offset, s3: Boolean) extends Command - - final private case class ShowConfig(config: Option[Path]) extends Command - } diff --git a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/ProjectMapper.scala b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/ProjectMapper.scala index c3fc1f4629..d6a9b709f0 100644 --- a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/ProjectMapper.scala +++ b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/ProjectMapper.scala @@ -1,7 +1,7 @@ package ch.epfl.bluebrain.nexus.ship import ch.epfl.bluebrain.nexus.delta.sourcing.model.ProjectRef -import ch.epfl.bluebrain.nexus.ship.config.ShipConfig.ProjectMapping +import ch.epfl.bluebrain.nexus.ship.config.InputConfig.ProjectMapping trait ProjectMapper { def map(project: ProjectRef): ProjectRef diff --git a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/RunMode.scala b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/RunMode.scala new file mode 100644 index 0000000000..31d97d1ae9 --- /dev/null +++ b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/RunMode.scala @@ -0,0 +1,11 @@ +package ch.epfl.bluebrain.nexus.ship + +sealed trait RunMode extends Product with Serializable + +object RunMode { + + final case object Local extends RunMode + + final case object S3 extends RunMode + +} diff --git a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/RunShip.scala b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/RunShip.scala index 34038bab37..1324a30e48 100644 --- a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/RunShip.scala +++ b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/RunShip.scala @@ -1,9 +1,7 @@ package ch.epfl.bluebrain.nexus.ship import cats.effect.{Clock, IO} -import ch.epfl.bluebrain.nexus.delta.kernel.Logger import ch.epfl.bluebrain.nexus.delta.kernel.utils.UUIDF -import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.client.S3StorageClient import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.api.{JsonLdApi, JsonLdJavaApi} import ch.epfl.bluebrain.nexus.delta.rdf.shacl.ValidateShacl import ch.epfl.bluebrain.nexus.delta.sdk.organizations.FetchActiveOrganization @@ -12,46 +10,34 @@ import ch.epfl.bluebrain.nexus.delta.sdk.projects.model.ApiMappings import ch.epfl.bluebrain.nexus.delta.sdk.quotas.Quotas import ch.epfl.bluebrain.nexus.delta.sourcing.Transactors import ch.epfl.bluebrain.nexus.delta.sourcing.exporter.RowEvent -import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset -import ch.epfl.bluebrain.nexus.ship.config.ShipConfig +import ch.epfl.bluebrain.nexus.ship.config.InputConfig import ch.epfl.bluebrain.nexus.ship.organizations.OrganizationProvider import ch.epfl.bluebrain.nexus.ship.projects.ProjectProcessor import ch.epfl.bluebrain.nexus.ship.resolvers.ResolverProcessor import ch.epfl.bluebrain.nexus.ship.resources.{ResourceProcessor, ResourceWiring} import ch.epfl.bluebrain.nexus.ship.schemas.{SchemaProcessor, SchemaWiring} import ch.epfl.bluebrain.nexus.ship.views.{BlazegraphViewProcessor, CompositeViewProcessor, ElasticSearchViewProcessor} -import eu.timepit.refined.types.string.NonEmptyString import fs2.Stream -import fs2.aws.s3.models.Models.{BucketName, FileKey} -import fs2.io.file.Path -trait RunShip { - - private val logger = Logger[RunShip] - - def loadConfig(config: Option[Path]): IO[ShipConfig] - - def eventsStream(path: Path, fromOffset: Offset): Stream[IO, RowEvent] +object RunShip { - def run(path: Path, config: Option[Path], fromOffset: Offset = Offset.start): IO[ImportReport] = { + def apply(eventsStream: Stream[IO, RowEvent], config: InputConfig, xas: Transactors): IO[ImportReport] = { val clock = Clock[IO] val uuidF = UUIDF.random // Resources may have been created with different configurations so we adopt the lenient one for the import implicit val jsonLdApi: JsonLdApi = JsonLdJavaApi.lenient for { - _ <- logger.info(s"Running the import with file $path, config $config") - config <- loadConfig(config) - report <- Transactors.init(config.database).use { xas => - val orgProvider = - OrganizationProvider(config.eventLog, config.serviceAccount.value, xas, clock)(uuidF) - val fetchContext = FetchContext(ApiMappings.empty, xas, Quotas.disabled) - val eventLogConfig = config.eventLog - val baseUri = config.baseUri - val projectMapper = ProjectMapper(config.projectMapping) - for { - // Provision organizations - _ <- orgProvider.create(config.organizations.values) - fetchActiveOrg = FetchActiveOrganization(xas) + report <- { + val orgProvider = + OrganizationProvider(config.eventLog, config.serviceAccount.value, xas, clock)(uuidF) + val fetchContext = FetchContext(ApiMappings.empty, xas, Quotas.disabled) + val eventLogConfig = config.eventLog + val baseUri = config.baseUri + val projectMapper = ProjectMapper(config.projectMapping) + for { + // Provision organizations + _ <- orgProvider.create(config.organizations.values) + fetchActiveOrg = FetchActiveOrganization(xas) // format: off // Wiring eventClock <- EventClock.init() @@ -70,46 +56,20 @@ trait RunShip { bgViewsProcessor = BlazegraphViewProcessor(fetchContext, rcr, projectMapper, eventLogConfig, eventClock, xas) compositeViewsProcessor = CompositeViewProcessor(fetchContext, rcr, projectMapper, eventLogConfig, eventClock, xas) // format: on - report <- EventProcessor - .run( - eventsStream(path, fromOffset), - projectProcessor, - resolverProcessor, - schemaProcessor, - resourceProcessor, - esViewsProcessor, - bgViewsProcessor, - compositeViewsProcessor - ) - } yield report - } + report <- EventProcessor + .run( + eventsStream, + projectProcessor, + resolverProcessor, + schemaProcessor, + resourceProcessor, + esViewsProcessor, + bgViewsProcessor, + compositeViewsProcessor + ) + } yield report + } } yield report } } - -object RunShip { - - def localShip = new RunShip { - override def loadConfig(config: Option[Path]): IO[ShipConfig] = - ShipConfig.load(config) - - override def eventsStream(path: Path, fromOffset: Offset): Stream[IO, RowEvent] = - EventStreamer.localStreamer.stream(path, fromOffset) - } - - def s3Ship(client: S3StorageClient, bucket: BucketName) = new RunShip { - override def loadConfig(config: Option[Path]): IO[ShipConfig] = config match { - case Some(configPath) => - val configStream = client.readFile(bucket, FileKey(NonEmptyString.unsafeFrom(configPath.toString))) - ShipConfig.load(configStream) - case None => ShipConfig.load(None) - } - - override def eventsStream(path: Path, fromOffset: Offset): Stream[IO, RowEvent] = - EventStreamer - .s3eventStreamer(client, bucket) - .stream(path, fromOffset) - } - -} diff --git a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/ShipCommand.scala b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/ShipCommand.scala new file mode 100644 index 0000000000..03274734b5 --- /dev/null +++ b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/ShipCommand.scala @@ -0,0 +1,14 @@ +package ch.epfl.bluebrain.nexus.ship + +import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset +import fs2.io.file.Path + +sealed trait ShipCommand extends Product with Serializable + +object ShipCommand { + + final case class RunCommand(path: Path, config: Option[Path], offset: Offset, mode: RunMode) extends ShipCommand + + final case class ShowConfigCommand(config: Option[Path]) extends ShipCommand + +} diff --git a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/config/InputConfig.scala b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/config/InputConfig.scala new file mode 100644 index 0000000000..c32591a239 --- /dev/null +++ b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/config/InputConfig.scala @@ -0,0 +1,32 @@ +package ch.epfl.bluebrain.nexus.ship.config + +import cats.syntax.all._ +import ch.epfl.bluebrain.nexus.delta.sdk.model.{BaseUri, ServiceAccountConfig} +import ch.epfl.bluebrain.nexus.delta.sourcing.config.EventLogConfig +import ch.epfl.bluebrain.nexus.delta.sourcing.model.ProjectRef +import ch.epfl.bluebrain.nexus.ship.config.InputConfig.ProjectMapping +import pureconfig.ConfigReader +import pureconfig.configurable.genericMapReader +import pureconfig.error.CannotConvert +import pureconfig.generic.semiauto.deriveReader + +final case class InputConfig( + baseUri: BaseUri, + eventLog: EventLogConfig, + organizations: OrganizationCreationConfig, + projectMapping: ProjectMapping = Map.empty, + viewDefaults: ViewDefaults, + serviceAccount: ServiceAccountConfig +) + +object InputConfig { + + type ProjectMapping = Map[ProjectRef, ProjectRef] + + implicit val mapReader: ConfigReader[ProjectMapping] = + genericMapReader(str => + ProjectRef.parse(str).leftMap(e => CannotConvert(str, classOf[ProjectRef].getSimpleName, e)) + ) + + implicit final val runConfigReader: ConfigReader[InputConfig] = deriveReader[InputConfig] +} diff --git a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/config/ShipConfig.scala b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/config/ShipConfig.scala index eee6113a39..45cb4d6126 100644 --- a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/config/ShipConfig.scala +++ b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/config/ShipConfig.scala @@ -3,41 +3,24 @@ package ch.epfl.bluebrain.nexus.ship.config import cats.effect.IO import cats.implicits._ import ch.epfl.bluebrain.nexus.delta.kernel.config.Configs -import ch.epfl.bluebrain.nexus.delta.sdk.model.{BaseUri, ServiceAccountConfig} -import ch.epfl.bluebrain.nexus.delta.sourcing.config.{DatabaseConfig, EventLogConfig} -import ch.epfl.bluebrain.nexus.delta.sourcing.model.ProjectRef -import ch.epfl.bluebrain.nexus.ship.config.ShipConfig.ProjectMapping +import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.client.S3StorageClient +import ch.epfl.bluebrain.nexus.delta.sourcing.config.DatabaseConfig import com.typesafe.config.Config +import eu.timepit.refined.types.string.NonEmptyString import fs2.Stream +import fs2.aws.s3.models.Models.{BucketName, FileKey} import fs2.io.file.Path import pureconfig.ConfigReader import pureconfig.backend.ConfigFactoryWrapper -import pureconfig.configurable.genericMapReader -import pureconfig.error.{CannotConvert, ConfigReaderException} +import pureconfig.error.ConfigReaderException import pureconfig.generic.semiauto.deriveReader import java.nio.charset.StandardCharsets.UTF_8 -final case class ShipConfig( - baseUri: BaseUri, - database: DatabaseConfig, - S3: S3Config, - eventLog: EventLogConfig, - organizations: OrganizationCreationConfig, - projectMapping: ProjectMapping = Map.empty, - viewDefaults: ViewDefaults, - serviceAccount: ServiceAccountConfig -) +final case class ShipConfig(database: DatabaseConfig, s3: S3Config, input: InputConfig) object ShipConfig { - type ProjectMapping = Map[ProjectRef, ProjectRef] - - implicit val mapReader: ConfigReader[ProjectMapping] = - genericMapReader(str => - ProjectRef.parse(str).leftMap(e => CannotConvert(str, classOf[ProjectRef].getSimpleName, e)) - ) - implicit final val shipConfigReader: ConfigReader[ShipConfig] = { deriveReader[ShipConfig] } @@ -61,9 +44,10 @@ object ShipConfig { def load(externalConfigPath: Option[Path]): IO[ShipConfig] = merge(externalConfigPath).map(_._1) - def load(externalConfigStream: Stream[IO, Byte]): IO[ShipConfig] = { - merge(externalConfigStream).map(_._1) - } + def loadFromS3(client: S3StorageClient, bucket: BucketName, path: Path): IO[ShipConfig] = { + val configStream = client.readFile(bucket, FileKey(NonEmptyString.unsafeFrom(path.toString))) + configFromStream(configStream).flatMap(mergeFromConfig) + }.map(_._1) /** * Loads a config from a stream. Taken from diff --git a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/projects/ProjectProcessor.scala b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/projects/ProjectProcessor.scala index 4b5c1d896b..ca1fffee22 100644 --- a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/projects/ProjectProcessor.scala +++ b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/projects/ProjectProcessor.scala @@ -15,7 +15,7 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.Transactors import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.Subject import ch.epfl.bluebrain.nexus.delta.sourcing.model.{EntityType, ProjectRef} import ch.epfl.bluebrain.nexus.ship._ -import ch.epfl.bluebrain.nexus.ship.config.ShipConfig +import ch.epfl.bluebrain.nexus.ship.config.InputConfig import ch.epfl.bluebrain.nexus.ship.error.ShipError.ProjectDeletionIsNotAllowed import ch.epfl.bluebrain.nexus.ship.projects.ProjectProcessor.logger import ch.epfl.bluebrain.nexus.ship.views.ViewWiring @@ -78,7 +78,7 @@ object ProjectProcessor { fetchContext: FetchContext, rcr: ResolverContextResolution, projectMapper: ProjectMapper, - config: ShipConfig, + config: InputConfig, clock: EventClock, xas: Transactors )(implicit diff --git a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/views/ViewWiring.scala b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/views/ViewWiring.scala index c198b36d3d..0b624f7b99 100644 --- a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/views/ViewWiring.scala +++ b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/views/ViewWiring.scala @@ -16,7 +16,7 @@ import ch.epfl.bluebrain.nexus.delta.sdk.{ScopeInitialization, ScopeInitializer} import ch.epfl.bluebrain.nexus.delta.sourcing.Transactors import ch.epfl.bluebrain.nexus.delta.sourcing.config.EventLogConfig import ch.epfl.bluebrain.nexus.ship.EventClock -import ch.epfl.bluebrain.nexus.ship.config.ShipConfig +import ch.epfl.bluebrain.nexus.ship.config.InputConfig import java.util.UUID import scala.concurrent.duration.DurationInt @@ -102,7 +102,7 @@ object ViewWiring { def viewInitializer( fetchContext: FetchContext, rcr: ResolverContextResolution, - config: ShipConfig, + config: InputConfig, clock: EventClock, xas: Transactors )(implicit jsonLdApi: JsonLdApi): IO[ScopeInitializer] = { @@ -115,7 +115,7 @@ object ViewWiring { private def viewInitializer( esViews: ElasticSearchViews, bgViews: BlazegraphViews, - config: ShipConfig, + config: InputConfig, clock: EventClock, xas: Transactors ): ScopeInitializer = { diff --git a/ship/src/test/resources/config/external.conf b/ship/src/test/resources/config/external.conf index 4e610f82fc..6a1757372a 100644 --- a/ship/src/test/resources/config/external.conf +++ b/ship/src/test/resources/config/external.conf @@ -1,3 +1,5 @@ ship { - base-uri = "https://bbp.epfl.ch/v1" + input { + base-uri = "https://bbp.epfl.ch/v1" + } } \ No newline at end of file diff --git a/ship/src/test/resources/config/project-mapping-sscx.conf b/ship/src/test/resources/config/project-mapping-sscx.conf deleted file mode 100644 index ebab053fdc..0000000000 --- a/ship/src/test/resources/config/project-mapping-sscx.conf +++ /dev/null @@ -1,5 +0,0 @@ -ship { - project-mapping = { - "public/sscx": "obp/somato" - } -} \ No newline at end of file diff --git a/ship/src/test/resources/config/project-mapping.conf b/ship/src/test/resources/config/project-mapping.conf index bf33645629..2c4be718d6 100644 --- a/ship/src/test/resources/config/project-mapping.conf +++ b/ship/src/test/resources/config/project-mapping.conf @@ -1,5 +1,7 @@ ship { - project-mapping = { - "private/mmb": "obp/reference" + input { + project-mapping = { + "private/mmb": "obp/reference" + } } } \ No newline at end of file diff --git a/ship/src/test/scala/ch/epfl/bluebrain/nexus/ship/RunShipSuite.scala b/ship/src/test/scala/ch/epfl/bluebrain/nexus/ship/RunShipSuite.scala index ea5a2142cc..10b1bba931 100644 --- a/ship/src/test/scala/ch/epfl/bluebrain/nexus/ship/RunShipSuite.scala +++ b/ship/src/test/scala/ch/epfl/bluebrain/nexus/ship/RunShipSuite.scala @@ -1,94 +1,79 @@ package ch.epfl.bluebrain.nexus.ship -import cats.effect.{IO, Resource} -import cats.syntax.all._ -import ch.epfl.bluebrain.nexus.delta.kernel.utils.ClasspathResourceLoader +import cats.effect.IO import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri import ch.epfl.bluebrain.nexus.delta.rdf.Vocabulary.nxv import ch.epfl.bluebrain.nexus.delta.sdk.projects.Projects import ch.epfl.bluebrain.nexus.delta.sdk.resolvers.Resolvers import ch.epfl.bluebrain.nexus.delta.sdk.resources.Resources import ch.epfl.bluebrain.nexus.delta.sourcing.Transactors -import ch.epfl.bluebrain.nexus.delta.sourcing.model.EntityType +import ch.epfl.bluebrain.nexus.delta.sourcing.model.{EntityType, Label, ProjectRef} import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset -import ch.epfl.bluebrain.nexus.delta.sourcing.postgres.Doobie.{transactors, PostgresPassword, PostgresUser} +import ch.epfl.bluebrain.nexus.delta.sourcing.postgres.Doobie import ch.epfl.bluebrain.nexus.ship.ImportReport.Count -import ch.epfl.bluebrain.nexus.ship.RunShipSuite.{checkFor, clearDB, expectedImportReport, getDistinctOrgProjects} -import ch.epfl.bluebrain.nexus.testkit.config.SystemPropertyOverride +import ch.epfl.bluebrain.nexus.ship.RunShipSuite.{checkFor, expectedImportReport, getDistinctOrgProjects} +import ch.epfl.bluebrain.nexus.ship.config.ShipConfigFixtures import ch.epfl.bluebrain.nexus.testkit.mu.NexusSuite -import ch.epfl.bluebrain.nexus.testkit.postgres.PostgresContainer import doobie.implicits._ import fs2.io.file.Path -import munit.catseffect.IOFixture -import munit.{AnyFixture, CatsEffectSuite} -import org.scalatest.time.SpanSugar.convertIntToGrainOfTime +import munit.AnyFixture import java.time.Instant -import scala.concurrent.duration.Duration -class RunShipSuite extends NexusSuite with RunShipSuite.Fixture { +class RunShipSuite extends NexusSuite with Doobie.Fixture with ShipConfigFixtures { - override def munitIOTimeout: Duration = 60.seconds + override def munitFixtures: Seq[AnyFixture[_]] = List(doobieTruncateAfterTest) + private lazy val xas = doobieTruncateAfterTest() - override def munitFixtures: Seq[AnyFixture[_]] = List(mainFixture) - private lazy val xas = mainFixture() + private def asPath(path: String): IO[Path] = loader.absolutePath(path).map(Path(_)) - override def beforeEach(context: BeforeEach): Unit = { - super.beforeEach(context) - clearDB(xas).accepted - () - } + private def eventsStream(path: String, offset: Offset = Offset.start) = + asPath(path).map { path => + EventStreamer.localStreamer.stream(path, offset) + } test("Run import by providing the path to a file") { for { - importFile <- asPath("import/import.json") - _ <- RunShip.localShip.run(importFile, None).assertEquals(expectedImportReport) - } yield () - } - - test("Run import and check for views") { - for { - importFile <- asPath("import/import.json") - _ <- RunShip.localShip.run(importFile, None).assertEquals(expectedImportReport) - _ <- checkFor("elasticsearch", nxv + "defaultElasticSearchIndex", xas).assertEquals(1) - _ <- checkFor("blazegraph", nxv + "defaultSparqlIndex", xas).assertEquals(1) + events <- eventsStream("import/import.json") + _ <- RunShip(events, inputConfig, xas).assertEquals(expectedImportReport) + _ <- checkFor("elasticsearch", nxv + "defaultElasticSearchIndex", xas).assertEquals(1) + _ <- checkFor("blazegraph", nxv + "defaultSparqlIndex", xas).assertEquals(1) } yield () } test("Run import by providing the path to a directory") { for { - importDirectory <- asPath("import/multi-part-import") - _ <- RunShip.localShip.run(importDirectory, None).assertEquals(expectedImportReport) + events <- eventsStream("import/multi-part-import") + _ <- RunShip(events, inputConfig, xas).assertEquals(expectedImportReport) } yield () } test("Test the increment") { + val start = Offset.at(2) for { - importFileWithTwoProjects <- asPath("import/two-projects.json") - startFrom = Offset.at(2) - _ <- RunShip.localShip.run(importFileWithTwoProjects, None, startFrom).map { report => - assert(report.offset == Offset.at(2L)) - assert(thereIsOneProjectEventIn(report)) - } + events <- eventsStream("import/two-projects.json", offset = start) + _ <- RunShip(events, inputConfig, xas).map { report => + assert(report.offset == Offset.at(2L)) + assert(thereIsOneProjectEventIn(report)) + } } yield () } test("Import and map public/sscx to obp/somato") { + val original = ProjectRef.unsafe("public", "sscx") + val target = ProjectRef.unsafe("obp", "somato") + val configWithProjectMapping = inputConfig.copy( + projectMapping = Map(original -> target) + ) for { - externalConfigPath <- loader.absolutePath("config/project-mapping-sscx.conf").map(x => Some(Path(x))) - importFileWithTwoProjects <- asPath("import/import.json") - _ <- RunShip.localShip.run(importFileWithTwoProjects, externalConfigPath, Offset.start) - _ <- getDistinctOrgProjects(xas).map { projects => - assert(projects.size == 1) - assert(projects.contains(("obp", "somato"))) - } + events <- eventsStream("import/import.json") + _ <- RunShip(events, configWithProjectMapping, xas) + _ <- getDistinctOrgProjects(xas).map { project => + assertEquals(project, target) + } } yield () } - private def asPath(path: String): IO[Path] = { - ClasspathResourceLoader().absolutePath(path).map(Path(_)) - } - private def thereIsOneProjectEventIn(report: ImportReport) = report.progress == Map(Projects.entityType -> Count(1L, 0L)) @@ -96,15 +81,12 @@ class RunShipSuite extends NexusSuite with RunShipSuite.Fixture { object RunShipSuite { - def clearDB(xas: Transactors): IO[Unit] = - sql""" - | DELETE FROM scoped_events; DELETE FROM scoped_states; - |""".stripMargin.update.run.void.transact(xas.write) - - def getDistinctOrgProjects(xas: Transactors): IO[List[(String, String)]] = + def getDistinctOrgProjects(xas: Transactors): IO[ProjectRef] = sql""" | SELECT DISTINCT org, project FROM scoped_events; - """.stripMargin.query[(String, String)].to[List].transact(xas.read) + """.stripMargin.query[(Label, Label)].unique.transact(xas.read).map { case (org, proj) => + ProjectRef(org, proj) + } def checkFor(entityType: String, id: Iri, xas: Transactors): IO[Int] = sql""" @@ -125,26 +107,4 @@ object RunShipSuite { ) ) - trait Fixture { self: CatsEffectSuite => - - private def initConfig(postgres: PostgresContainer) = - Map( - "ship.database.access.host" -> postgres.getHost, - "ship.database.access.port" -> postgres.getMappedPort(5432).toString, - "ship.database.tables-autocreate" -> "true", - "ship.organizations.values.public" -> "The public organization", - "ship.organizations.values.obp" -> "The OBP organization" - ) - - private val resource: Resource[IO, Transactors] = transactors( - PostgresContainer.resource(PostgresUser, PostgresPassword).flatTap { pg => - SystemPropertyOverride(initConfig(pg)).void - }, - PostgresUser, - PostgresPassword - ) - - val mainFixture: IOFixture[Transactors] = ResourceSuiteLocalFixture("main", resource) - } - } diff --git a/ship/src/test/scala/ch/epfl/bluebrain/nexus/ship/S3RunShipSuite.scala b/ship/src/test/scala/ch/epfl/bluebrain/nexus/ship/S3RunShipSuite.scala index 1ddcb87dbb..3341fc5f9a 100644 --- a/ship/src/test/scala/ch/epfl/bluebrain/nexus/ship/S3RunShipSuite.scala +++ b/ship/src/test/scala/ch/epfl/bluebrain/nexus/ship/S3RunShipSuite.scala @@ -1,78 +1,51 @@ package ch.epfl.bluebrain.nexus.ship -import cats.effect.IO -import cats.syntax.all._ import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.LocalStackS3StorageClient -import ch.epfl.bluebrain.nexus.ship.RunShipSuite.{clearDB, expectedImportReport} -import ch.epfl.bluebrain.nexus.ship.S3RunShipSuite.uploadFileToS3 +import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.LocalStackS3StorageClient.uploadFileToS3 +import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset +import ch.epfl.bluebrain.nexus.delta.sourcing.postgres.Doobie +import ch.epfl.bluebrain.nexus.ship.RunShipSuite.expectedImportReport +import ch.epfl.bluebrain.nexus.ship.config.ShipConfigFixtures import ch.epfl.bluebrain.nexus.testkit.mu.NexusSuite import eu.timepit.refined.types.string.NonEmptyString import fs2.aws.s3.models.Models.BucketName import fs2.io.file.Path -import io.laserdisc.pure.s3.tagless.S3AsyncClientOp import munit.AnyFixture -import org.scalatest.time.SpanSugar.convertIntToGrainOfTime -import software.amazon.awssdk.services.s3.model.{CreateBucketRequest, PutObjectRequest, PutObjectResponse} -import java.nio.file.Paths -import scala.concurrent.duration.Duration +import scala.concurrent.duration.{Duration, DurationInt} -class S3RunShipSuite extends NexusSuite with RunShipSuite.Fixture with LocalStackS3StorageClient.Fixture { +class S3RunShipSuite + extends NexusSuite + with Doobie.Fixture + with LocalStackS3StorageClient.Fixture + with ShipConfigFixtures { override def munitIOTimeout: Duration = 60.seconds - override def munitFixtures: Seq[AnyFixture[_]] = List(mainFixture, localStackS3Client) - private lazy val xas = mainFixture() + override def munitFixtures: Seq[AnyFixture[_]] = List(doobieTruncateAfterTest, localStackS3Client) + private lazy val xas = doobieTruncateAfterTest() private lazy val (s3Client, fs2S3client, _) = localStackS3Client() private val bucket = BucketName(NonEmptyString.unsafeFrom("bucket")) - override def beforeEach(context: BeforeEach): Unit = { - super.beforeEach(context) - clearDB(xas).accepted - () - } - test("Run import from S3 providing a single file") { val importFilePath = Path("/import/import.json") for { - _ <- uploadFileToS3(fs2S3client, bucket, importFilePath) - _ <- RunShip.s3Ship(s3Client, bucket).run(importFilePath, None).assertEquals(expectedImportReport) - } yield () - } - - test("Succeed in overloading the default config with an external config in S3") { - val configPath = Path("/config/external.conf") - for { - _ <- uploadFileToS3(fs2S3client, bucket, configPath) - shipConfig <- RunShip.s3Ship(s3Client, bucket).loadConfig(configPath.some) - _ = assertEquals(shipConfig.baseUri.toString, "https://bbp.epfl.ch/v1") + _ <- uploadFileToS3(fs2S3client, bucket, importFilePath) + events = EventStreamer.s3eventStreamer(s3Client, bucket).stream(importFilePath, Offset.start) + _ <- RunShip(events, inputConfig, xas).assertEquals(expectedImportReport) } yield () } test("Run import from S3 providing a directory") { val directoryPath = Path("/import/multi-part-import") for { - _ <- uploadFileToS3(fs2S3client, bucket, Path("/import/multi-part-import/2024-04-05T14:38:31.165389Z.json")) - _ <- uploadFileToS3(fs2S3client, bucket, Path("/import/multi-part-import/2024-04-05T14:38:31.165389Z.success")) - _ <- uploadFileToS3(fs2S3client, bucket, Path("/import/multi-part-import/2024-04-06T11:34:31.165389Z.json")) - _ <- RunShip - .s3Ship(s3Client, bucket) - .run(directoryPath, None) - .assertEquals(expectedImportReport) + _ <- uploadFileToS3(fs2S3client, bucket, Path("/import/multi-part-import/2024-04-05T14:38:31.165389Z.json")) + _ <- uploadFileToS3(fs2S3client, bucket, Path("/import/multi-part-import/2024-04-05T14:38:31.165389Z.success")) + _ <- uploadFileToS3(fs2S3client, bucket, Path("/import/multi-part-import/2024-04-06T11:34:31.165389Z.json")) + events = EventStreamer.s3eventStreamer(s3Client, bucket).stream(directoryPath, Offset.start) + _ <- RunShip(events, inputConfig, xas).assertEquals(expectedImportReport) } yield () } } - -object S3RunShipSuite { - - def uploadFileToS3(s3Client: S3AsyncClientOp[IO], bucket: BucketName, path: Path): IO[PutObjectResponse] = { - s3Client.createBucket(CreateBucketRequest.builder().bucket(bucket.value.value).build) >> - s3Client.putObject( - PutObjectRequest.builder.bucket(bucket.value.value).key(path.toString).build, - Paths.get(getClass.getResource(path.toString).toURI) - ) - } - -} diff --git a/ship/src/test/scala/ch/epfl/bluebrain/nexus/ship/ShipIntegrationSpec.scala b/ship/src/test/scala/ch/epfl/bluebrain/nexus/ship/ShipIntegrationSpec.scala index 07fecd0164..4ee2a4d995 100644 --- a/ship/src/test/scala/ch/epfl/bluebrain/nexus/ship/ShipIntegrationSpec.scala +++ b/ship/src/test/scala/ch/epfl/bluebrain/nexus/ship/ShipIntegrationSpec.scala @@ -9,6 +9,7 @@ import ch.epfl.bluebrain.nexus.delta.rdf.Vocabulary.nxv import ch.epfl.bluebrain.nexus.delta.sourcing.exporter.ExportEventQuery import ch.epfl.bluebrain.nexus.delta.sourcing.model.{Label, ProjectRef} import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset +import ch.epfl.bluebrain.nexus.ship.ShipCommand.RunCommand import ch.epfl.bluebrain.nexus.tests.Identity.writer import ch.epfl.bluebrain.nexus.tests.admin.ProjectPayload import ch.epfl.bluebrain.nexus.tests.iam.types.Permission @@ -348,8 +349,8 @@ class ShipIntegrationSpec extends BaseIntegrationSpec { val folder = s"/tmp/ship/${project.project.value}/" val folderPath = Paths.get(folder) val file = Files.newDirectoryStream(folderPath, "*.json").iterator().asScala.toList.head - - RunShip.localShip.run(fs2.io.file.Path.fromNioPath(file), None).accepted + val r = RunCommand(fs2.io.file.Path.fromNioPath(file), None, Offset.start, RunMode.Local) + Main.run(r).accepted () } diff --git a/ship/src/test/scala/ch/epfl/bluebrain/nexus/ship/config/ShipConfigFixtures.scala b/ship/src/test/scala/ch/epfl/bluebrain/nexus/ship/config/ShipConfigFixtures.scala new file mode 100644 index 0000000000..81c7194661 --- /dev/null +++ b/ship/src/test/scala/ch/epfl/bluebrain/nexus/ship/config/ShipConfigFixtures.scala @@ -0,0 +1,29 @@ +package ch.epfl.bluebrain.nexus.ship.config + +import ch.epfl.bluebrain.nexus.delta.sdk.identities.model.ServiceAccount +import ch.epfl.bluebrain.nexus.delta.sdk.{ConfigFixtures, Defaults} +import ch.epfl.bluebrain.nexus.delta.sdk.model.{BaseUri, ServiceAccountConfig} +import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.User +import ch.epfl.bluebrain.nexus.delta.sourcing.model.Label + +trait ShipConfigFixtures extends ConfigFixtures { + + private val baseUri = BaseUri("http://localhost:8080", Label.unsafe("v1")) + + private val organizationsCreation = OrganizationCreationConfig( + Map(Label.unsafe("public") -> "The public organization", Label.unsafe("obp") -> "The OBP organization") + ) + + private val viewDefaults = ViewDefaults( + Defaults("Default ES View", "Description ES View"), + Defaults("Default EBG View", "Description BG View") + ) + + private val serviceAccount: ServiceAccountConfig = ServiceAccountConfig( + ServiceAccount(User("internal", Label.unsafe("sa"))) + ) + + def inputConfig: InputConfig = + InputConfig(baseUri, eventLogConfig, organizationsCreation, Map.empty, viewDefaults, serviceAccount) + +} diff --git a/ship/src/test/scala/ch/epfl/bluebrain/nexus/ship/config/ShipConfigSuite.scala b/ship/src/test/scala/ch/epfl/bluebrain/nexus/ship/config/ShipConfigSuite.scala index b89fde37ae..117fb8616d 100644 --- a/ship/src/test/scala/ch/epfl/bluebrain/nexus/ship/config/ShipConfigSuite.scala +++ b/ship/src/test/scala/ch/epfl/bluebrain/nexus/ship/config/ShipConfigSuite.scala @@ -1,6 +1,8 @@ package ch.epfl.bluebrain.nexus.ship.config import ch.epfl.bluebrain.nexus.delta.sdk.Defaults +import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.LocalStackS3StorageClient +import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.LocalStackS3StorageClient.uploadFileToS3 import ch.epfl.bluebrain.nexus.delta.sdk.model.BaseUri import ch.epfl.bluebrain.nexus.delta.sourcing.model.{Label, ProjectRef} import ch.epfl.bluebrain.nexus.ship.config.ShipConfigSuite.{defaultBgValues, defaultEsValues} @@ -8,27 +10,33 @@ import ch.epfl.bluebrain.nexus.testkit.mu.NexusSuite import eu.timepit.refined.types.string.NonEmptyString import fs2.aws.s3.models.Models.BucketName import fs2.io.file.Path +import munit.AnyFixture import java.net.URI -class ShipConfigSuite extends NexusSuite { +class ShipConfigSuite extends NexusSuite with LocalStackS3StorageClient.Fixture { + + override def munitFixtures: Seq[AnyFixture[_]] = List(localStackS3Client) + + private lazy val (s3Client, fs2S3client, _) = localStackS3Client() + private val bucket = BucketName(NonEmptyString.unsafeFrom("bucket")) test("Default configuration should be parsed and loaded") { val expectedBaseUri = BaseUri("http://localhost:8080", Label.unsafe("v1")) - ShipConfig.load(None).map(_.baseUri).assertEquals(expectedBaseUri) + ShipConfig.load(None).map(_.input.baseUri).assertEquals(expectedBaseUri) } test("The defaults (name/description) for views should be correct") { val config = ShipConfig.load(None) - config.map(_.viewDefaults.elasticsearch).assertEquals(defaultEsValues) >> - config.map(_.viewDefaults.blazegraph).assertEquals(defaultBgValues) + config.map(_.input.viewDefaults.elasticsearch).assertEquals(defaultEsValues) >> + config.map(_.input.viewDefaults.blazegraph).assertEquals(defaultBgValues) } test("Default configuration should be overloaded by the external config") { val expectedBaseUri = BaseUri("https://bbp.epfl.ch", Label.unsafe("v1")) for { externalConfigPath <- loader.absolutePath("config/external.conf") - _ <- ShipConfig.load(Some(Path(externalConfigPath))).map(_.baseUri).assertEquals(expectedBaseUri) + _ <- ShipConfig.load(Some(Path(externalConfigPath))).map(_.input.baseUri).assertEquals(expectedBaseUri) } yield () } @@ -39,7 +47,7 @@ class ShipConfigSuite extends NexusSuite { for { externalConfigPath <- loader.absolutePath("config/project-mapping.conf") - mapping = ShipConfig.load(Some(Path(externalConfigPath))).map(_.projectMapping) + mapping = ShipConfig.load(Some(Path(externalConfigPath))).map(_.input.projectMapping) _ <- mapping.assertEquals(expected) } yield () } @@ -49,11 +57,20 @@ class ShipConfigSuite extends NexusSuite { val expected = S3Config(new URI("http://my-s3-endpoint.com"), bucket) for { externalConfigPath <- loader.absolutePath("config/s3.conf") - s3Config = ShipConfig.load(Some(Path(externalConfigPath))).map(_.S3) + s3Config = ShipConfig.load(Some(Path(externalConfigPath))).map(_.s3) _ <- s3Config.assertEquals(expected) } yield () } + test("Succeed in overloading the default config with an external config in S3") { + val configPath = Path("/config/external.conf") + for { + _ <- uploadFileToS3(fs2S3client, bucket, configPath) + shipConfig <- ShipConfig.loadFromS3(s3Client, bucket, configPath) + _ = assertEquals(shipConfig.input.baseUri.toString, "https://bbp.epfl.ch/v1") + } yield () + } + } object ShipConfigSuite {