From 58e1bbf9ca7dafd2e9783a8213fb8a332d82d9a9 Mon Sep 17 00:00:00 2001 From: Simon Date: Wed, 21 Feb 2024 10:45:50 +0100 Subject: [PATCH] Init org provider + project processor (#4742) * Init org provider + project processor --------- Co-authored-by: Simon Dumas --- .github/workflows/ci-delta-plugins.yml | 2 +- .github/workflows/ci-delta-ship.yml | 2 +- build.sbt | 2 +- .../nexus/delta/config/AppConfig.scala | 35 ++------ .../nexus/delta/config/AppConfigError.scala | 24 ------ .../delta/wiring/OrganizationsModule.scala | 2 +- .../nexus/delta/wiring/ProjectsModule.scala | 2 +- .../routes/OrganizationsRoutesSpec.scala | 6 +- .../delta/routes/ProjectsRoutesSpec.scala | 2 +- .../nexus/delta/kernel/config/Configs.scala | 48 +++++++++++ .../nexus/delta/sdk/auth/Credentials.scala | 9 +-- .../sdk/organizations/OrganizationsImpl.scala | 5 +- .../delta/sdk/projects/ProjectsImpl.scala | 5 +- .../AutomaticProvisioningConfig.scala | 4 - .../OrganizationDeleterSuite.scala | 3 +- .../organizations/OrganizationsImplSpec.scala | 6 +- .../delta/sdk/projects/ProjectsFixture.scala | 2 +- .../delta/sdk/projects/ProjectsImplSpec.scala | 6 +- .../ProjectProvisioningSpec.scala | 8 +- .../nexus/delta/sourcing/model/Label.scala | 5 ++ ship/src/main/resources/default.conf | 23 ++++++ .../bluebrain/nexus/ship/EventClock.scala | 30 +++++++ .../bluebrain/nexus/ship/EventProcessor.scala | 36 +++++---- .../bluebrain/nexus/ship/EventUUIDF.scala | 20 +++++ .../ch/epfl/bluebrain/nexus/ship/Main.scala | 72 +++++++++++++---- .../config/OrganizationCreationConfig.scala | 23 ++++++ .../nexus/ship/config/ShipConfig.scala | 38 ++++----- .../nexus/ship/error/ShipError.scala | 16 ++++ .../nexus/ship/model/InputEvent.scala | 19 +++-- .../organizations/OrganizationProvider.scala | 40 +++++++++ .../ship/projects/ProjectProcessor.scala | 81 +++++++++++++++++++ ship/src/test/resources/config/external.conf | 3 + .../nexus/ship/config/ShipConfigSuite.scala | 14 +++- 33 files changed, 440 insertions(+), 153 deletions(-) delete mode 100644 delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/config/AppConfigError.scala create mode 100644 delta/kernel/src/main/scala/ch/epfl/bluebrain/nexus/delta/kernel/config/Configs.scala create mode 100644 ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/EventClock.scala create mode 100644 ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/EventUUIDF.scala create mode 100644 ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/config/OrganizationCreationConfig.scala create mode 100644 ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/error/ShipError.scala create mode 100644 ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/organizations/OrganizationProvider.scala create mode 100644 ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/projects/ProjectProcessor.scala create mode 100644 ship/src/test/resources/config/external.conf diff --git a/.github/workflows/ci-delta-plugins.yml b/.github/workflows/ci-delta-plugins.yml index 3db5b684d5..93ab0b6e88 100644 --- a/.github/workflows/ci-delta-plugins.yml +++ b/.github/workflows/ci-delta-plugins.yml @@ -2,8 +2,8 @@ name: Delta Plugins unit tests on: pull_request: paths: - - 'ship/**' - 'delta/kernel/**' + - 'delta/plugins/**' - 'delta/rdf/**' - 'delta/sdk/**' - 'delta/sourcing-psql/**' diff --git a/.github/workflows/ci-delta-ship.yml b/.github/workflows/ci-delta-ship.yml index d364d6e060..141b56e2f4 100644 --- a/.github/workflows/ci-delta-ship.yml +++ b/.github/workflows/ci-delta-ship.yml @@ -2,8 +2,8 @@ name: Delta Ship unit tests on: pull_request: paths: + - 'ship/**' - 'delta/kernel/**' - - 'delta/plugins/**' - 'delta/rdf/**' - 'delta/sdk/**' - 'delta/sourcing-psql/**' diff --git a/build.sbt b/build.sbt index e3d7cc3f44..8674a56738 100755 --- a/build.sbt +++ b/build.sbt @@ -845,7 +845,7 @@ lazy val root = project .in(file(".")) .settings(name := "nexus", moduleName := "nexus") .settings(compilation, shared, noPublish) - .aggregate(docs, delta, storage, tests) + .aggregate(docs, delta, ship, storage, tests) lazy val noPublish = Seq( publish / skip := true, diff --git a/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/config/AppConfig.scala b/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/config/AppConfig.scala index 1d4d82adab..ce0235a84e 100644 --- a/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/config/AppConfig.scala +++ b/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/config/AppConfig.scala @@ -3,6 +3,7 @@ package ch.epfl.bluebrain.nexus.delta.config import cats.effect.IO import cats.syntax.all._ import ch.epfl.bluebrain.nexus.delta.kernel.cache.CacheConfig +import ch.epfl.bluebrain.nexus.delta.kernel.config.Configs import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.api.JsonLdApiConfig import ch.epfl.bluebrain.nexus.delta.sdk.acls.AclsConfig import ch.epfl.bluebrain.nexus.delta.sdk.fusion.FusionConfig @@ -20,9 +21,9 @@ import ch.epfl.bluebrain.nexus.delta.sdk.sse.SseConfig import ch.epfl.bluebrain.nexus.delta.sdk.typehierarchy.TypeHierarchyConfig import ch.epfl.bluebrain.nexus.delta.sourcing.config.{DatabaseConfig, ProjectionConfig} import ch.epfl.bluebrain.nexus.delta.sourcing.exporter.ExportConfig -import com.typesafe.config.{Config, ConfigFactory, ConfigParseOptions, ConfigResolveOptions} +import com.typesafe.config.Config +import pureconfig.ConfigReader import pureconfig.generic.semiauto.deriveReader -import pureconfig.{ConfigReader, ConfigSource} import java.io.{File, InputStreamReader} import java.nio.charset.StandardCharsets.UTF_8 @@ -56,9 +57,6 @@ final case class AppConfig( object AppConfig { - private val parseOptions = ConfigParseOptions.defaults().setAllowMissing(false) - private val resolverOptions = ConfigResolveOptions.defaults() - /** * Loads the application in two steps, wrapping the error type: * @@ -85,30 +83,13 @@ object AppConfig { pluginsConfigPaths: List[String] = List.empty, accClassLoader: ClassLoader = getClass.getClassLoader ): IO[(AppConfig, Config)] = { - - // Merge configs according to their order - def merge(configs: Config*) = IO.fromEither { - val merged = configs - .foldLeft(ConfigFactory.defaultOverrides())(_ withFallback _) - .withFallback(ConfigFactory.load()) - .resolve(resolverOptions) - ConfigSource.fromConfig(merged).at("app").load[AppConfig].map(_ -> merged).leftMap(AppConfigError(_)) - } - for { - externalConfig <- IO.blocking(externalConfigPath.fold(ConfigFactory.empty()) { p => - ConfigFactory.parseFile(new File(p), parseOptions) - }) - defaultConfig <- IO.blocking(ConfigFactory.parseResources("default.conf", parseOptions)) - pluginConfigs <- IO.blocking { - pluginsConfigPaths.map { string => - ConfigFactory.parseReader( - new InputStreamReader(accClassLoader.getResourceAsStream(string), UTF_8), - parseOptions - ) - } + externalConfig <- Configs.parseFile(externalConfigPath.map(new File(_))) + defaultConfig <- Configs.parseResource("default.conf") + pluginConfigs <- pluginsConfigPaths.traverse { path => + Configs.parseReader(new InputStreamReader(accClassLoader.getResourceAsStream(path), UTF_8)) } - (appConfig, mergedConfig) <- merge(externalConfig :: defaultConfig :: pluginConfigs: _*) + (appConfig, mergedConfig) <- Configs.merge[AppConfig]("app", externalConfig :: defaultConfig :: pluginConfigs: _*) } yield (appConfig, mergedConfig) } diff --git a/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/config/AppConfigError.scala b/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/config/AppConfigError.scala deleted file mode 100644 index 9dbdad876d..0000000000 --- a/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/config/AppConfigError.scala +++ /dev/null @@ -1,24 +0,0 @@ -package ch.epfl.bluebrain.nexus.delta.config - -import pureconfig.error.ConfigReaderFailures - -/** - * Error representing issues loading the application configuration. - * @param reason - * the reason why the application configuration could not be loaded - */ -final case class AppConfigError(reason: String) extends Exception { - override def fillInStackTrace(): Throwable = this - override def getMessage: String = reason -} - -object AppConfigError { - - /** - * Constructs an [[AppConfigError]] from a set of [[ConfigReaderFailures]]. - * @param configReaderFailures - * the underlying config read failures - */ - def apply(configReaderFailures: ConfigReaderFailures): AppConfigError = - new AppConfigError(configReaderFailures.prettyPrint()) -} diff --git a/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/wiring/OrganizationsModule.scala b/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/wiring/OrganizationsModule.scala index 77646899ed..5416de009b 100644 --- a/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/wiring/OrganizationsModule.scala +++ b/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/wiring/OrganizationsModule.scala @@ -34,7 +34,7 @@ object OrganizationsModule extends ModuleDef { ) => OrganizationsImpl( scopeInitializer, - config.organizations, + config.organizations.eventLog, xas, clock )(uuidF) diff --git a/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/wiring/ProjectsModule.scala b/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/wiring/ProjectsModule.scala index aec3c7c283..5533fd4d2a 100644 --- a/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/wiring/ProjectsModule.scala +++ b/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/wiring/ProjectsModule.scala @@ -67,7 +67,7 @@ object ProjectsModule extends ModuleDef { ValidateProjectDeletion(xas, config.projects.deletion.enabled), scopeInitializer, mappings.merge, - config.projects, + config.projects.eventLog, xas, clock )(baseUri, uuidF) diff --git a/delta/app/src/test/scala/ch/epfl/bluebrain/nexus/delta/routes/OrganizationsRoutesSpec.scala b/delta/app/src/test/scala/ch/epfl/bluebrain/nexus/delta/routes/OrganizationsRoutesSpec.scala index 23a7d001d4..98d4db0d47 100644 --- a/delta/app/src/test/scala/ch/epfl/bluebrain/nexus/delta/routes/OrganizationsRoutesSpec.scala +++ b/delta/app/src/test/scala/ch/epfl/bluebrain/nexus/delta/routes/OrganizationsRoutesSpec.scala @@ -10,7 +10,7 @@ import ch.epfl.bluebrain.nexus.delta.sdk.acls.model.AclAddress import ch.epfl.bluebrain.nexus.delta.sdk.generators.OrganizationGen import ch.epfl.bluebrain.nexus.delta.sdk.implicits._ import ch.epfl.bluebrain.nexus.delta.sdk.organizations.model.OrganizationRejection.OrganizationNonEmpty -import ch.epfl.bluebrain.nexus.delta.sdk.organizations.{OrganizationDeleter, OrganizationsConfig, OrganizationsImpl} +import ch.epfl.bluebrain.nexus.delta.sdk.organizations.{OrganizationDeleter, OrganizationsImpl} import ch.epfl.bluebrain.nexus.delta.sdk.permissions.Permissions.{orgs => orgsPermissions} import ch.epfl.bluebrain.nexus.delta.sdk.utils.BaseRouteSpec import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.{Anonymous, Subject, User} @@ -29,9 +29,7 @@ class OrganizationsRoutesSpec extends BaseRouteSpec { private val org1 = OrganizationGen.organization("org1", fixedUuid, Some("My description")) private val org2 = OrganizationGen.organization("org2", fixedUuid) - private val config = OrganizationsConfig(eventLogConfig, pagination) - - private lazy val orgs = OrganizationsImpl(ScopeInitializer.noop, config, xas, clock) + private lazy val orgs = OrganizationsImpl(ScopeInitializer.noop, eventLogConfig, xas, clock) private lazy val orgDeleter: OrganizationDeleter = id => IO.raiseWhen(id == org1.label)(OrganizationNonEmpty(id)) private val superUser = User("superUser", Label.unsafe(genString())) diff --git a/delta/app/src/test/scala/ch/epfl/bluebrain/nexus/delta/routes/ProjectsRoutesSpec.scala b/delta/app/src/test/scala/ch/epfl/bluebrain/nexus/delta/routes/ProjectsRoutesSpec.scala index c8ecfc7614..d9589f876b 100644 --- a/delta/app/src/test/scala/ch/epfl/bluebrain/nexus/delta/routes/ProjectsRoutesSpec.scala +++ b/delta/app/src/test/scala/ch/epfl/bluebrain/nexus/delta/routes/ProjectsRoutesSpec.scala @@ -105,7 +105,7 @@ class ProjectsRoutesSpec extends BaseRouteSpec with BeforeAndAfterAll { ) private lazy val projects = - ProjectsImpl(fetchOrg, _ => IO.unit, ScopeInitializer.noop, defaultApiMappings, projectsConfig, xas, clock) + ProjectsImpl(fetchOrg, _ => IO.unit, ScopeInitializer.noop, defaultApiMappings, eventLogConfig, xas, clock) private lazy val provisioning = ProjectProvisioning(aclCheck.append, projects, provisioningConfig) private lazy val routes = Route.seal( diff --git a/delta/kernel/src/main/scala/ch/epfl/bluebrain/nexus/delta/kernel/config/Configs.scala b/delta/kernel/src/main/scala/ch/epfl/bluebrain/nexus/delta/kernel/config/Configs.scala new file mode 100644 index 0000000000..64527c6cbc --- /dev/null +++ b/delta/kernel/src/main/scala/ch/epfl/bluebrain/nexus/delta/kernel/config/Configs.scala @@ -0,0 +1,48 @@ +package ch.epfl.bluebrain.nexus.delta.kernel.config + +import cats.effect.IO +import com.typesafe.config.{Config, ConfigFactory, ConfigParseOptions, ConfigResolveOptions} +import cats.syntax.all._ +import pureconfig.error.ConfigReaderException +import pureconfig.{ConfigReader, ConfigSource} + +import java.io.{File, Reader} + +object Configs { + + private val parseOptions = ConfigParseOptions.defaults().setAllowMissing(false) + + private val resolverOptions = ConfigResolveOptions.defaults() + + /** + * Loads the config from the file or return an empty configuration + */ + def parseFile(file: Option[File]): IO[Config] = + IO.blocking(file.fold(ConfigFactory.empty()) { f => + ConfigFactory.parseFile(f, parseOptions) + }) + + /** + * Loads the config from resource + */ + def parseResource(resource: String): IO[Config] = + IO.blocking(ConfigFactory.parseResources(resource, parseOptions)) + + /** + * Loads the config from the reader + */ + def parseReader(reader: Reader): IO[Config] = + IO.blocking(ConfigFactory.parseReader(reader, parseOptions)) + + /** + * Merge the configs in order and load the namespace according to the config reader + */ + def merge[C: ConfigReader](namespace: String, configs: Config*): IO[(C, Config)] = IO.fromEither { + val merged = configs + .foldLeft(ConfigFactory.defaultOverrides())(_ withFallback _) + .withFallback(ConfigFactory.load()) + .resolve(resolverOptions) + ConfigSource.fromConfig(merged).at(namespace).load[C].map(_ -> merged).leftMap(ConfigReaderException(_)) + } + +} diff --git a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/auth/Credentials.scala b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/auth/Credentials.scala index bf4a486b39..990c7d3158 100644 --- a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/auth/Credentials.scala +++ b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/auth/Credentials.scala @@ -3,11 +3,8 @@ package ch.epfl.bluebrain.nexus.delta.sdk.auth import ch.epfl.bluebrain.nexus.delta.kernel.Secret import ch.epfl.bluebrain.nexus.delta.sourcing.model.Label import pureconfig.ConfigReader -import pureconfig.error.CannotConvert import pureconfig.generic.semiauto.deriveReader -import scala.annotation.nowarn - /** * Enumerates the different ways to obtain an auth toke for making requests to a remote service */ @@ -37,11 +34,7 @@ object Credentials { */ case class ClientCredentials(user: String, password: Secret[String], realm: Label) extends Credentials object ClientCredentials { - @nowarn("cat=unused") - implicit private val labelConfigReader: ConfigReader[Label] = ConfigReader.fromString(str => - Label(str).left.map(e => CannotConvert(str, classOf[Label].getSimpleName, e.getMessage)) - ) - implicit val configReader: ConfigReader[ClientCredentials] = deriveReader[ClientCredentials] + implicit val configReader: ConfigReader[ClientCredentials] = deriveReader[ClientCredentials] } implicit val configReader: ConfigReader[Credentials] = deriveReader[Credentials] diff --git a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/organizations/OrganizationsImpl.scala b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/organizations/OrganizationsImpl.scala index 95ffe3a67e..2737ac727c 100644 --- a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/organizations/OrganizationsImpl.scala +++ b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/organizations/OrganizationsImpl.scala @@ -13,6 +13,7 @@ import ch.epfl.bluebrain.nexus.delta.sdk.organizations.model.{OrganizationComman import ch.epfl.bluebrain.nexus.delta.sdk.syntax._ import ch.epfl.bluebrain.nexus.delta.sdk.{OrganizationResource, ScopeInitializer} import ch.epfl.bluebrain.nexus.delta.sourcing._ +import ch.epfl.bluebrain.nexus.delta.sourcing.config.EventLogConfig import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.Subject import ch.epfl.bluebrain.nexus.delta.sourcing.model.Label @@ -85,14 +86,14 @@ object OrganizationsImpl { def apply( scopeInitializer: ScopeInitializer, - config: OrganizationsConfig, + config: EventLogConfig, xas: Transactors, clock: Clock[IO] )(implicit uuidf: UUIDF ): Organizations = new OrganizationsImpl( - GlobalEventLog(Organizations.definition(clock), config.eventLog, xas), + GlobalEventLog(Organizations.definition(clock), config, xas), scopeInitializer ) diff --git a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/projects/ProjectsImpl.scala b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/projects/ProjectsImpl.scala index 8902468487..877fbc0d25 100644 --- a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/projects/ProjectsImpl.scala +++ b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/projects/ProjectsImpl.scala @@ -15,6 +15,7 @@ import ch.epfl.bluebrain.nexus.delta.sdk.projects.model.ProjectRejection._ import ch.epfl.bluebrain.nexus.delta.sdk.projects.model._ import ch.epfl.bluebrain.nexus.delta.sdk.syntax._ import ch.epfl.bluebrain.nexus.delta.sourcing._ +import ch.epfl.bluebrain.nexus.delta.sourcing.config.EventLogConfig import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.Subject import ch.epfl.bluebrain.nexus.delta.sourcing.model.{ElemStream, ProjectRef} import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset @@ -112,7 +113,7 @@ object ProjectsImpl { validateDeletion: ValidateProjectDeletion, scopeInitializer: ScopeInitializer, defaultApiMappings: ApiMappings, - config: ProjectsConfig, + config: EventLogConfig, xas: Transactors, clock: Clock[IO] )(implicit @@ -120,7 +121,7 @@ object ProjectsImpl { uuidF: UUIDF ): Projects = new ProjectsImpl( - ScopedEventLog(Projects.definition(fetchAndValidateOrg, validateDeletion, clock), config.eventLog, xas), + ScopedEventLog(Projects.definition(fetchAndValidateOrg, validateDeletion, clock), config, xas), scopeInitializer, defaultApiMappings ) diff --git a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/provisioning/AutomaticProvisioningConfig.scala b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/provisioning/AutomaticProvisioningConfig.scala index 3abd44dd18..13940c554d 100644 --- a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/provisioning/AutomaticProvisioningConfig.scala +++ b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/provisioning/AutomaticProvisioningConfig.scala @@ -46,10 +46,6 @@ object AutomaticProvisioningConfig { implicit private val iriConfigReader: ConfigReader[Iri] = ConfigReader.fromString(str => Iri(str).leftMap(err => CannotConvert(str, classOf[Iri].getSimpleName, err))) - implicit private val labelConfigReader: ConfigReader[Label] = ConfigReader.fromString(str => - Label(str).leftMap(e => CannotConvert(str, classOf[Label].getSimpleName, e.getMessage)) - ) - implicit private val mapReader: ConfigReader[Map[Label, Label]] = genericMapReader(str => Label(str).leftMap(e => CannotConvert(str, classOf[Label].getSimpleName, e.getMessage))) diff --git a/delta/sdk/src/test/scala/ch/epfl/bluebrain/nexus/delta/sdk/organizations/OrganizationDeleterSuite.scala b/delta/sdk/src/test/scala/ch/epfl/bluebrain/nexus/delta/sdk/organizations/OrganizationDeleterSuite.scala index e0ca775692..77f00b4dc8 100644 --- a/delta/sdk/src/test/scala/ch/epfl/bluebrain/nexus/delta/sdk/organizations/OrganizationDeleterSuite.scala +++ b/delta/sdk/src/test/scala/ch/epfl/bluebrain/nexus/delta/sdk/organizations/OrganizationDeleterSuite.scala @@ -35,7 +35,6 @@ class OrganizationDeleterSuite extends NexusSuite with ConfigFixtures with Proje } private val config = ProjectsConfig(eventLogConfig, pagination, deletionConfig) - private val orgConfig = OrganizationsConfig(eventLogConfig, pagination) private lazy val projectFixture = createProjectsFixture(fetchOrg, defaultApiMappings, config, clock) override def munitFixtures: Seq[AnyFixture[_]] = List(projectFixture) @@ -44,7 +43,7 @@ class OrganizationDeleterSuite extends NexusSuite with ConfigFixtures with Proje private lazy val orgDeleter = OrganizationDeleter(xas) private val projRef = ProjectRef.unsafe(org1.value, "myproj") private val fields = ProjectFields(None, ApiMappings.empty, None, None) - private lazy val orgs = OrganizationsImpl(ScopeInitializer.noop, orgConfig, xas, clock) + private lazy val orgs = OrganizationsImpl(ScopeInitializer.noop, eventLogConfig, xas, clock) private val permission = Permissions.resources.read private lazy val acls = AclsImpl(IO.pure(Set(permission)), _ => IO.unit, Set(), aclsConfig, xas, clock) diff --git a/delta/sdk/src/test/scala/ch/epfl/bluebrain/nexus/delta/sdk/organizations/OrganizationsImplSpec.scala b/delta/sdk/src/test/scala/ch/epfl/bluebrain/nexus/delta/sdk/organizations/OrganizationsImplSpec.scala index 4329517afb..5687e562cb 100644 --- a/delta/sdk/src/test/scala/ch/epfl/bluebrain/nexus/delta/sdk/organizations/OrganizationsImplSpec.scala +++ b/delta/sdk/src/test/scala/ch/epfl/bluebrain/nexus/delta/sdk/organizations/OrganizationsImplSpec.scala @@ -26,8 +26,6 @@ class OrganizationsImplSpec with CancelAfterFailure with ConfigFixtures { - private val config = OrganizationsConfig(eventLogConfig, pagination) - val uuid = UUID.randomUUID() implicit val uuidF: UUIDF = UUIDF.fixed(uuid) @@ -48,7 +46,7 @@ class OrganizationsImplSpec IO.unit } - private lazy val orgs = OrganizationsImpl(ScopeInitializer.noop, config, xas, clock) + private lazy val orgs = OrganizationsImpl(ScopeInitializer.noop, eventLogConfig, xas, clock) "Organizations implementation" should { @@ -109,7 +107,7 @@ class OrganizationsImplSpec "run the initializer upon organization creation" in { val initializerWasExecuted = Ref.unsafe[IO, Boolean](false) - val orgs = OrganizationsImpl(orgInitializer(initializerWasExecuted), config, xas, clock) + val orgs = OrganizationsImpl(orgInitializer(initializerWasExecuted), eventLogConfig, xas, clock) orgs.create(Label.unsafe(genString()), description).accepted initializerWasExecuted.get.accepted shouldEqual true diff --git a/delta/sdk/src/test/scala/ch/epfl/bluebrain/nexus/delta/sdk/projects/ProjectsFixture.scala b/delta/sdk/src/test/scala/ch/epfl/bluebrain/nexus/delta/sdk/projects/ProjectsFixture.scala index b0828f3a34..4c045a88f3 100644 --- a/delta/sdk/src/test/scala/ch/epfl/bluebrain/nexus/delta/sdk/projects/ProjectsFixture.scala +++ b/delta/sdk/src/test/scala/ch/epfl/bluebrain/nexus/delta/sdk/projects/ProjectsFixture.scala @@ -41,7 +41,7 @@ trait ProjectsFixture { self: CatsEffectSuite => ResourceSuiteLocalFixture( "projects", Doobie.resource().map { xas => - (xas, ProjectsImpl(fetchOrgs, _ => IO.unit, inits, apiMappings, config, xas, clock)) + (xas, ProjectsImpl(fetchOrgs, _ => IO.unit, inits, apiMappings, config.eventLog, xas, clock)) } ) } diff --git a/delta/sdk/src/test/scala/ch/epfl/bluebrain/nexus/delta/sdk/projects/ProjectsImplSpec.scala b/delta/sdk/src/test/scala/ch/epfl/bluebrain/nexus/delta/sdk/projects/ProjectsImplSpec.scala index 564f0ab959..9bd82f86ce 100644 --- a/delta/sdk/src/test/scala/ch/epfl/bluebrain/nexus/delta/sdk/projects/ProjectsImplSpec.scala +++ b/delta/sdk/src/test/scala/ch/epfl/bluebrain/nexus/delta/sdk/projects/ProjectsImplSpec.scala @@ -53,8 +53,6 @@ class ProjectsImplSpec extends CatsEffectSpec with DoobieScalaTestFixture with C private val order = ResourceF.sortBy[Project]("_label").value - private val config = ProjectsConfig(eventLogConfig, pagination, deletionConfig) - private def fetchOrg: FetchOrganization = { case `org1` => IO.pure(Organization(org1, orgUuid, None)) case `org2` => IO.pure(Organization(org2, orgUuid, None)) @@ -82,7 +80,7 @@ class ProjectsImplSpec extends CatsEffectSpec with DoobieScalaTestFixture with C } private lazy val projects = - ProjectsImpl(fetchOrg, validateDeletion, ScopeInitializer.noop, defaultApiMappings, config, xas, clock) + ProjectsImpl(fetchOrg, validateDeletion, ScopeInitializer.noop, defaultApiMappings, eventLogConfig, xas, clock) "The Projects operations bundle" should { "create a project" in { @@ -274,7 +272,7 @@ class ProjectsImplSpec extends CatsEffectSpec with DoobieScalaTestFixture with C val projectRef = ProjectRef.unsafe("org", genString()) val initializerWasExecuted = Ref.unsafe[IO, Boolean](false) // format: off - val projects = ProjectsImpl(fetchOrg, validateDeletion, projectInitializer(initializerWasExecuted), defaultApiMappings, config, xas, clock) + val projects = ProjectsImpl(fetchOrg, validateDeletion, projectInitializer(initializerWasExecuted), defaultApiMappings, eventLogConfig, xas, clock) // format: on projects.create(projectRef, payload)(Identity.Anonymous).accepted diff --git a/delta/sdk/src/test/scala/ch/epfl/bluebrain/nexus/delta/sdk/provisioning/ProjectProvisioningSpec.scala b/delta/sdk/src/test/scala/ch/epfl/bluebrain/nexus/delta/sdk/provisioning/ProjectProvisioningSpec.scala index 3ee35d9362..fc44d270ec 100644 --- a/delta/sdk/src/test/scala/ch/epfl/bluebrain/nexus/delta/sdk/provisioning/ProjectProvisioningSpec.scala +++ b/delta/sdk/src/test/scala/ch/epfl/bluebrain/nexus/delta/sdk/provisioning/ProjectProvisioningSpec.scala @@ -3,7 +3,6 @@ package ch.epfl.bluebrain.nexus.delta.sdk.provisioning import cats.effect.IO import ch.epfl.bluebrain.nexus.delta.kernel.utils.UUIDF import ch.epfl.bluebrain.nexus.delta.rdf.syntax.iriStringContextSyntax -import ch.epfl.bluebrain.nexus.delta.sdk.{ConfigFixtures, ScopeInitializer} import ch.epfl.bluebrain.nexus.delta.sdk.acls.AclSimpleCheck import ch.epfl.bluebrain.nexus.delta.sdk.acls.model.{Acl, AclAddress} import ch.epfl.bluebrain.nexus.delta.sdk.model.BaseUri @@ -11,10 +10,11 @@ import ch.epfl.bluebrain.nexus.delta.sdk.organizations.model.Organization import ch.epfl.bluebrain.nexus.delta.sdk.organizations.model.OrganizationRejection.OrganizationNotFound import ch.epfl.bluebrain.nexus.delta.sdk.permissions.Permissions.resources import ch.epfl.bluebrain.nexus.delta.sdk.projects.Projects.FetchOrganization +import ch.epfl.bluebrain.nexus.delta.sdk.projects.ProjectsImpl import ch.epfl.bluebrain.nexus.delta.sdk.projects.model.ProjectRejection.WrappedOrganizationRejection import ch.epfl.bluebrain.nexus.delta.sdk.projects.model._ -import ch.epfl.bluebrain.nexus.delta.sdk.projects.{ProjectsConfig, ProjectsImpl} import ch.epfl.bluebrain.nexus.delta.sdk.provisioning.ProjectProvisioning.InvalidProjectLabel +import ch.epfl.bluebrain.nexus.delta.sdk.{ConfigFixtures, ScopeInitializer} import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.Subject import ch.epfl.bluebrain.nexus.delta.sourcing.model.{Identity, Label, ProjectRef} import ch.epfl.bluebrain.nexus.delta.sourcing.postgres.DoobieScalaTestFixture @@ -53,14 +53,12 @@ class ProjectProvisioningSpec extends CatsEffectSpec with DoobieScalaTestFixture ) ) - private val config = ProjectsConfig(eventLogConfig, pagination, deletionConfig) - private lazy val projects = ProjectsImpl( fetchOrg, _ => IO.unit, ScopeInitializer.noop, ApiMappings.empty, - config, + eventLogConfig, xas, clock ) diff --git a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/model/Label.scala b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/model/Label.scala index 0f80e0113e..3be4806408 100644 --- a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/model/Label.scala +++ b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/model/Label.scala @@ -7,6 +7,8 @@ import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.decoder.JsonLdDecoder import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.decoder.JsonLdDecoderError.ParsingFailure import doobie.{Get, Put} import io.circe.{Decoder, Encoder, KeyDecoder, KeyEncoder} +import pureconfig.ConfigReader +import pureconfig.error.CannotConvert import scala.util.matching.Regex @@ -78,4 +80,7 @@ object Label { (cursor: ExpandedJsonLdCursor) => cursor.get[String].flatMap { Label(_).leftMap { e => ParsingFailure(e.getMessage) } } + implicit val labelConfigReader: ConfigReader[Label] = ConfigReader.fromString(str => + Label(str).leftMap(e => CannotConvert(str, classOf[Label].getSimpleName, e.getMessage)) + ) } diff --git a/ship/src/main/resources/default.conf b/ship/src/main/resources/default.conf index 28e2cfcdbf..fba48d6ebe 100644 --- a/ship/src/main/resources/default.conf +++ b/ship/src/main/resources/default.conf @@ -1,4 +1,6 @@ ship { + base-uri = "http://localhost:8080/v1" + database { read = ${ship.database.access} # Access to database for write access @@ -29,4 +31,25 @@ ship { username = "postgres" password = "postgres" } + + event-log { + query-config = { + batch-size = 30 + refresh-strategy = 3s + } + max-duration = 14 seconds + } + + organizations { + values { + # organization example + #obp = "The Open Brain Platform Organization" + } + } + + # Service account configuration for internal operations + service-account { + subject: "delta" + realm: "internal" + } } \ No newline at end of file diff --git a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/EventClock.scala b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/EventClock.scala new file mode 100644 index 0000000000..27d74b5326 --- /dev/null +++ b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/EventClock.scala @@ -0,0 +1,30 @@ +package ch.epfl.bluebrain.nexus.ship + +import cats.Applicative +import cats.effect.{Clock, IO, Ref} + +import java.time.Instant +import java.util.concurrent.TimeUnit +import scala.concurrent.duration.FiniteDuration + +/** + * Mutable clock used for the migration + */ +class EventClock(instant: Ref[IO, Instant]) extends Clock[IO] { + + def setInstant(value: Instant): IO[Unit] = instant.set(value) + + override def applicative: Applicative[IO] = Applicative[IO] + + override def monotonic: IO[FiniteDuration] = toDuration + + override def realTime: IO[FiniteDuration] = toDuration + + private def toDuration: IO[FiniteDuration] = instant.get.map { i => + FiniteDuration(i.toEpochMilli, TimeUnit.MILLISECONDS) + } +} + +object EventClock { + def init(): IO[EventClock] = Ref.of[IO, Instant](Instant.EPOCH).map { ref => new EventClock(ref) } +} diff --git a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/EventProcessor.scala b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/EventProcessor.scala index fe62471221..22bd91c5c4 100644 --- a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/EventProcessor.scala +++ b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/EventProcessor.scala @@ -3,18 +3,18 @@ package ch.epfl.bluebrain.nexus.ship import cats.effect.IO import ch.epfl.bluebrain.nexus.delta.kernel.Logger import ch.epfl.bluebrain.nexus.delta.sourcing.event.Event.ScopedEvent -import ch.epfl.bluebrain.nexus.delta.sourcing.model.Label +import ch.epfl.bluebrain.nexus.delta.sourcing.model.EntityType import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset import ch.epfl.bluebrain.nexus.ship.model.InputEvent import fs2.Stream import io.circe.Decoder /** - * Process events for the defined resource type - */ + * Process events for the defined resource type + */ trait EventProcessor[Event <: ScopedEvent] { - def resourceType: Label + def resourceType: EntityType def decoder: Decoder[Event] @@ -26,21 +26,25 @@ trait EventProcessor[Event <: ScopedEvent] { object EventProcessor { - private val logger = Logger[EventProcessor.type ] + private val logger = Logger[EventProcessor.type] - def run(processors: List[EventProcessor[_]], eventStream: Stream[IO, InputEvent]): IO[Offset] = { - val processorsMap = processors.foldLeft(Map.empty[Label, EventProcessor[_]]) { - (acc, processor) => acc + (processor.resourceType -> processor) + def run(eventStream: Stream[IO, InputEvent], processors: EventProcessor[_]*): IO[Offset] = { + val processorsMap = processors.foldLeft(Map.empty[EntityType, EventProcessor[_]]) { (acc, processor) => + acc + (processor.resourceType -> processor) } - eventStream.evalTap { event => - processorsMap.get(event.`type`) match { - case Some(processor) => processor.evaluate(event) - case None => logger.warn(s"No processor is provided for '${event.`type`}', skipping...") + eventStream + .evalTap { event => + processorsMap.get(event.`type`) match { + case Some(processor) => processor.evaluate(event) + case None => logger.warn(s"No processor is provided for '${event.`type`}', skipping...") + } + } + .scan((0, Offset.start)) { case ((count, _), event) => (count + 1, event.ordering) } + .compile + .lastOrError + .flatMap { case (count, offset) => + logger.info(s"$count events were imported up to offset $offset").as(offset) } - }.scan((0, Offset.start)){ case ((count, _), event) => (count + 1, event.ordering) - }.compile.lastOrError.flatMap { case (count, offset) => - logger.info(s"$count events were imported up to offset $offset").as(offset) - } } } diff --git a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/EventUUIDF.scala b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/EventUUIDF.scala new file mode 100644 index 0000000000..1a7ecef5f4 --- /dev/null +++ b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/EventUUIDF.scala @@ -0,0 +1,20 @@ +package ch.epfl.bluebrain.nexus.ship + +import cats.effect.{IO, Ref} +import ch.epfl.bluebrain.nexus.delta.kernel.utils.UUIDF + +import java.util.UUID + +/** + * Mutable uuid generator used for the migration + */ +class EventUUIDF(uuid: Ref[IO, UUID]) extends UUIDF { + + def setUUID(value: UUID): IO[Unit] = uuid.set(value) + + override def apply(): IO[UUID] = uuid.get +} + +object EventUUIDF { + def init(): IO[EventUUIDF] = Ref.of[IO, UUID](UUID.randomUUID()).map { ref => new EventUUIDF(ref) } +} diff --git a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/Main.scala b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/Main.scala index 5bdef5af0f..393107794c 100644 --- a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/Main.scala +++ b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/Main.scala @@ -1,13 +1,16 @@ package ch.epfl.bluebrain.nexus.ship -import cats.effect.{ExitCode, IO} +import cats.effect.{Clock, ExitCode, IO} import cats.syntax.all._ import ch.epfl.bluebrain.nexus.delta.kernel.Logger +import ch.epfl.bluebrain.nexus.delta.kernel.utils.UUIDF import ch.epfl.bluebrain.nexus.delta.ship.BuildInfo import ch.epfl.bluebrain.nexus.delta.sourcing.Transactors import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset import ch.epfl.bluebrain.nexus.ship.config.ShipConfig import ch.epfl.bluebrain.nexus.ship.model.InputEvent +import ch.epfl.bluebrain.nexus.ship.organizations.OrganizationProvider +import ch.epfl.bluebrain.nexus.ship.projects.ProjectProcessor import com.monovore.decline.Opts import com.monovore.decline.effect.CommandIOApp import fs2.Stream @@ -23,32 +26,67 @@ object Main private val logger = Logger[Main.type] - private val inputFile: Opts[Path] = Opts.option[String]("file", help = "The data file containing the imports.").map(Path(_)) + private val inputFile: Opts[Path] = + Opts.option[String]("file", help = "The data file containing the imports.").map(Path(_)) - private val configFile: Opts[Option[Path]] = Opts.option[String]("config", help = "The configuration file.").map(Path(_)).orNone + private val configFile: Opts[Option[Path]] = + Opts.option[String]("config", help = "The configuration file.").map(Path(_)).orNone private val offset: Opts[Offset] = Opts .option[Long]("offset", help = "To perform an incremental import from the provided offset.") - .map(Offset.at).withDefault(Offset.start) + .map(Offset.at) + .withDefault(Offset.start) - private val runProcess = Opts.subcommand("run", "Run an import") { + private val run = Opts.subcommand("run", "Run an import") { (inputFile, configFile, offset).mapN(Run) } - override def main: Opts[IO[ExitCode]] = runProcess.map { - case Run(file, config, offset) => - for { - _ <- logger.info(s"Running the import with file $file, config $config and from offset $offset") - config <- ShipConfig.load(config) - events = eventStream(file) - _ <- Transactors.init(config.database).use { - _ => EventProcessor.run(List.empty, events) - } - } yield ExitCode.Success + private val showConfig = Opts.subcommand("config", "Show reconciled config") { + configFile.map(ShowConfig) } - private def eventStream(file: Path): Stream[IO, InputEvent] = Files[IO].readUtf8Lines(file).map(decode[InputEvent]).rethrow + override def main: Opts[IO[ExitCode]] = + (run orElse showConfig) + .map { + case Run(file, config, _) => run(file, config) + case ShowConfig(config) => showConfig(config) + } + .map(_.as(ExitCode.Success)) - private final case class Run(file: Path, config: Option[Path], offset: Offset) + private def run(file: Path, config: Option[Path]): IO[Unit] = { + val clock = Clock[IO] + val uuidF = UUIDF.random + for { + _ <- logger.info(s"Running the import with file $file, config $config and from offset $offset") + config <- ShipConfig.load(config) + _ <- Transactors.init(config.database).use { xas => + val orgProvider = OrganizationProvider(config.eventLog, config.serviceAccount.value, xas, clock)(uuidF) + for { + // Provision organizations + _ <- orgProvider.create(config.organizations.values) + events = eventStream(file) + projectProcessor <- + ProjectProcessor(orgProvider.fetchActiveOrganization, config.eventLog, xas)(config.baseUri) + _ <- EventProcessor.run(events, projectProcessor) + } yield () + } + } yield () + } + + private def showConfig(config: Option[Path]) = + for { + _ <- logger.info(s"Showing reconciled config") + config <- ShipConfig.merge(config).map(_._2) + _ <- logger.info(config.root().render()) + } yield () + + private def eventStream(file: Path): Stream[IO, InputEvent] = + Files[IO].readUtf8Lines(file).map(decode[InputEvent]).rethrow + + sealed private trait Command + + final private case class Run(file: Path, config: Option[Path], offset: Offset) extends Command + + final private case class ShowConfig(config: Option[Path]) extends Command } diff --git a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/config/OrganizationCreationConfig.scala b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/config/OrganizationCreationConfig.scala new file mode 100644 index 0000000000..d8bc3dab19 --- /dev/null +++ b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/config/OrganizationCreationConfig.scala @@ -0,0 +1,23 @@ +package ch.epfl.bluebrain.nexus.ship.config + +import cats.syntax.all._ +import ch.epfl.bluebrain.nexus.delta.sourcing.model.Label +import pureconfig.ConfigReader +import pureconfig.configurable._ +import pureconfig.error.CannotConvert +import pureconfig.generic.semiauto.deriveReader + +import scala.annotation.nowarn + +final case class OrganizationCreationConfig(values: Map[Label, String]) + +object OrganizationCreationConfig { + + @nowarn("cat=unused") + implicit final val quotasConfigReader: ConfigReader[OrganizationCreationConfig] = { + implicit val mapReader: ConfigReader[Map[Label, String]] = + genericMapReader(str => Label(str).leftMap(e => CannotConvert(str, classOf[Label].getSimpleName, e.getMessage))) + deriveReader[OrganizationCreationConfig] + } + +} diff --git a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/config/ShipConfig.scala b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/config/ShipConfig.scala index 03dd3a12bc..5f5aec7502 100644 --- a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/config/ShipConfig.scala +++ b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/config/ShipConfig.scala @@ -1,32 +1,34 @@ package ch.epfl.bluebrain.nexus.ship.config import cats.effect.IO -import cats.syntax.all._ -import ch.epfl.bluebrain.nexus.delta.sourcing.config.DatabaseConfig -import com.typesafe.config.{ConfigFactory, ConfigParseOptions, ConfigResolveOptions} +import ch.epfl.bluebrain.nexus.delta.kernel.config.Configs +import ch.epfl.bluebrain.nexus.delta.sdk.model.{BaseUri, ServiceAccountConfig} +import ch.epfl.bluebrain.nexus.delta.sourcing.config.{DatabaseConfig, EventLogConfig} +import com.typesafe.config.Config import fs2.io.file.Path -import pureconfig.error.ConfigReaderException +import pureconfig.ConfigReader import pureconfig.generic.semiauto.deriveReader -import pureconfig.{ConfigReader, ConfigSource} -final case class ShipConfig(database: DatabaseConfig) +final case class ShipConfig( + baseUri: BaseUri, + database: DatabaseConfig, + eventLog: EventLogConfig, + organizations: OrganizationCreationConfig, + serviceAccount: ServiceAccountConfig +) object ShipConfig { - private val parseOptions = ConfigParseOptions.defaults().setAllowMissing(false) - private val resolverOptions = ConfigResolveOptions.defaults() - implicit final val shipConfigReader: ConfigReader[ShipConfig] = deriveReader[ShipConfig] - def load(externalConfigPath: Option[Path]) = + def merge(externalConfigPath: Option[Path]): IO[(ShipConfig, Config)] = for { - externalConfig <- IO.blocking(externalConfigPath.fold(ConfigFactory.empty()) { path => - ConfigFactory.parseFile(path.toNioPath.toFile, parseOptions) - }) - defaultConfig <- IO.blocking(ConfigFactory.parseResources("default.conf", parseOptions)) - merged = (externalConfig, defaultConfig).foldLeft(ConfigFactory.defaultOverrides())(_ withFallback _).withFallback(ConfigFactory.load()) - .resolve(resolverOptions) - config <- IO.fromEither(ConfigSource.fromConfig(merged).at("ship").load[ShipConfig].leftMap(ConfigReaderException(_))) - } yield config + externalConfig <- Configs.parseFile(externalConfigPath.map(_.toNioPath.toFile)) + defaultConfig <- Configs.parseResource("default.conf") + result <- Configs.merge[ShipConfig]("ship", externalConfig, defaultConfig) + } yield result + + def load(externalConfigPath: Option[Path]): IO[ShipConfig] = + merge(externalConfigPath).map(_._1) } diff --git a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/error/ShipError.scala b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/error/ShipError.scala new file mode 100644 index 0000000000..325f574d5b --- /dev/null +++ b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/error/ShipError.scala @@ -0,0 +1,16 @@ +package ch.epfl.bluebrain.nexus.ship.error + +import ch.epfl.bluebrain.nexus.delta.sourcing.model.ProjectRef + +sealed abstract class ShipError(reason: String) extends Exception { self => + override def fillInStackTrace(): Throwable = self + + override def getMessage: String = reason +} + +object ShipError { + + final case class ProjectDeletionIsNotAllowed(project: ProjectRef) + extends ShipError(s"'$project' is not allowed during import.") + +} diff --git a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/model/InputEvent.scala b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/model/InputEvent.scala index 70728b3227..3487cce007 100644 --- a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/model/InputEvent.scala +++ b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/model/InputEvent.scala @@ -1,20 +1,23 @@ package ch.epfl.bluebrain.nexus.ship.model import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri -import ch.epfl.bluebrain.nexus.delta.sourcing.model.Label +import ch.epfl.bluebrain.nexus.delta.sourcing.model.{EntityType, Label} import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset import io.circe.{Decoder, Json} import java.time.Instant import scala.annotation.nowarn -final case class InputEvent(ordering: Offset, - `type`: Label, - org: Label, - project: Label, - id: Iri, rev: Int, - value: Json, - instant: Instant) +final case class InputEvent( + ordering: Offset, + `type`: EntityType, + org: Label, + project: Label, + id: Iri, + rev: Int, + value: Json, + instant: Instant +) object InputEvent { diff --git a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/organizations/OrganizationProvider.scala b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/organizations/OrganizationProvider.scala new file mode 100644 index 0000000000..7c8d83c0d7 --- /dev/null +++ b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/organizations/OrganizationProvider.scala @@ -0,0 +1,40 @@ +package ch.epfl.bluebrain.nexus.ship.organizations + +import cats.effect.{Clock, IO} +import cats.syntax.all._ +import ch.epfl.bluebrain.nexus.delta.kernel.Logger +import ch.epfl.bluebrain.nexus.delta.kernel.utils.UUIDF +import ch.epfl.bluebrain.nexus.delta.sdk.ScopeInitializer +import ch.epfl.bluebrain.nexus.delta.sdk.identities.model.ServiceAccount +import ch.epfl.bluebrain.nexus.delta.sdk.organizations.model.Organization +import ch.epfl.bluebrain.nexus.delta.sdk.organizations.{Organizations, OrganizationsImpl} +import ch.epfl.bluebrain.nexus.delta.sdk.organizations.model.OrganizationRejection.OrganizationAlreadyExists +import ch.epfl.bluebrain.nexus.delta.sourcing.Transactors +import ch.epfl.bluebrain.nexus.delta.sourcing.config.EventLogConfig +import ch.epfl.bluebrain.nexus.delta.sourcing.model.{Identity, Label} +import ch.epfl.bluebrain.nexus.ship.organizations.OrganizationProvider.logger + +final class OrganizationProvider(orgs: Organizations, serviceAccount: ServiceAccount) { + + implicit val subject: Identity.Subject = serviceAccount.subject + + def create(values: Map[Label, String]): IO[Unit] = + values.toList.traverse { case (label, description) => + orgs.create(label, Option.when(description.nonEmpty)(description))(subject).recoverWith { + case _: OrganizationAlreadyExists => logger.info(s"Organization '$label' already exists.") + } + }.void + + def fetchActiveOrganization(label: Label): IO[Organization] = orgs.fetchActiveOrganization(label) +} + +object OrganizationProvider { + private val logger = Logger[OrganizationProvider] + + def apply(config: EventLogConfig, serviceAccount: ServiceAccount, xas: Transactors, clock: Clock[IO])(implicit + uuidf: UUIDF + ): OrganizationProvider = { + val orgs = OrganizationsImpl(ScopeInitializer.noop, config, xas, clock) + new OrganizationProvider(orgs, serviceAccount) + } +} diff --git a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/projects/ProjectProcessor.scala b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/projects/ProjectProcessor.scala new file mode 100644 index 0000000000..14b91ee1ec --- /dev/null +++ b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/projects/ProjectProcessor.scala @@ -0,0 +1,81 @@ +package ch.epfl.bluebrain.nexus.ship.projects + +import cats.effect.IO +import ch.epfl.bluebrain.nexus.delta.kernel.Logger +import ch.epfl.bluebrain.nexus.delta.sdk.ScopeInitializer +import ch.epfl.bluebrain.nexus.delta.sdk.model.BaseUri +import ch.epfl.bluebrain.nexus.delta.sdk.projects.Projects.FetchOrganization +import ch.epfl.bluebrain.nexus.delta.sdk.projects.model.ProjectEvent._ +import ch.epfl.bluebrain.nexus.delta.sdk.projects.model.ProjectRejection.NotFound +import ch.epfl.bluebrain.nexus.delta.sdk.projects.model.{ApiMappings, ProjectEvent, ProjectFields, ProjectRejection} +import ch.epfl.bluebrain.nexus.delta.sdk.projects.{Projects, ProjectsImpl, ValidateProjectDeletion} +import ch.epfl.bluebrain.nexus.delta.sourcing.Transactors +import ch.epfl.bluebrain.nexus.delta.sourcing.config.EventLogConfig +import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.Subject +import ch.epfl.bluebrain.nexus.delta.sourcing.model.{EntityType, ProjectRef} +import ch.epfl.bluebrain.nexus.ship.error.ShipError.ProjectDeletionIsNotAllowed +import ch.epfl.bluebrain.nexus.ship.projects.ProjectProcessor.logger +import ch.epfl.bluebrain.nexus.ship.{EventClock, EventProcessor, EventUUIDF} +import io.circe.Decoder + +final class ProjectProcessor private (projects: Projects, clock: EventClock, uuidF: EventUUIDF) + extends EventProcessor[ProjectEvent] { + override def resourceType: EntityType = Projects.entityType + + override def decoder: Decoder[ProjectEvent] = ProjectEvent.serializer.codec + + override def evaluate(event: ProjectEvent): IO[Unit] = { + for { + _ <- clock.setInstant(event.instant) + _ <- uuidF.setUUID(event.uuid) + _ <- evaluateInternal(event) + } yield () + } + + private def evaluateInternal(event: ProjectEvent) = { + implicit val s: Subject = event.subject + val projectRef = event.project + val cRev = event.rev - 1 + event match { + case ProjectCreated(_, _, _, _, _, description, apiMappings, base, vocab, enforceSchema, _, _) => + val fields = ProjectFields(description, apiMappings, Some(base), Some(vocab), enforceSchema) + projects.create(projectRef, fields) + case ProjectUpdated(_, _, _, _, _, description, apiMappings, base, vocab, enforceSchema, _, _) => + val fields = ProjectFields(description, apiMappings, Some(base), Some(vocab), enforceSchema) + projects.update(projectRef, cRev, fields) + case _: ProjectDeprecated => + projects.deprecate(projectRef, cRev) + case _: ProjectUndeprecated => + projects.undeprecate(projectRef, cRev) + case _: ProjectMarkedForDeletion => + IO.raiseError(ProjectDeletionIsNotAllowed(projectRef)) + } + }.recoverWith { + case notFound: NotFound => IO.raiseError(notFound) + case error: ProjectRejection => logger.warn(error)(error.reason) + }.void +} + +object ProjectProcessor { + + private val logger = Logger[ProjectProcessor] + def apply(fetchAndValidateOrg: FetchOrganization, config: EventLogConfig, xas: Transactors)(implicit + base: BaseUri + ): IO[ProjectProcessor] = + for { + clock <- EventClock.init() + uuidF <- EventUUIDF.init() + } yield { + val disableDeletion: ValidateProjectDeletion = (p: ProjectRef) => IO.raiseError(ProjectDeletionIsNotAllowed(p)) + val projects = ProjectsImpl( + fetchAndValidateOrg, + disableDeletion, + ScopeInitializer.noop, + ApiMappings.empty, + config, + xas, + clock + )(base, uuidF) + new ProjectProcessor(projects, clock, uuidF) + } +} diff --git a/ship/src/test/resources/config/external.conf b/ship/src/test/resources/config/external.conf new file mode 100644 index 0000000000..4e610f82fc --- /dev/null +++ b/ship/src/test/resources/config/external.conf @@ -0,0 +1,3 @@ +ship { + base-uri = "https://bbp.epfl.ch/v1" +} \ No newline at end of file diff --git a/ship/src/test/scala/ch/epfl/bluebrain/nexus/ship/config/ShipConfigSuite.scala b/ship/src/test/scala/ch/epfl/bluebrain/nexus/ship/config/ShipConfigSuite.scala index cdec394250..b5b91a248b 100644 --- a/ship/src/test/scala/ch/epfl/bluebrain/nexus/ship/config/ShipConfigSuite.scala +++ b/ship/src/test/scala/ch/epfl/bluebrain/nexus/ship/config/ShipConfigSuite.scala @@ -1,10 +1,22 @@ package ch.epfl.bluebrain.nexus.ship.config +import ch.epfl.bluebrain.nexus.delta.sdk.model.BaseUri +import ch.epfl.bluebrain.nexus.delta.sourcing.model.Label import ch.epfl.bluebrain.nexus.testkit.mu.NexusSuite +import fs2.io.file.Path class ShipConfigSuite extends NexusSuite { test("Default configuration should be parsed and loaded") { - ShipConfig.load(None).assert(_ => true) + val expectedBaseUri = BaseUri("http://localhost:8080", Label.unsafe("v1")) + ShipConfig.load(None).map(_.baseUri).assertEquals(expectedBaseUri) + } + + test("Default configuration should be overloaded by the external config") { + val expectedBaseUri = BaseUri("https://bbp.epfl.ch", Label.unsafe("v1")) + for { + externalConfigPath <- loader.absolutePath("config/external.conf") + _ <- ShipConfig.load(Some(Path(externalConfigPath))).map(_.baseUri).assertEquals(expectedBaseUri) + } yield () } }