Skip to content

Commit

Permalink
Improve HeadlessPlayer safety, use kotlin.time
Browse files Browse the repository at this point in the history
  • Loading branch information
toasterofbread committed May 17, 2024
1 parent 9c55876 commit f100b1f
Show file tree
Hide file tree
Showing 22 changed files with 186 additions and 137 deletions.
5 changes: 4 additions & 1 deletion src/commonMain/kotlin/cinterop/mpv/LibMpvClient.kt.disabled
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@ class mpv_event {
val event_id: Int get() = throw NotImplementedError()
}

abstract class LibMpvClient(val headless: Boolean = true): Player {
abstract class LibMpvClient(
val headless: Boolean = true,
playlist_auto_progress: Boolean = true
): Player {
companion object {
fun isAvailable(): Boolean = false
}
Expand Down
9 changes: 8 additions & 1 deletion src/commonMain/kotlin/cinterop/mpv/LibMpvClient.kt.enabled
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@ import cnames.structs.mpv_handle as MpvHandle
import libmpv.mpv_format as MpvFormat

@OptIn(ExperimentalForeignApi::class)
abstract class LibMpvClient(val headless: Boolean = true): Player {
abstract class LibMpvClient(
val headless: Boolean = true,
playlist_auto_progress: Boolean = true
): Player {
companion object {
fun isAvailable(): Boolean = true
}
Expand All @@ -32,6 +35,10 @@ abstract class LibMpvClient(val headless: Boolean = true): Player {
osd_level.value = 3
mpv_set_option(ctx, "osd-level", MPV_FORMAT_INT64, osd_level.ptr)
}

if (!playlist_auto_progress) {
mpv_set_option_string(ctx, "keep-open", "always")
}
}

val init_result: Int = mpv_initialize(ctx)
Expand Down
2 changes: 1 addition & 1 deletion src/commonMain/kotlin/cinterop/mpv/MpvClientImpl.kt
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ inline fun <reified T: CPointed> CPointer<*>?.pointedAs(): T =
this!!.reinterpret<T>().pointed

@OptIn(ExperimentalForeignApi::class)
abstract class MpvClientImpl(headless: Boolean = true): LibMpvClient(headless) {
abstract class MpvClientImpl(headless: Boolean = true, playlist_auto_progress: Boolean = true): LibMpvClient(headless = headless, playlist_auto_progress = playlist_auto_progress) {
companion object {
fun isAvailable(): Boolean = LibMpvClient.isAvailable()
}
Expand Down
5 changes: 3 additions & 2 deletions src/commonMain/kotlin/cinterop/zmq/ZmqRouter.kt
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import kotlinx.cinterop.cstr
import kotlinx.cinterop.toCValues
import libzmq.*
import spms.socketapi.shared.SpMsSocketApi
import kotlin.time.Duration

@OptIn(ExperimentalForeignApi::class)
abstract class ZmqRouter(mem_scope: MemScope) {
Expand Down Expand Up @@ -36,8 +37,8 @@ abstract class ZmqRouter(mem_scope: MemScope) {
socket.release()
}

protected fun recvMultipart(timeout_ms: Long?): Message? {
val parts: List<ByteArray> = socket.recvMultipart(timeout_ms) ?: return null
protected fun recvMultipart(timeout: Duration?): Message? {
val parts: List<ByteArray> = socket.recvMultipart(timeout) ?: return null

var client_id: ByteArray? = null
val message_parts: MutableList<String> = mutableListOf()
Expand Down
9 changes: 5 additions & 4 deletions src/commonMain/kotlin/cinterop/zmq/ZmqSocket.kt
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import platform.posix.memcpy
import spms.zmqPollerWait
import spms.socketapi.shared.SpMsSocketApi
import spms.socketapi.shared.SPMS_MESSAGE_MAX_SIZE
import kotlin.time.Duration

@OptIn(ExperimentalForeignApi::class)
class ZmqSocket(mem_scope: MemScope, type: Int, val is_binder: Boolean) {
Expand Down Expand Up @@ -88,14 +89,14 @@ class ZmqSocket(mem_scope: MemScope, type: Int, val is_binder: Boolean) {
zmq_ctx_destroy(context)
}

fun recvStringMultipart(timeout_ms: Long?): List<String>? {
val message: List<ByteArray> = recvMultipart(timeout_ms) ?: return null
fun recvStringMultipart(timeout: Duration?): List<String>? {
val message: List<ByteArray> = recvMultipart(timeout) ?: return null
return SpMsSocketApi.decode(message.map { it.decodeToString() })
}

fun recvMultipart(timeout_ms: Long?): List<ByteArray>? = memScoped {
fun recvMultipart(timeout: Duration?): List<ByteArray>? = memScoped {
val event: zmq_poller_event_t = alloc()
zmqPollerWait(poller, event.ptr, timeout_ms ?: ZMQ_NOBLOCK.toLong())
zmqPollerWait(poller, event.ptr, timeout?.inWholeMilliseconds ?: ZMQ_NOBLOCK.toLong())

if (event.events.toInt() != ZMQ_POLLIN) {
return null
Expand Down
3 changes: 2 additions & 1 deletion src/commonMain/kotlin/spms/client/cli/CommandLineClient.kt
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@ import spms.client.cli.modes.Interactive
import spms.client.cli.modes.Poll
import spms.client.cli.modes.Run
import toRed
import kotlin.time.Duration

const val SERVER_REPLY_TIMEOUT_MS: Long = 2000
val SERVER_REPLY_TIMEOUT: Duration = with (Duration) { 2.seconds }

private fun getClientName(): String =
"SpMs CLI"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,10 @@ abstract class CommandLineClientMode(
)
context.socket.sendStringMultipart(listOf(Json.encodeToString(handshake)))

val reply: List<String>? = context.socket.recvStringMultipart(SERVER_REPLY_TIMEOUT_MS)
val reply: List<String>? = context.socket.recvStringMultipart(SERVER_REPLY_TIMEOUT)

if (reply == null) {
throw SpMsCommandLineClientError(currentContext.loc.cli.errServerDidNotRespond(SERVER_REPLY_TIMEOUT_MS))
throw SpMsCommandLineClientError(currentContext.loc.cli.errServerDidNotRespond(SERVER_REPLY_TIMEOUT))
}

log(currentContext.loc.cli.handshake_reply_received + " " + reply.toString())
Expand Down
15 changes: 8 additions & 7 deletions src/commonMain/kotlin/spms/client/cli/modes/Poll.kt
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@ import libzmq.ZMQ_NOBLOCK
import spms.client.cli.CommandLineClientMode
import spms.client.cli.SpMsCommandLineClientError
import spms.localisation.loc
import kotlin.system.getTimeMillis
import spms.socketapi.shared.SpMsSocketApi
import kotlinx.cinterop.ExperimentalForeignApi
import kotlin.time.*

private const val SERVER_EVENT_TIMEOUT_MS: Long = 10000
private val SERVER_EVENT_TIMEOUT: Duration = with (Duration) { 10000.milliseconds }
private const val POLL_INTERVAL: Long = 100

@OptIn(ExperimentalForeignApi::class)
Expand All @@ -28,24 +28,25 @@ class Poll: CommandLineClientMode("poll", { "TODO" }) {
while (true) {
delay(POLL_INTERVAL)

val wait_end: Long = getTimeMillis() + SERVER_EVENT_TIMEOUT_MS
val wait_start: TimeMark = TimeSource.Monotonic.markNow()
var events: List<JsonElement>? = null

while (events == null && getTimeMillis() < wait_end) {
val message: List<String>? =
while (events == null && wait_start.elapsedNow() < SERVER_EVENT_TIMEOUT) {
val message: List<String>? = with (Duration) {
socket.recvStringMultipart(
(wait_end - getTimeMillis()).coerceAtLeast(ZMQ_NOBLOCK.toLong())
(SERVER_EVENT_TIMEOUT - wait_start.elapsedNow()).inWholeMilliseconds.coerceAtLeast(ZMQ_NOBLOCK.toLong()).milliseconds
)?.let {
SpMsSocketApi.decode(it)
}
}

events = message?.map {
Json.decodeFromString(it)
}
}

if (events == null) {
throw SpMsCommandLineClientError(currentContext.loc.cli.errServerDidNotSendEvents(SERVER_EVENT_TIMEOUT_MS))
throw SpMsCommandLineClientError(currentContext.loc.cli.errServerDidNotSendEvents(SERVER_EVENT_TIMEOUT))
}

if (events.isNotEmpty()) {
Expand Down
16 changes: 9 additions & 7 deletions src/commonMain/kotlin/spms/client/cli/modes/Run.kt
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import libzmq.ZMQ_NOBLOCK
import spms.socketapi.Action
import spms.socketapi.shared.SpMsSocketApi
import spms.client.cli.CommandLineClientMode
import spms.client.cli.SERVER_REPLY_TIMEOUT_MS
import spms.client.cli.SERVER_REPLY_TIMEOUT
import spms.localisation.loc
import spms.socketapi.shared.SpMsServerHandshake
import spms.socketapi.shared.SpMsActionReply
Expand All @@ -33,6 +33,7 @@ import spms.socketapi.server.ServerAction
import toRed
import kotlin.system.getTimeMillis
import kotlinx.cinterop.ExperimentalForeignApi
import kotlin.time.*

private fun CommandLineClientMode.jsonModeOption() =
option("-j", "--json").flag().help { context.loc.server_actions.option_help_json }
Expand Down Expand Up @@ -120,12 +121,13 @@ class ActionCommandLineClientMode(
val reply: SpMsActionReply? = executeActionOnSocket(
action,
parameter_values,
SERVER_REPLY_TIMEOUT_MS,
currentContext, silent = silent
SERVER_REPLY_TIMEOUT,
currentContext,
silent = silent
)

if (reply == null) {
throw CliktError(currentContext.loc.server_actions.replyNotReceived(SERVER_REPLY_TIMEOUT_MS).toRed())
throw CliktError(currentContext.loc.server_actions.replyNotReceived(SERVER_REPLY_TIMEOUT).toRed())
}
else if (reply.success) {
if (json_mode || parent_json_mode) {
Expand All @@ -152,7 +154,7 @@ class ActionCommandLineClientMode(
private fun executeActionOnSocket(
action: Action,
parameter_values: List<JsonPrimitive>,
reply_timeout_ms: Long?,
reply_timeout: Duration?,
context: Context,
silent: Boolean = false
): SpMsActionReply? {
Expand All @@ -168,7 +170,7 @@ class ActionCommandLineClientMode(
println(context.loc.server_actions.actionSentAndWaitingForReply(action.identifier))
}

val timeout_end: Long? = reply_timeout_ms?.let { getTimeMillis() + it }
val timeout_end: TimeMark? = reply_timeout?.let { TimeSource.Monotonic.markNow() + it }
do {
if (!reply.isNullOrEmpty()) {
if (!silent) {
Expand All @@ -190,7 +192,7 @@ class ActionCommandLineClientMode(
else if (!silent) {
println(context.loc.server_actions.receivedEmptyReplyFromServer(action.identifier))
}
} while (timeout_end == null || getTimeMillis() < timeout_end)
} while (timeout_end == null || timeout_end.hasNotPassedNow())

return null
}
Expand Down
34 changes: 18 additions & 16 deletions src/commonMain/kotlin/spms/client/player/PlayerClient.kt
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,17 @@ import spms.socketapi.parseSocketMessage
import spms.socketapi.player.PlayerAction
import spms.socketapi.shared.*
import kotlin.system.getTimeMillis
import kotlin.time.*

private const val SERVER_REPLY_TIMEOUT_MS: Long = 2000
private const val SERVER_EVENT_TIMEOUT_MS: Long = 11000
private const val POLL_INTERVAL_MS: Long = 100
private const val CLIENT_REPLY_TIMEOUT_MS: Long = 1000
private val SERVER_REPLY_TIMEOUT: Duration = with (Duration) { 2.seconds }
private val SERVER_EVENT_TIMEOUT: Duration = with (Duration) { 11.seconds }
private val POLL_INTERVAL: Duration = with (Duration) { 100.milliseconds }
private val CLIENT_REPLY_TIMEOUT: Duration = with (Duration) { 1.seconds }

private fun getClientName(): String =
"SpMs Player Client"

private abstract class PlayerImpl(headless: Boolean = true): MpvClientImpl(headless) {
private abstract class PlayerImpl(headless: Boolean = true): MpvClientImpl(headless = headless, playlist_auto_progress = false) {
override fun onEvent(event: SpMsPlayerEvent, clientless: Boolean) {
if (event.type == SpMsPlayerEvent.Type.READY_TO_PLAY) {
onReadyToPlay()
Expand Down Expand Up @@ -183,11 +184,11 @@ class PlayerClient private constructor(): Command(

while (!shutdown) {
try {
delay(POLL_INTERVAL_MS)
delay(POLL_INTERVAL)

// We don't actually care about the client handshake, it's just for consistency with the main server api
// val handshake_message: List<String> =
socket.recvStringMultipart(CLIENT_REPLY_TIMEOUT_MS) ?: continue
socket.recvStringMultipart(CLIENT_REPLY_TIMEOUT) ?: continue

val handshake_reply: SpMsServerHandshake =
SpMsServerHandshake(
Expand All @@ -203,7 +204,7 @@ class PlayerClient private constructor(): Command(
)

val message: List<String> =
socket.recvStringMultipart(CLIENT_REPLY_TIMEOUT_MS) ?: continue
socket.recvStringMultipart(CLIENT_REPLY_TIMEOUT) ?: continue

val reply: List<SpMsActionReply> =
parseSocketMessage(message) { action_name, action_params ->
Expand Down Expand Up @@ -241,9 +242,9 @@ class PlayerClient private constructor(): Command(
)
socket.sendStringMultipart(listOf(json.encodeToString(handshake)))

val reply: List<String>? = socket.recvStringMultipart(SERVER_REPLY_TIMEOUT_MS)
val reply: List<String>? = socket.recvStringMultipart(SERVER_REPLY_TIMEOUT)
if (reply == null) {
throw SpMsCommandLineClientError(currentContext.loc.cli.errServerDidNotRespond(SERVER_REPLY_TIMEOUT_MS))
throw SpMsCommandLineClientError(currentContext.loc.cli.errServerDidNotRespond(SERVER_REPLY_TIMEOUT))
}

val server_handshake: SpMsServerHandshake = Json.decodeFromString(reply.first())
Expand All @@ -258,7 +259,7 @@ class PlayerClient private constructor(): Command(
val message: MutableList<String> = mutableListOf()

while (!shutdown) {
// delay(POLL_INTERVAL_MS)
// delay(POLL_INTERVAL)

val events: List<SpMsPlayerEvent> = socket.pollEvents()
for (event in events) {
Expand All @@ -285,14 +286,15 @@ class PlayerClient private constructor(): Command(
}

private fun ZmqSocket.pollEvents(): List<SpMsPlayerEvent> {
val wait_end: Long = getTimeMillis() + SERVER_EVENT_TIMEOUT_MS
val wait_start: TimeMark = TimeSource.Monotonic.markNow()
var events: List<SpMsPlayerEvent>? = null

while (events == null && getTimeMillis() < wait_end) {
val message: List<String>? =
while (events == null && wait_start.elapsedNow() < SERVER_EVENT_TIMEOUT) {
val message: List<String>? = with (Duration) {
recvStringMultipart(
(wait_end - getTimeMillis()).coerceAtLeast(ZMQ_NOBLOCK.toLong())
(SERVER_EVENT_TIMEOUT - wait_start.elapsedNow()).inWholeMilliseconds.coerceAtLeast(ZMQ_NOBLOCK.toLong()).milliseconds
)
}

events = message?.mapNotNull {
try {
Expand All @@ -306,7 +308,7 @@ class PlayerClient private constructor(): Command(
}

if (events == null) {
throw SpMsCommandLineClientError(currentContext.loc.cli.errServerDidNotSendEvents(SERVER_EVENT_TIMEOUT_MS))
throw SpMsCommandLineClientError(currentContext.loc.cli.errServerDidNotSendEvents(SERVER_EVENT_TIMEOUT))
}

return events.orEmpty()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package spms.localisation.strings

import kotlin.time.Duration

interface CliLocalisation {
val bug_report_notice: String

Expand All @@ -18,8 +20,8 @@ interface CliLocalisation {
val status_key_state: String
val status_key_is_playing: String
val status_key_current_item_index: String
val status_key_current_position_ms: String
val status_key_duration_ms: String
val status_key_current_position: String
val status_key_duration: String
val status_key_repeat_mode: String

fun connectingToSocket(address: String): String
Expand All @@ -30,6 +32,6 @@ interface CliLocalisation {

val poll_polling_server_for_events: String

fun errServerDidNotRespond(timeout_ms: Long): String
fun errServerDidNotSendEvents(timeout_ms: Long): String
fun errServerDidNotRespond(timeout: Duration): String
fun errServerDidNotSendEvents(timeout: Duration): String
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package spms.localisation.strings

import spms.server.BUG_REPORT_URL
import kotlin.time.Duration

class CliLocalisationEn: CliLocalisation {
override val bug_report_notice: String = "Report bugs at $BUG_REPORT_URL"
Expand All @@ -25,8 +26,8 @@ class CliLocalisationEn: CliLocalisation {
override val status_key_state: String = "Playback state"
override val status_key_is_playing: String = "Is playing"
override val status_key_current_item_index: String = "Current item index"
override val status_key_current_position_ms: String = "Item position (ms)"
override val status_key_duration_ms: String = "Item duration (ms)"
override val status_key_current_position: String = "Item position"
override val status_key_duration: String = "Item duration"
override val status_key_repeat_mode: String = "Repeat mode"

override fun connectingToSocket(address: String): String =
Expand All @@ -38,8 +39,8 @@ class CliLocalisationEn: CliLocalisation {

override val poll_polling_server_for_events: String = "Polling server for events..."

override fun errServerDidNotRespond(timeout_ms: Long): String =
"Server did not respond within timeout (${timeout_ms}ms)"
override fun errServerDidNotSendEvents(timeout_ms: Long): String =
"Server did not send events within timeout (${timeout_ms}ms)"
override fun errServerDidNotRespond(timeout: Duration): String =
"Server did not respond within timeout ($timeout)"
override fun errServerDidNotSendEvents(timeout: Duration): String =
"Server did not send events within timeout ($timeout)"
}
Loading

0 comments on commit f100b1f

Please sign in to comment.