Skip to content

Commit

Permalink
Reuse partition naming, refactor integration tests
Browse files Browse the repository at this point in the history
  • Loading branch information
dantb committed Sep 29, 2023
1 parent 5f07a09 commit a073776
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 83 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import java.time.Instant
/**
* Operations pertaining to managing Access Control Lists.
*/
trait Acls extends PurgeAcl {
trait Acls {

/**
* Fetches the ACL resource for an ''address'' on the current revision.
Expand Down Expand Up @@ -190,7 +190,6 @@ trait Acls extends PurgeAcl {
*/
def subtract(acl: Acl, rev: Int)(implicit caller: Subject): IO[AclRejection, AclResource]

// DTB TODO - is this more of a deprecate than a delete?
/**
* Delete all ''acl'' on the passed ''address''.
*
Expand All @@ -201,18 +200,15 @@ trait Acls extends PurgeAcl {
*/
def delete(address: AclAddress, rev: Int)(implicit caller: Subject): IO[AclRejection, AclResource]

private def filterSelf(resource: AclResource)(implicit caller: Caller): AclResource =
resource.map(_.filter(caller.identities))

}

trait PurgeAcl {

/**
* Hard deletes events and states for the given acl address. This is meant to be used internally for project and
* organization deletion.
*/
def purge(project: AclAddress): UIO[Unit]

private def filterSelf(resource: AclResource)(implicit caller: Caller): AclResource =
resource.map(_.filter(caller.identities))

}

object Acls {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import ch.epfl.bluebrain.nexus.delta.sdk.acls.Acls
import ch.epfl.bluebrain.nexus.delta.sdk.acls.model.AclAddress
import ch.epfl.bluebrain.nexus.delta.sdk.organizations.model.OrganizationRejection.OrganizationNonEmpty
import ch.epfl.bluebrain.nexus.delta.sourcing.model.{EntityType, Label}
import ch.epfl.bluebrain.nexus.delta.sourcing.{MD5, Transactors}
import ch.epfl.bluebrain.nexus.delta.sourcing.{PartitionInit, Transactors}
import doobie.implicits._
import ch.epfl.bluebrain.nexus.delta.sourcing.implicits._
import doobie.util.update.Update0
Expand All @@ -34,12 +34,12 @@ object OrganizationDeleter {

private def deleteAll(id: Label): IO[Unit] =
(for {
_ <- List("scoped_events", "scoped_states").traverse(dropPartition(id, _)).void
_ <- List("scoped_events", "scoped_states").traverse(dropPartition(id, _))
_ <- List("global_events", "global_states").traverse(deleteGlobal(id, _))
} yield ()).transact(xas.writeCE).void

private def dropPartition(id: Label, table: String): ConnectionIO[Unit] =
Update0(s"DROP TABLE IF EXISTS ${table}_${MD5.hash(id.value)}", None).run.void
Update0(s"DROP TABLE IF EXISTS ${PartitionInit.orgPartition(table, id)}", None).run.void

private def deleteGlobal(id: Label, table: String): ConnectionIO[Unit] =
for {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package ch.epfl.bluebrain.nexus.delta.sdk
import akka.http.scaladsl.model.Uri
import ch.epfl.bluebrain.nexus.delta.kernel.RetryStrategyConfig
import ch.epfl.bluebrain.nexus.delta.kernel.cache.CacheConfig
import ch.epfl.bluebrain.nexus.delta.sdk.acls.AclsConfig
import ch.epfl.bluebrain.nexus.delta.sdk.fusion.FusionConfig
import ch.epfl.bluebrain.nexus.delta.sdk.http.{HttpClientConfig, HttpClientWorthRetry}
import ch.epfl.bluebrain.nexus.delta.sdk.model.search.PaginationConfig
Expand All @@ -20,6 +21,8 @@ trait ConfigFixtures {

def eventLogConfig: EventLogConfig = EventLogConfig(queryConfig, 5.seconds)

def aclsConfig: AclsConfig = AclsConfig(eventLogConfig)

def pagination: PaginationConfig =
PaginationConfig(
defaultSize = 30,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,109 +3,112 @@ package ch.epfl.bluebrain.nexus.delta.sdk.organizations
import cats.syntax.all._
import ch.epfl.bluebrain.nexus.delta.kernel.utils.UUIDF
import ch.epfl.bluebrain.nexus.delta.sdk.ConfigFixtures
import ch.epfl.bluebrain.nexus.delta.sdk.acls.AclsImpl
import ch.epfl.bluebrain.nexus.delta.sdk.acls.model.{Acl, AclAddress}
import ch.epfl.bluebrain.nexus.delta.sdk.generators.ProjectGen.defaultApiMappings
import ch.epfl.bluebrain.nexus.delta.sdk.organizations.model.{Organization, OrganizationCommand}
import ch.epfl.bluebrain.nexus.delta.sdk.organizations.model.Organization
import ch.epfl.bluebrain.nexus.delta.sdk.organizations.model.OrganizationRejection.{OrganizationNonEmpty, OrganizationNotFound}
import ch.epfl.bluebrain.nexus.delta.sdk.permissions.Permissions
import ch.epfl.bluebrain.nexus.delta.sdk.projects.Projects.FetchOrganization
import ch.epfl.bluebrain.nexus.delta.sdk.projects.{ProjectsConfig, ProjectsFixture}
import ch.epfl.bluebrain.nexus.delta.sdk.projects.model.ProjectRejection.WrappedOrganizationRejection
import ch.epfl.bluebrain.nexus.delta.sdk.projects.model.{ApiMappings, ProjectFields}
import ch.epfl.bluebrain.nexus.delta.sourcing.{GlobalEventLog, MD5}
import ch.epfl.bluebrain.nexus.delta.sdk.projects.{ProjectsConfig, ProjectsFixture}
import ch.epfl.bluebrain.nexus.delta.sourcing.PartitionInit
import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.Subject
import ch.epfl.bluebrain.nexus.delta.sourcing.model.{Identity, Label, ProjectRef}
import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset
import ch.epfl.bluebrain.nexus.testkit.IOFixedClock
import ch.epfl.bluebrain.nexus.testkit.bio.BioSuite
import doobie.implicits._
import doobie.util.update.Update0
import monix.bio.{IO, Task, UIO}
import munit.AnyFixture

import java.util.UUID

class OrganizationDeleterSuite extends BioSuite with IOFixedClock with ConfigFixtures {

private val org = Label.unsafe("org")
private val orgUuid = UUID.randomUUID()
private val org1 = Label.unsafe("org1")
private val org2 = Label.unsafe("org2")

private def fetchOrg: FetchOrganization = {
case `org` => UIO.pure(Organization(org, orgUuid, None))
case other => IO.raiseError(WrappedOrganizationRejection(OrganizationNotFound(other)))
case `org1` => UIO.pure(Organization(org1, UUID.randomUUID(), None))
case `org2` => UIO.pure(Organization(org2, UUID.randomUUID(), None))
case other => IO.raiseError(WrappedOrganizationRejection(OrganizationNotFound(other)))
}

private val config = ProjectsConfig(eventLogConfig, pagination, cacheConfig, deletionConfig)
private val projectFixture = ProjectsFixture.init(fetchOrg, defaultApiMappings, config)
private val config = ProjectsConfig(eventLogConfig, pagination, cacheConfig, deletionConfig)
private val orgConfig = OrganizationsConfig(eventLogConfig, pagination, cacheConfig)
private lazy val projectFixture = ProjectsFixture.init(fetchOrg, defaultApiMappings, config)

override def munitFixtures: Seq[AnyFixture[_]] = List(projectFixture)

private lazy val (xas, projects) = projectFixture()
private lazy val orgDeleter = OrganizationDeleter(xas)
private val projRef = ProjectRef.unsafe("org", "myproj")
private val projRef = ProjectRef.unsafe(org1.value, "myproj")
private val fields = ProjectFields(None, ApiMappings.empty, None, None)
private lazy val orgs = GlobalEventLog(Organizations.definition, logConfig, xas)
private lazy val orgs = OrganizationsImpl(Set(), orgConfig, xas)
private val permission = Permissions.resources.read
private lazy val acls = AclsImpl(UIO.pure(Set(permission)), _ => IO.unit, Set(), aclsConfig, xas)

implicit val subject: Subject = Identity.User("Bob", Label.unsafe("realm"))
implicit val uuidF: UUIDF = UUIDF.fixed(orgUuid)
implicit val uuidF: UUIDF = UUIDF.fixed(UUID.randomUUID())

test("Fail when trying to delete a non-empty organization") {
val arrange: IO[Any, Unit] = for {
_ <- truncateTables
_ <- orgs.evaluate(org, OrganizationCommand.CreateOrganization(org, None, Identity.Anonymous))
_ <- projects.create(projRef, fields)
for {
_ <- createOrgAndAcl(org1)
_ <- createProj()
result <- deleteOrg(org1)
_ <- assertDeletionFailed(result)
} yield ()

val act: UIO[Either[OrganizationNonEmpty, Unit]] =
IO.from(orgDeleter.delete(org).attemptNarrow[OrganizationNonEmpty]).hideErrors

def assert(result: Either[OrganizationNonEmpty, Unit]): IO[Any, Unit] = for {
eventPartitions <- queryPartitions("scoped_events")
statePartitions <- queryPartitions("scoped_states")
fetchedProject <- projects.fetch(projRef)
globalOrgEvent <- orgs.currentEvents(Offset.Start).compile.to(List).map(_.map(_.value.uuid))
globalOrgState <- orgs.currentStates(Offset.Start).compile.to(List).map(_.map(_.value.uuid))
} yield {
assertEquals(result, Left(OrganizationNonEmpty(org)))
val orgPartition = MD5.hash(org.value)
assertEquals(eventPartitions.headOption, Some(s"scoped_events_$orgPartition"))
assertEquals(statePartitions.headOption, Some(s"scoped_states_$orgPartition"))
assertEquals(fetchedProject.value.ref, projRef)
assertEquals(globalOrgState, List(orgUuid))
assertEquals(globalOrgEvent, List(orgUuid))
}

arrange >> act >>= assert
}

test("Successfully delete an empty organization") {
val arrange: IO[Any, Unit] = for {
_ <- truncateTables
_ <- orgs.evaluate(org, OrganizationCommand.CreateOrganization(org, None, Identity.Anonymous))
for {
_ <- createOrgAndAcl(org2)
result <- deleteOrg(org2)
_ <- assertPartitionsAndDataIsDeleted(result)
} yield ()
}

def createOrgAndAcl(org: Label) = for {
_ <- acls.replace(Acl(AclAddress.fromOrg(org), subject -> Set(permission)), 0)
_ <- orgs.create(org, None)
} yield ()

def createProj() = projects.create(projRef, fields)

def deleteOrg(org: Label): UIO[Either[OrganizationNonEmpty, Unit]] =
IO.from(orgDeleter.delete(org).attemptNarrow[OrganizationNonEmpty]).hideErrors

def assertDeletionFailed(result: Either[OrganizationNonEmpty, Unit]) = for {
eventPartitionDeleted <- orgPartitionIsDeleted("scoped_events", org1) //.map(_.isEmpty)
statePartitionDeleted <- orgPartitionIsDeleted("scoped_states", org1) //.map(_.isEmpty)
fetchedProject <- projects.fetch(projRef)
orgResult <- orgs.fetch(org1).map(_.value.label)
aclExists <- acls.fetch(AclAddress.fromOrg(org1)).attempt.map(_.isRight)
} yield {
assertEquals(result, Left(OrganizationNonEmpty(org1)))
assertEquals(eventPartitionDeleted, false)
assertEquals(statePartitionDeleted, false)
assertEquals(fetchedProject.value.ref, projRef)
assertEquals(orgResult, org1)
assertEquals(aclExists, true)
}

val act: UIO[Either[OrganizationNonEmpty, Unit]] =
IO.from(orgDeleter.delete(org).attemptNarrow[OrganizationNonEmpty]).hideErrors

def assert(result: Either[OrganizationNonEmpty, Unit]): IO[Any, Unit] = for {
globalEventsDeleted <- orgs.currentEvents(Offset.Start).compile.to(List).map(_.isEmpty)
globalStateDeleted <- orgs.currentStates(Offset.Start).compile.to(List).map(_.isEmpty)
eventPartitionDeleted <- queryPartitions("scoped_events").map(_.isEmpty)
statePartitionDeleted <- queryPartitions("scoped_states").map(_.isEmpty)
} yield {
assertEquals(result, Right(()))
assertEquals(eventPartitionDeleted, true)
assertEquals(statePartitionDeleted, true)
assertEquals(globalStateDeleted, true)
assertEquals(globalEventsDeleted, true)
}

arrange >> act >>= assert
def assertPartitionsAndDataIsDeleted(result: Either[OrganizationNonEmpty, Unit]) = for {
orgResult <- orgs.fetch(org2).attempt
eventPartitionDeleted <- orgPartitionIsDeleted("scoped_events", org2)
statePartitionDeleted <- orgPartitionIsDeleted("scoped_states", org2)
aclDeleted <- acls.fetch(AclAddress.fromOrg(org2)).attempt.map(_.isLeft)
} yield {
assertEquals(result, Right(()))
assertEquals(eventPartitionDeleted, true)
assertEquals(statePartitionDeleted, true)
assertEquals(orgResult, Left(OrganizationNotFound(org2)))
assertEquals(aclDeleted, true)
}

def truncateTables: UIO[Unit] =
List("global_events", "global_states", "scoped_events", "scoped_states")
.traverse(t => Update0(s"DELETE FROM $t", None).run.transact(xas.write))
.void
.hideErrors
def orgPartitionIsDeleted(table: String, org: Label): Task[Boolean] =
queryPartitions(table).map(!_.contains(PartitionInit.orgPartition(table, org)))

def queryPartitions(table: String): Task[List[String]] =
sql"""SELECT inhrelid::regclass AS child
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package ch.epfl.bluebrain.nexus.delta.sourcing
import cats.implicits._
import Transactors.PartitionsCache
import ch.epfl.bluebrain.nexus.delta.sourcing.PartitionInit.{createOrgPartition, createProjectPartition, projectRefHash}
import ch.epfl.bluebrain.nexus.delta.sourcing.model.ProjectRef
import ch.epfl.bluebrain.nexus.delta.sourcing.model.{Label, ProjectRef}
import doobie.Fragment
import doobie.free.connection
import monix.bio.Task
Expand Down Expand Up @@ -73,7 +73,7 @@ object PartitionInit {
*/
def createOrgPartition(mainTable: String, projectRef: ProjectRef): Fragment =
Fragment.const(s"""
| CREATE TABLE IF NOT EXISTS ${orgPartition(mainTable, projectRef)}
| CREATE TABLE IF NOT EXISTS ${orgPartitionFromProj(mainTable, projectRef)}
| PARTITION OF $mainTable FOR VALUES IN ('${projectRef.organization}')
| PARTITION BY LIST (project);
|""".stripMargin)
Expand All @@ -89,7 +89,7 @@ object PartitionInit {
def createProjectPartition(mainTable: String, projectRef: ProjectRef): Fragment =
Fragment.const(s"""
| CREATE TABLE IF NOT EXISTS ${projectRefPartition(mainTable, projectRef)}
| PARTITION OF ${orgPartition(mainTable, projectRef)} FOR VALUES IN ('${projectRef.project}')
| PARTITION OF ${orgPartitionFromProj(mainTable, projectRef)} FOR VALUES IN ('${projectRef.project}')
|""".stripMargin)

def projectRefHash(projectRef: ProjectRef): String =
Expand All @@ -98,10 +98,13 @@ object PartitionInit {
def projectRefPartition(mainTable: String, projectRef: ProjectRef) =
s"${mainTable}_${projectRefHash(projectRef)}"

private def orgHash(projectRef: ProjectRef) =
MD5.hash(projectRef.organization.value)
def orgHash(orgId: Label): String =
MD5.hash(orgId.value)

private def orgPartition(mainTable: String, projectRef: ProjectRef) =
s"${mainTable}_${orgHash(projectRef)}"
def orgPartition(mainTable: String, orgId: Label) =
s"${mainTable}_${orgHash(orgId)}"

private def orgPartitionFromProj(mainTable: String, projectRef: ProjectRef) =
orgPartition(mainTable, projectRef.organization)

}

0 comments on commit a073776

Please sign in to comment.