Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Suspend fixes #93

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 0 additions & 9 deletions .idea/modules.xml

This file was deleted.

4 changes: 2 additions & 2 deletions kzmq-benchmarks/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ benchmark {
}
configurations {
val main by getting {
warmups = 3
iterations = 3
warmups = 5
iterations = 10
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,13 @@ open class PullPushBenchmark() {
@Param("jeromq", "cio")
var engineName = ""

@Param("inproc", "ipc", "tcp")
@Param("inproc", "tcp")
var transport = "tcp"

private lateinit var address: String

@Param("10", "100", "1000", "10000", "100000")
var messageSize = 10

private lateinit var address: String
private lateinit var messageData: ByteString

private lateinit var scope: CoroutineScope
Expand Down
15 changes: 0 additions & 15 deletions kzmq-core/src/commonMain/kotlin/org/zeromq/ReceiveSocket.kt
Original file line number Diff line number Diff line change
Expand Up @@ -68,18 +68,3 @@ public interface ReceiveSocket {
*/
public var receiveTimeout: Int
}

public suspend inline fun <T> ReceiveSocket.receive(crossinline block: ReadScope.() -> T): T =
receive().let { it.checkingNoRemainingFrames { block() } }

public suspend inline fun <T> ReceiveSocket.receiveCatching(crossinline block: ReadScope.() -> T): SocketResult<T> =
receiveCatching().map { it.checkingNoRemainingFrames { block() } }

public inline fun <T> ReceiveSocket.tryReceive(crossinline block: ReadScope.() -> T): SocketResult<T> =
tryReceive().map { it.checkingNoRemainingFrames { block() } }

public inline fun <T> Message.checkingNoRemainingFrames(crossinline block: ReadScope.() -> T): T {
val result = block()
ensureNoRemainingFrames()
return result
}
15 changes: 15 additions & 0 deletions kzmq-core/src/commonMain/kotlin/org/zeromq/ReceiveSocketOps.kt
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,21 @@ import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.flow.*

public suspend inline fun <T> ReceiveSocket.receive(crossinline block: ReadScope.() -> T): T =
receive().let { it.checkingNoRemainingFrames { block() } }

public suspend inline fun <T> ReceiveSocket.receiveCatching(crossinline block: ReadScope.() -> T): SocketResult<T> =
receiveCatching().map { it.checkingNoRemainingFrames { block() } }

public inline fun <T> ReceiveSocket.tryReceive(crossinline block: ReadScope.() -> T): SocketResult<T> =
tryReceive().map { it.checkingNoRemainingFrames { block() } }

public inline fun <T> Message.checkingNoRemainingFrames(crossinline block: ReadScope.() -> T): T {
val result = block()
ensureNoRemainingFrames()
return result
}

/**
* Experimental API. Implementation is subject to change.
*/
Expand Down
10 changes: 0 additions & 10 deletions kzmq-core/src/commonMain/kotlin/org/zeromq/SendSocket.kt
Original file line number Diff line number Diff line change
Expand Up @@ -78,13 +78,3 @@ public interface SendSocket {
*/
public var sendTimeout: Int
}

public suspend fun SendSocket.send(sender: WriteScope.() -> Unit) {
send(Message(listOf<ByteString>()).apply { sender() })
}

public suspend fun SendSocket.sendCatching(sender: WriteScope.() -> Unit): SocketResult<Unit> =
sendCatching(Message(listOf<ByteString>()).apply { sender() })

public fun SendSocket.trySend(sender: WriteScope.() -> Unit): SocketResult<Unit> =
trySend(Message(listOf<ByteString>()).apply { sender() })
10 changes: 10 additions & 0 deletions kzmq-core/src/commonMain/kotlin/org/zeromq/SendSocketOps.kt
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,16 @@ package org.zeromq

import kotlinx.coroutines.flow.*

public suspend fun SendSocket.send(sender: WriteScope.() -> Unit) {
send(buildMessage { sender() })
}

public suspend fun SendSocket.sendCatching(sender: WriteScope.() -> Unit): SocketResult<Unit> =
sendCatching(buildMessage { sender() })

public fun SendSocket.trySend(sender: WriteScope.() -> Unit): SocketResult<Unit> =
trySend(buildMessage { sender() })

/**
* Experimental API. Implementation is subject to change.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,20 +79,32 @@ internal class CIODealerSocket(
}

internal class DealerSocketHandler : SocketHandler {
private val mailboxes = CircularQueue<PeerMailbox>()
private val outgoingMailboxes = CircularQueue<PeerMailbox>()
private val incomingMailboxes = CircularQueue<PeerMailbox>()

override suspend fun handle(peerEvents: ReceiveChannel<PeerEvent>) = coroutineScope {
while (isActive) {
mailboxes.update(peerEvents.receive())
val event = peerEvents.receive()
outgoingMailboxes.updateOnAdditionRemoval(event)
incomingMailboxes.updateOnAdditionRemoval(event)
}
}

override suspend fun send(message: Message) {
mailboxes.sendToFirstAvailable(message)
outgoingMailboxes.sendToFirstAvailable(message)
}

override fun trySend(message: Message): Unit? {
return outgoingMailboxes.trySendToFirstAvailable(message)?.let {}
}

override suspend fun receive(): Message {
val (_, message) = mailboxes.receiveFromFirst()
val (_, message) = incomingMailboxes.receiveFromFirst()
return message
}

override fun tryReceive(): Message? {
val maybeMailboxAndMessage = incomingMailboxes.tryReceiveFromFirst()
return maybeMailboxAndMessage?.let { (_, message) -> message }
}
}
53 changes: 35 additions & 18 deletions kzmq-engine-cio/src/commonMain/kotlin/org/zeromq/CIOPairSocket.kt
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,6 @@ internal class CIOPairSocket(
internal class PairSocketHandler : SocketHandler {
private val mailbox = atomic<PeerMailbox?>(null)

private suspend fun awaitCurrentPeer() {
var counter = 0
while (mailbox.value == null) {
if (counter++ < 100) println("in awaitCurrentPeer: ${mailbox.value}")
yield()
}
}

override suspend fun handle(peerEvents: ReceiveChannel<PeerEvent>) = coroutineScope {
while (isActive) {
val (kind, peerMailbox) = peerEvents.receive()
Expand All @@ -82,18 +74,43 @@ internal class PairSocketHandler : SocketHandler {
}

override suspend fun send(message: Message) {
awaitCurrentPeer()
val mailbox = mailbox.value!!
logger.v { "Sending $message to $mailbox" }
mailbox.sendChannel.send(CommandOrMessage(message))
while (true) {
val result = trySend(message)
if (result != null) return
yield()
}
}

override fun trySend(message: Message): Unit? {
val maybeMailbox = mailbox.value
if (maybeMailbox != null) {
val result = maybeMailbox.sendChannel.trySend(CommandOrMessage(message))
if (result.isSuccess) {
logger.v { "Sent message to $maybeMailbox" }
return Unit
}
}
return null
}

override suspend fun receive(): Message {
awaitCurrentPeer()
val mailbox = mailbox.value!!
val commandOrMessage = mailbox.receiveChannel.receive()
val message = commandOrMessage.messageOrThrow()
logger.v { "Receiving $message from $mailbox" }
return message
while (true) {
val result = tryReceive()
if (result != null) return result
yield()
}
}

override fun tryReceive(): Message? {
val mailbox = mailbox.value
if (mailbox != null) {
val result = mailbox.receiveChannel.tryReceive()
if (result.isSuccess) {
val message = result.getOrThrow().messageOrThrow()
logger.v { "Receiving $message from $mailbox" }
return message
}
}
return null
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,17 @@ internal class PullSocketHandler : SocketHandler {

override suspend fun handle(peerEvents: ReceiveChannel<PeerEvent>) = coroutineScope {
while (isActive) {
mailboxes.update(peerEvents.receive())
mailboxes.updateOnAdditionRemoval(peerEvents.receive())
}
}

override suspend fun receive(): Message {
val (_, message) = mailboxes.receiveFromFirst()
return message
}

override fun tryReceive(): Message? {
val maybeMailboxAndMessage = mailboxes.tryReceiveFromFirst()
return maybeMailboxAndMessage?.let { (_, message) -> message }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,15 @@ internal class PushSocketHandler : SocketHandler {

override suspend fun handle(peerEvents: ReceiveChannel<PeerEvent>) = coroutineScope {
while (isActive) {
mailboxes.update(peerEvents.receive())
mailboxes.updateOnAdditionRemoval(peerEvents.receive())
}
}

override suspend fun send(message: Message) {
mailboxes.sendToFirstAvailable(message)
}

override fun trySend(message: Message): Unit? {
return mailboxes.trySendToFirstAvailable(message)?.let {}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

package org.zeromq

import kotlinx.coroutines.channels.*
import org.zeromq.internal.*

internal interface CIOReceiveSocket : ReceiveSocket {
Expand All @@ -22,7 +21,9 @@ internal interface CIOReceiveSocket : ReceiveSocket {
}

override fun tryReceive(): SocketResult<Message> {
TODO()
val maybeMessage = handler.tryReceive()
return if (maybeMessage != null) SocketResult.success(maybeMessage)
else SocketResult.failure()
}

override val onReceive get() = TODO()
Expand Down
36 changes: 24 additions & 12 deletions kzmq-engine-cio/src/commonMain/kotlin/org/zeromq/CIOReplySocket.kt
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ package org.zeromq
import kotlinx.atomicfu.*
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.sync.*
import kotlinx.io.bytestring.*
import org.zeromq.internal.*
import org.zeromq.internal.utils.*
Expand Down Expand Up @@ -73,7 +72,6 @@ internal class CIOReplySocket(
internal class ReplySocketHandler : SocketHandler {
private val mailboxes = CircularQueue<PeerMailbox>()
private var state = atomic<ReplySocketState>(ReplySocketState.Idle)
private val requestReplyLock = Mutex()

private suspend fun awaitState(predicate: (ReplySocketState?) -> Boolean) {
while (!predicate(state.value)) yield()
Expand All @@ -82,28 +80,42 @@ internal class ReplySocketHandler : SocketHandler {
override suspend fun handle(peerEvents: ReceiveChannel<PeerEvent>) = coroutineScope {
while (isActive) {
val event = peerEvents.receive()
mailboxes.update(event)
mailboxes.updateOnAdditionRemoval(event)
}
}

override suspend fun receive(): Message {
awaitState { it is ReplySocketState.Idle }
requestReplyLock.withLock {
val (mailbox, message) = mailboxes.receiveFromFirst()
val (mailbox, message) = mailboxes.receiveFromFirst()
state.value = ReplySocketState.ProcessingRequest(mailbox, message.popPrefixAddress())
return message
}

override fun tryReceive(): Message? {
if (state.value !is ReplySocketState.Idle) return null
val maybeMailboxAndMessage = mailboxes.tryReceiveFromFirst()
return maybeMailboxAndMessage?.let { (mailbox, message) ->
state.value = ReplySocketState.ProcessingRequest(mailbox, message.popPrefixAddress())
return message
message
}
}

override suspend fun send(message: Message) {
awaitState { it is ReplySocketState.ProcessingRequest }
requestReplyLock.withLock {
val (peer, address) = state.value as ReplySocketState.ProcessingRequest
val (mailbox, address) = state.value as ReplySocketState.ProcessingRequest

message.pushPrefixAddress(address)
peer.sendChannel.send(CommandOrMessage(message))
state.value = ReplySocketState.Idle
}
message.pushPrefixAddress(address)
mailbox.sendChannel.send(CommandOrMessage(message))
state.value = ReplySocketState.Idle
}

override fun trySend(message: Message): Unit? {
if (state.value !is ReplySocketState.ProcessingRequest) return null
val (mailbox, address) = state.value as ReplySocketState.ProcessingRequest

message.pushPrefixAddress(address)
val result = mailbox.sendChannel.trySend(CommandOrMessage(message))
return result.getOrNull()
}
}

Expand Down
Loading