Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix flaky Node.js tests #354

Merged
merged 5 commits into from
Apr 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ import kotlin.time.Duration.Companion.seconds
@OptIn(ExperimentalStdlibApi::class)
abstract class PortUtilBaseTest {

protected abstract fun openServerSocket(
protected open val isNodeJs: Boolean = false

protected abstract suspend fun openServerSocket(
ipAddress: IPAddress,
port: Int,
): AutoCloseable
Expand All @@ -54,7 +56,7 @@ abstract class PortUtilBaseTest {
fun givenFindAvailable_whenCoroutineCancelled_thenHandlesCancellationProperly() = runTest(timeout = 120.seconds) {
val port = Port.Proxy.MIN.toPortProxy()
val host = LocalHost.IPv4
val limit = 750
val limit = if (isNodeJs) 250 else 750
val i = port.iterator(limit)

var count = 0
Expand Down Expand Up @@ -84,7 +86,7 @@ abstract class PortUtilBaseTest {

// Ensure any exceptions/results are propagated
withContext(Dispatchers.Default) {
delay(5_000.milliseconds)
delay(2_500.milliseconds)
}

// If it threw an IOException, that would be propagated
Expand Down Expand Up @@ -114,11 +116,6 @@ abstract class PortUtilBaseTest {
socket.close()
} catch (_: Throwable) {}
}
withContext(Dispatchers.Default) {
// Need to switch context here for an actual delay
// b/c JS needs to establish the connection
delay(10.milliseconds)
}
return socket to portProxy
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,5 @@ internal external fun net_createServer(connectionListener: (socket: dynamic) ->
@OptIn(InternalProcessApi::class)
internal external class net_Server: events_EventEmitter {
fun close()
fun listen(port: Int, host: String, backlog: Int, callback: () -> Unit)
fun listen(options: dynamic, callback: () -> Unit)
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,3 @@ internal inline fun net_Server.onError(
) {
on("error", callback)
}

@Suppress("NOTHING_TO_INLINE")
@OptIn(InternalProcessApi::class)
internal inline fun net_Server.onListening(
noinline callback: () -> Unit,
) {
on("listening", callback)
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ import io.matthewnelson.kmp.tor.runtime.core.internal.*
import io.matthewnelson.kmp.tor.runtime.core.internal.PortProxyIterator.Companion.iterator
import io.matthewnelson.kmp.tor.runtime.core.internal.net_createServer
import io.matthewnelson.kmp.tor.runtime.core.internal.onError
import io.matthewnelson.kmp.tor.runtime.core.internal.onListening
import kotlinx.coroutines.*
import kotlin.time.Duration
import kotlin.time.Duration.Companion.milliseconds
import kotlin.time.TimeSource

Expand All @@ -40,7 +40,7 @@ import kotlin.time.TimeSource
// @Throws(IOException::class, CancellationException::class)
public actual suspend fun Port.isAvailableAsync(
host: LocalHost,
): Boolean = host.resolve().isPortAvailable(value)
): Boolean = host.resolve().isPortAvailable(value, timeout = 100.milliseconds)

/**
* Finds an available TCP port on [LocalHost] starting with the current
Expand All @@ -64,16 +64,35 @@ public actual suspend fun Port.Proxy.findAvailableAsync(
val ipAddress = host.resolve()

val ctx = currentCoroutineContext()
while (ctx.isActive && i.hasNext()) {
if (!ipAddress.isPortAvailable(i.next())) continue
val maxTimeouts = 5
var timeouts = 0
while (ctx.isActive && i.hasNext() && timeouts < maxTimeouts) {
try {
val isAvailable = ipAddress.isPortAvailable(i.next())
timeouts = 0
if (!isAvailable) continue
} catch (_: IOException) {
timeouts++
continue
}

return i.toPortProxy()
}

throw ctx.cancellationExceptionOr { i.unavailableException(ipAddress) }
throw ctx.cancellationExceptionOr {
if (timeouts >= maxTimeouts) {
IOException("$maxTimeouts successive timeouts occurred when checking availability")
} else {
i.unavailableException(ipAddress)
}
}
}

// @Throws(IOException::class, CancellationException::class)
private suspend fun IPAddress.isPortAvailable(port: Int): Boolean {
private suspend fun IPAddress.isPortAvailable(
port: Int,
timeout: Duration = 42.milliseconds
): Boolean {
val timeMark = TimeSource.Monotonic.markNow()
val ctx = currentCoroutineContext()
val latch = Job(ctx[Job])
Expand All @@ -87,11 +106,6 @@ private suspend fun IPAddress.isPortAvailable(port: Int): Boolean {

latch.invokeOnCompletion { server.close() }

server.onListening {
isAvailable = true
latch.complete()
}

server.onError { err ->
if ((err.code as String) == "EADDRINUSE") {
isAvailable = false
Expand All @@ -101,13 +115,16 @@ private suspend fun IPAddress.isPortAvailable(port: Int): Boolean {
latch.complete()
}

server.listen(port, ipAddress, 1) {
val options = js("{}")
options["port"] = port
options["host"] = ipAddress
options["backlog"] = 1

server.listen(options) {
isAvailable = true
latch.complete()
}

val waitTime = (if (IsUnixLikeHost) 42 else 84).milliseconds

withContext(NonCancellable) {
while (
ctx.isActive
Expand All @@ -116,7 +133,7 @@ private suspend fun IPAddress.isPortAvailable(port: Int): Boolean {
&& error == null
) {
delay(5.milliseconds)
if (timeMark.elapsedNow() > waitTime) break
if (timeMark.elapsedNow() > timeout) break
}
}
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,29 @@ package io.matthewnelson.kmp.tor.runtime.core.util
import io.matthewnelson.kmp.tor.runtime.core.address.IPAddress
import io.matthewnelson.kmp.tor.runtime.core.internal.net_createServer
import io.matthewnelson.kmp.tor.runtime.core.internal.onError
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.delay
import kotlinx.coroutines.withContext
import kotlin.test.fail
import kotlin.time.Duration.Companion.milliseconds

@OptIn(ExperimentalStdlibApi::class)
class PortUtilJsUnitTest: PortUtilBaseTest() {

override fun openServerSocket(
override val isNodeJs: Boolean = true

override suspend fun openServerSocket(
ipAddress: IPAddress,
port: Int,
): AutoCloseable {
val server = net_createServer { it.destroy(); Unit }
server.onError { err -> fail(err.toString()) }
server.listen(port, ipAddress.value, 1) {}
val options = js("{}")
options["port"] = port
options["host"] = ipAddress.value
options["backlog"] = 1
server.listen(options) {}
withContext(Dispatchers.Default) { delay(10.milliseconds) }
return object : AutoCloseable {
override fun close() { server.close() }
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import io.matthewnelson.kmp.tor.runtime.core.internal.ServerSocketProducer.Compa
@OptIn(ExperimentalStdlibApi::class)
class PortUtilNonJsUnitTest: PortUtilBaseTest() {

override fun openServerSocket(
override suspend fun openServerSocket(
ipAddress: IPAddress,
port: Int,
): AutoCloseable = ipAddress.toServerSocketProducer()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public object TestUtils {
val dataDir = homeDir.resolve("data")
val cacheDir = homeDir.resolve("cache")

withContext(Dispatchers.Default) { delay(250.milliseconds) }
withContext(Dispatchers.Default) { delay(350.milliseconds) }

val p = Process.Builder(paths.tor)
.args("--DataDirectory")
Expand All @@ -64,6 +64,8 @@ public object TestUtils {
.args("1")
.args("--RunAsDaemon")
.args("0")
.args("__OwningControllerProcess")
.args(Process.Current.pid().toString())
.destroySignal(Signal.SIGTERM)
.environment("HOME", homeDir.path)
.stdin(Stdio.Null)
Expand All @@ -73,7 +75,7 @@ public object TestUtils {

currentCoroutineContext().job.invokeOnCompletion { p.destroy() }

withContext(Dispatchers.Default) { delay(250.milliseconds) }
withContext(Dispatchers.Default) { delay(350.milliseconds) }

return p
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ class TorCtrlFactoryUnitTest {
p.destroy()
}

withContext(Dispatchers.Default) { delay(250.milliseconds) }
withContext(Dispatchers.Default) { delay(350.milliseconds) }

assertTrue(ctrl.isDestroyed())
}
Expand All @@ -102,7 +102,7 @@ class TorCtrlFactoryUnitTest {
)

val host = resolve()
val port = startPort.findAvailableAsync(1_000, this)
val port = startPort.findAvailableAsync(100, this)

val address = ProxyAddress(host, port)

Expand All @@ -114,7 +114,7 @@ class TorCtrlFactoryUnitTest {

block(process, ctrl)

withContext(Dispatchers.Default) { delay(250.milliseconds) }
withContext(Dispatchers.Default) { delay(500.milliseconds) }

assertEquals(1, invocationDestroy)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
**/
@file:Suppress("EXPECT_ACTUAL_CLASSIFIERS_ARE_IN_BETA_WARNING")
@file:Suppress("EXPECT_ACTUAL_CLASSIFIERS_ARE_IN_BETA_WARNING", "KotlinRedundantDiagnosticSuppress")

package io.matthewnelson.kmp.tor.runtime.ctrl

Expand Down Expand Up @@ -122,8 +122,12 @@ public actual interface TorCtrl : Destroyable, TorEvent.Processor, TorCmd.Privil
* */
@Throws(CancellationException::class, IOException::class)
public actual suspend fun connectAsync(address: ProxyAddress): TorCtrl {
return connect { context ->
withContext(context) { address.connect() }
return withDelayedReturnAsync {
connect { context ->
withContext(context) {
address.connect()
}
}
}
}

Expand All @@ -138,8 +142,12 @@ public actual interface TorCtrl : Destroyable, TorEvent.Processor, TorCmd.Privil
public actual suspend fun connectAsync(path: File): TorCtrl {
path.checkUnixSockedSupport()

return connect { context ->
withContext(context) { path.connect() }
return withDelayedReturnAsync {
connect { context ->
withContext(context) {
path.connect()
}
}
}
}

Expand All @@ -153,7 +161,9 @@ public actual interface TorCtrl : Destroyable, TorEvent.Processor, TorCmd.Privil
* */
@Throws(IOException::class)
public fun connect(address: ProxyAddress): TorCtrl {
return connect { address.connect() }
return withDelayedReturn {
connect { address.connect() }
}
}

/**
Expand All @@ -170,10 +180,13 @@ public actual interface TorCtrl : Destroyable, TorEvent.Processor, TorCmd.Privil
public fun connect(path: File): TorCtrl {
path.checkUnixSockedSupport()

return connect { path.connect() }
return withDelayedReturn {
connect { path.connect() }
}
}

@Throws(IOException::class)
@Suppress("NOTHING_TO_INLINE")
@OptIn(ExperimentalContracts::class)
private inline fun connect(
connect: (context: CoroutineContext) -> CtrlConnection,
Expand All @@ -182,21 +195,32 @@ public actual interface TorCtrl : Destroyable, TorEvent.Processor, TorCmd.Privil
callsInPlace(connect, InvocationKind.EXACTLY_ONCE)
}

@OptIn(ExperimentalCoroutinesApi::class)
val dispatcher = Dispatchers.IO.limitedParallelism(2)
val dispatcher = Dispatchers.IO

val connection = try {
connect(dispatcher)
} catch (t: Throwable) {
throw t.wrapIOException()
}

val ctrl = RealTorCtrl.of(this, dispatcher, connection)
return RealTorCtrl.of(this, dispatcher, connection)
}

/**
* A slight delay is needed before returning in order
* to ensure that the coroutine starts before able
* to call destroy on it.
* */
@Suppress("NOTHING_TO_INLINE")
@OptIn(ExperimentalContracts::class)
private inline fun withDelayedReturn(block: () -> TorCtrl): TorCtrl {
contract {
callsInPlace(block, InvocationKind.EXACTLY_ONCE)
}

val ctrl = block()

try {
// A slight delay is needed before returning in order
// to ensure that the coroutine starts before able
// to call destroy on it.
Blocking.threadSleep(25.milliseconds)
} catch (e: InterruptedException) {
ctrl.destroy()
Expand All @@ -206,6 +230,30 @@ public actual interface TorCtrl : Destroyable, TorEvent.Processor, TorCmd.Privil
return ctrl
}

/**
* A slight delay is needed before returning in order
* to ensure that the coroutine starts before able
* to call destroy on it.
* */
@Suppress("NOTHING_TO_INLINE")
@OptIn(ExperimentalContracts::class)
private suspend inline fun withDelayedReturnAsync(block: () -> TorCtrl): TorCtrl {
contract {
callsInPlace(block, InvocationKind.EXACTLY_ONCE)
}

val ctrl = block()

try {
delay(25.milliseconds)
} catch (e: CancellationException) {
ctrl.destroy()
throw e
}

return ctrl
}

@InternalKmpTorApi
public actual fun tempQueue(): TempTorCmdQueue = TempTorCmdQueue.of(handler)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ class TorCtrlFactoryTest {
.args("1")
.args("--RunAsDaemon")
.args("0")
.args("__OwningControllerProcess")
.args(Process.Current.pid().toString())
.destroySignal(Signal.SIGTERM)
.environment("HOME", homeDir.path)
.stdin(Stdio.Null)
Expand Down
Loading