Skip to content

Commit

Permalink
refactor(*): introduce streamer pipeline to enable multiple output
Browse files Browse the repository at this point in the history
  • Loading branch information
ThibaultBee committed Feb 7, 2025
1 parent e5e0c1d commit 0288768
Show file tree
Hide file tree
Showing 64 changed files with 4,507 additions and 1,134 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Copyright (C) 2025 Thibault B.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.github.thibaultbee.streampack.core.elements.endpoints

import io.github.thibaultbee.streampack.core.configuration.mediadescriptor.MediaDescriptor
import io.github.thibaultbee.streampack.core.elements.data.Frame
import io.github.thibaultbee.streampack.core.elements.encoders.CodecConfig
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.runBlocking

class DummyEndpoint : IEndpointInternal {
private val _isOpenFlow = MutableStateFlow(false)
override val isOpenFlow: StateFlow<Boolean> = _isOpenFlow

private val _frameFlow = MutableStateFlow<Frame?>(null)
val frameFlow: StateFlow<Frame?> = _frameFlow

var numOfAudioFramesWritten = 0
private set
var numOfVideoFramesWritten = 0
private set
val numOfFramesWritten: Int
get() = numOfAudioFramesWritten + numOfVideoFramesWritten

private val _isStreamingFlow = MutableStateFlow(false)
val isStreamingFlow: StateFlow<Boolean> = _isStreamingFlow

private val _configFlow = MutableStateFlow<CodecConfig?>(null)
val configFlow: StateFlow<CodecConfig?> = _configFlow

override val info: IEndpoint.IEndpointInfo
get() = TODO("Not yet implemented")

override fun getInfo(type: MediaDescriptor.Type): IEndpoint.IEndpointInfo {
TODO("Not yet implemented")
}

override val metrics: Any
get() = TODO("Not yet implemented")

override suspend fun open(descriptor: MediaDescriptor) {
_isOpenFlow.emit(true)
}

override suspend fun close() {
_isOpenFlow.emit(false)
}

override suspend fun write(frame: Frame, streamPid: Int) {
_frameFlow.emit(frame)
when {
frame.isAudio -> numOfAudioFramesWritten++
frame.isVideo -> numOfVideoFramesWritten++
}
}

override fun addStreams(streamConfigs: List<CodecConfig>): Map<CodecConfig, Int> {
runBlocking {
streamConfigs.forEach { _configFlow.emit(it) }
}
return streamConfigs.associateWith { it.hashCode() }
}

override fun addStream(streamConfig: CodecConfig): Int {
runBlocking {
_configFlow.emit(streamConfig)
}
return streamConfig.hashCode()
}

override suspend fun startStream() {
_isStreamingFlow.emit(true)
}

override suspend fun stopStream() {
_isStreamingFlow.emit(false)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package io.github.thibaultbee.streampack.core.elements.sources

import android.media.MediaFormat
import io.github.thibaultbee.streampack.core.elements.data.Frame
import io.github.thibaultbee.streampack.core.elements.sources.audio.AudioSourceConfig
import io.github.thibaultbee.streampack.core.elements.sources.audio.IAudioSourceInternal
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.StateFlow
import java.nio.ByteBuffer

class StubAudioSource : IAudioSourceInternal {
override var isMuted: Boolean
get() = TODO("Not yet implemented")
set(value) {}

private val _isStreamingFlow = MutableStateFlow(false)
val isStreamingFlow: StateFlow<Boolean> = _isStreamingFlow

private val _configurationFlow = MutableStateFlow<AudioSourceConfig?>(null)
val configurationFlow: StateFlow<AudioSourceConfig?> = _configurationFlow

override fun getAudioFrame(inputBuffer: ByteBuffer?): Frame {
return Frame(
inputBuffer ?: ByteBuffer.allocate(8192),
0,
format = MediaFormat().apply {
setString(
MediaFormat.KEY_MIME,
MediaFormat.MIMETYPE_AUDIO_RAW
)
})
}

override fun startStream() {
_isStreamingFlow.value = true
}

override fun stopStream() {
_isStreamingFlow.value = false
}

override fun configure(config: AudioSourceConfig) {
_configurationFlow.value = config
}


override fun release() {

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package io.github.thibaultbee.streampack.core.elements.sources

import io.github.thibaultbee.streampack.core.elements.processing.video.source.DefaultSourceInfoProvider
import io.github.thibaultbee.streampack.core.elements.processing.video.source.ISourceInfoProvider
import io.github.thibaultbee.streampack.core.elements.sources.video.IVideoSourceInternal
import io.github.thibaultbee.streampack.core.elements.sources.video.VideoSourceConfig
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.StateFlow

class StubVideoSource : IVideoSourceInternal {
override val infoProviderFlow: StateFlow<ISourceInfoProvider> =
MutableStateFlow(DefaultSourceInfoProvider())

private val _isStreamingFlow = MutableStateFlow(false)
override val isStreamingFlow: StateFlow<Boolean> = _isStreamingFlow

private val _configurationFlow = MutableStateFlow<VideoSourceConfig?>(null)
val configurationFlow: StateFlow<VideoSourceConfig?> = _configurationFlow

override suspend fun startStream() {
_isStreamingFlow.emit(true)
}

override suspend fun stopStream() {
_isStreamingFlow.emit(false)
}

override fun configure(config: VideoSourceConfig) {
_configurationFlow.value = config
}

override fun release() {

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright (C) 2025 Thibault B.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.github.thibaultbee.streampack.core.pipelines

import org.junit.Assert.*

class StreamerPipelineTest {

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
* Copyright (C) 2025 Thibault B.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.github.thibaultbee.streampack.core.pipelines.outputs

import android.util.Size
import io.github.thibaultbee.streampack.core.elements.data.Frame
import io.github.thibaultbee.streampack.core.logger.Logger
import io.github.thibaultbee.streampack.core.utils.SurfaceUtils
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.StateFlow

class StubAudioAsyncPipelineOutput :
StubPipelineOutput(hasAudio = true, hasVideo = false),
IAudioSyncPipelineOutputInternal {

private val _audioFrameFlow = MutableStateFlow<Frame?>(null)
val audioFrameFlow: StateFlow<Frame?> = _audioFrameFlow

override fun queueAudioFrame(frame: Frame) {
_audioFrameFlow.value = frame
}
}

class StubVideoSurfacePipelineOutput(resolution: Size) :
StubPipelineOutput(hasAudio = false, hasVideo = true),
IVideoSurfacePipelineOutputInternal {

override var targetRotation: Int = 0
private val _surfaceFlow =
MutableStateFlow<SurfaceWithSize?>(
SurfaceWithSize(
SurfaceUtils.createSurface(resolution),
resolution
)
)
override val surfaceFlow: StateFlow<SurfaceWithSize?> = _surfaceFlow
override var videoSourceTimestampOffset: Long = 0L
}

class StubAudioSyncVideoSurfacePipelineOutput(resolution: Size) :
StubPipelineOutput(hasAudio = true, hasVideo = true),
IAudioSyncPipelineOutputInternal, IVideoSurfacePipelineOutputInternal {

override var targetRotation: Int = 0
private val _surfaceFlow =
MutableStateFlow<SurfaceWithSize?>(
SurfaceWithSize(
SurfaceUtils.createSurface(resolution),
resolution
)
)
override val surfaceFlow: StateFlow<SurfaceWithSize?> = _surfaceFlow
override var videoSourceTimestampOffset: Long = 0L

private val _audioFrameFlow = MutableStateFlow<Frame?>(null)
val audioFrameFlow: StateFlow<Frame?> = _audioFrameFlow

override fun queueAudioFrame(frame: Frame) {
_audioFrameFlow.value = frame
}
}

abstract class StubPipelineOutput(override val hasAudio: Boolean, override val hasVideo: Boolean) :
IPipelineOutput {

private val _throwable = MutableStateFlow<Throwable?>(null)
override val throwableFlow: StateFlow<Throwable?> = _throwable

private val _isStreaming = MutableStateFlow(false)
override val isStreamingFlow: StateFlow<Boolean> = _isStreaming

override suspend fun startStream() {
Logger.i(TAG, "Start stream")
_isStreaming.emit(true)
}

override suspend fun stopStream() {
Logger.i(TAG, "Stop stream")
_isStreaming.emit(false)
}

override suspend fun release() {
Logger.i(TAG, "Release")
_isStreaming.emit(false)
}

companion object {
private const val TAG = "DummyPipelineOutput"
}
}

abstract class StubPipelineOutputInternal(hasAudio: Boolean, hasVideo: Boolean) :
StubPipelineOutput(hasAudio, hasVideo),
ISyncStartStreamPipelineOutputInternal {

override var streamerListener: ISyncStartStreamPipelineOutputInternal.Listener? = null

override suspend fun startStream() {
super.startStream()
streamerListener?.onStartStream()
}
}
Loading

0 comments on commit 0288768

Please sign in to comment.