diff --git a/README.md b/README.md index c9ee441..7e0bee7 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,10 @@ +# This project is DEPRECATED + +With the release of Kotlin 1.7.20-Beta (https://blog.jetbrains.com/kotlin/2022/08/kotlin-1-7-20-beta/) and the new +Memory Model for Kotlin Native becoming the default, there's no longer a need for this library. Version 0.9.0 helps +transition over to the new memory model without having to make any changes related to CoroutineWorker code. The latest +version of CoroutineWorker uses the same implementation for all platforms. + # CoroutineWorker [![Build Status](https://github.com/autodesk/coroutineworker/workflows/build/badge.svg)](https://github.com/autodesk/coroutineworker/actions?query=workflow%3Abuild) @@ -18,7 +25,7 @@ kotlin { sourceSets { commonMain { dependencies { - implementation "com.autodesk:coroutineworker:0.8.3" + implementation "com.autodesk:coroutineworker:0.9.0" } } } diff --git a/gradle.properties b/gradle.properties index 34592a1..a8eb4fb 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,4 +1,5 @@ -VERSION=0.8.3 +VERSION=0.9.0 kotlin.mpp.enableCompatibilityMetadataVariant=true -kotlin.mpp.stability.nowarn=true \ No newline at end of file +kotlin.mpp.stability.nowarn=true +kotlin.native.binary.memoryModel=experimental \ No newline at end of file diff --git a/src/commonMain/kotlin/com/autodesk/coroutineworker/CoroutineWorker.kt b/src/commonMain/kotlin/com/autodesk/coroutineworker/CoroutineWorker.kt index e1af59d..39b373d 100644 --- a/src/commonMain/kotlin/com/autodesk/coroutineworker/CoroutineWorker.kt +++ b/src/commonMain/kotlin/com/autodesk/coroutineworker/CoroutineWorker.kt @@ -2,41 +2,38 @@ package com.autodesk.coroutineworker import kotlinx.coroutines.CoroutineScope import kotlin.coroutines.CoroutineContext +import kotlinx.coroutines.Job +import kotlinx.coroutines.cancelAndJoin +import kotlinx.coroutines.launch -/** - * Encapsulates performing background work. On JVM, this use coroutines outright. - * In native, this uses Worker threads and manages its mutability/concurrency issues. - */ -public expect class CoroutineWorker internal constructor() { - - /** Cancels the underlying Job */ - public fun cancel() +public class CoroutineWorker : CoroutineScope { + private val job = Job() /** - * Cancel the underlying job/worker and waits for it - * to receive its cancellation message or complete - * */ - public suspend fun cancelAndJoin() + * The context of this scope. See [CoroutineScope.coroutineContext] + */ + override val coroutineContext: CoroutineContext = job + + public fun cancel() { + job.cancel() + } + + public suspend fun cancelAndJoin() { + job.cancelAndJoin() + } public companion object { - /** - * Enqueues the background work [block] to run and returns a reference to the worker, which can be cancelled - */ - public fun execute(block: suspend CoroutineScope.() -> Unit): CoroutineWorker - - /** - * Performs [block] in another [CoroutineContext] ([jvmContext]) and waits for it to complete. - * This is similar to [kotlinx.coroutines.withContext] with some caveats: - * - * - Native: we cannot use [kotlinx.coroutines.withContext] because - * the global Dispatchers do not work properly. Therefore, - * the context argument is ignored, and the block is always - * run on some other Worker in the Worker pool. This is the - * most similar to switching contexts on JVM. - * - * - JVM: This has the same behavior as calling [kotlinx.coroutines.withContext] in the JVM - */ - public suspend fun withContext(jvmContext: CoroutineContext, block: suspend CoroutineScope.() -> T): T + public fun execute(block: suspend CoroutineScope.() -> Unit): CoroutineWorker { + return CoroutineWorker().also { + it.launch(block = block) + } + } + + public suspend fun withContext(jvmContext: CoroutineContext, block: suspend CoroutineScope.() -> T): T { + return kotlinx.coroutines.withContext(jvmContext) { + block() + } + } } } diff --git a/src/commonMain/kotlin/com/autodesk/coroutineworker/Utils.kt b/src/commonMain/kotlin/com/autodesk/coroutineworker/Utils.kt index 0604674..4c19818 100644 --- a/src/commonMain/kotlin/com/autodesk/coroutineworker/Utils.kt +++ b/src/commonMain/kotlin/com/autodesk/coroutineworker/Utils.kt @@ -1,11 +1,21 @@ package com.autodesk.coroutineworker + +import kotlinx.coroutines.suspendCancellableCoroutine + /** - * Bridges a platform's callback-based async method to coroutines, - * ensuring that the coroutine is resumed on a thread appropriate - * for the platform. - * * [startAsync] is called to start the work. It calls the passed [CompletionLambda] * when complete, and returns a [CancellationLambda] that can be called to cancel the * async work * */ -public expect suspend fun threadSafeSuspendCallback(startAsync: (CompletionLambda) -> CancellationLambda): T +public suspend fun threadSafeSuspendCallback(startAsync: (CompletionLambda) -> CancellationLambda): T { + return suspendCancellableCoroutine { cont -> + val cancellable = startAsync { + if (cont.isActive) { + cont.resumeWith(it) + } + } + cont.invokeOnCancellation { + cancellable() + } + } +} \ No newline at end of file diff --git a/src/jsMain/kotlin/com/autodesk/coroutineworker/CoroutineWorker.kt b/src/jsMain/kotlin/com/autodesk/coroutineworker/CoroutineWorker.kt deleted file mode 100644 index 14d5dbf..0000000 --- a/src/jsMain/kotlin/com/autodesk/coroutineworker/CoroutineWorker.kt +++ /dev/null @@ -1,40 +0,0 @@ -package com.autodesk.coroutineworker - -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.Job -import kotlinx.coroutines.cancelAndJoin -import kotlinx.coroutines.launch -import kotlin.coroutines.CoroutineContext - -public actual class CoroutineWorker internal actual constructor() : CoroutineScope { - - private val job = Job() - - /** - * The context of this scope. See [CoroutineScope.coroutineContext] - */ - override val coroutineContext: CoroutineContext = job - - public actual fun cancel() { - job.cancel() - } - - public actual suspend fun cancelAndJoin() { - job.cancelAndJoin() - } - - public actual companion object { - - public actual fun execute(block: suspend CoroutineScope.() -> Unit): CoroutineWorker { - return CoroutineWorker().also { - it.launch(block = block) - } - } - - public actual suspend fun withContext(jvmContext: CoroutineContext, block: suspend CoroutineScope.() -> T): T { - return kotlinx.coroutines.withContext(jvmContext) { - block() - } - } - } -} diff --git a/src/jsMain/kotlin/com/autodesk/coroutineworker/Utils.kt b/src/jsMain/kotlin/com/autodesk/coroutineworker/Utils.kt deleted file mode 100644 index 233a823..0000000 --- a/src/jsMain/kotlin/com/autodesk/coroutineworker/Utils.kt +++ /dev/null @@ -1,16 +0,0 @@ -package com.autodesk.coroutineworker - -import kotlinx.coroutines.suspendCancellableCoroutine - -public actual suspend fun threadSafeSuspendCallback(startAsync: (CompletionLambda) -> CancellationLambda): T { - return suspendCancellableCoroutine { cont -> - val cancellable = startAsync { - if (!cont.isCancelled) { - cont.resumeWith(it) - } - } - cont.invokeOnCancellation { - cancellable() - } - } -} diff --git a/src/jvmMain/kotlin/com/autodesk/coroutineworker/CoroutineWorker.kt b/src/jvmMain/kotlin/com/autodesk/coroutineworker/CoroutineWorker.kt deleted file mode 100644 index 14d5dbf..0000000 --- a/src/jvmMain/kotlin/com/autodesk/coroutineworker/CoroutineWorker.kt +++ /dev/null @@ -1,40 +0,0 @@ -package com.autodesk.coroutineworker - -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.Job -import kotlinx.coroutines.cancelAndJoin -import kotlinx.coroutines.launch -import kotlin.coroutines.CoroutineContext - -public actual class CoroutineWorker internal actual constructor() : CoroutineScope { - - private val job = Job() - - /** - * The context of this scope. See [CoroutineScope.coroutineContext] - */ - override val coroutineContext: CoroutineContext = job - - public actual fun cancel() { - job.cancel() - } - - public actual suspend fun cancelAndJoin() { - job.cancelAndJoin() - } - - public actual companion object { - - public actual fun execute(block: suspend CoroutineScope.() -> Unit): CoroutineWorker { - return CoroutineWorker().also { - it.launch(block = block) - } - } - - public actual suspend fun withContext(jvmContext: CoroutineContext, block: suspend CoroutineScope.() -> T): T { - return kotlinx.coroutines.withContext(jvmContext) { - block() - } - } - } -} diff --git a/src/jvmMain/kotlin/com/autodesk/coroutineworker/Utils.kt b/src/jvmMain/kotlin/com/autodesk/coroutineworker/Utils.kt deleted file mode 100644 index 233a823..0000000 --- a/src/jvmMain/kotlin/com/autodesk/coroutineworker/Utils.kt +++ /dev/null @@ -1,16 +0,0 @@ -package com.autodesk.coroutineworker - -import kotlinx.coroutines.suspendCancellableCoroutine - -public actual suspend fun threadSafeSuspendCallback(startAsync: (CompletionLambda) -> CancellationLambda): T { - return suspendCancellableCoroutine { cont -> - val cancellable = startAsync { - if (!cont.isCancelled) { - cont.resumeWith(it) - } - } - cont.invokeOnCancellation { - cancellable() - } - } -} diff --git a/src/jvmTest/kotlin/com/autodesk/coroutineworker/CoroutineWorkerJVMTest.kt b/src/jvmTest/kotlin/com/autodesk/coroutineworker/CoroutineWorkerJVMTest.kt deleted file mode 100644 index 4614d67..0000000 --- a/src/jvmTest/kotlin/com/autodesk/coroutineworker/CoroutineWorkerJVMTest.kt +++ /dev/null @@ -1,26 +0,0 @@ -package com.autodesk.coroutineworker - -import kotlinx.coroutines.Dispatchers -import kotlinx.coroutines.launch -import org.junit.Test -import kotlin.test.assertNotSame -import kotlin.test.assertTrue - -class CoroutineWorkerJVMTest { - - @Test - fun `test withContext changes contexts`() { - testRunBlocking { - var called = false - val job = launch { - val context = coroutineContext - CoroutineWorker.withContext(Dispatchers.IO) { - assertNotSame(context, coroutineContext) - called = true - } - } - job.join() - assertTrue(called) - } - } -} diff --git a/src/nativeMain/kotlin/com/autodesk/coroutineworker/BackgroundCoroutineWorkQueueExecutor.kt b/src/nativeMain/kotlin/com/autodesk/coroutineworker/BackgroundCoroutineWorkQueueExecutor.kt deleted file mode 100644 index 03771c1..0000000 --- a/src/nativeMain/kotlin/com/autodesk/coroutineworker/BackgroundCoroutineWorkQueueExecutor.kt +++ /dev/null @@ -1,230 +0,0 @@ -package com.autodesk.coroutineworker - -import kotlinx.coroutines.CancellationException -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.async -import kotlinx.coroutines.launch -import kotlinx.coroutines.runBlocking -import kotlin.native.concurrent.AtomicInt -import kotlin.native.concurrent.AtomicReference -import kotlin.native.concurrent.SharedImmutable -import kotlin.native.concurrent.TransferMode -import kotlin.native.concurrent.Worker -import kotlin.native.concurrent.WorkerBoundReference -import kotlin.native.concurrent.ensureNeverFrozen -import kotlin.native.concurrent.freeze - -/** - * Holds the hook for handling background uncaught exceptions - */ -@SharedImmutable -private val UNHANDLED_EXCEPTION_HOOK = AtomicReference<((Throwable) -> Unit)?>(null) - -/** - * Interface for a work item that can be queued to run in - * a BackgroundCoroutineWorkQueueExecutor - */ -internal interface CoroutineWorkItem { - /** The block to execute via a Worker */ - val work: suspend CoroutineScope.() -> Unit -} - -/** - * An executor that runs blocks in a [kotlinx.coroutines.CoroutineScope] on a background - * Worker in a pool with [numWorkers] workers. - */ -internal class BackgroundCoroutineWorkQueueExecutor(private val numWorkers: Int) { - - /** - * The pool, on which blocks are executed - */ - private val pool = WorkerPool(numWorkers) - - /** - * Protects access to the queue on a single thread - */ - private val queueThread = Worker.start() - - /** - * Special worker for IO work - */ - private val ioWorker = Worker.start(name = ioWorkerName) - - /** - * The wrapped (allow freezing and mutable access on single thread) queue of WorkItems - */ - private val wrappedQueue: WorkerBoundReference> by lazy { - WorkerBoundReference(WorkQueue()).freeze() - } - - /** - * The wrapped queue of WorkItems - */ - private val queue: WorkQueue - get() = wrappedQueue.value - - /** - * The number of workers actively processing blocks - */ - private val _numActiveWorkers = AtomicInt(0) - - /** - * Getter for _numActiveWorkers; useful for preventing leakage in tests - */ - val numActiveWorkers: Int - get() = _numActiveWorkers.value - - /** - * Returns the next work item to process, if any - */ - private fun dequeueWork(): WorkItem? = queueThread.execute(TransferMode.SAFE, { this }) { - with(it) { - queue.dequeue().also { - if (it == null) { - // worker is going to become inactive - _numActiveWorkers.decrement() - } - } - } - }.result - - /** - * Queues an item to be executed in the general worker pool - */ - fun enqueueWork(item: WorkItem, isIoWork: Boolean) { - if (isIoWork) { - ioWorker.executeAfter( - operation = { - runBlocking { - this.ensureNeverFrozen() - performWorkHandlingExceptions(item, this) - } - }.freeze() - ) - } else { - queueThread.executeAfter( - operation = { - queue.enqueue(item) - // start a worker if we have more workers to start - val activeWorkerCount = _numActiveWorkers.value - if (activeWorkerCount < numWorkers) { - pool.performWork { - runBlocking { - // error if we accidentally freeze coroutine internals - this.ensureNeverFrozen() - processWorkItems(this) - } - } - _numActiveWorkers.increment() - } - }.freeze() - ) - } - } - - private suspend fun processWorkItems(scope: CoroutineScope) { - val workItem = dequeueWork() ?: return - - performWorkHandlingExceptions(workItem, scope) - - // execute a coroutine to attempt to process the next work item, if possible - scope.launch { processWorkItems(scope) } - } - - private suspend fun performWorkHandlingExceptions(workItem: WorkItem, scope: CoroutineScope) { - // Execute the work in a job that can be cancelled - try { - scope.async { - workItem.work(this) - }.await() - } catch (_: CancellationException) { - // ignore cancellation - } catch (e: Throwable) { - val handler = UNHANDLED_EXCEPTION_HOOK.value - if (handler != null) { - handler(e) - } else { - throw e - } - } - } - - init { freeze() } - - companion object { - /** - * Sets the handler for uncaught exceptions encountered in work items - */ - internal fun setUnhandledExceptionHook(handler: (Throwable) -> Unit) { - UNHANDLED_EXCEPTION_HOOK.value = handler.freeze() - } - - /** - * The name of the IO worker - */ - private const val ioWorkerName = "com.autodesk.coroutineworker.ioworker" - - /** - * Returns whether we're already running on the IO thread - */ - internal fun shouldPerformIoWorkInline() = Worker.current.name == ioWorkerName - } -} - -/** - * Set [handler] for exceptions that would - * be bubbled up to the underlying Worker - */ -public fun setUnhandledExceptionHook(handler: (Throwable) -> Unit) { - BackgroundCoroutineWorkQueueExecutor.setUnhandledExceptionHook(handler) -} - -/** - * Queue of workItems - */ -private class WorkQueue { - - /** - * Node in the queue of work items - */ - private class Node(val workItem: WorkItem) { - /** - * Pointer to the next node - */ - var next: Node? = null - } - - /** - * The head of the queue of WorkItems to execute - * */ - private var queueHead: Node? = null - - /** - * The tail of the queue of WorkItems to execute - */ - private var queueTail: Node? = null - - /** - * Enqueues a WorkItem - */ - fun enqueue(item: WorkItem) { - val curTail = queueTail - queueTail = Node(item) - curTail?.next = queueTail - if (queueHead == null) { - queueHead = queueTail - } - } - - /** - * De-queues a WorkItem - */ - fun dequeue(): WorkItem? { - val curHead = queueHead ?: return null - queueHead = curHead.next - if (queueHead == null) { - queueTail = null - } - return curHead.workItem - } -} diff --git a/src/nativeMain/kotlin/com/autodesk/coroutineworker/CoroutineUtils.kt b/src/nativeMain/kotlin/com/autodesk/coroutineworker/CoroutineUtils.kt deleted file mode 100644 index 3377501..0000000 --- a/src/nativeMain/kotlin/com/autodesk/coroutineworker/CoroutineUtils.kt +++ /dev/null @@ -1,80 +0,0 @@ -package com.autodesk.coroutineworker - -import kotlinx.coroutines.CancellationException -import kotlinx.coroutines.CoroutineDispatcher -import kotlinx.coroutines.Delay -import kotlinx.coroutines.Runnable -import kotlinx.coroutines.delay -import kotlin.coroutines.ContinuationInterceptor -import kotlin.coroutines.CoroutineContext -import kotlin.coroutines.coroutineContext -import kotlin.native.concurrent.AtomicInt -import kotlin.native.concurrent.AtomicReference -import kotlin.native.concurrent.freeze - -internal suspend fun waitAndDelayForCondition(condition: () -> Boolean) { - do { - delay(50) - } while (!condition()) -} - -@OptIn(kotlinx.coroutines.InternalCoroutinesApi::class) -public actual suspend fun threadSafeSuspendCallback(startAsync: (CompletionLambda) -> CancellationLambda): T { - check(coroutineContext[ContinuationInterceptor] is Delay) { - """threadSafeSuspendCallback only works for CoroutineDispatchers that implement Delay. - |Implement Delay for your dispatcher or use runBlocking. - """.trimMargin() - } - - // this will contain the future result of the async work - val futureResult = AtomicReference?>(null).freeze() - - // keep track of cancelled state, so that we - // can avoid updating the future when cancelled - val isCancelled = AtomicInt(0) - - // create a frozen completion handler for the async work - val completion = { result: Result -> - initRuntimeIfNeeded() - if (isCancelled.value == 0) { - // store the result in the AtomicReference, which - // signals that the work is complete - futureResult.value = result.freeze() - } - }.freeze() - - // start the async work and pass it a completion handler - // it returns a closure to call if we get cancelled - val cancellable = startAsync(completion) - - try { - // wait for the result to appear, which signals that the - // work on the other thread is done - waitAndDelayForCondition { futureResult.value != null } - - val result = futureResult.value - // Ensure we don't leak memory: https://github.com/JetBrains/kotlin-native/blob/master/runtime/src/main/kotlin/kotlin/native/concurrent/Atomics.kt#L206 - futureResult.value = null - if (result == null) { - throw IllegalStateException("Future should have a result; found null") - } - return result.getOrThrow() - } catch (e: CancellationException) { - // we were cancelled. cancel the work we - // were waiting on too - isCancelled.value = 1 - cancellable() - throw e - } -} - -/** - * Symbolic dispatcher used to trigger IO-bound-thread-like behavior on native - */ -public object IODispatcher : CoroutineDispatcher() { - override fun dispatch(context: CoroutineContext, block: Runnable) { - throw UnsupportedOperationException( - "This dispatcher is symbolic and should not be used to run anything." - ) - } -} diff --git a/src/nativeMain/kotlin/com/autodesk/coroutineworker/CoroutineWorker.kt b/src/nativeMain/kotlin/com/autodesk/coroutineworker/CoroutineWorker.kt deleted file mode 100644 index ad27cec..0000000 --- a/src/nativeMain/kotlin/com/autodesk/coroutineworker/CoroutineWorker.kt +++ /dev/null @@ -1,180 +0,0 @@ -package com.autodesk.coroutineworker - -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.cancel -import kotlinx.coroutines.coroutineScope -import kotlinx.coroutines.launch -import kotlin.coroutines.CoroutineContext -import kotlin.native.concurrent.AtomicInt -import kotlin.native.concurrent.freeze - -public actual class CoroutineWorker internal actual constructor() { - - /** - * True, if the job was cancelled; false, otherwise. - */ - private val state = CoroutineWorkerState() - - public actual fun cancel() { - cancelIfRunning() - } - - private fun cancelIfRunning(): Boolean { - if (state.completed) { - return false - } - // signal that this job should cancel - state.cancelled = true - return true - } - - public actual suspend fun cancelAndJoin() { - if (!cancelIfRunning()) { - return - } - // repeated check and wait for the job to complete - waitAndDelayForCondition { state.completed } - } - - public actual companion object { - - /** - * Gets the number of active workers running in the underlying WorkerPool. - * This is useful when testing, to ensure you don't leave workers running - * across tests. - */ - public val numActiveWorkers: Int - get() = executor.numActiveWorkers - - /** The executor used for all BackgroundJobs */ - private val executor = BackgroundCoroutineWorkQueueExecutor(4) - - public actual fun execute(block: suspend CoroutineScope.() -> Unit): CoroutineWorker { - return executeInternal(false, block) - } - - public actual suspend fun withContext(jvmContext: CoroutineContext, block: suspend CoroutineScope.() -> T): T { - val isIoWork = jvmContext == IODispatcher - if (isIoWork && BackgroundCoroutineWorkQueueExecutor.shouldPerformIoWorkInline()) { - return coroutineScope(block) - } - return threadSafeSuspendCallback { completion -> - val job = executeInternal(isIoWork) { - val result = runCatching { - block() - } - completion(result) - } - return@threadSafeSuspendCallback { job.cancel() } - } - } - - private fun executeInternal(isIoWork: Boolean, block: suspend CoroutineScope.() -> Unit): CoroutineWorker { - return CoroutineWorker().also { - val state = it.state - executor.enqueueWork( - WorkItem( - { state.cancelled }, - { state.completed = true }, - block - ), - isIoWork - ) - } - } - - /** CoroutineWorker's CoroutineWorkItem class that listens for cancellation */ - private class WorkItem( - val cancelled: () -> Boolean, - val notifyCompletion: () -> Unit, - val block: suspend CoroutineScope.() -> Unit - ) : CoroutineWorkItem { - override val work: suspend CoroutineScope.() -> Unit - init { - work = { - var completed = false - try { - repeatedlyCheckForCancellation(this.coroutineContext, cancelled) { completed } - // inside a new CoroutineScope, so that child jobs are cancelled - coroutineScope { - block() - } - } finally { - completed = true - notifyCompletion() - } - } - } - - // repeatedly checks if the scope has been cancelled and cancels the scope if needed; bails out, when the job completes - private fun CoroutineScope.repeatedlyCheckForCancellation(context: CoroutineContext, cancelled: () -> Boolean, completedGetter: () -> Boolean) { - launch { - waitAndDelayForCondition { - val cancelledValue = cancelled() - if (cancelledValue) { - context.cancel() - } - completedGetter() || cancelledValue - } - } - } - } - } - - init { freeze() } -} - -private class CoroutineWorkerState { - - /** - * The backing store for the state - */ - private val value = AtomicInt(0) - - /** - * True, if the job was cancelled; false, otherwise. - */ - var cancelled: Boolean - get() = isSet(cancelledBit) - set(value) = updateValue(cancelledBit, value) - - /** - * True, if the job finished; false, otherwise. - */ - var completed: Boolean - get() = isSet(completedBit) - set(value) = updateValue(completedBit, value) - - /** - * Updates the value with the bit, setting or un-setting it - */ - private fun updateValue(bit: Int, set: Boolean) { - do { - val old = value.value - val new = if (set) { - old or bit - } else { - old and bit.inv() - } - } while (!value.compareAndSet(old, new)) - } - - /** - * Returns whether or not the bit is set - */ - private fun isSet(bit: Int) = (value.value and bit) == bit - - companion object { - /** - * Cancelled bit - */ - private const val cancelledBit = 1 - - /** - * Completed bit - */ - private const val completedBit = 2 - } - - init { freeze() } -} diff --git a/src/nativeMain/kotlin/com/autodesk/coroutineworker/WorkerPool.kt b/src/nativeMain/kotlin/com/autodesk/coroutineworker/WorkerPool.kt deleted file mode 100644 index 13b0587..0000000 --- a/src/nativeMain/kotlin/com/autodesk/coroutineworker/WorkerPool.kt +++ /dev/null @@ -1,74 +0,0 @@ -package com.autodesk.coroutineworker - -import kotlin.native.concurrent.AtomicLong -import kotlin.native.concurrent.Worker -import kotlin.native.concurrent.freeze - -/** - * Keeps track of last time (via a sequence number) the worker - * was used, and the number of blocks queued for the worker - */ -private class WeightedWorker( - val worker: Worker -) { - /** Set to the pool's current sequence, each time it's used */ - val lastSequence = AtomicLong(0) - - /** Number of blocks queued on this worker */ - val numBlocksQueued = AtomicLong(0) - - companion object { - val comparator = compareBy( - { it.numBlocksQueued.value }, - { it.lastSequence.value } - ) - } - - init { freeze() } -} - -/** - * A pool of Worker instances, which are used in order of least busy - * and then least recently used. The pool will have [numWorkers] workers. - */ -internal class WorkerPool(private val numWorkers: Int) { - /** The available workers */ - private val workers = (0 until numWorkers).map { WeightedWorker(Worker.start()) } - - /** The current sequence, which is incremented each time performWork is called */ - private val currentSequence = AtomicLong(0) - - private fun nextWorker(): WeightedWorker { - var next: WeightedWorker? = null - while (next == null) { - next = workers.minWithOrNull(comparator = WeightedWorker.comparator)!!.takeIf { - val currentValue = it.numBlocksQueued.value - // try again, if numBlocksQueue was modified - it.numBlocksQueued.compareAndSet(currentValue, currentValue + 1) - } - } - return next.apply { - lastSequence.value = currentSequence.addAndGet(1) - } - } - - fun performWork(work: () -> Unit) { - // get the next worker - val worker = nextWorker() - - // prepare the block to update state, when the worker is finished - val workerCompleteBlock: () -> Unit = { - worker.numBlocksQueued.decrement() - } - val workerOperation: () -> Unit = { - try { - work() - } finally { - workerCompleteBlock() - } - }.freeze() - worker.worker.executeAfter(operation = workerOperation) - } - - init { freeze() } -} diff --git a/src/nativeTest/kotlin/com/autodesk/coroutineworker/CoroutineUtilsTest.kt b/src/nativeTest/kotlin/com/autodesk/coroutineworker/CoroutineUtilsTest.kt deleted file mode 100644 index 48821b2..0000000 --- a/src/nativeTest/kotlin/com/autodesk/coroutineworker/CoroutineUtilsTest.kt +++ /dev/null @@ -1,59 +0,0 @@ -package com.autodesk.coroutineworker - -import kotlinx.atomicfu.atomic -import kotlinx.coroutines.Dispatchers -import kotlin.native.concurrent.isFrozen -import kotlin.test.Test -import kotlin.test.assertFalse -import kotlin.test.assertTrue - -class CoroutineUtilsTest { - - @Test - fun `threadSafeSuspendCallback completion is frozen`() { - testRunBlocking { - var called = false - threadSafeSuspendCallback { completion -> - called = true - assertTrue(completion.isFrozen) - completion(Result.success(Unit)) - return@threadSafeSuspendCallback { Unit } - } - assertTrue(called) - } - } - - @Test - fun `withContext with special dispatcher uses special thread`() { - testRunBlocking { - CoroutineWorker.withContext(IODispatcher) { - // should return true because it's on the special thread - assertTrue(BackgroundCoroutineWorkQueueExecutor.shouldPerformIoWorkInline()) - } - } - } - - @Test - fun `withContext without special dispatcher does not use special thread`() { - testRunBlocking { - CoroutineWorker.withContext(Dispatchers.Default) { - // should return false because it's not on the special thread - assertFalse(BackgroundCoroutineWorkQueueExecutor.shouldPerformIoWorkInline()) - } - } - } - - @Test - fun `multiple withContext on IODispatcher works without timeout`() { - testRunBlocking { - val ran = atomic(false) - CoroutineWorker.withContext(IODispatcher) { - // should return false because it's not on the special thread - CoroutineWorker.withContext(IODispatcher) { - ran.value = true - } - } - assertTrue(ran.value) - } - } -}