Skip to content

Commit

Permalink
fix: Release semaphore after mapping.csv (DEV-3435) (#198)
Browse files Browse the repository at this point in the history
1. `getBulkIngestMappingCsv` wasn't releasing the semaphore, because it
wasn't using `withSemaphoreFork`, because it required the result, so it
was rewritten to `withSemaphore` and usable in both forking and
non-forking cases, so the release code is now shared and called in all
cases.
2. Testcase for releasing locks has been added (test indeed fails with
timeout pre-fix).
3. `.unsome.flatMap { case None => ZIO.fail(e); case Some =>
ZIO.succeed(a) }` should be equivalent to `.mapBoth(e, a)`
  • Loading branch information
siers authored Apr 8, 2024
1 parent 3aea9d2 commit 3bd76b3
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 48 deletions.
28 changes: 12 additions & 16 deletions src/main/scala/swiss/dasch/api/ProjectsEndpointsHandler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -87,23 +87,21 @@ final case class ProjectsEndpointsHandler(
code =>
bulkIngestService
.startBulkIngest(code)
.unsome
.flatMap {
case None => ZIO.fail(failBulkIngestInProgress(code))
case Some(_) => ZIO.succeed(ProjectResponse.from(code))
},
.mapBoth(
_ => failBulkIngestInProgress(code),
_ => ProjectResponse.from(code),
),
)

private val postBulkIngestEndpointFinalize: ZServerEndpoint[Any, Any] = projectEndpoints.postBulkIngestFinalize
.serverLogic(_ =>
code =>
bulkIngestService
.finalizeBulkIngest(code)
.unsome
.flatMap {
case None => ZIO.fail(failBulkIngestInProgress(code))
case Some(_) => ZIO.succeed(ProjectResponse.from(code))
},
.mapBoth(
_ => failBulkIngestInProgress(code),
_ => ProjectResponse.from(code),
),
)

private val getBulkIngestMappingCsvEndpoint: ZServerEndpoint[Any, Any] =
Expand All @@ -114,13 +112,11 @@ final case class ProjectsEndpointsHandler(
bulkIngestService
.getBulkIngestMappingCsv(code)
.mapError {
case None => failBulkIngestInProgress(code)
case Some(ioException) => InternalServerError(ioException)
}
.flatMap {
case None => ZIO.fail(NotFound(code))
case Some(str) => ZIO.succeed(str)
case None => failBulkIngestInProgress(code)
case Some(ioException: IOException) => InternalServerError(ioException)
}
.some
.mapError(_ => NotFound(code))
},
)

Expand Down
57 changes: 30 additions & 27 deletions src/main/scala/swiss/dasch/domain/BulkIngestService.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,23 +28,26 @@ final case class BulkIngestService(
config: IngestConfig,
semaphoresPerProject: TMap[ProjectShortcode, TSemaphore],
) {

private def getSemaphore(key: ProjectShortcode) =
semaphoresPerProject.getOrElseSTM(key, TSemaphore.make(1)).tap(semaphoresPerProject.put(key, _)).commit

private def acquireWithTimeout(sem: TSemaphore) =
sem.acquire.as(sem).commit.timeout(Duration.fromMillis(400)).some

private def withSemaphoreFork[E, A](
key: ProjectShortcode,
zio: ProjectShortcode => IO[E, A],
): IO[Option[Nothing], Fiber.Runtime[E, A]] =
private def getSemaphore(key: ProjectShortcode): UIO[TSemaphore] =
(for {
semaphore <- semaphoresPerProject.getOrElseSTM(key, TSemaphore.make(1))
_ <- semaphoresPerProject.put(key, semaphore)
} yield semaphore).commit

private def acquireWithTimeout(sem: TSemaphore): UIO[Unit] =
sem.acquire.commit.timeout(Duration.fromMillis(400)).as(())

private def withSemaphore[E, A](key: ProjectShortcode)(
zio: IO[E, A],
): IO[Option[E], A] =
getSemaphore(key)
.flatMap(acquireWithTimeout)
.flatMap(sem => zio.apply(key).logError.ensuring(sem.release.commit).forkDaemon)
.tap(acquireWithTimeout(_).asSomeError)
.flatMap(sem => zio.logError.ensuring(sem.release.commit).asSomeError)

def startBulkIngest(project: ProjectShortcode): IO[Option[Nothing], Fiber.Runtime[IOException, IngestResult]] =
withSemaphoreFork(project, doBulkIngest)
def startBulkIngest(shortcode: ProjectShortcode): IO[Option[Nothing], Fiber.Runtime[IOException, IngestResult]] =
withSemaphore(shortcode) {
doBulkIngest(shortcode).forkDaemon
}

private def doBulkIngest(project: ProjectShortcode) =
for {
Expand Down Expand Up @@ -106,7 +109,9 @@ final case class BulkIngestService(
}

def finalizeBulkIngest(shortcode: ProjectShortcode): IO[Option[Nothing], Fiber.Runtime[IOException, Unit]] =
withSemaphoreFork(shortcode, doFinalize)
withSemaphore(shortcode) {
doFinalize(shortcode).forkDaemon
}

private def doFinalize(shortcode: ProjectShortcode): ZIO[Any, IOException, Unit] =
for {
Expand All @@ -119,17 +124,15 @@ final case class BulkIngestService(
} yield ()

def getBulkIngestMappingCsv(shortcode: ProjectShortcode): IO[Option[IOException], Option[String]] =
for {
_ <- getSemaphore(shortcode).flatMap(acquireWithTimeout)
importDir <- getImportFolder(shortcode).asSomeError
mappingCsv = getMappingCsvFile(importDir, shortcode)
mapping <- ZIO
.ifZIO(Files.exists(mappingCsv))(
Files.readAllLines(mappingCsv).map(it => Some(it.mkString("\n"))),
ZIO.none,
)
.asSomeError
} yield mapping
withSemaphore(shortcode) {
for {
importDir <- getImportFolder(shortcode)
mappingCsv = getMappingCsvFile(importDir, shortcode)
mapping <- ZIO.whenZIO(Files.exists(mappingCsv)) {
Files.readAllLines(mappingCsv).map(_.mkString("\n"))
}
} yield mapping
}
}

object BulkIngestService {
Expand Down
25 changes: 20 additions & 5 deletions src/test/scala/swiss/dasch/domain/BulkIngestServiceSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,19 @@ import swiss.dasch.infrastructure.CommandExecutorMock
import swiss.dasch.test.SpecConfigurations
import zio.nio.file.Files
import zio.test.{ZIOSpecDefault, assertTrue}
import zio.{Fiber, ZIO}
import zio.*

import java.io.IOException
import zio.test.TestAspect

object BulkIngestServiceSpec extends ZIOSpecDefault {

// accessor functions for testing
def finalizeBulkIngest(
private def finalizeBulkIngest(
shortcode: ProjectShortcode,
): ZIO[BulkIngestService, Option[Nothing], Fiber.Runtime[IOException, Unit]] =
ZIO.serviceWithZIO[BulkIngestService](_.finalizeBulkIngest(shortcode))

def getBulkIngestMappingCsv(
private def getBulkIngestMappingCsv(
shortcode: ProjectShortcode,
): ZIO[BulkIngestService, Option[IOException], Option[String]] =
ZIO.serviceWithZIO[BulkIngestService](_.getBulkIngestMappingCsv(shortcode))
Expand Down Expand Up @@ -65,9 +65,24 @@ object BulkIngestServiceSpec extends ZIOSpecDefault {
} yield assertTrue(mappingCsvFileExists && mappingCsv.contains("1,2,3"))
})

private val checkSemaphoresReleased = suite("check semaphores released")(test("check semaphores") {
for {
shortcode <- ZIO.succeed(ProjectShortcode.unsafeFrom("0001"))
importDir <- StorageService.getTempFolder().map(_ / "import" / shortcode.value).tap(Files.createDirectories(_))
_ <- Files.createFile(importDir.parent.head / s"mapping-$shortcode.csv")

_ <- getBulkIngestMappingCsv(shortcode)
_ <- finalizeBulkIngest(shortcode)
_ <- getBulkIngestMappingCsv(shortcode)
_ <- finalizeBulkIngest(shortcode)

} yield assertTrue(true)
})

val spec = suite("BulkIngestServiceLive")(
finalizeBulkIngestSuite,
getBulkIngestMappingCsvSuite,
checkSemaphoresReleased,
).provide(
AssetInfoServiceLive.layer,
BulkIngestService.layer,
Expand All @@ -81,5 +96,5 @@ object BulkIngestServiceSpec extends ZIOSpecDefault {
SpecConfigurations.storageConfigLayer,
StillImageService.layer,
StorageServiceLive.layer,
)
) @@ TestAspect.timeout(1.second)
}

0 comments on commit 3bd76b3

Please sign in to comment.