diff --git a/app/src/commonMain/kotlin/dev/toastbits/spms/client/cli/modes/Poll.kt b/app/src/commonMain/kotlin/dev/toastbits/spms/client/cli/modes/Poll.kt index b224607..f4b166f 100644 --- a/app/src/commonMain/kotlin/dev/toastbits/spms/client/cli/modes/Poll.kt +++ b/app/src/commonMain/kotlin/dev/toastbits/spms/client/cli/modes/Poll.kt @@ -8,6 +8,7 @@ import dev.toastbits.spms.client.cli.CommandLineClientMode import dev.toastbits.spms.client.cli.SpMsCommandLineClientError import dev.toastbits.spms.localisation.loc import dev.toastbits.spms.socketapi.shared.SpMsSocketApi +import dev.toastbits.spms.server.CLIENT_HEARTBEAT_TARGET_PERIOD import kotlin.time.* private val SERVER_EVENT_TIMEOUT: Duration = with (Duration) { 10000.milliseconds } @@ -22,35 +23,27 @@ class Poll: CommandLineClientMode("poll", { "TODO" }) { log(currentContext.loc.cli.poll_polling_server_for_events) + var last_heartbeat: TimeMark = TimeSource.Monotonic.markNow() + while (true) { - delay(POLL_INTERVAL) - - val wait_start: TimeMark = TimeSource.Monotonic.markNow() - var events: List? = null - - while (events == null && wait_start.elapsedNow() < SERVER_EVENT_TIMEOUT) { - val message: List? = with (Duration) { - socket.recvStringMultipart( - (SERVER_EVENT_TIMEOUT - wait_start.elapsedNow()).inWholeMilliseconds.coerceAtLeast(1L).milliseconds - )?.let { - SpMsSocketApi.decode(it) - } + val message: List? = + socket.recvStringMultipart(null)?.let { + SpMsSocketApi.decode(it) } - events = message?.map { - Json.decodeFromString(it) + val events: List = + message.orEmpty().map { part -> + Json.decodeFromString(part) } - } - - if (events == null) { - throw SpMsCommandLineClientError(currentContext.loc.cli.errServerDidNotSendEvents(SERVER_EVENT_TIMEOUT)) - } if (events.isNotEmpty()) { println(events) } - socket.sendStringMultipart(listOf("")) + if (last_heartbeat.elapsedNow() > CLIENT_HEARTBEAT_TARGET_PERIOD) { + socket.sendStringMultipart(listOf("")) + last_heartbeat = TimeSource.Monotonic.markNow() + } } } diff --git a/app/src/commonMain/kotlin/dev/toastbits/spms/client/player/PlayerClient.kt b/app/src/commonMain/kotlin/dev/toastbits/spms/client/player/PlayerClient.kt index e502694..cdd40b2 100644 --- a/app/src/commonMain/kotlin/dev/toastbits/spms/client/player/PlayerClient.kt +++ b/app/src/commonMain/kotlin/dev/toastbits/spms/client/player/PlayerClient.kt @@ -14,6 +14,7 @@ import dev.toastbits.spms.client.cli.SpMsCommandLineClientError import dev.toastbits.spms.localisation.loc import dev.toastbits.spms.server.PlayerOptions import dev.toastbits.spms.server.SpMs +import dev.toastbits.spms.server.CLIENT_HEARTBEAT_TARGET_PERIOD import dev.toastbits.spms.socketapi.parseSocketMessage import dev.toastbits.spms.socketapi.player.PlayerAction import dev.toastbits.spms.socketapi.shared.* @@ -24,8 +25,6 @@ import kotlin.time.* import gen.libmpv.LibMpv private val SERVER_REPLY_TIMEOUT: Duration = with (Duration) { 2.seconds } -private val SERVER_EVENT_TIMEOUT: Duration = with (Duration) { 11.seconds } -private val POLL_INTERVAL: Duration = with (Duration) { 100.milliseconds } private val CLIENT_REPLY_TIMEOUT: Duration = with (Duration) { 1.seconds } private fun getClientName(): String = @@ -198,8 +197,6 @@ class PlayerClient private constructor(val libmpv: LibMpv): Command( while (!shutdown) { try { - delay(POLL_INTERVAL) - // We don't actually care about the client handshake, it's just for consistency with the main server api // val handshake_message: List = socket.recvStringMultipart(CLIENT_REPLY_TIMEOUT) ?: continue @@ -271,10 +268,9 @@ class PlayerClient private constructor(val libmpv: LibMpv): Command( log("Initial state: ${server_handshake.server_state}") val message: MutableList = mutableListOf() + var last_heartbeat: TimeMark = TimeSource.Monotonic.markNow() while (!shutdown) { -// delay(POLL_INTERVAL) - val events: List = socket.pollEvents() for (event in events) { println("Processing event $event") @@ -287,44 +283,33 @@ class PlayerClient private constructor(val libmpv: LibMpv): Command( message.add(Json.encodeToString(queued.second)) } queued_messages.clear() - - log("Sending messages: $message") } - else { + else if (last_heartbeat.elapsedNow() >= CLIENT_HEARTBEAT_TARGET_PERIOD) { message.add(" ") } + else { + continue + } + log("Sending messages: $message") socket.sendStringMultipart(message) message.clear() + + last_heartbeat = TimeSource.Monotonic.markNow() } } private fun ZmqSocket.pollEvents(): List { - val wait_start: TimeMark = TimeSource.Monotonic.markNow() - var events: List? = null - - while (events == null && wait_start.elapsedNow() < SERVER_EVENT_TIMEOUT) { - val message: List? = with (Duration) { - recvStringMultipart( - (SERVER_EVENT_TIMEOUT - wait_start.elapsedNow()).inWholeMilliseconds.coerceAtLeast(1L).milliseconds - ) + val events: List? = recvStringMultipart(null)?.mapNotNull { + try { + Json.decodeFromString(it) } - - events = message?.mapNotNull { - try { - Json.decodeFromString(it) - } - catch (e: Throwable) { - RuntimeException("Parsing SpMsPlayerEvent failed $it", e).printStackTrace() - return emptyList() - } + catch (e: Throwable) { + RuntimeException("Parsing SpMsPlayerEvent failed $it", e).printStackTrace() + return emptyList() } } - if (events == null) { - throw SpMsCommandLineClientError(currentContext.loc.cli.errServerDidNotSendEvents(SERVER_EVENT_TIMEOUT)) - } - return events.orEmpty() } } diff --git a/app/src/commonMain/kotlin/dev/toastbits/spms/localisation/strings/CliLocalisation.kt b/app/src/commonMain/kotlin/dev/toastbits/spms/localisation/strings/CliLocalisation.kt index e365bfe..8a446a4 100644 --- a/app/src/commonMain/kotlin/dev/toastbits/spms/localisation/strings/CliLocalisation.kt +++ b/app/src/commonMain/kotlin/dev/toastbits/spms/localisation/strings/CliLocalisation.kt @@ -28,5 +28,4 @@ interface CliLocalisation { val poll_polling_server_for_events: String fun errServerDidNotRespond(timeout: Duration): String - fun errServerDidNotSendEvents(timeout: Duration): String } diff --git a/app/src/commonMain/kotlin/dev/toastbits/spms/localisation/strings/CliLocalisationEn.kt b/app/src/commonMain/kotlin/dev/toastbits/spms/localisation/strings/CliLocalisationEn.kt index 9dc5fd3..dce01cd 100644 --- a/app/src/commonMain/kotlin/dev/toastbits/spms/localisation/strings/CliLocalisationEn.kt +++ b/app/src/commonMain/kotlin/dev/toastbits/spms/localisation/strings/CliLocalisationEn.kt @@ -36,6 +36,4 @@ class CliLocalisationEn: CliLocalisation { override fun errServerDidNotRespond(timeout: Duration): String = "Server did not respond within timeout ($timeout)" - override fun errServerDidNotSendEvents(timeout: Duration): String = - "Server did not send events within timeout ($timeout)" } diff --git a/app/src/commonMain/kotlin/dev/toastbits/spms/localisation/strings/CliLocalisationJa.kt b/app/src/commonMain/kotlin/dev/toastbits/spms/localisation/strings/CliLocalisationJa.kt index 190c373..9c4a250 100644 --- a/app/src/commonMain/kotlin/dev/toastbits/spms/localisation/strings/CliLocalisationJa.kt +++ b/app/src/commonMain/kotlin/dev/toastbits/spms/localisation/strings/CliLocalisationJa.kt @@ -36,6 +36,4 @@ class CliLocalisationJa: CliLocalisation { override fun errServerDidNotRespond(timeout: Duration): String = "タイムアウト($timeout)の内にサーバーから返信が来ませんでした" - override fun errServerDidNotSendEvents(timeout: Duration): String = - "タイムアウト($timeout)の内にサーバーからイベントが来ませんでした" } diff --git a/gradle.properties b/gradle.properties index c64fd4f..ae473f3 100644 --- a/gradle.properties +++ b/gradle.properties @@ -14,7 +14,7 @@ clikt.version=4.4.0 mediasession.version=0.1.1 ytm.version=0.2.1 ktor.version=3.0.0-beta-1 -kjna.version=3b0e9cb762 +kjna.version=0.0.4 # Nix org.gradle.java.installations.fromEnv=JAVA_21_HOME,JAVA_22_HOME diff --git a/library/src/commonMain/kotlin/dev/toastbits/spms/server/SpMs.kt b/library/src/commonMain/kotlin/dev/toastbits/spms/server/SpMs.kt index b1dbdc7..6cbc5f9 100644 --- a/library/src/commonMain/kotlin/dev/toastbits/spms/server/SpMs.kt +++ b/library/src/commonMain/kotlin/dev/toastbits/spms/server/SpMs.kt @@ -22,11 +22,13 @@ import dev.toastbits.spms.localisation.SpMsLocalisation import dev.toastbits.spms.getMachineId import dev.toastbits.spms.getDeviceName import dev.toastbits.spms.createLibMpv +import dev.toastbits.spms.ReentrantLock import kotlin.experimental.ExperimentalNativeApi import kotlin.time.* import gen.libmpv.LibMpv -private val CLIENT_REPLY_TIMEOUT: Duration = with (Duration) { 100.milliseconds } +val CLIENT_HEARTBEAT_MAX_PERIOD: Duration = with (Duration) { 10.seconds } +val CLIENT_HEARTBEAT_TARGET_PERIOD: Duration = with (Duration) { 5.seconds } open class SpMs( val headless: Boolean = !LibMpv.isAvailable(), @@ -34,10 +36,12 @@ open class SpMs( ): ZmqRouter() { private var item_durations: MutableMap = mutableMapOf() private val item_durations_channel: Channel = Channel() + private val time_source: TimeSource = TimeSource.Monotonic private var executing_client_id: Int? = null private var player_event_inc: Int = 0 private val player_events: MutableList = mutableListOf() + private val player_events_lock: ReentrantLock = ReentrantLock() private val clients: MutableList = mutableListOf() private var playback_waiting_for_clients: Boolean = false @@ -88,10 +92,10 @@ open class SpMs( fun poll(client_reply_attempts: Int) { - // Process stray messages (hopefully client handshakes) + // Process stray messages (client handshakes or events) while (true) { val message: ZmqMessage = recvMultipart(null) ?: break - println("Got stray message before polling") + // println("Got stray message before polling") onClientMessage(message) } @@ -100,75 +104,74 @@ open class SpMs( while (client_i >= 0) { val client: SpMsClient = clients[client_i--] - val message_parts: MutableList = mutableListOf() - val events: List = getEventsForClient(client) - if (events.isEmpty()) { - message_parts.add("null") - } - else { - // Add events to message, then consume - for (event in events) { - message_parts.add(Json.encodeToString(event)) + if (events.isNotEmpty()) { + val message_parts: List = events.map { event -> + val encoded_event: String = Json.encodeToString(event) client.event_head = maxOf(event.event_id, client.event_head) event.onConsumedByClient() if (event.pending_client_amount <= 0) { - player_events.remove(event) + player_events_lock.withLock { + player_events.remove(event) + } } - } - } - - sendMultipart(client.createMessage(message_parts)) - - if (events.isNotEmpty()) { - println("Sent events $events to client $client") - } - - // Wait for client to reply - val wait_start: TimeMark = TimeSource.Monotonic.markNow() - var client_reply: ZmqMessage? = null - while (true) { - val remaining: Duration = CLIENT_REPLY_TIMEOUT - wait_start.elapsedNow() - if (remaining <= Duration.ZERO) { - break + return@map encoded_event } - val message: ZmqMessage = recvMultipart(remaining) ?: continue - - if (message.client_id.contentHashCode() == client.id) { - client_reply = message - break - } - else { - // Handle connections from other clients - println("Got stray message during polling") - onClientMessage(message) - } - } - - // Client did not reply to message within timeout - if (client_reply == null) { - if (++client.failed_connection_attempts >= client_reply_attempts) { - onClientTimedOut(client, client_reply_attempts) - } - continue + sendMultipart(client.createMessage(message_parts)) + println("Sent events $events to client $client") } - client.failed_connection_attempts = 0 - - check(executing_client_id == null) - executing_client_id = client.id - - try { - processClientMessage(client_reply, client) - } - catch (e: Throwable) { - RuntimeException("Exception while processing reply from $client", e).printStackTrace() + if (client.last_heartbeat.elapsedNow() > CLIENT_HEARTBEAT_MAX_PERIOD) { + onClientTimedOut(client, CLIENT_HEARTBEAT_MAX_PERIOD) } - executing_client_id = null + // // Wait for client to reply + // val wait_start: TimeMark = time_source.markNow() + // var client_reply: ZmqMessage? = null + + // while (true) { + // val remaining: Duration = CLIENT_REPLY_TIMEOUT - wait_start.elapsedNow() + // if (remaining <= Duration.ZERO) { + // break + // } + + // val message: ZmqMessage = recvMultipart(remaining) ?: continue + + // if (message.client_id.contentHashCode() == client.id) { + // client_reply = message + // break + // } + // else { + // // Handle connections from other clients + // println("Got stray message during polling") + // onClientMessage(message) + // } + // } + + // // Client did not reply to message within timeout + // if (client_reply == null) { + // if (++client.failed_connection_attempts >= client_reply_attempts) { + // onClientTimedOut(client, client_reply_attempts) + // } + // continue + // } + + // client.failed_connection_attempts = 0 + + // check(executing_client_id == null) + // executing_client_id = client.id + + // try { + // processClientMessage(client_reply, client) + // } + // catch (e: Throwable) { + // RuntimeException("Exception while processing reply from $client", e).printStackTrace() + // } + + // executing_client_id = null } } @@ -198,9 +201,14 @@ open class SpMs( } private fun processClientMessage(message: ZmqMessage, client: SpMsClient) { + println("GOT MSG FROM CLIENT ${message.parts.toList()}") val reply: List = processClientActions(message.parts, client) - if (reply.isNotEmpty()) { + println("Sending reply to client: $reply") + if (reply.isEmpty()) { + sendMultipart(client.createMessage(listOf("REPLY TO ${message.parts.toList()}"))) + } + else { sendMultipart(client.createMessage(listOf(Json.encodeToString(reply)))) } } @@ -291,15 +299,17 @@ open class SpMs( client_amount = clients.count { it.type.receivesEvents() } ) - val i: MutableIterator = player_events.iterator() - while (i.hasNext()) { - val other: SpMsPlayerEvent = i.next() - if (event.overrides(other)) { - i.remove() + player_events_lock.withLock { + val i: MutableIterator = player_events.iterator() + while (i.hasNext()) { + val other: SpMsPlayerEvent = i.next() + if (event.overrides(other)) { + i.remove() + } } - } - player_events.add(event) + player_events.add(event) + } } protected open fun onPlayerShutdown() {} @@ -321,7 +331,18 @@ open class SpMs( var client: SpMsClient? = clients.firstOrNull { it.id == id } if (client != null) { - println("Got stray message from connected client $client, ignoring: ${message.parts.toList()}") + check(executing_client_id == null) + executing_client_id = client.id + client.last_heartbeat = time_source.markNow() + + try { + processClientMessage(message, client) + } + catch (e: Throwable) { + RuntimeException("Exception while processing reply from $client", e).printStackTrace() + } + + executing_client_id = null return } else if (content == null) { @@ -347,7 +368,8 @@ open class SpMs( machine_id = client_handshake.machine_id, player_port = client_handshake.player_port ), - player_event_inc + player_event_inc, + last_heartbeat = time_source.markNow() ) val action_replies: List? @@ -390,9 +412,9 @@ open class SpMs( } } - private fun onClientTimedOut(client: SpMsClient, attempts: Int) { + private fun onClientTimedOut(client: SpMsClient, timeout: Duration) { clients.remove(client) - println("$client failed to respond after $attempts attempts") + println("$client failed to provide heartbeat after $timeout") if ( client.type.playsAudio() diff --git a/library/src/commonMain/kotlin/dev/toastbits/spms/server/SpMsClient.kt b/library/src/commonMain/kotlin/dev/toastbits/spms/server/SpMsClient.kt index 239f056..4ac9d28 100644 --- a/library/src/commonMain/kotlin/dev/toastbits/spms/server/SpMsClient.kt +++ b/library/src/commonMain/kotlin/dev/toastbits/spms/server/SpMsClient.kt @@ -4,11 +4,13 @@ import dev.toastbits.spms.socketapi.shared.SpMsClientID import dev.toastbits.spms.socketapi.shared.SpMsClientInfo import dev.toastbits.spms.socketapi.shared.SpMsClientType import dev.toastbits.spms.socketapi.shared.SpMsLanguage +import kotlin.time.TimeMark internal class SpMsClient( val id_bytes: ByteArray, val info: SpMsClientInfo, - var event_head: Int + var event_head: Int, + var last_heartbeat: TimeMark ) { val name: String get() = info.name val type: SpMsClientType get() = info.type @@ -16,8 +18,7 @@ internal class SpMsClient( val id: SpMsClientID = id_bytes.contentHashCode() var ready_to_play: Boolean = false - var failed_connection_attempts: Int = 0 override fun toString(): String = - "Client(id=$id, name=$name, type=$type, language=$language, event_head=$event_head)" + "Client(id=$id, name=$name, type=$type, language=$language, event_head=$event_head, last_heartbeat=$last_heartbeat)" }