Skip to content

Commit

Permalink
Fix flaky Node.js tests (#354)
Browse files Browse the repository at this point in the history
  • Loading branch information
05nelsonm authored Apr 9, 2024
1 parent 363ca04 commit 1c661c3
Show file tree
Hide file tree
Showing 10 changed files with 122 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ import kotlin.time.Duration.Companion.seconds
@OptIn(ExperimentalStdlibApi::class)
abstract class PortUtilBaseTest {

protected abstract fun openServerSocket(
protected open val isNodeJs: Boolean = false

protected abstract suspend fun openServerSocket(
ipAddress: IPAddress,
port: Int,
): AutoCloseable
Expand All @@ -54,7 +56,7 @@ abstract class PortUtilBaseTest {
fun givenFindAvailable_whenCoroutineCancelled_thenHandlesCancellationProperly() = runTest(timeout = 120.seconds) {
val port = Port.Proxy.MIN.toPortProxy()
val host = LocalHost.IPv4
val limit = 750
val limit = if (isNodeJs) 250 else 750
val i = port.iterator(limit)

var count = 0
Expand Down Expand Up @@ -84,7 +86,7 @@ abstract class PortUtilBaseTest {

// Ensure any exceptions/results are propagated
withContext(Dispatchers.Default) {
delay(5_000.milliseconds)
delay(2_500.milliseconds)
}

// If it threw an IOException, that would be propagated
Expand Down Expand Up @@ -114,11 +116,6 @@ abstract class PortUtilBaseTest {
socket.close()
} catch (_: Throwable) {}
}
withContext(Dispatchers.Default) {
// Need to switch context here for an actual delay
// b/c JS needs to establish the connection
delay(10.milliseconds)
}
return socket to portProxy
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,5 @@ internal external fun net_createServer(connectionListener: (socket: dynamic) ->
@OptIn(InternalProcessApi::class)
internal external class net_Server: events_EventEmitter {
fun close()
fun listen(port: Int, host: String, backlog: Int, callback: () -> Unit)
fun listen(options: dynamic, callback: () -> Unit)
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,3 @@ internal inline fun net_Server.onError(
) {
on("error", callback)
}

@Suppress("NOTHING_TO_INLINE")
@OptIn(InternalProcessApi::class)
internal inline fun net_Server.onListening(
noinline callback: () -> Unit,
) {
on("listening", callback)
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ import io.matthewnelson.kmp.tor.runtime.core.internal.*
import io.matthewnelson.kmp.tor.runtime.core.internal.PortProxyIterator.Companion.iterator
import io.matthewnelson.kmp.tor.runtime.core.internal.net_createServer
import io.matthewnelson.kmp.tor.runtime.core.internal.onError
import io.matthewnelson.kmp.tor.runtime.core.internal.onListening
import kotlinx.coroutines.*
import kotlin.time.Duration
import kotlin.time.Duration.Companion.milliseconds
import kotlin.time.TimeSource

Expand All @@ -40,7 +40,7 @@ import kotlin.time.TimeSource
// @Throws(IOException::class, CancellationException::class)
public actual suspend fun Port.isAvailableAsync(
host: LocalHost,
): Boolean = host.resolve().isPortAvailable(value)
): Boolean = host.resolve().isPortAvailable(value, timeout = 100.milliseconds)

/**
* Finds an available TCP port on [LocalHost] starting with the current
Expand All @@ -64,16 +64,35 @@ public actual suspend fun Port.Proxy.findAvailableAsync(
val ipAddress = host.resolve()

val ctx = currentCoroutineContext()
while (ctx.isActive && i.hasNext()) {
if (!ipAddress.isPortAvailable(i.next())) continue
val maxTimeouts = 5
var timeouts = 0
while (ctx.isActive && i.hasNext() && timeouts < maxTimeouts) {
try {
val isAvailable = ipAddress.isPortAvailable(i.next())
timeouts = 0
if (!isAvailable) continue
} catch (_: IOException) {
timeouts++
continue
}

return i.toPortProxy()
}

throw ctx.cancellationExceptionOr { i.unavailableException(ipAddress) }
throw ctx.cancellationExceptionOr {
if (timeouts >= maxTimeouts) {
IOException("$maxTimeouts successive timeouts occurred when checking availability")
} else {
i.unavailableException(ipAddress)
}
}
}

// @Throws(IOException::class, CancellationException::class)
private suspend fun IPAddress.isPortAvailable(port: Int): Boolean {
private suspend fun IPAddress.isPortAvailable(
port: Int,
timeout: Duration = 42.milliseconds
): Boolean {
val timeMark = TimeSource.Monotonic.markNow()
val ctx = currentCoroutineContext()
val latch = Job(ctx[Job])
Expand All @@ -87,11 +106,6 @@ private suspend fun IPAddress.isPortAvailable(port: Int): Boolean {

latch.invokeOnCompletion { server.close() }

server.onListening {
isAvailable = true
latch.complete()
}

server.onError { err ->
if ((err.code as String) == "EADDRINUSE") {
isAvailable = false
Expand All @@ -101,13 +115,16 @@ private suspend fun IPAddress.isPortAvailable(port: Int): Boolean {
latch.complete()
}

server.listen(port, ipAddress, 1) {
val options = js("{}")
options["port"] = port
options["host"] = ipAddress
options["backlog"] = 1

server.listen(options) {
isAvailable = true
latch.complete()
}

val waitTime = (if (IsUnixLikeHost) 42 else 84).milliseconds

withContext(NonCancellable) {
while (
ctx.isActive
Expand All @@ -116,7 +133,7 @@ private suspend fun IPAddress.isPortAvailable(port: Int): Boolean {
&& error == null
) {
delay(5.milliseconds)
if (timeMark.elapsedNow() > waitTime) break
if (timeMark.elapsedNow() > timeout) break
}
}
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,29 @@ package io.matthewnelson.kmp.tor.runtime.core.util
import io.matthewnelson.kmp.tor.runtime.core.address.IPAddress
import io.matthewnelson.kmp.tor.runtime.core.internal.net_createServer
import io.matthewnelson.kmp.tor.runtime.core.internal.onError
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.delay
import kotlinx.coroutines.withContext
import kotlin.test.fail
import kotlin.time.Duration.Companion.milliseconds

@OptIn(ExperimentalStdlibApi::class)
class PortUtilJsUnitTest: PortUtilBaseTest() {

override fun openServerSocket(
override val isNodeJs: Boolean = true

override suspend fun openServerSocket(
ipAddress: IPAddress,
port: Int,
): AutoCloseable {
val server = net_createServer { it.destroy(); Unit }
server.onError { err -> fail(err.toString()) }
server.listen(port, ipAddress.value, 1) {}
val options = js("{}")
options["port"] = port
options["host"] = ipAddress.value
options["backlog"] = 1
server.listen(options) {}
withContext(Dispatchers.Default) { delay(10.milliseconds) }
return object : AutoCloseable {
override fun close() { server.close() }
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import io.matthewnelson.kmp.tor.runtime.core.internal.ServerSocketProducer.Compa
@OptIn(ExperimentalStdlibApi::class)
class PortUtilNonJsUnitTest: PortUtilBaseTest() {

override fun openServerSocket(
override suspend fun openServerSocket(
ipAddress: IPAddress,
port: Int,
): AutoCloseable = ipAddress.toServerSocketProducer()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public object TestUtils {
val dataDir = homeDir.resolve("data")
val cacheDir = homeDir.resolve("cache")

withContext(Dispatchers.Default) { delay(250.milliseconds) }
withContext(Dispatchers.Default) { delay(350.milliseconds) }

val p = Process.Builder(paths.tor)
.args("--DataDirectory")
Expand All @@ -64,6 +64,8 @@ public object TestUtils {
.args("1")
.args("--RunAsDaemon")
.args("0")
.args("__OwningControllerProcess")
.args(Process.Current.pid().toString())
.destroySignal(Signal.SIGTERM)
.environment("HOME", homeDir.path)
.stdin(Stdio.Null)
Expand All @@ -73,7 +75,7 @@ public object TestUtils {

currentCoroutineContext().job.invokeOnCompletion { p.destroy() }

withContext(Dispatchers.Default) { delay(250.milliseconds) }
withContext(Dispatchers.Default) { delay(350.milliseconds) }

return p
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ class TorCtrlFactoryUnitTest {
p.destroy()
}

withContext(Dispatchers.Default) { delay(250.milliseconds) }
withContext(Dispatchers.Default) { delay(350.milliseconds) }

assertTrue(ctrl.isDestroyed())
}
Expand All @@ -102,7 +102,7 @@ class TorCtrlFactoryUnitTest {
)

val host = resolve()
val port = startPort.findAvailableAsync(1_000, this)
val port = startPort.findAvailableAsync(100, this)

val address = ProxyAddress(host, port)

Expand All @@ -114,7 +114,7 @@ class TorCtrlFactoryUnitTest {

block(process, ctrl)

withContext(Dispatchers.Default) { delay(250.milliseconds) }
withContext(Dispatchers.Default) { delay(500.milliseconds) }

assertEquals(1, invocationDestroy)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
**/
@file:Suppress("EXPECT_ACTUAL_CLASSIFIERS_ARE_IN_BETA_WARNING")
@file:Suppress("EXPECT_ACTUAL_CLASSIFIERS_ARE_IN_BETA_WARNING", "KotlinRedundantDiagnosticSuppress")

package io.matthewnelson.kmp.tor.runtime.ctrl

Expand Down Expand Up @@ -122,8 +122,12 @@ public actual interface TorCtrl : Destroyable, TorEvent.Processor, TorCmd.Privil
* */
@Throws(CancellationException::class, IOException::class)
public actual suspend fun connectAsync(address: ProxyAddress): TorCtrl {
return connect { context ->
withContext(context) { address.connect() }
return withDelayedReturnAsync {
connect { context ->
withContext(context) {
address.connect()
}
}
}
}

Expand All @@ -138,8 +142,12 @@ public actual interface TorCtrl : Destroyable, TorEvent.Processor, TorCmd.Privil
public actual suspend fun connectAsync(path: File): TorCtrl {
path.checkUnixSockedSupport()

return connect { context ->
withContext(context) { path.connect() }
return withDelayedReturnAsync {
connect { context ->
withContext(context) {
path.connect()
}
}
}
}

Expand All @@ -153,7 +161,9 @@ public actual interface TorCtrl : Destroyable, TorEvent.Processor, TorCmd.Privil
* */
@Throws(IOException::class)
public fun connect(address: ProxyAddress): TorCtrl {
return connect { address.connect() }
return withDelayedReturn {
connect { address.connect() }
}
}

/**
Expand All @@ -170,10 +180,13 @@ public actual interface TorCtrl : Destroyable, TorEvent.Processor, TorCmd.Privil
public fun connect(path: File): TorCtrl {
path.checkUnixSockedSupport()

return connect { path.connect() }
return withDelayedReturn {
connect { path.connect() }
}
}

@Throws(IOException::class)
@Suppress("NOTHING_TO_INLINE")
@OptIn(ExperimentalContracts::class)
private inline fun connect(
connect: (context: CoroutineContext) -> CtrlConnection,
Expand All @@ -182,21 +195,32 @@ public actual interface TorCtrl : Destroyable, TorEvent.Processor, TorCmd.Privil
callsInPlace(connect, InvocationKind.EXACTLY_ONCE)
}

@OptIn(ExperimentalCoroutinesApi::class)
val dispatcher = Dispatchers.IO.limitedParallelism(2)
val dispatcher = Dispatchers.IO

val connection = try {
connect(dispatcher)
} catch (t: Throwable) {
throw t.wrapIOException()
}

val ctrl = RealTorCtrl.of(this, dispatcher, connection)
return RealTorCtrl.of(this, dispatcher, connection)
}

/**
* A slight delay is needed before returning in order
* to ensure that the coroutine starts before able
* to call destroy on it.
* */
@Suppress("NOTHING_TO_INLINE")
@OptIn(ExperimentalContracts::class)
private inline fun withDelayedReturn(block: () -> TorCtrl): TorCtrl {
contract {
callsInPlace(block, InvocationKind.EXACTLY_ONCE)
}

val ctrl = block()

try {
// A slight delay is needed before returning in order
// to ensure that the coroutine starts before able
// to call destroy on it.
Blocking.threadSleep(25.milliseconds)
} catch (e: InterruptedException) {
ctrl.destroy()
Expand All @@ -206,6 +230,30 @@ public actual interface TorCtrl : Destroyable, TorEvent.Processor, TorCmd.Privil
return ctrl
}

/**
* A slight delay is needed before returning in order
* to ensure that the coroutine starts before able
* to call destroy on it.
* */
@Suppress("NOTHING_TO_INLINE")
@OptIn(ExperimentalContracts::class)
private suspend inline fun withDelayedReturnAsync(block: () -> TorCtrl): TorCtrl {
contract {
callsInPlace(block, InvocationKind.EXACTLY_ONCE)
}

val ctrl = block()

try {
delay(25.milliseconds)
} catch (e: CancellationException) {
ctrl.destroy()
throw e
}

return ctrl
}

@InternalKmpTorApi
public actual fun tempQueue(): TempTorCmdQueue = TempTorCmdQueue.of(handler)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ class TorCtrlFactoryTest {
.args("1")
.args("--RunAsDaemon")
.args("0")
.args("__OwningControllerProcess")
.args(Process.Current.pid().toString())
.destroySignal(Signal.SIGTERM)
.environment("HOME", homeDir.path)
.stdin(Stdio.Null)
Expand Down

0 comments on commit 1c661c3

Please sign in to comment.