Skip to content

Commit

Permalink
Merge pull request #2735 from vasilmkd/run-blocking-carefully
Browse files Browse the repository at this point in the history
Run blocking code on the compute pool more carefully
  • Loading branch information
djspiewak authored Jan 8, 2022
2 parents 5610226 + 97e1776 commit e216853
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,5 @@ private[effect] sealed abstract class WorkStealingThreadPool private ()
def reportFailure(cause: Throwable): Unit
private[effect] def rescheduleFiber(fiber: IOFiber[_]): Unit
private[effect] def scheduleFiber(fiber: IOFiber[_]): Unit
private[effect] def canExecuteBlockingCode(): Boolean
}
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,20 @@ private[effect] final class WorkStealingThreadPool(
}
}

/**
* Checks if the blocking code can be executed in the current context (only returns true for
* worker threads that belong to this execution context).
*/
private[effect] def canExecuteBlockingCode(): Boolean = {
val thread = Thread.currentThread()
if (thread.isInstanceOf[WorkerThread]) {
val worker = thread.asInstanceOf[WorkerThread]
worker.canExecuteBlockingCodeOn(this)
} else {
false
}
}

/**
* Schedules a fiber for execution on this thread pool originating from an external thread (a
* thread which is not owned by this thread pool).
Expand Down
15 changes: 15 additions & 0 deletions core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,21 @@ private final class WorkerThread(
def isOwnedBy(threadPool: WorkStealingThreadPool): Boolean =
(pool eq threadPool) && !blocking

/**
* Checks whether this [[WorkerThread]] operates within the [[WorkStealingThreadPool]]
* provided as an argument to this method. The implementation checks whether the provided
* [[WorkStealingThreadPool]] matches the reference of the pool provided when this
* [[WorkerThread]] was constructed.
*
* @param threadPool
* a work stealing thread pool reference
* @return
* `true` if this worker thread is owned by the provided work stealing thread pool, `false`
* otherwise
*/
def canExecuteBlockingCodeOn(threadPool: WorkStealingThreadPool): Boolean =
pool eq threadPool

/**
* Registers a suspended fiber.
*
Expand Down
53 changes: 31 additions & 22 deletions core/shared/src/main/scala/cats/effect/IOFiber.scala
Original file line number Diff line number Diff line change
Expand Up @@ -943,30 +943,26 @@ private final class IOFiber[A](
if (cur.hint eq IOFiber.TypeBlocking) {
val ec = currentCtx
if (ec.isInstanceOf[WorkStealingThreadPool]) {
var error: Throwable = null
val r =
try {
scala.concurrent.blocking(cur.thunk())
} catch {
case NonFatal(t) =>
error = t
case t: Throwable =>
onFatalFailure(t)
}

val next = if (error eq null) succeeded(r, 0) else failed(error, 0)
runLoop(next, nextCancelation, nextAutoCede)
} else {
resumeTag = BlockingR
resumeIO = cur
val wstp = ec.asInstanceOf[WorkStealingThreadPool]
if (wstp.canExecuteBlockingCode()) {
var error: Throwable = null
val r =
try {
scala.concurrent.blocking(cur.thunk())
} catch {
case NonFatal(t) =>
error = t
case t: Throwable =>
onFatalFailure(t)
}

if (isStackTracing) {
val handle = monitor()
objectState.push(handle)
val next = if (error eq null) succeeded(r, 0) else failed(error, 0)
runLoop(next, nextCancelation, nextAutoCede)
} else {
blockingFallback(cur)
}

val ec = runtime.blocking
scheduleOnForeignEC(ec, this)
} else {
blockingFallback(cur)
}
} else {
runLoop(interruptibleImpl(cur), nextCancelation, nextAutoCede)
Expand All @@ -985,6 +981,19 @@ private final class IOFiber[A](
}
}

private[this] def blockingFallback(cur: Blocking[Any]): Unit = {
resumeTag = BlockingR
resumeIO = cur

if (isStackTracing) {
val handle = monitor()
objectState.push(handle)
}

val ec = runtime.blocking
scheduleOnForeignEC(ec, this)
}

/*
* Only the owner of the run-loop can invoke this.
* Should be invoked at most once per fiber before termination.
Expand Down

0 comments on commit e216853

Please sign in to comment.