Skip to content

Commit

Permalink
When running nexus ship with s3 flag, read config from s3 (#4866)
Browse files Browse the repository at this point in the history
  • Loading branch information
olivergrabinski authored Apr 12, 2024
1 parent 59ea896 commit ef1c14e
Show file tree
Hide file tree
Showing 4 changed files with 134 additions and 53 deletions.
17 changes: 15 additions & 2 deletions ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/RunShip.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,17 @@ import ch.epfl.bluebrain.nexus.ship.resolvers.ResolverProcessor
import ch.epfl.bluebrain.nexus.ship.resources.{ResourceProcessor, ResourceWiring}
import ch.epfl.bluebrain.nexus.ship.schemas.{SchemaProcessor, SchemaWiring}
import ch.epfl.bluebrain.nexus.ship.views.{BlazegraphViewProcessor, CompositeViewProcessor, ElasticSearchViewProcessor}
import eu.timepit.refined.types.string.NonEmptyString
import fs2.Stream
import fs2.aws.s3.models.Models.BucketName
import fs2.aws.s3.models.Models.{BucketName, FileKey}
import fs2.io.file.Path

trait RunShip {

private val logger = Logger[RunShip]

def loadConfig(config: Option[Path]): IO[ShipConfig]

def eventsStream(path: Path, fromOffset: Offset): Stream[IO, RowEvent]

def run(path: Path, config: Option[Path], fromOffset: Offset = Offset.start): IO[ImportReport] = {
Expand All @@ -37,7 +40,7 @@ trait RunShip {
implicit val jsonLdApi: JsonLdApi = JsonLdJavaApi.lenient
for {
_ <- logger.info(s"Running the import with file $path, config $config")
config <- ShipConfig.load(config)
config <- loadConfig(config)
report <- Transactors.init(config.database).use { xas =>
val orgProvider =
OrganizationProvider(config.eventLog, config.serviceAccount.value, xas, clock)(uuidF)
Expand Down Expand Up @@ -88,11 +91,21 @@ trait RunShip {
object RunShip {

def localShip = new RunShip {
override def loadConfig(config: Option[Path]): IO[ShipConfig] =
ShipConfig.load(config)

override def eventsStream(path: Path, fromOffset: Offset): Stream[IO, RowEvent] =
EventStreamer.localStreamer.stream(path, fromOffset)
}

def s3Ship(client: S3StorageClient, bucket: BucketName) = new RunShip {
override def loadConfig(config: Option[Path]): IO[ShipConfig] = config match {
case Some(configPath) =>
val configStream = client.readFile(bucket, FileKey(NonEmptyString.unsafeFrom(configPath.toString)))
ShipConfig.load(configStream)
case None => ShipConfig.load(None)
}

override def eventsStream(path: Path, fromOffset: Offset): Stream[IO, RowEvent] =
EventStreamer
.s3eventStreamer(client, bucket)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,16 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.config.{DatabaseConfig, EventLogCo
import ch.epfl.bluebrain.nexus.delta.sourcing.model.ProjectRef
import ch.epfl.bluebrain.nexus.ship.config.ShipConfig.ProjectMapping
import com.typesafe.config.Config
import fs2.Stream
import fs2.io.file.Path
import pureconfig.ConfigReader
import pureconfig.backend.ConfigFactoryWrapper
import pureconfig.configurable.genericMapReader
import pureconfig.error.CannotConvert
import pureconfig.error.{CannotConvert, ConfigReaderException}
import pureconfig.generic.semiauto.deriveReader

import java.nio.charset.StandardCharsets.UTF_8

final case class ShipConfig(
baseUri: BaseUri,
database: DatabaseConfig,
Expand All @@ -37,13 +41,39 @@ object ShipConfig {
deriveReader[ShipConfig]
}

def merge(externalConfigPath: Option[Path]): IO[(ShipConfig, Config)] =
def merge(externalConfigPath: Option[Path]): IO[(ShipConfig, Config)] = {
val externalConfig = Configs.parseFile(externalConfigPath.map(_.toNioPath.toFile))
externalConfig.flatMap(mergeFromConfig)
}

def merge(externalConfigStream: Stream[IO, Byte]): IO[(ShipConfig, Config)] = {
val externalConfig = configFromStream(externalConfigStream)
externalConfig.flatMap(mergeFromConfig)
}

private def mergeFromConfig(externalConfig: Config): IO[(ShipConfig, Config)] =
for {
externalConfig <- Configs.parseFile(externalConfigPath.map(_.toNioPath.toFile))
defaultConfig <- Configs.parseResource("ship-default.conf")
result <- Configs.merge[ShipConfig]("ship", externalConfig, defaultConfig)
defaultConfig <- Configs.parseResource("ship-default.conf")
result <- Configs.merge[ShipConfig]("ship", externalConfig, defaultConfig)
} yield result

def load(externalConfigPath: Option[Path]): IO[ShipConfig] =
merge(externalConfigPath).map(_._1)

def load(externalConfigStream: Stream[IO, Byte]): IO[ShipConfig] = {
merge(externalConfigStream).map(_._1)
}

/**
* Loads a config from a stream. Taken from
* https://github.com/pureconfig/pureconfig/tree/master/modules/fs2/src/main/scala/pureconfig/module/fs2
*/
private def configFromStream(configStream: Stream[IO, Byte]): IO[Config] =
for {
bytes <- configStream.compile.to(Array)
string = new String(bytes, UTF_8)
configOrError <- IO.delay(ConfigFactoryWrapper.parseString(string))
config <- IO.fromEither(configOrError.leftMap(ConfigReaderException[Config]))
} yield config

}
Original file line number Diff line number Diff line change
Expand Up @@ -3,71 +3,40 @@ package ch.epfl.bluebrain.nexus.ship
import cats.effect.{IO, Resource}
import cats.syntax.all._
import ch.epfl.bluebrain.nexus.delta.kernel.utils.ClasspathResourceLoader
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.LocalStackS3StorageClient
import ch.epfl.bluebrain.nexus.delta.sdk.projects.Projects
import ch.epfl.bluebrain.nexus.delta.sdk.resolvers.Resolvers
import ch.epfl.bluebrain.nexus.delta.sdk.resources.Resources
import ch.epfl.bluebrain.nexus.delta.sourcing.Transactors
import ch.epfl.bluebrain.nexus.delta.sourcing.model.EntityType
import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset
import ch.epfl.bluebrain.nexus.delta.sourcing.postgres.Doobie.{transactors, PostgresPassword, PostgresUser}
import ch.epfl.bluebrain.nexus.delta.sourcing.postgres.Doobie.{PostgresPassword, PostgresUser, transactors}
import ch.epfl.bluebrain.nexus.ship.ImportReport.Count
import ch.epfl.bluebrain.nexus.ship.RunShipSuite.{clearDB, expectedImportReport, getDistinctOrgProjects, uploadImportFileToS3}
import ch.epfl.bluebrain.nexus.ship.RunShipSuite.{clearDB, expectedImportReport, getDistinctOrgProjects}
import ch.epfl.bluebrain.nexus.testkit.config.SystemPropertyOverride
import ch.epfl.bluebrain.nexus.testkit.mu.NexusSuite
import ch.epfl.bluebrain.nexus.testkit.postgres.PostgresContainer
import doobie.implicits._
import eu.timepit.refined.types.string.NonEmptyString
import fs2.aws.s3.models.Models.BucketName
import fs2.io.file.Path
import io.laserdisc.pure.s3.tagless.S3AsyncClientOp
import munit.catseffect.IOFixture
import munit.{AnyFixture, CatsEffectSuite}
import org.scalatest.time.SpanSugar.convertIntToGrainOfTime
import software.amazon.awssdk.services.s3.model.{CreateBucketRequest, PutObjectRequest, PutObjectResponse}

import java.nio.file.Paths
import java.time.Instant
import scala.concurrent.duration.Duration

class RunShipSuite extends NexusSuite with RunShipSuite.Fixture with LocalStackS3StorageClient.Fixture {
class RunShipSuite extends NexusSuite with RunShipSuite.Fixture {

override def munitIOTimeout: Duration = 60.seconds

override def munitFixtures: Seq[AnyFixture[_]] = List(mainFixture, localStackS3Client)
override def munitFixtures: Seq[AnyFixture[_]] = List(mainFixture)
private lazy val xas = mainFixture()
private lazy val (s3Client, fs2S3Client, _) = localStackS3Client()

override def beforeEach(context: BeforeEach): Unit = {
super.beforeEach(context)
clearDB(xas).accepted
()
}

test("Run import from S3 providing a single file") {
val path = Path("/import/import.json")
val bucket = BucketName(NonEmptyString.unsafeFrom("bucket"))
for {
_ <- uploadImportFileToS3(fs2S3Client, bucket, path)
_ <- RunShip.s3Ship(s3Client, bucket).run(path, None).assertEquals(expectedImportReport)
} yield ()
}

test("Run import from S3 providing a directory") {
val directoryPath = Path("/import/multi-part-import")
val bucket = BucketName(NonEmptyString.unsafeFrom("bucket"))
for {
_ <- uploadImportFileToS3(fs2S3Client, bucket, Path("/import/multi-part-import/2024-04-05T14:38:31.165389Z.json"))
_ <-
uploadImportFileToS3(fs2S3Client, bucket, Path("/import/multi-part-import/2024-04-05T14:38:31.165389Z.success"))
_ <- uploadImportFileToS3(fs2S3Client, bucket, Path("/import/multi-part-import/2024-04-06T11:34:31.165389Z.json"))
_ <- RunShip
.s3Ship(s3Client, bucket)
.run(directoryPath, None)
.assertEquals(expectedImportReport)
} yield ()
}

test("Run import by providing the path to a file") {
for {
importFile <- asPath("import/import.json")
Expand Down Expand Up @@ -116,12 +85,12 @@ class RunShipSuite extends NexusSuite with RunShipSuite.Fixture with LocalStackS

object RunShipSuite {

def clearDB(xas: Transactors) =
def clearDB(xas: Transactors): IO[Unit] =
sql"""
| DELETE FROM scoped_events; DELETE FROM scoped_states;
|""".stripMargin.update.run.void.transact(xas.write)

def getDistinctOrgProjects(xas: Transactors) =
def getDistinctOrgProjects(xas: Transactors): IO[List[(String, String)]] =
sql"""
| SELECT DISTINCT org, project FROM scoped_events;
""".stripMargin.query[(String, String)].to[List].transact(xas.read)
Expand All @@ -138,15 +107,6 @@ object RunShipSuite {
)
)

def uploadImportFileToS3(s3Client: S3AsyncClientOp[IO], bucket: BucketName, path: Path): IO[PutObjectResponse] = {
s3Client.createBucket(CreateBucketRequest.builder().bucket(bucket.value.value).build) >>
s3Client
.putObject(
PutObjectRequest.builder.bucket(bucket.value.value).key(path.toString).build,
Paths.get(getClass.getResource(path.toString).toURI)
)
}

trait Fixture { self: CatsEffectSuite =>

private def initConfig(postgres: PostgresContainer) =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package ch.epfl.bluebrain.nexus.ship

import cats.effect.IO
import cats.syntax.all._
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.LocalStackS3StorageClient
import ch.epfl.bluebrain.nexus.ship.RunShipSuite.{clearDB, expectedImportReport}
import ch.epfl.bluebrain.nexus.ship.S3RunShipSuite.uploadFileToS3
import ch.epfl.bluebrain.nexus.testkit.mu.NexusSuite
import eu.timepit.refined.types.string.NonEmptyString
import fs2.aws.s3.models.Models.BucketName
import fs2.io.file.Path
import io.laserdisc.pure.s3.tagless.S3AsyncClientOp
import munit.AnyFixture
import org.scalatest.time.SpanSugar.convertIntToGrainOfTime
import software.amazon.awssdk.services.s3.model.{CreateBucketRequest, PutObjectRequest, PutObjectResponse}

import java.nio.file.Paths
import scala.concurrent.duration.Duration

class S3RunShipSuite extends NexusSuite with RunShipSuite.Fixture with LocalStackS3StorageClient.Fixture {

override def munitIOTimeout: Duration = 60.seconds

override def munitFixtures: Seq[AnyFixture[_]] = List(mainFixture, localStackS3Client)
private lazy val xas = mainFixture()
private lazy val (s3Client, fs2S3client, _) = localStackS3Client()

private val bucket = BucketName(NonEmptyString.unsafeFrom("bucket"))

override def beforeEach(context: BeforeEach): Unit = {
super.beforeEach(context)
clearDB(xas).accepted
()
}

test("Run import from S3 providing a single file") {
val importFilePath = Path("/import/import.json")
for {
_ <- uploadFileToS3(fs2S3client, bucket, importFilePath)
_ <- RunShip.s3Ship(s3Client, bucket).run(importFilePath, None).assertEquals(expectedImportReport)
} yield ()
}

test("Succeed in overloading the default config with an external config in S3") {
val configPath = Path("/config/external.conf")
for {
_ <- uploadFileToS3(fs2S3client, bucket, configPath)
shipConfig <- RunShip.s3Ship(s3Client, bucket).loadConfig(configPath.some)
_ = assertEquals(shipConfig.baseUri.toString, "https://bbp.epfl.ch/v1")
} yield ()
}

test("Run import from S3 providing a directory") {
val directoryPath = Path("/import/multi-part-import")
for {
_ <- uploadFileToS3(fs2S3client, bucket, Path("/import/multi-part-import/2024-04-05T14:38:31.165389Z.json"))
_ <- uploadFileToS3(fs2S3client, bucket, Path("/import/multi-part-import/2024-04-05T14:38:31.165389Z.success"))
_ <- uploadFileToS3(fs2S3client, bucket, Path("/import/multi-part-import/2024-04-06T11:34:31.165389Z.json"))
_ <- RunShip
.s3Ship(s3Client, bucket)
.run(directoryPath, None)
.assertEquals(expectedImportReport)
} yield ()
}

}

object S3RunShipSuite {

def uploadFileToS3(s3Client: S3AsyncClientOp[IO], bucket: BucketName, path: Path): IO[PutObjectResponse] = {
s3Client.createBucket(CreateBucketRequest.builder().bucket(bucket.value.value).build) >>
s3Client.putObject(
PutObjectRequest.builder.bucket(bucket.value.value).key(path.toString).build,
Paths.get(getClass.getResource(path.toString).toURI)
)
}

}

0 comments on commit ef1c14e

Please sign in to comment.