Skip to content

Commit

Permalink
Use duration from client
Browse files Browse the repository at this point in the history
  • Loading branch information
toasterofbread committed Dec 4, 2023
1 parent 29aa8dc commit 890d746
Show file tree
Hide file tree
Showing 7 changed files with 74 additions and 108 deletions.
2 changes: 1 addition & 1 deletion src/nativeMain/kotlin/spms/client/player/PlayerClient.kt
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ class PlayerClient private constructor(): Command(
}

override fun onReadyToPlay() {
queued_messages.add("readyToPlay" to listOf(JsonPrimitive(current_item_index), JsonPrimitive(getItem())))
queued_messages.add("readyToPlay" to listOf(JsonPrimitive(current_item_index), JsonPrimitive(getItem()), JsonPrimitive(duration_ms)))
}

val stream_url: String get() = "https://www.youtube.com/watch?v="//stream_provider_server.getStreamUrl()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,4 +63,5 @@ interface ServerActionLocalisation {
val ready_to_play_help: String
val ready_to_play_param_item_index: String
val ready_to_play_param_item_id: String
val ready_to_play_param_item_duration_ms: String
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,9 @@ class ServerActionLocalisationEn: ServerActionLocalisation {
override val status_help: String = "Get detailed information about the server's current status"
override val status_output_start: String = "Server status"

override val ready_to_play_name: String = "サーバーに再生準備完了と知らせる"
override val ready_to_play_help: String = "クライアントが再生を始める準備ができたとサーバーに知らせる"
override val ready_to_play_param_item_index: String = "再生準備が完了したアイテムのインデックス"
override val ready_to_play_param_item_id: String = "再生準備が完了したアイテムのID"
override val ready_to_play_name: String = "Notify ready to play"
override val ready_to_play_help: String = "Notify server that client is ready to play the current item"
override val ready_to_play_param_item_index: String = "Index of the current item"
override val ready_to_play_param_item_id: String = "ID of the current item"
override val ready_to_play_param_item_duration_ms: String = "Duration (ms) of the current item"
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,9 @@ class ServerActionLocalisationJa: ServerActionLocalisation {
override val status_help: String = "サーバーの現在の状態を表示"
override val status_output_start: String = "サーバー状態"

override val ready_to_play_name: String = ""
override val ready_to_play_help: String
get() = TODO("Not yet implemented")
override val ready_to_play_param_item_index: String
get() = TODO("Not yet implemented")
override val ready_to_play_param_item_id: String
get() = TODO("Not yet implemented")
override val ready_to_play_name: String = "サーバーに再生準備完了と知らせる"
override val ready_to_play_help: String = "クライアントが再生を始める準備ができたとサーバーに知らせる"
override val ready_to_play_param_item_index: String = "再生準備が完了したアイテムのインデックス"
override val ready_to_play_param_item_id: String = "再生準備が完了したアイテムのID"
override val ready_to_play_param_item_duration_ms: String = "再生準備が完了したアイテムの長さ(ミリ秒)"
}
119 changes: 27 additions & 92 deletions src/nativeMain/kotlin/spms/player/HeadlessPlayer.kt
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,22 @@ package spms.player
import kotlinx.atomicfu.locks.ReentrantLock
import kotlinx.atomicfu.locks.withLock
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Deferred
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.IO
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.async
import kotlinx.coroutines.cancelChildren
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.serialization.json.JsonPrimitive
import kotlin.system.getTimeNanos

private const val ITEMS_TO_LOAD_IN_ADVANCE: Int = 0

abstract class HeadlessPlayer(private val enable_logging: Boolean = true): Player {
protected abstract fun getCachedItemDuration(item_id: String): Long?
protected abstract suspend fun loadItemDuration(item_id: String): Long
fun onDurationLoaded(item_id: String, item_duration_ms: Long) {
if (item_id == getItem()) {
onEvent(PlayerEvent.PropertyChanged("duration_ms", JsonPrimitive(item_duration_ms)))
}
}

override fun release() {}

private var _state: Player.State = Player.State.IDLE
Expand All @@ -39,7 +41,7 @@ abstract class HeadlessPlayer(private val enable_logging: Boolean = true): Playe
get() =
if (is_playing) (getTimeNanos() - playback_mark) / 1000000
else playback_mark
final override val duration_ms: Long get() = queue.getOrNull(current_item_index)?.let { item_durations[it] } ?: 0
final override val duration_ms: Long get() = queue.getOrNull(current_item_index)?.let { getCachedItemDuration(it) } ?: 0
final override var repeat_mode: Player.RepeatMode = Player.RepeatMode.NONE
private set
final override var volume: Double = 1.0
Expand All @@ -50,10 +52,6 @@ abstract class HeadlessPlayer(private val enable_logging: Boolean = true): Playe
private val playback_scope: CoroutineScope = CoroutineScope(Dispatchers.Default)
private var playback_mark: Long = 0

private val load_scope: CoroutineScope = CoroutineScope(SupervisorJob())
private val duration_loaders: MutableMap<String, Deferred<Long>> = mutableMapOf()
private val item_durations: MutableMap<String, Long> = mutableMapOf()

private val playback_lock: ReentrantLock = ReentrantLock()
private inline fun <T> withLock(block: () -> T): T = playback_lock.withLock(block)

Expand Down Expand Up @@ -84,70 +82,11 @@ abstract class HeadlessPlayer(private val enable_logging: Boolean = true): Playe

val item_id: String = queue.getOrNull(to) ?: return

val duration: Long? = item_durations[item_id]
val duration: Long? = getCachedItemDuration(item_id)
onEvent(PlayerEvent.PropertyChanged("duration_ms", JsonPrimitive(duration ?: 0)))
}

@Suppress("DeferredResultUnused")
private fun onQueueOrPositionChanged() {
// log("onQueueOrPositionChanged() start")
//
// val load_range: IntRange = current_item_index .. minOf(current_item_index + ITEMS_TO_LOAD_IN_ADVANCE, queue.size - 1)
//
// for ((i, item_id) in queue.withIndex()) {
// if (item_streams[item_id] != null) {
// continue
// }
//
// val loader: Job? = stream_loaders[item_id]
// if (i in load_range) {
// if (loader == null) {
// loadDuration(item_id)
// }
// }
// else {
// if (loader != null) {
// loader.cancel()
// stream_loaders.remove(item_id)
// }
// }
// }
//
// log("onQueueOrPositionChanged() end")
}

private fun loadDuration(item_id: String): Deferred<Long> {
var loader: Deferred<Long>? = duration_loaders[item_id]

if (loader == null) {
loader = load_scope.async(Dispatchers.IO) {
log("Loading duration for item '$item_id'")
val duration: Long
try {
duration = VideoInfoProvider.getVideoDuration(item_id)
}
catch (e: Throwable) {
val exception = RuntimeException("Exception during getVideoDuration($item_id)", e)
exception.printStackTrace()
throw exception
}

log("Loaded duration for item '$item_id': $duration")
withLock {
@Suppress("DeferredResultUnused")
duration_loaders.remove(item_id)

item_durations[item_id] = duration

if (item_id == queue.getOrNull(current_item_index)) {
onEvent(PlayerEvent.PropertyChanged("duration_ms", JsonPrimitive(duration)), clientless = true)
}
}
return@async duration
}
duration_loaders[item_id] = loader
}
return loader
}

override fun play() {
Expand All @@ -171,15 +110,12 @@ abstract class HeadlessPlayer(private val enable_logging: Boolean = true): Playe
playback_mark = getTimeNanos() - current_position

val item_id: String = queue[current_item_index]
val existing_duration: Long? = item_durations[item_id]
var duration: Long? = getCachedItemDuration(item_id)

val duration_job: Deferred<Long>?
if (existing_duration == null) {
duration_job = loadDuration(item_id)
if (duration == null) {
state = Player.State.BUFFERING
}
else {
duration_job = null
state = Player.State.READY
is_playing = true
onEvent(PlayerEvent.PropertyChanged("is_playing", JsonPrimitive(true)))
Expand All @@ -188,10 +124,18 @@ abstract class HeadlessPlayer(private val enable_logging: Boolean = true): Playe
log("play() $item_id: Launching timer")

playback_scope.launch {
val duration: Long?
if (duration_job != null) {
log("play() $item_id: Awaiting duration job")
duration = duration_job.await()
if (duration == null) {
log("play() $item_id: loadItemDuration()")

duration = loadItemDuration(item_id)

if (duration == null) {
state = Player.State.IDLE
is_playing = false
playback_mark = 0
println("play(): Duration is null, cannot play")
TODO()
}

withLock {
state = Player.State.READY
Expand All @@ -201,25 +145,16 @@ abstract class HeadlessPlayer(private val enable_logging: Boolean = true): Playe
}
else {
log("play() $item_id: Using existing duration")
duration = existing_duration
}

if (duration == null) {
state = Player.State.IDLE
is_playing = false
playback_mark = 0
println("play(): Duration is null, cannot play")
TODO()
}

log("play() will wait for ${duration - current_position}ms (${duration} $current_position)")
delay(duration - current_position)
log("play() will wait for ${duration!! - current_position}ms (${duration} $current_position)")
delay(duration!! - current_position)
log("play() resumed after delay")

withLock {
is_playing = false
state = Player.State.IDLE
playback_mark = duration
playback_mark = duration!!
onEvent(PlayerEvent.PropertyChanged("is_playing", JsonPrimitive(false)))
onItemPlaybackEnded()
}
Expand Down
26 changes: 25 additions & 1 deletion src/nativeMain/kotlin/spms/server/SpMs.kt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ import cinterop.mpv.getCurrentStatusJson
import cinterop.zmq.ZmqRouter
import kotlinx.cinterop.ExperimentalForeignApi
import kotlinx.cinterop.MemScope
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import kotlinx.serialization.Serializable
import kotlinx.serialization.encodeToString
import kotlinx.serialization.json.Json
Expand All @@ -27,9 +30,23 @@ class SpMs(mem_scope: MemScope, headless: Boolean = false, enable_gui: Boolean):
@Serializable
data class ActionReply(val success: Boolean, val error: String? = null, val error_cause: String? = null, val result: JsonElement? = null)

private var item_durations: MutableMap<String, Long> = mutableMapOf()
private val item_durations_channel: Channel<Unit> = Channel()

val player: Player =
if (headless)
object : HeadlessPlayer() {
override fun getCachedItemDuration(item_id: String): Long? = item_durations[item_id]

override suspend fun loadItemDuration(item_id: String): Long {
var cached: Long? = item_durations[item_id]
while (cached == null) {
item_durations_channel.receive()
cached = item_durations[item_id]
}
return cached
}

override fun onEvent(event: PlayerEvent, clientless: Boolean) = onPlayerEvent(event, clientless)
override fun onShutdown() = onPlayerShutdown()
}
Expand Down Expand Up @@ -173,7 +190,7 @@ class SpMs(mem_scope: MemScope, headless: Boolean = false, enable_gui: Boolean):
return !player_shut_down
}

fun onClientReadyToPlay(client_id: SpMsClientID, item_index: Int, item_id: String) {
fun onClientReadyToPlay(client_id: SpMsClientID, item_index: Int, item_id: String, item_duration_ms: Long) {
if (!playback_waiting_for_clients) {
return
}
Expand All @@ -185,6 +202,13 @@ class SpMs(mem_scope: MemScope, headless: Boolean = false, enable_gui: Boolean):
return
}

item_durations[item_id] = item_duration_ms
if (player is HeadlessPlayer) {
player.onDurationLoaded(item_id, item_duration_ms)
}

item_durations_channel.trySend(Unit)

if (ready_client.ready_to_play) {
return
}
Expand Down
13 changes: 10 additions & 3 deletions src/nativeMain/kotlin/spms/serveraction/ServerActionReadyToPlay.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package spms.serveraction

import kotlinx.serialization.json.JsonElement
import kotlinx.serialization.json.int
import kotlinx.serialization.json.jsonPrimitive
import kotlinx.serialization.json.long
import spms.server.SpMs

class ServerActionReadyToPlay: ServerAction(
Expand All @@ -22,14 +22,21 @@ class ServerActionReadyToPlay: ServerAction(
true,
"item_id",
{ server_actions.ready_to_play_param_item_id }
),
Parameter(
Parameter.Type.Int,
true,
"item_duration_ms",
{ server_actions.ready_to_play_param_item_duration_ms }
)
)
) {
override fun execute(server: SpMs, context: ActionContext): JsonElement? {
server.onClientReadyToPlay(
context.client,
context.getParameterValue("item_index")!!.jsonPrimitive.int,
context.getParameterValue("item_id")!!.jsonPrimitive.content
context.getParameterValue("item_index")!!.int,
context.getParameterValue("item_id")!!.content,
context.getParameterValue("item_duration_ms")!!.long
)
return null
}
Expand Down

0 comments on commit 890d746

Please sign in to comment.