Skip to content

Commit

Permalink
Delegate S3 file validation and creation endpoints (#4998)
Browse files Browse the repository at this point in the history
* Delegate file operation

* Add delegate file route

* JWS token issuing / verification logic

* Parse RSA private key, generate public, wire up route

* Use custom header for signature, provide storage id optionally, add integration test

* Add delegation to AWS config

* Add/test endpoint for registration using delegation path

* RSA config docs, tidy errors

* Refactor
  • Loading branch information
dantb authored Jun 6, 2024
1 parent 1d8805c commit 4839266
Show file tree
Hide file tree
Showing 22 changed files with 513 additions and 28 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
package ch.epfl.bluebrain.nexus.delta.plugins.storage

import akka.actor.typed.ActorSystem
import akka.http.scaladsl.model.Uri
import akka.http.scaladsl.model.{HttpResponse, StatusCodes, Uri}
import akka.http.scaladsl.server.{Route, RouteResult}
import cats.effect.{Clock, IO}
import ch.epfl.bluebrain.nexus.delta.kernel.utils.{ClasspathResourceLoader, TransactionalFileCopier, UUIDF}
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.client.ElasticSearchClient
Expand All @@ -10,7 +11,7 @@ import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.Files.FilesLog
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.batch.{BatchCopy, BatchFiles}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.contexts.{files => fileCtxId}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model._
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.routes.{BatchFilesRoutes, FilesRoutes}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.routes.{BatchFilesRoutes, DelegateFilesRoutes, FilesRoutes, TokenIssuer}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.schemas.{files => filesSchemaId}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.{FileAttributesUpdateStream, Files}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.StoragesConfig.{ShowFileLocation, StorageTypeConfig}
Expand All @@ -20,8 +21,8 @@ import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.FileOpe
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.disk.{DiskFileOperations, DiskStorageCopyFiles}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.remote.client.RemoteDiskStorageClient
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.remote.{RemoteDiskFileOperations, RemoteDiskStorageCopyFiles}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.{S3FileOperations, S3LocationGenerator}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.client.S3StorageClient
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.{S3FileOperations, S3LocationGenerator}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.routes.StoragesRoutes
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.schemas.{storage => storagesSchemaId}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.{StorageDeletionTask, StoragePermissionProviderImpl, Storages, StoragesStatistics}
Expand Down Expand Up @@ -51,6 +52,8 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.{ScopedEventLog, Transactors}
import com.typesafe.config.Config
import izumi.distage.model.definition.{Id, ModuleDef}

import scala.concurrent.Future

/**
* Storages and Files wiring
*/
Expand Down Expand Up @@ -271,6 +274,33 @@ class StoragePluginModule(priority: Int) extends ModuleDef {
)
}

make[Option[DelegateFilesRoutes]].from {
(
cfg: StorageTypeConfig,
identities: Identities,
aclCheck: AclCheck,
files: Files,
schemeDirectives: DeltaSchemeDirectives,
indexingAction: AggregateIndexingAction,
shift: File.Shift,
baseUri: BaseUri,
cr: RemoteContextResolution @Id("aggregate"),
ordering: JsonKeyOrdering,
showLocation: ShowFileLocation
) =>
cfg.amazon.flatMap(_.delegation).map { delegationCfg =>
val tokenIssuer = new TokenIssuer(delegationCfg.rsaKey, delegationCfg.tokenDuration)
new DelegateFilesRoutes(
identities,
aclCheck,
files,
tokenIssuer,
indexingAction(_, _, _)(shift),
schemeDirectives
)(baseUri, cr, ordering, showLocation)
}
}

make[BatchFilesRoutes].from {
(
showLocation: ShowFileLocation,
Expand Down Expand Up @@ -363,4 +393,14 @@ class StoragePluginModule(priority: Int) extends ModuleDef {
many[PriorityRoute].add { (batchFileRoutes: BatchFilesRoutes) =>
PriorityRoute(priority, batchFileRoutes.routes, requiresStrictEntity = false)
}

many[PriorityRoute].add { (maybeDelegationRoutes: Option[DelegateFilesRoutes]) =>
PriorityRoute(
priority,
maybeDelegationRoutes
.map(_.routes)
.getOrElse[Route](_ => Future.successful(RouteResult.Complete(HttpResponse(StatusCodes.NotFound)))),
requiresStrictEntity = false
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.FileCommand._
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.FileEvent._
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.FileRejection._
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model._
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.routes.DelegateFilesRoutes.DelegationResponse
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.schemas.{files => fileSchema}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.StorageRejection.{StorageFetchRejection, StorageIsDeprecated}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.{DigestAlgorithm, Storage, StorageRejection, StorageType}
Expand Down Expand Up @@ -146,6 +147,19 @@ final class Files(
} yield res
}.span("createLink")

def delegate(projectRef: ProjectRef, description: FileDescription, storageId: Option[IdSegment])(implicit
caller: Caller
): IO[DelegationResponse] = {
for {
pc <- fetchContext.onCreate(projectRef)
iri <- generateId(pc)
_ <-
test(CreateFile(iri, projectRef, testStorageRef, testStorageType, testAttributes, caller.subject, tag = None))
(_, storage) <- fetchAndValidateActiveStorage(storageId, projectRef, pc)
metadata <- fileOperations.delegate(storage, description.filename)
} yield DelegationResponse(metadata.bucket, iri, metadata.path, description.metadata)
}.span("delegate")

/**
* Create a new file linking it from an existing file in a storage
*
Expand Down Expand Up @@ -496,13 +510,15 @@ final class Files(
} yield ResourceRef.Revision(storage.id, storage.rev) -> storage.value
case None =>
for {
storage <- storages.fetchDefault(ref).adaptError { case e: StorageRejection =>
WrappedStorageRejection(e)
}
storage <- fetchDefaultStorage(ref)
_ <- validateAuth(ref, storage.value.storageValue.writePermission)
} yield ResourceRef.Revision(storage.id, storage.rev) -> storage.value
}

private def fetchDefaultStorage(ref: ProjectRef) = storages.fetchDefault(ref).adaptError { case e: StorageRejection =>
WrappedStorageRejection(e)
}

private def validateAuth(project: ProjectRef, permission: Permission)(implicit c: Caller): IO[Unit] =
aclCheck.authorizeForOr(project, permission)(AuthorizationFailed(project, permission))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import ch.epfl.bluebrain.nexus.delta.sdk.syntax.httpResponseFieldsSyntax
import ch.epfl.bluebrain.nexus.delta.sourcing.model.ProjectRef
import ch.epfl.bluebrain.nexus.delta.sourcing.model.Tag.UserTag
import io.circe.syntax._
import io.circe.{Encoder, JsonObject}
import io.circe.{Encoder, Json, JsonObject}

/**
* Enumeration of File rejection types.
Expand Down Expand Up @@ -231,6 +231,10 @@ object FileRejection {
s"Linking or registering a file cannot be performed without a 'filename' or a 'path' that does not end with a filename."
)

final case object InvalidJWSPayload extends FileRejection("Signature missing, flattened JWS format expected")

final case class JWSSignatureExpired(payload: Json) extends FileRejection(s"Token expired for payload: $payload")

final case class CopyRejection(
sourceProj: ProjectRef,
destProject: ProjectRef,
Expand Down Expand Up @@ -281,6 +285,7 @@ object FileRejection {
case FetchRejection(_, _, FetchFileRejection.FileNotFound(_)) => (StatusCodes.InternalServerError, Seq.empty)
case SaveRejection(_, _, SaveFileRejection.ResourceAlreadyExists(_)) => (StatusCodes.Conflict, Seq.empty)
case SaveRejection(_, _, SaveFileRejection.BucketAccessDenied(_, _, _)) => (StatusCodes.Forbidden, Seq.empty)
case JWSSignatureExpired(_) => (StatusCodes.Forbidden, Seq.empty)
case CopyRejection(_, _, _, rejection) => (rejection.status, Seq.empty)
case FetchRejection(_, _, _) => (StatusCodes.InternalServerError, Seq.empty)
case SaveRejection(_, _, _) => (StatusCodes.InternalServerError, Seq.empty)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
package ch.epfl.bluebrain.nexus.delta.plugins.storage.files.routes

import akka.http.scaladsl.model.StatusCodes.{Created, OK}
import akka.http.scaladsl.model.Uri
import akka.http.scaladsl.server._
import cats.effect.IO
import cats.syntax.all._
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.FileRejection._
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model._
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.routes.DelegateFilesRoutes._
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.{FileResource, Files}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.StoragesConfig.ShowFileLocation
import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.RemoteContextResolution
import ch.epfl.bluebrain.nexus.delta.rdf.utils.JsonKeyOrdering
import ch.epfl.bluebrain.nexus.delta.sdk.{IndexingAction, IndexingMode}
import ch.epfl.bluebrain.nexus.delta.sdk.acls.AclCheck
import ch.epfl.bluebrain.nexus.delta.sdk.circe.{CirceMarshalling, CirceUnmarshalling}
import ch.epfl.bluebrain.nexus.delta.sdk.directives.DeltaDirectives._
import ch.epfl.bluebrain.nexus.delta.sdk.directives.{AuthDirectives, DeltaSchemeDirectives, ResponseToJsonLd}
import ch.epfl.bluebrain.nexus.delta.sdk.identities.Identities
import ch.epfl.bluebrain.nexus.delta.sdk.identities.model.Caller
import ch.epfl.bluebrain.nexus.delta.sdk.model.{BaseUri, IdSegment}
import io.circe.generic.extras.Configuration
import io.circe.generic.extras.semiauto.deriveConfiguredDecoder
import io.circe.generic.semiauto.{deriveDecoder, deriveEncoder}
import io.circe.syntax.EncoderOps
import io.circe.{Decoder, Encoder, Json}
import ch.epfl.bluebrain.nexus.delta.sdk.implicits._
import ch.epfl.bluebrain.nexus.delta.sourcing.model.ProjectRef

final class DelegateFilesRoutes(
identities: Identities,
aclCheck: AclCheck,
files: Files,
tokenIssuer: TokenIssuer,
index: IndexingAction.Execute[File],
schemeDirectives: DeltaSchemeDirectives
)(implicit
baseUri: BaseUri,
cr: RemoteContextResolution,
ordering: JsonKeyOrdering,
showLocation: ShowFileLocation
) extends AuthDirectives(identities, aclCheck)
with CirceUnmarshalling
with CirceMarshalling { self =>

import schemeDirectives._

def routes: Route =
baseUriPrefix(baseUri.prefix) {
pathPrefix("delegate" / "files") {
extractCaller { implicit caller =>
projectRef { project =>
concat(
pathPrefix("validate") {
(pathEndOrSingleSlash & post) {
parameter("storage".as[IdSegment].?) { storageId =>
entity(as[FileDescription]) { desc =>
emit(OK, validateFileDetails(project, storageId, desc).attemptNarrow[FileRejection])
}
}
}
},
(pathEndOrSingleSlash & post) {
(parameter("storage".as[IdSegment].?) & indexingMode) { (storageId, mode) =>
entity(as[Json]) { jwsPayload =>
emit(
Created,
registerDelegatedFile(jwsPayload, project, storageId, mode)
.attemptNarrow[FileRejection]: ResponseToJsonLd
)
}
}
}
)
}
}
}
}

private def validateFileDetails(project: ProjectRef, storageId: Option[IdSegment], desc: FileDescription)(implicit
c: Caller
) =
for {
delegationResp <- files.delegate(project, desc, storageId)
jwsPayload <- tokenIssuer.issueJWSPayload(delegationResp.asJson)
} yield jwsPayload

private def registerDelegatedFile(
jwsPayload: Json,
project: ProjectRef,
storageId: Option[IdSegment],
mode: IndexingMode
)(implicit c: Caller): IO[FileResource] =
for {
originalPayload <- tokenIssuer.verifyJWSPayload(jwsPayload)
delegationResponse <- IO.fromEither(originalPayload.as[DelegationResponse])
fileId = FileId(delegationResponse.id, project)
fileResource <-
files.registerFile(
fileId,
storageId,
delegationResponse.metadata,
delegationResponse.path.path,
None,
None
)
_ <- index(project, fileResource, mode)
} yield fileResource

}

object DelegateFilesRoutes {

final case class DelegationResponse(bucket: String, id: Iri, path: Uri, metadata: Option[FileCustomMetadata])

object DelegationResponse {
implicit val enc: Encoder[DelegationResponse] = deriveEncoder
implicit val dec: Decoder[DelegationResponse] = deriveDecoder
}

implicit private val config: Configuration = Configuration.default
implicit val dec: Decoder[FileDescription] = deriveConfiguredDecoder[FileDescription]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package ch.epfl.bluebrain.nexus.delta.plugins.storage.files.routes

import cats.effect.{Clock, IO}
import ch.epfl.bluebrain.nexus.delta.kernel.Logger
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.FileRejection.{InvalidJWSPayload, JWSSignatureExpired}
import com.nimbusds.jose.crypto.{RSASSASigner, RSASSAVerifier}
import com.nimbusds.jose.jwk.RSAKey
import com.nimbusds.jose.util.JSONObjectUtils
import com.nimbusds.jose.{JWSAlgorithm, JWSHeader, JWSObjectJSON, Payload}
import io.circe.{parser, Json, Printer}

import java.security.KeyFactory
import java.security.interfaces.{RSAPrivateCrtKey, RSAPublicKey}
import java.security.spec.{PKCS8EncodedKeySpec, RSAPublicKeySpec}
import java.util.Base64
import scala.concurrent.duration.FiniteDuration
import scala.jdk.CollectionConverters.ListHasAsScala
import scala.util.Try

class TokenIssuer(key: RSAKey, tokenValidity: FiniteDuration)(implicit clock: Clock[IO]) {
private val signer = new RSASSASigner(key)
private val verifier = new RSASSAVerifier(key.toPublicJWK)
private val TokenValiditySeconds = tokenValidity.toSeconds
private val log = Logger[TokenIssuer]

def issueJWSPayload(payloadToSign: Json): IO[Json] =
for {
now <- clock.realTimeInstant
jwsObject = mkJWSObject(payloadToSign)
_ <- IO.delay(jwsObject.sign(mkJWSHeader(now.getEpochSecond + TokenValiditySeconds), signer))
serialized <- IO.delay(jwsObject.serializeFlattened())
json <- IO.fromEither(parser.parse(serialized))
} yield json

def verifyJWSPayload(payload: Json): IO[Json] =
for {
jwsObject <- IO.delay(JWSObjectJSON.parse(payload.toString()))
sig <- IO.fromOption(jwsObject.getSignatures.asScala.headOption)(InvalidJWSPayload)
_ <- IO.delay(sig.verify(verifier))
objectPayload = jwsObject.getPayload.toString
originalPayload <- IO.fromEither(parser.parse(objectPayload))
_ <- log.info(s"Original payload parsed for token: $originalPayload")
now <- clock.realTimeInstant
exp <- IO.delay(sig.getHeader.getCustomParam("exp").asInstanceOf[Long])
_ <- IO.raiseWhen(now.getEpochSecond > exp)(JWSSignatureExpired(originalPayload))
} yield originalPayload

private def mkJWSHeader(expSeconds: Long): JWSHeader =
new JWSHeader.Builder(JWSAlgorithm.RS256).keyID(key.getKeyID).customParam("exp", expSeconds).build()

private def mkJWSObject(payload: Json) = new JWSObjectJSON(mkPayload(payload))

private def mkPayload(raw: Json) = {
val jsonObjectMap = JSONObjectUtils.parse(raw.printWith(Printer.noSpacesSortKeys))
new Payload(jsonObjectMap)
}
}

object TokenIssuer {

def generateRSAKeyFromPrivate(privateKey: RSAPrivateCrtKey): RSAKey = {
val publicKeySpec: RSAPublicKeySpec = new RSAPublicKeySpec(privateKey.getModulus, privateKey.getPublicExponent)
val kf = KeyFactory.getInstance("RSA")
val publicKey = kf.generatePublic(publicKeySpec).asInstanceOf[RSAPublicKey]
new RSAKey.Builder(publicKey).privateKey(privateKey).build()
}

def parseRSAPrivateKey(raw: String): Try[RSAPrivateCrtKey] = Try {
val keyStripped = raw
.replace("-----END PRIVATE KEY-----", "")
.replace("-----BEGIN PRIVATE KEY-----", "")
.replace("\n", "")
val keyStrippedDecoded = Base64.getDecoder.decode(keyStripped)

val keySpec = new PKCS8EncodedKeySpec(keyStrippedDecoded)
val kf = KeyFactory.getInstance("RSA")
kf.generatePrivate(keySpec).asInstanceOf[RSAPrivateCrtKey]
}

}
Loading

0 comments on commit 4839266

Please sign in to comment.