Skip to content

Commit

Permalink
Use heartbeat-based API model for server
Browse files Browse the repository at this point in the history
  • Loading branch information
toasterofbread committed Jun 24, 2024
1 parent 35275d8 commit fe62a0e
Show file tree
Hide file tree
Showing 8 changed files with 126 additions and 130 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import dev.toastbits.spms.client.cli.CommandLineClientMode
import dev.toastbits.spms.client.cli.SpMsCommandLineClientError
import dev.toastbits.spms.localisation.loc
import dev.toastbits.spms.socketapi.shared.SpMsSocketApi
import dev.toastbits.spms.server.CLIENT_HEARTBEAT_TARGET_PERIOD
import kotlin.time.*

private val SERVER_EVENT_TIMEOUT: Duration = with (Duration) { 10000.milliseconds }
Expand All @@ -22,35 +23,27 @@ class Poll: CommandLineClientMode("poll", { "TODO" }) {

log(currentContext.loc.cli.poll_polling_server_for_events)

var last_heartbeat: TimeMark = TimeSource.Monotonic.markNow()

while (true) {
delay(POLL_INTERVAL)

val wait_start: TimeMark = TimeSource.Monotonic.markNow()
var events: List<JsonElement>? = null

while (events == null && wait_start.elapsedNow() < SERVER_EVENT_TIMEOUT) {
val message: List<String>? = with (Duration) {
socket.recvStringMultipart(
(SERVER_EVENT_TIMEOUT - wait_start.elapsedNow()).inWholeMilliseconds.coerceAtLeast(1L).milliseconds
)?.let {
SpMsSocketApi.decode(it)
}
val message: List<String>? =
socket.recvStringMultipart(null)?.let {
SpMsSocketApi.decode(it)
}

events = message?.map {
Json.decodeFromString(it)
val events: List<JsonElement> =
message.orEmpty().map { part ->
Json.decodeFromString(part)
}
}

if (events == null) {
throw SpMsCommandLineClientError(currentContext.loc.cli.errServerDidNotSendEvents(SERVER_EVENT_TIMEOUT))
}

if (events.isNotEmpty()) {
println(events)
}

socket.sendStringMultipart(listOf(""))
if (last_heartbeat.elapsedNow() > CLIENT_HEARTBEAT_TARGET_PERIOD) {
socket.sendStringMultipart(listOf(""))
last_heartbeat = TimeSource.Monotonic.markNow()
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import dev.toastbits.spms.client.cli.SpMsCommandLineClientError
import dev.toastbits.spms.localisation.loc
import dev.toastbits.spms.server.PlayerOptions
import dev.toastbits.spms.server.SpMs
import dev.toastbits.spms.server.CLIENT_HEARTBEAT_TARGET_PERIOD
import dev.toastbits.spms.socketapi.parseSocketMessage
import dev.toastbits.spms.socketapi.player.PlayerAction
import dev.toastbits.spms.socketapi.shared.*
Expand All @@ -24,8 +25,6 @@ import kotlin.time.*
import gen.libmpv.LibMpv

private val SERVER_REPLY_TIMEOUT: Duration = with (Duration) { 2.seconds }
private val SERVER_EVENT_TIMEOUT: Duration = with (Duration) { 11.seconds }
private val POLL_INTERVAL: Duration = with (Duration) { 100.milliseconds }
private val CLIENT_REPLY_TIMEOUT: Duration = with (Duration) { 1.seconds }

private fun getClientName(): String =
Expand Down Expand Up @@ -198,8 +197,6 @@ class PlayerClient private constructor(val libmpv: LibMpv): Command(

while (!shutdown) {
try {
delay(POLL_INTERVAL)

// We don't actually care about the client handshake, it's just for consistency with the main server api
// val handshake_message: List<String> =
socket.recvStringMultipart(CLIENT_REPLY_TIMEOUT) ?: continue
Expand Down Expand Up @@ -271,10 +268,9 @@ class PlayerClient private constructor(val libmpv: LibMpv): Command(
log("Initial state: ${server_handshake.server_state}")

val message: MutableList<String> = mutableListOf()
var last_heartbeat: TimeMark = TimeSource.Monotonic.markNow()

while (!shutdown) {
// delay(POLL_INTERVAL)

val events: List<SpMsPlayerEvent> = socket.pollEvents()
for (event in events) {
println("Processing event $event")
Expand All @@ -287,44 +283,33 @@ class PlayerClient private constructor(val libmpv: LibMpv): Command(
message.add(Json.encodeToString(queued.second))
}
queued_messages.clear()

log("Sending messages: $message")
}
else {
else if (last_heartbeat.elapsedNow() >= CLIENT_HEARTBEAT_TARGET_PERIOD) {
message.add(" ")
}
else {
continue
}

log("Sending messages: $message")
socket.sendStringMultipart(message)
message.clear()

last_heartbeat = TimeSource.Monotonic.markNow()
}
}

private fun ZmqSocket.pollEvents(): List<SpMsPlayerEvent> {
val wait_start: TimeMark = TimeSource.Monotonic.markNow()
var events: List<SpMsPlayerEvent>? = null

while (events == null && wait_start.elapsedNow() < SERVER_EVENT_TIMEOUT) {
val message: List<String>? = with (Duration) {
recvStringMultipart(
(SERVER_EVENT_TIMEOUT - wait_start.elapsedNow()).inWholeMilliseconds.coerceAtLeast(1L).milliseconds
)
val events: List<SpMsPlayerEvent>? = recvStringMultipart(null)?.mapNotNull {
try {
Json.decodeFromString<SpMsPlayerEvent?>(it)
}

events = message?.mapNotNull {
try {
Json.decodeFromString<SpMsPlayerEvent?>(it)
}
catch (e: Throwable) {
RuntimeException("Parsing SpMsPlayerEvent failed $it", e).printStackTrace()
return emptyList()
}
catch (e: Throwable) {
RuntimeException("Parsing SpMsPlayerEvent failed $it", e).printStackTrace()
return emptyList()
}
}

if (events == null) {
throw SpMsCommandLineClientError(currentContext.loc.cli.errServerDidNotSendEvents(SERVER_EVENT_TIMEOUT))
}

return events.orEmpty()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,4 @@ interface CliLocalisation {
val poll_polling_server_for_events: String

fun errServerDidNotRespond(timeout: Duration): String
fun errServerDidNotSendEvents(timeout: Duration): String
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,4 @@ class CliLocalisationEn: CliLocalisation {

override fun errServerDidNotRespond(timeout: Duration): String =
"Server did not respond within timeout ($timeout)"
override fun errServerDidNotSendEvents(timeout: Duration): String =
"Server did not send events within timeout ($timeout)"
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,4 @@ class CliLocalisationJa: CliLocalisation {

override fun errServerDidNotRespond(timeout: Duration): String =
"タイムアウト($timeout)の内にサーバーから返信が来ませんでした"
override fun errServerDidNotSendEvents(timeout: Duration): String =
"タイムアウト($timeout)の内にサーバーからイベントが来ませんでした"
}
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ clikt.version=4.4.0
mediasession.version=0.1.1
ytm.version=0.2.1
ktor.version=3.0.0-beta-1
kjna.version=3b0e9cb762
kjna.version=0.0.4

# Nix
org.gradle.java.installations.fromEnv=JAVA_21_HOME,JAVA_22_HOME
Loading

0 comments on commit fe62a0e

Please sign in to comment.