Skip to content

Commit

Permalink
Pass UncaughtException.Handler as CoroutineContext to OnEvent.Executor
Browse files Browse the repository at this point in the history
  • Loading branch information
05nelsonm committed Apr 11, 2024
1 parent 4a16918 commit ae24d45
Show file tree
Hide file tree
Showing 17 changed files with 229 additions and 36 deletions.
7 changes: 4 additions & 3 deletions library/runtime-core/api/runtime-core.api
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,18 @@ public abstract interface class io/matthewnelson/kmp/tor/runtime/core/OnEvent :
}

public abstract interface class io/matthewnelson/kmp/tor/runtime/core/OnEvent$Executor {
public abstract fun execute (Lio/matthewnelson/kmp/tor/runtime/core/ItBlock;)V
public abstract fun execute (Lkotlin/coroutines/CoroutineContext;Lio/matthewnelson/kmp/tor/runtime/core/ItBlock;)V
}

public final class io/matthewnelson/kmp/tor/runtime/core/OnEvent$Executor$Main : io/matthewnelson/kmp/tor/runtime/core/OnEvent$Executor {
public static final field INSTANCE Lio/matthewnelson/kmp/tor/runtime/core/OnEvent$Executor$Main;
public fun execute (Lio/matthewnelson/kmp/tor/runtime/core/ItBlock;)V
public fun execute (Lkotlin/coroutines/CoroutineContext;Lio/matthewnelson/kmp/tor/runtime/core/ItBlock;)V
public fun toString ()Ljava/lang/String;
}

public final class io/matthewnelson/kmp/tor/runtime/core/OnEvent$Executor$Unconfined : io/matthewnelson/kmp/tor/runtime/core/OnEvent$Executor {
public static final field INSTANCE Lio/matthewnelson/kmp/tor/runtime/core/OnEvent$Executor$Unconfined;
public fun execute (Lio/matthewnelson/kmp/tor/runtime/core/ItBlock;)V
public fun execute (Lkotlin/coroutines/CoroutineContext;Lio/matthewnelson/kmp/tor/runtime/core/ItBlock;)V
public fun toString ()Ljava/lang/String;
}

Expand Down Expand Up @@ -627,6 +627,7 @@ public class io/matthewnelson/kmp/tor/runtime/core/TorEvent$Observer {
public final field tag Ljava/lang/String;
public fun <init> (Lio/matthewnelson/kmp/tor/runtime/core/TorEvent;Ljava/lang/String;Lio/matthewnelson/kmp/tor/runtime/core/OnEvent$Executor;Lio/matthewnelson/kmp/tor/runtime/core/OnEvent;)V
public final fun notify (Lio/matthewnelson/kmp/tor/runtime/core/OnEvent$Executor;Ljava/lang/String;)V
public final fun notify (Lkotlin/coroutines/CoroutineContext;Lio/matthewnelson/kmp/tor/runtime/core/OnEvent$Executor;Ljava/lang/String;)V
public final fun toString ()Ljava/lang/String;
public final fun toString (Z)Ljava/lang/String;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package io.matthewnelson.kmp.tor.runtime.core
import io.matthewnelson.kmp.tor.runtime.core.internal.ExecutorMainInternal
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.MainCoroutineDispatcher
import kotlin.coroutines.CoroutineContext
import kotlin.jvm.JvmField

/**
Expand All @@ -26,7 +27,7 @@ import kotlin.jvm.JvmField
*
* **NOTE:** Exceptions should not be thrown
* within the [OnSuccess] lambda. If [OnSuccess] is
* being utilized with TorRuntime APIs, it will be
* being utilized with `TorRuntime` APIs, it will be
* treated as an [UncaughtException] and dispatched
* to [io.matthewnelson.kmp.tor.runtime.RuntimeEvent.LOG.ERROR]
* observers.
Expand All @@ -39,7 +40,7 @@ public typealias OnSuccess<T> = ItBlock<T>
*
* **NOTE:** The exception should not be re-thrown
* within the [OnFailure] lambda. If [OnFailure] is
* being utilized with TorRuntime APIs, it will be
* being utilized with `TorRuntime` APIs, it will be
* treated as an [UncaughtException] and dispatched
* to [io.matthewnelson.kmp.tor.runtime.RuntimeEvent.LOG.ERROR]
* observers.
Expand All @@ -49,6 +50,14 @@ public typealias OnFailure = ItBlock<Throwable>
/**
* A callback for dispatching events.
*
* Implementations of [OnEvent] should not throw exception,
* be fast, and non-blocking.
*
* **NOTE:** If [OnEvent] is being utilized with `TorRuntime`
* APIs, it will be treated as an [UncaughtException] and dispatched
* to [io.matthewnelson.kmp.tor.runtime.RuntimeEvent.LOG.ERROR]
* observers.
*
* @see [OnEvent.Executor]
* */
public fun interface OnEvent<in It: Any>: ItBlock<It> {
Expand Down Expand Up @@ -79,9 +88,11 @@ public fun interface OnEvent<in It: Any>: ItBlock<It> {
/**
* Execute [block] in desired context.
*
* @param [handler] The [UncaughtException.Handler] wrapped as
* [CoroutineContext] element to pipe exceptions.
* @param [block] to be invoked in desired context.
* */
public fun execute(block: ItBlock<Unit>)
public fun execute(handler: CoroutineContext, block: ItBlock<Unit>)

/**
* Utilizes [Dispatchers.Main] under the hood to transition events
Expand Down Expand Up @@ -110,7 +121,7 @@ public fun interface OnEvent<in It: Any>: ItBlock<It> {
* confines of its lambda.
* */
public object Unconfined: Executor {
override fun execute(block: ItBlock<Unit>) { block(Unit) }
override fun execute(handler: CoroutineContext, block: ItBlock<Unit>) { block(Unit) }

override fun toString(): String = "OnEvent.Executor.Unconfined"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

package io.matthewnelson.kmp.tor.runtime.core

import kotlinx.coroutines.CoroutineExceptionHandler
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext
import kotlin.jvm.JvmField

/**
Expand Down Expand Up @@ -299,17 +302,30 @@ public enum class TorEvent {
* back to if [executor] was not defined for this observer.
* */
public fun notify(default: OnEvent.Executor, event: String) {
(executor ?: default).execute { onEvent(event) }
notify(EmptyCoroutineContext, default, event)
}

/**
* Invokes [OnEvent] for the given [event] string
*
* @param [handler] Optional ability to pass [UncaughtException.Handler]
* wrapped as [CoroutineExceptionHandler]
* @param [default] the default [OnEvent.Executor] to fall
* back to if [executor] was not defined for this observer.
* */
public fun notify(handler: CoroutineContext, default: OnEvent.Executor, event: String) {
(executor ?: default).execute(handler) { onEvent(event) }
}


public final override fun toString(): String = toString(isStatic = false)

public fun toString(isStatic: Boolean): String = buildString {
val tag = if (tag != null && isStatic) "STATIC" else tag

append("TorEvent.Observer[tag=")
append(tag.toString())
append(",event=")
append(", event=")
append(event.name)

when (executor) {
Expand All @@ -318,7 +334,7 @@ public enum class TorEvent {
OnEvent.Executor.Unconfined -> executor.toString()
else -> "Custom"
}.let {
append(",executor=")
append(", executor=")
append(it)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,5 @@ import io.matthewnelson.kmp.tor.runtime.core.OnEvent
import kotlin.coroutines.CoroutineContext

internal expect object ExecutorMainInternal: OnEvent.Executor {
override fun execute(block: ItBlock<Unit>)
override fun execute(handler: CoroutineContext, block: ItBlock<Unit>)
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package io.matthewnelson.kmp.tor.runtime.core.internal

import io.matthewnelson.kmp.tor.runtime.core.OnEvent
import kotlin.coroutines.EmptyCoroutineContext
import kotlin.test.Test
import kotlin.test.assertFailsWith

Expand All @@ -24,7 +25,7 @@ class MainExecutorJvmUnitTest {
@Test
fun givenExecute_whenNoDispatchersMain_thenThrowsException() {
assertFailsWith<IllegalStateException> {
OnEvent.Executor.Main.execute { }
OnEvent.Executor.Main.execute(EmptyCoroutineContext) { }
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,13 @@ import io.matthewnelson.kmp.tor.runtime.core.ItBlock
import io.matthewnelson.kmp.tor.runtime.core.OnEvent
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Runnable
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext

internal actual object ExecutorMainInternal: OnEvent.Executor {

actual override fun execute(block: ItBlock<Unit>) {
Main.dispatch(EmptyCoroutineContext, Runnable { block(Unit) })
actual override fun execute(handler: CoroutineContext, block: ItBlock<Unit>) {
Main.dispatch(handler, Runnable { block(Unit) })
}

private val Main by lazy {
Expand Down
20 changes: 19 additions & 1 deletion library/runtime-ctrl/api/runtime-ctrl.api
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ public abstract class io/matthewnelson/kmp/tor/runtime/ctrl/AbstractTorEventProc
protected final fun destroyed ()Z
protected fun getDebug ()Z
protected final fun getDefaultExecutor ()Lio/matthewnelson/kmp/tor/runtime/core/OnEvent$Executor;
protected abstract fun getHandler ()Lio/matthewnelson/kmp/tor/runtime/core/UncaughtException$Handler;
protected abstract fun getHandler ()Lio/matthewnelson/kmp/tor/runtime/ctrl/AbstractTorEventProcessor$HandlerWithContext;
protected final fun isStaticTag (Ljava/lang/String;)Z
protected final fun notifyObservers (Lio/matthewnelson/kmp/tor/runtime/core/TorEvent;Ljava/lang/String;)V
protected fun onDestroy ()Z
Expand All @@ -21,6 +21,24 @@ public abstract class io/matthewnelson/kmp/tor/runtime/ctrl/AbstractTorEventProc
protected final class io/matthewnelson/kmp/tor/runtime/ctrl/AbstractTorEventProcessor$Companion {
}

protected final class io/matthewnelson/kmp/tor/runtime/ctrl/AbstractTorEventProcessor$HandlerWithContext : kotlin/coroutines/AbstractCoroutineContextElement, io/matthewnelson/kmp/tor/runtime/core/UncaughtException$Handler, kotlinx/coroutines/CoroutineExceptionHandler {
public final field delegate Lio/matthewnelson/kmp/tor/runtime/core/UncaughtException$Handler;
public fun <init> (Lio/matthewnelson/kmp/tor/runtime/core/UncaughtException$Handler;)V
public fun handleException (Lkotlin/coroutines/CoroutineContext;Ljava/lang/Throwable;)V
public fun invoke (Lio/matthewnelson/kmp/tor/runtime/core/UncaughtException;)V
public synthetic fun invoke (Ljava/lang/Object;)V
}

protected final class io/matthewnelson/kmp/tor/runtime/ctrl/AbstractTorEventProcessor$ObserverNameContext : kotlin/coroutines/AbstractCoroutineContextElement {
public static final field Key Lio/matthewnelson/kmp/tor/runtime/ctrl/AbstractTorEventProcessor$ObserverNameContext$Key;
public final field context Ljava/lang/String;
public fun <init> (Ljava/lang/String;)V
public final fun toString ()Ljava/lang/String;
}

public final class io/matthewnelson/kmp/tor/runtime/ctrl/AbstractTorEventProcessor$ObserverNameContext$Key : kotlin/coroutines/CoroutineContext$Key {
}

public final class io/matthewnelson/kmp/tor/runtime/ctrl/TempTorCmdQueue : io/matthewnelson/kmp/tor/runtime/core/Destroyable, io/matthewnelson/kmp/tor/runtime/core/ctrl/TorCmd$Unprivileged$Processor {
public synthetic fun <init> (Lio/matthewnelson/kmp/tor/runtime/core/UncaughtException$Handler;Lkotlin/jvm/internal/DefaultConstructorMarker;)V
public final fun attach (Lio/matthewnelson/kmp/tor/runtime/ctrl/TorCtrl;)V
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,12 @@ import io.matthewnelson.kmp.tor.runtime.core.OnEvent
import io.matthewnelson.kmp.tor.runtime.core.TorEvent
import io.matthewnelson.kmp.tor.runtime.core.UncaughtException
import io.matthewnelson.kmp.tor.runtime.core.UncaughtException.Handler.Companion.tryCatch
import kotlinx.coroutines.CoroutineExceptionHandler
import kotlin.concurrent.Volatile
import kotlin.coroutines.AbstractCoroutineContextElement
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.cancellation.CancellationException
import kotlin.jvm.JvmField
import kotlin.jvm.JvmName
import kotlin.jvm.JvmStatic

Expand All @@ -47,7 +52,7 @@ protected constructor(
@get:JvmName("destroyed")
protected val destroyed: Boolean get() = _destroyed
protected open val debug: Boolean = true
protected abstract val handler: UncaughtException.Handler
protected abstract val handler: HandlerWithContext

init {
observers.addAll(initialObservers)
Expand Down Expand Up @@ -132,8 +137,10 @@ protected constructor(
if (isEmpty()) return@withObservers null
mapNotNull { if (it.event == event) it else null }
}?.forEach { observer ->
handler.tryCatch(observer.toString(isStatic = observer.tag.isStaticTag())) {
observer.notify(defaultExecutor, output)
val ctx = ObserverNameContext(observer.toString(isStatic = observer.tag.isStaticTag()))

handler.tryCatch(ctx) {
observer.notify(handler + ctx, defaultExecutor, output)
}
}
}
Expand All @@ -152,7 +159,7 @@ protected constructor(
return wasDestroyed
}

private fun <T: Any?> withObservers(
private fun <T : Any?> withObservers(
block: MutableSet<TorEvent.Observer>.() -> T,
): T {
if (_destroyed) return block(noOpMutableSet())
Expand All @@ -169,11 +176,41 @@ protected constructor(
@JvmStatic
@InternalKmpTorApi
@Suppress("UNCHECKED_CAST")
protected fun <T: Any> noOpMutableSet(): MutableSet<T> = NoOpMutableSet as MutableSet<T>
protected fun <T : Any> noOpMutableSet(): MutableSet<T> = NoOpMutableSet as MutableSet<T>
}

// testing
protected open fun registered(): Int = synchronized(lock) { observers.size }

// Handler that also implements CoroutineExceptionHandler
protected class HandlerWithContext(
@JvmField
public val delegate: UncaughtException.Handler
) : AbstractCoroutineContextElement(CoroutineExceptionHandler),
UncaughtException.Handler by delegate,
CoroutineExceptionHandler
{

override fun handleException(context: CoroutineContext, exception: Throwable) {
if (exception is CancellationException) return
if (exception is UncaughtException) {
invoke(exception)
} else {
val ctx = context[ObserverNameContext]?.context ?: "EventProcessor"
tryCatch(ctx) { throw exception }
}
}
}

// For passing observer name as context
protected class ObserverNameContext(
@JvmField
public val context: String,
): AbstractCoroutineContextElement(ObserverNameContext) {
public companion object Key: CoroutineContext.Key<ObserverNameContext>

final override fun toString(): String = context
}
}

private object NoOpMutableSet: MutableSet<Any> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ internal abstract class AbstractTorCmdQueue internal constructor(
staticTag: String?,
initialObservers: Set<TorEvent.Observer>,
defaultExecutor: OnEvent.Executor,
protected final override val handler: UncaughtException.Handler,
handler: UncaughtException.Handler,
): AbstractTorEventProcessor(staticTag, initialObservers, defaultExecutor),
Destroyable,
TorCmd.Privileged.Processor
Expand All @@ -45,6 +45,7 @@ internal abstract class AbstractTorCmdQueue internal constructor(
@Volatile
@Suppress("PropertyName")
protected open var LOG: Debugger? = null
protected final override val handler: HandlerWithContext = HandlerWithContext(handler)

public final override fun isDestroyed(): Boolean = destroyed

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ internal class RealTorCtrl private constructor(
LOG = null
}
} finally {
(handler as CloseableExceptionHandler).close()
(handler.delegate as CloseableExceptionHandler).close()
}

return true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,17 @@ import io.matthewnelson.kmp.tor.core.api.annotation.InternalKmpTorApi
import io.matthewnelson.kmp.tor.runtime.core.OnEvent
import io.matthewnelson.kmp.tor.runtime.core.TorEvent
import io.matthewnelson.kmp.tor.runtime.core.UncaughtException
import kotlinx.coroutines.*
import kotlinx.coroutines.test.runTest
import kotlin.test.*

@OptIn(InternalKmpTorApi::class)
class AbstractTorEventProcessorUnitTest {

private class TestProcessor: AbstractTorEventProcessor("static", emptySet(), OnEvent.Executor.Unconfined) {
override val handler: UncaughtException.Handler = UncaughtException.Handler.THROW
private class TestProcessor(
handler: UncaughtException.Handler = UncaughtException.Handler.THROW
): AbstractTorEventProcessor("static", emptySet(), OnEvent.Executor.Unconfined) {
override val handler = HandlerWithContext(handler)
val size: Int get() = registered()
fun notify(event: TorEvent, output: String) { event.notifyObservers(output) }
fun destroy() { onDestroy() }
Expand Down Expand Up @@ -185,4 +189,28 @@ class AbstractTorEventProcessorUnitTest {
assertFailsWith<NoSuchElementException> { iterator.next() }
assertFailsWith<IllegalStateException> { iterator.remove() }
}

@Test
fun givenHandler_whenPassedAsCoroutineContext_thenObserverNameContextIsPassed() = runTest {
val exceptions = mutableListOf<UncaughtException>()
val processor = TestProcessor(handler = { exceptions.add(it) })

val expectedTag = "Expected Tag"
var invocationEvent = 0
val latch = Job()
processor.subscribe(TorEvent.BW.observer(
tag = expectedTag,
executor = { handler, _ ->
@OptIn(DelicateCoroutinesApi::class)
GlobalScope.launch(handler) { throw IllegalStateException() }
.invokeOnCompletion { latch.cancel() }
},
onEvent = { invocationEvent++ }
))
processor.notify(TorEvent.BW, "")
latch.join()
assertEquals(1, exceptions.size)
assertEquals(0, invocationEvent)
assertTrue(exceptions.first().context.contains(expectedTag))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package io.matthewnelson.kmp.tor.runtime.mobile
import io.matthewnelson.kmp.tor.runtime.core.OnEvent
import kotlinx.coroutines.Job
import kotlinx.coroutines.test.runTest
import kotlin.coroutines.EmptyCoroutineContext
import kotlin.test.Test
import kotlin.test.assertTrue

Expand All @@ -26,7 +27,7 @@ class OnEventExecutorMainTest {
@Test
fun givenAndroid_whenExecutorMain_thenUsesDispatchersImmediate() = runTest {
val job = Job()
OnEvent.Executor.Main.execute { job.complete() }
OnEvent.Executor.Main.execute(EmptyCoroutineContext) { job.complete() }
job.join()
assertTrue(job.isCompleted)
}
Expand Down
Loading

0 comments on commit ae24d45

Please sign in to comment.