Skip to content

Commit

Permalink
Merge branch 'master' into 4102-introduce-resources-practice-routes
Browse files Browse the repository at this point in the history
  • Loading branch information
imsdu authored Sep 21, 2023
2 parents d84c0c1 + 80f591b commit 0a85d8b
Show file tree
Hide file tree
Showing 118 changed files with 935 additions and 1,059 deletions.
4 changes: 3 additions & 1 deletion .github/workflows/ci-integration-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,6 @@ jobs:
run: sbt -Dsbt.color=always -Dsbt.supershell=false "project tests" test
- name: Stop & clean Docker
if: ${{ always() }}
run: docker-compose -f tests/docker/docker-compose.yml down --rmi "local" --volumes
run: |
docker-compose -f tests/docker/docker-compose.yml down --rmi "local" --volumes
docker system prune --force --volumes
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ native
target/
.bloop
.metals
metals.sbt
project/.bloop
.bsp

Expand Down
10 changes: 5 additions & 5 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ val akkaHttpCirceVersion = "1.39.2"
val akkaCorsVersion = "1.2.0"
val akkaVersion = "2.6.21"
val alpakkaVersion = "3.0.4"
val apacheCompressVersion = "1.23.0"
val apacheCompressVersion = "1.24.0"
val apacheIoVersion = "1.3.2"
val awsSdkVersion = "2.17.184"
val byteBuddyAgentVersion = "1.10.17"
Expand All @@ -29,7 +29,7 @@ val caffeineVersion = "3.1.8"
val catsEffectVersion = "2.5.5"
val catsRetryVersion = "2.1.1"
val catsVersion = "2.10.0"
val circeVersion = "0.14.5"
val circeVersion = "0.14.6"
val circeOpticsVersion = "0.14.1"
val circeExtrasVersions = "0.14.3"
val classgraphVersion = "4.8.162"
Expand All @@ -46,16 +46,16 @@ val kanelaAgentVersion = "1.0.17"
val kindProjectorVersion = "0.13.2"
val log4catsVersion = "1.7.0"
val logbackVersion = "1.4.11"
val magnoliaVersion = "1.1.4"
val mockitoVersion = "1.17.14"
val magnoliaVersion = "1.1.6"
val mockitoVersion = "1.17.22"
val monixVersion = "3.4.1"
val monixBioVersion = "1.2.0"
val munitVersion = "1.0.0-M8"
val nimbusJoseJwtVersion = "9.31"
val postgresJdbcVersion = "42.6.0"
val pureconfigVersion = "0.17.4"
val scalaLoggingVersion = "3.9.5"
val scalaTestVersion = "3.2.16"
val scalaTestVersion = "3.2.17"
val scalaXmlVersion = "2.1.0"
val topBraidVersion = "1.3.2" // 1.4.1 fails to validate some test schemas
val testContainersVersion = "1.19.0"
Expand Down
20 changes: 11 additions & 9 deletions delta/app/src/main/resources/app.conf
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,6 @@ app {
"schemas/write",
"files/write",
"storages/write",
"archives/write",
"version/read",
"quotas/read",
"supervision/read"
Expand All @@ -139,7 +138,6 @@ app {
"schemas/write",
"files/write",
"storages/write",
"archives/write",
"version/read",
"quotas/read"
]
Expand Down Expand Up @@ -279,12 +277,6 @@ app {
}
}

# secrets encryption configuration
encryption {
password = "changeme"
salt = "salt"
}

# projection configuration
projections {
cluster {
Expand All @@ -310,7 +302,7 @@ app {
}

defaults {
http-client {
http-client-compression {
# the retry strategy for the http client
retry = ${app.defaults.constant-retry-strategy}
# the strategy to decide if it is worth retrying when an Http error occurs.
Expand All @@ -320,6 +312,16 @@ app {
compression = true
}

http-client-no-compression {
# the retry strategy for the http client
retry = ${app.defaults.constant-retry-strategy}
# the strategy to decide if it is worth retrying when an Http error occurs.
# allowed strategies are 'always', 'never' or 'onServerError'.
is-worth-retrying = "onServerError"
# Flag to decide whether or not to support compression
compression = false
}

# default query configuration
query {
batch-size = 30
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package ch.epfl.bluebrain.nexus.delta.config
import ch.epfl.bluebrain.nexus.delta.kernel.cache.CacheConfig
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.api.JsonLdApiConfig
import ch.epfl.bluebrain.nexus.delta.sdk.acls.AclsConfig
import ch.epfl.bluebrain.nexus.delta.sdk.crypto.EncryptionConfig
import ch.epfl.bluebrain.nexus.delta.sdk.fusion.FusionConfig
import ch.epfl.bluebrain.nexus.delta.sdk.model.ServiceAccountConfig
import ch.epfl.bluebrain.nexus.delta.sdk.organizations.OrganizationsConfig
Expand Down Expand Up @@ -47,7 +46,6 @@ final case class AppConfig(
schemas: SchemasConfig,
serviceAccount: ServiceAccountConfig,
sse: SseConfig,
encryption: EncryptionConfig,
projections: ProjectionConfig,
fusion: FusionConfig
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ class JsonLayout extends LayoutBase[ILoggingEvent] {
.fromFields(
Map(
"@timestamp" := event.getInstant,
"message" := event.getMessage,
"message" := event.getFormattedMessage,
"log.level" := event.getLevel.toString,
"log.logger" := event.getLoggerName
) ++ stackTraceFields
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import ch.epfl.bluebrain.nexus.delta.routes.ErrorRoutes
import ch.epfl.bluebrain.nexus.delta.sdk.IndexingAction.AggregateIndexingAction
import ch.epfl.bluebrain.nexus.delta.sdk._
import ch.epfl.bluebrain.nexus.delta.sdk.acls.Acls
import ch.epfl.bluebrain.nexus.delta.sdk.crypto.Crypto
import ch.epfl.bluebrain.nexus.delta.sdk.fusion.FusionConfig
import ch.epfl.bluebrain.nexus.delta.sdk.http.StrictEntity
import ch.epfl.bluebrain.nexus.delta.sdk.identities.model.ServiceAccount
Expand Down Expand Up @@ -61,7 +60,6 @@ class DeltaModule(appCfg: AppConfig, config: Config)(implicit classLoader: Class
make[BaseUri].from { appCfg.http.baseUri }
make[StrictEntity].from { appCfg.http.strictEntityTimeout }
make[ServiceAccount].from { appCfg.serviceAccount.value }
make[Crypto].from { appCfg.encryption.crypto }

make[Transactors].fromResource {
Transactors.init(appCfg.database)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,17 @@ import akka.http.scaladsl.model.{HttpRequest, Uri}
import ch.epfl.bluebrain.nexus.delta.Main.pluginsMaxPriority
import ch.epfl.bluebrain.nexus.delta.config.AppConfig
import ch.epfl.bluebrain.nexus.delta.kernel.cache.CacheConfig
import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._
import ch.epfl.bluebrain.nexus.delta.kernel.search.Pagination.FromPagination
import ch.epfl.bluebrain.nexus.delta.rdf.Vocabulary.contexts
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.{ContextValue, RemoteContextResolution}
import ch.epfl.bluebrain.nexus.delta.rdf.utils.JsonKeyOrdering
import ch.epfl.bluebrain.nexus.delta.routes.IdentitiesRoutes
import ch.epfl.bluebrain.nexus.delta.sdk.PriorityRoute
import ch.epfl.bluebrain.nexus.delta.sdk.acls.AclCheck
import ch.epfl.bluebrain.nexus.delta.sdk.auth.{AuthTokenProvider, OpenIdAuthService}
import ch.epfl.bluebrain.nexus.delta.sdk.http.{HttpClient, HttpClientError}
import ch.epfl.bluebrain.nexus.delta.sdk.identities.{Identities, IdentitiesImpl}
import ch.epfl.bluebrain.nexus.delta.kernel.search.Pagination.FromPagination
import ch.epfl.bluebrain.nexus.delta.sdk.model.search.SearchParams.RealmSearchParams
import ch.epfl.bluebrain.nexus.delta.sdk.model.{BaseUri, ResourceF}
import ch.epfl.bluebrain.nexus.delta.sdk.realms.Realms
Expand All @@ -22,7 +24,6 @@ import io.circe.Json
import izumi.distage.model.definition.{Id, ModuleDef}
import monix.bio.{IO, UIO}
import monix.execution.Scheduler
import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._

/**
* Identities module wiring config.
Expand All @@ -46,6 +47,14 @@ object IdentitiesModule extends ModuleDef {
IdentitiesImpl(findActiveRealm, getUserInfo, config)
}

make[OpenIdAuthService].from { (httpClient: HttpClient @Id("realm"), realms: Realms) =>
new OpenIdAuthService(httpClient, realms)
}

make[AuthTokenProvider].fromEffect { (authService: OpenIdAuthService) =>
AuthTokenProvider(authService)
}

many[RemoteContextResolution].addEffect(ContextValue.fromFile("contexts/identities.json").map { ctx =>
RemoteContextResolution.fixed(contexts.identities -> ctx)
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ object RealmsModule extends ModuleDef {
}

make[HttpClient].named("realm").from { (as: ActorSystem[Nothing], sc: Scheduler) =>
HttpClient.noRetry()(as.classicSystem, sc)
HttpClient.noRetry(compression = false)(as.classicSystem, sc)
}

many[SseEncoder[_]].add { base: BaseUri => RealmEvent.sseEncoder(base) }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import monix.execution.Scheduler

import java.nio.ByteBuffer
import java.nio.charset.StandardCharsets
import akka.stream.alpakka.file.ArchiveMetadata

/**
* Archive download functionality.
Expand All @@ -55,10 +56,9 @@ trait ArchiveDownload {
* @param caller
* the caller to be used for checking for access
*/
def apply[M](
def apply(
value: ArchiveValue,
project: ProjectRef,
format: ArchiveFormat[M],
ignoreNotFound: Boolean
)(implicit caller: Caller, scheduler: Scheduler): IO[ArchiveRejection, AkkaSource]

Expand Down Expand Up @@ -96,18 +96,17 @@ object ArchiveDownload {
private val printer = Printer.spaces2.copy(dropNullValues = true)
private val sourcePrinter = Printer.spaces2.copy(dropNullValues = false)

override def apply[M](
override def apply(
value: ArchiveValue,
project: ProjectRef,
format: ArchiveFormat[M],
ignoreNotFound: Boolean
)(implicit caller: Caller, scheduler: Scheduler): IO[ArchiveRejection, AkkaSource] = {
for {
references <- value.resources.toList.traverse(toFullReference)
_ <- checkResourcePermissions(references, project)
contentStream <- resolveReferencesAsStream(references, project, ignoreNotFound, format)
contentStream <- resolveReferencesAsStream(references, project, ignoreNotFound)
} yield {
Source.fromGraph(StreamConverter(contentStream)).via(format.writeFlow)
Source.fromGraph(StreamConverter(contentStream)).via(Zip.writeFlow)
}
}

Expand All @@ -124,34 +123,29 @@ object ArchiveDownload {
}
}

private def resolveReferencesAsStream[M](
private def resolveReferencesAsStream(
references: List[FullArchiveReference],
project: ProjectRef,
ignoreNotFound: Boolean,
format: ArchiveFormat[M]
)(implicit caller: Caller): IO[ArchiveRejection, Stream[Task, (M, AkkaSource)]] = {
ignoreNotFound: Boolean
)(implicit caller: Caller): IO[ArchiveRejection, Stream[Task, (ArchiveMetadata, AkkaSource)]] = {
references
.traverseFilter {
case ref: FileReference => fileEntry(ref, project, format, ignoreNotFound)
case ref: ResourceReference => resourceEntry(ref, project, format, ignoreNotFound)
case ref: FileReference => fileEntry(ref, project, ignoreNotFound)
case ref: ResourceReference => resourceEntry(ref, project, ignoreNotFound)
}
.map(sortWith(format))
.map(sortWith)
.map(asStream)
}

private def sortWith[M](
format: ArchiveFormat[M]
)(list: List[(M, Task[AkkaSource])]): List[(M, Task[AkkaSource])] = {
list.sortBy { case (entry, _) =>
entry
}(format.ordering)
}
private def sortWith(list: List[(ArchiveMetadata, Task[AkkaSource])]): List[(ArchiveMetadata, Task[AkkaSource])] =
list.sortBy { case (entry, _) => entry }(Zip.ordering)

private def asStream[M](list: List[(M, Task[AkkaSource])]) = {
Stream.iterable(list).evalMap[Task, (M, AkkaSource)] { case (metadata, source) =>
private def asStream(
list: List[(ArchiveMetadata, Task[AkkaSource])]
): Stream[Task, (ArchiveMetadata, AkkaSource)] =
Stream.iterable(list).evalMap { case (metadata, source) =>
source.map(metadata -> _)
}
}

private def checkResourcePermissions(
refs: List[FullArchiveReference],
Expand All @@ -166,14 +160,13 @@ object ArchiveDownload {
)
.void

private def fileEntry[Metadata](
private def fileEntry(
ref: FileReference,
project: ProjectRef,
format: ArchiveFormat[Metadata],
ignoreNotFound: Boolean
)(implicit
caller: Caller
): IO[ArchiveRejection, Option[(Metadata, Task[AkkaSource])]] = {
): IO[ArchiveRejection, Option[(ArchiveMetadata, Task[AkkaSource])]] = {
val refProject = ref.project.getOrElse(project)
// the required permissions are checked for each file content fetch
val entry = fetchFileContent(ref.ref, refProject, caller)
Expand All @@ -184,21 +177,19 @@ object ArchiveDownload {
case FileRejection.AuthorizationFailed(addr, perm) => AuthorizationFailed(addr, perm)
case other => WrappedFileRejection(other)
}
.flatMap { case FileResponse(fileMetadata, content) =>
IO.fromEither(
pathOf(ref, project, format, fileMetadata.filename).map { path =>
val archiveMetadata = format.metadata(path, fileMetadata.bytes)
val contentTask: Task[AkkaSource] = content
.tapError(response =>
UIO.delay(
logger
.error(s"Error streaming file '${fileMetadata.filename}' for archive: ${response.value.value}")
)
)
.mapError(response => ArchiveDownloadError(fileMetadata.filename, response))
Some((archiveMetadata, contentTask))
}
)
.map { case FileResponse(fileMetadata, content) =>
val path = pathOf(ref, project, fileMetadata.filename)
val archiveMetadata = Zip.metadata(path)
val contentTask: Task[AkkaSource] = content
.tapError(response =>
UIO.delay(
logger
.error(s"Error streaming file '${fileMetadata.filename}' for archive: ${response.value.value}")
)
)
.mapError(response => ArchiveDownloadError(fileMetadata.filename, response))
Some((archiveMetadata, contentTask))

}
if (ignoreNotFound) entry.onErrorRecover { case _: ResourceNotFound => None }
else entry
Expand All @@ -207,27 +198,21 @@ object ArchiveDownload {
private def pathOf(
ref: FileReference,
project: ProjectRef,
format: ArchiveFormat[_],
filename: String
): Either[FilenameTooLong, String] =
ref.path.map { p => Right(p.value.toString) }.getOrElse {
): String =
ref.path.map(_.value.toString).getOrElse {
val p = ref.project.getOrElse(project)
Either.cond(
format != ArchiveFormat.Tar || filename.length < 100,
s"$p/file/$filename",
FilenameTooLong(ref.ref.original, p, filename)
)
s"$p/file/$filename"
}

private def resourceEntry[Metadata](
private def resourceEntry(
ref: ResourceReference,
project: ProjectRef,
format: ArchiveFormat[Metadata],
ignoreNotFound: Boolean
): IO[ArchiveRejection, Option[(Metadata, Task[AkkaSource])]] = {
): IO[ArchiveRejection, Option[(ArchiveMetadata, Task[AkkaSource])]] = {
val archiveEntry = resourceRefToByteString(ref, project).map { content =>
val path = pathOf(ref, project)
val metadata = format.metadata(path, content.length.toLong)
val metadata = Zip.metadata(path)
Some((metadata, Task.pure(Source.single(content))))
}
if (ignoreNotFound) archiveEntry.onErrorHandle { _: ResourceNotFound => None }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,13 +166,12 @@ class Archives(
def download(
id: IdSegment,
project: ProjectRef,
format: ArchiveFormat[_],
ignoreNotFound: Boolean
)(implicit caller: Caller, scheduler: Scheduler): IO[ArchiveRejection, AkkaSource] =
(for {
resource <- fetch(id, project)
value = resource.value
source <- archiveDownload(value.value, project, format, ignoreNotFound)
source <- archiveDownload(value.value, project, ignoreNotFound)
} yield source).span("downloadArchive")

private def eval(cmd: CreateArchive): IO[ArchiveRejection, ArchiveResource] =
Expand Down
Loading

0 comments on commit 0a85d8b

Please sign in to comment.