From 3b9dde07892a5ef423f51f29c17ece8cc7d90085 Mon Sep 17 00:00:00 2001 From: ThibaultBee <37510686+ThibaultBee@users.noreply.github.com> Date: Sun, 3 Dec 2023 21:24:22 +0100 Subject: [PATCH] feat(core): make `startStream` and `stopStream` suspendable in case there is a network issue. --- .../testcases/AudioOnlyStreamerTestCase.kt | 12 ++++--- .../testcases/CameraStreamerTestCase.kt | 12 ++++--- .../streamer/testcases/StreamerTestCase.kt | 16 +++++---- .../streampack/internal/encoders/IEncoder.kt | 4 ++- .../internal/endpoints/FakeEndpoint.kt | 10 +++--- .../internal/endpoints/FileWriter.kt | 4 +-- .../internal/endpoints/IEndpoint.kt | 12 +++---- .../internal/interfaces/Streamable.kt | 34 +++++++++++++++---- .../streampack/internal/muxers/IMuxer.kt | 3 +- .../internal/muxers/flv/FlvMuxer.kt | 4 --- .../internal/muxers/mp4/MP4Muxer.kt | 3 -- .../streampack/internal/muxers/ts/TSMuxer.kt | 4 --- .../internal/sources/IFrameSource.kt | 4 ++- .../internal/sources/ISurfaceSource.kt | 4 ++- .../streamers/StreamerLifeCycleObserver.kt | 5 ++- .../streamers/bases/BaseCameraStreamer.kt | 4 ++- .../streamers/bases/BaseStreamer.kt | 9 +++-- .../streamers/interfaces/IStreamer.kt | 2 +- .../services/BaseScreenRecorderService.kt | 5 ++- .../internal/endpoints/FileWriterTest.kt | 33 +++++++++++++----- .../streampack/app/utils/StreamerManager.kt | 5 ++- .../streampack/screenrecorder/MainActivity.kt | 8 +++-- .../DemoScreenRecorderRtmpLiveService.kt | 5 ++- .../DemoScreenRecorderSrtLiveService.kt | 5 ++- .../rtmp/internal/endpoints/RtmpProducer.kt | 24 +++++++------ .../ext/srt/internal/endpoints/SrtProducer.kt | 4 +-- .../srt/streamers/CameraSrtLiveStreamer.kt | 2 +- .../ScreenRecorderSrtLiveStreamer.kt | 2 +- 28 files changed, 152 insertions(+), 87 deletions(-) diff --git a/core/src/androidTest/java/io/github/thibaultbee/streampack/streamer/testcases/AudioOnlyStreamerTestCase.kt b/core/src/androidTest/java/io/github/thibaultbee/streampack/streamer/testcases/AudioOnlyStreamerTestCase.kt index 32e884147..20e720b8b 100644 --- a/core/src/androidTest/java/io/github/thibaultbee/streampack/streamer/testcases/AudioOnlyStreamerTestCase.kt +++ b/core/src/androidTest/java/io/github/thibaultbee/streampack/streamer/testcases/AudioOnlyStreamerTestCase.kt @@ -41,8 +41,8 @@ abstract class AudioOnlyStreamerTestCase : ) runBlocking { streamer.startStream() + streamer.stopStream() } - streamer.stopStream() streamer.release() } catch (e: Exception) { Log.e(TAG, "defaultUsageTest: exception: ", e) @@ -58,8 +58,8 @@ abstract class AudioOnlyStreamerTestCase : ) runBlocking { streamer.startStream() + streamer.stopStream() } - streamer.stopStream() streamer.release() } catch (e: Exception) { Log.e(TAG, "defaultUsageTest2: exception: ", e) @@ -149,7 +149,9 @@ abstract class AudioOnlyStreamerTestCase : streamer.configure( AndroidUtils.fakeValidAudioConfig() ) - streamer.stopStream() + runBlocking { + streamer.stopStream() + } } catch (e: Exception) { Log.e(TAG, "configureStopStreamTest: exception: ", e) fail("Must be possible to configure/stopStream but catches exception: $e") @@ -180,8 +182,8 @@ abstract class AudioOnlyStreamerTestCase : ) runBlocking { streamer.startStream() + streamer.stopStream() } - streamer.stopStream() } catch (e: Exception) { Log.e(TAG, "startStreamStopStreamTest: exception: ", e) fail("Must be possible to startStream/stopStream but catches exception: $e") @@ -197,8 +199,8 @@ abstract class AudioOnlyStreamerTestCase : (0..10).forEach { _ -> runBlocking { streamer.startStream() + streamer.stopStream() } - streamer.stopStream() } } catch (e: Exception) { Log.e(TAG, "multipleStartStreamStopStreamTest: exception: ", e) diff --git a/core/src/androidTest/java/io/github/thibaultbee/streampack/streamer/testcases/CameraStreamerTestCase.kt b/core/src/androidTest/java/io/github/thibaultbee/streampack/streamer/testcases/CameraStreamerTestCase.kt index 0b5687e9b..6420cf782 100644 --- a/core/src/androidTest/java/io/github/thibaultbee/streampack/streamer/testcases/CameraStreamerTestCase.kt +++ b/core/src/androidTest/java/io/github/thibaultbee/streampack/streamer/testcases/CameraStreamerTestCase.kt @@ -58,8 +58,8 @@ abstract class CameraStreamerTestCase : streamer.startPreview(surface) runBlocking { streamer.startStream() + streamer.stopStream() } - streamer.stopStream() streamer.stopPreview() streamer.release() } catch (e: Exception) { @@ -80,8 +80,8 @@ abstract class CameraStreamerTestCase : streamer.startPreview(surface) runBlocking { streamer.startStream() + streamer.stopStream() } - streamer.stopStream() streamer.release() } catch (e: Exception) { Log.e(TAG, "defaultUsageTest2: exception: ", e) @@ -161,7 +161,9 @@ abstract class CameraStreamerTestCase : AndroidUtils.fakeValidVideoConfig() ) streamer.startPreview(surface) - streamer.stopStream() + runBlocking { + streamer.stopStream() + } } catch (e: Exception) { Log.e(TAG, "startPreviewStopStreamTest: exception: ", e) fail("Must be possible to startPreview/stopStream but catches exception: $e") @@ -216,8 +218,8 @@ abstract class CameraStreamerTestCase : streamer.startPreview(surface) runBlocking { streamer.startStream() + streamer.stopStream() } - streamer.stopStream() } catch (e: Exception) { Log.e(TAG, "startStreamStopStreamTest: exception: ", e) fail("Must be possible to startStream/stopStream but catches exception: $e") @@ -252,8 +254,8 @@ abstract class CameraStreamerTestCase : (0..10).forEach { _ -> runBlocking { streamer.startStream() + streamer.stopStream() } - streamer.stopStream() } } catch (e: Exception) { Log.e(TAG, "multipleStartStreamStopStreamTest: exception: ", e) diff --git a/core/src/androidTest/java/io/github/thibaultbee/streampack/streamer/testcases/StreamerTestCase.kt b/core/src/androidTest/java/io/github/thibaultbee/streampack/streamer/testcases/StreamerTestCase.kt index d5ca6d000..d976b0d54 100644 --- a/core/src/androidTest/java/io/github/thibaultbee/streampack/streamer/testcases/StreamerTestCase.kt +++ b/core/src/androidTest/java/io/github/thibaultbee/streampack/streamer/testcases/StreamerTestCase.kt @@ -44,8 +44,8 @@ abstract class StreamerTestCase { ) runBlocking { streamer.startStream() + streamer.stopStream() } - streamer.stopStream() streamer.release() } catch (e: Exception) { Log.e(TAG, "defaultUsageTest: exception: ", e) @@ -64,8 +64,8 @@ abstract class StreamerTestCase { ) runBlocking { streamer.startStream() + streamer.stopStream() } - streamer.stopStream() streamer.release() } catch (e: Exception) { Log.e(TAG, "defaultUsageTest2: exception: ", e) @@ -137,7 +137,9 @@ abstract class StreamerTestCase { @Test fun stopStreamTest() { try { - streamer.stopStream() + runBlocking { + streamer.stopStream() + } } catch (e: Exception) { Log.e(TAG, "stopStreamTest: exception: ", e) fail("Must be possible to only stopStream without exception: $e") @@ -191,7 +193,9 @@ abstract class StreamerTestCase { AndroidUtils.fakeValidAudioConfig(), AndroidUtils.fakeValidVideoConfig() ) - streamer.stopStream() + runBlocking { + streamer.stopStream() + } } catch (e: Exception) { Log.e(TAG, "configureStopStreamTest: exception: ", e) fail("Must be possible to configure/stopStream but catches exception: $e") @@ -224,8 +228,8 @@ abstract class StreamerTestCase { ) runBlocking { streamer.startStream() + streamer.stopStream() } - streamer.stopStream() } catch (e: Exception) { Log.e(TAG, "startStreamStopStreamTest: exception: ", e) fail("Must be possible to startStream/stopStream but catches exception: $e") @@ -258,8 +262,8 @@ abstract class StreamerTestCase { (0..10).forEach { _ -> runBlocking { streamer.startStream() + streamer.stopStream() } - streamer.stopStream() } } catch (e: Exception) { Log.e(TAG, "multipleStartStreamStopStreamTest: exception: ", e) diff --git a/core/src/main/java/io/github/thibaultbee/streampack/internal/encoders/IEncoder.kt b/core/src/main/java/io/github/thibaultbee/streampack/internal/encoders/IEncoder.kt index 7512fe972..63485ad26 100644 --- a/core/src/main/java/io/github/thibaultbee/streampack/internal/encoders/IEncoder.kt +++ b/core/src/main/java/io/github/thibaultbee/streampack/internal/encoders/IEncoder.kt @@ -15,9 +15,11 @@ */ package io.github.thibaultbee.streampack.internal.encoders +import io.github.thibaultbee.streampack.internal.interfaces.Configurable +import io.github.thibaultbee.streampack.internal.interfaces.Releaseable import io.github.thibaultbee.streampack.internal.interfaces.Streamable -interface IEncoder : Streamable { +interface IEncoder : Streamable, Configurable, Releaseable { /** * Input and output of an async encoder */ diff --git a/core/src/main/java/io/github/thibaultbee/streampack/internal/endpoints/FakeEndpoint.kt b/core/src/main/java/io/github/thibaultbee/streampack/internal/endpoints/FakeEndpoint.kt index 1d5e366f6..34dc92976 100644 --- a/core/src/main/java/io/github/thibaultbee/streampack/internal/endpoints/FakeEndpoint.kt +++ b/core/src/main/java/io/github/thibaultbee/streampack/internal/endpoints/FakeEndpoint.kt @@ -22,10 +22,6 @@ import io.github.thibaultbee.streampack.logger.Logger * A fake endpoint for test purpose. */ class FakeEndpoint : IEndpoint { - override fun startStream() { - Logger.d(TAG, "startStream called") - } - override fun configure(config: Int) { Logger.d(TAG, "configure called with bitrate = $config") } @@ -34,7 +30,11 @@ class FakeEndpoint : IEndpoint { Logger.d(TAG, "write called (packet size = ${packet.buffer.remaining()})") } - override fun stopStream() { + override suspend fun startStream() { + Logger.d(TAG, "startStream called") + } + + override suspend fun stopStream() { Logger.d(TAG, "stopStream called") } diff --git a/core/src/main/java/io/github/thibaultbee/streampack/internal/endpoints/FileWriter.kt b/core/src/main/java/io/github/thibaultbee/streampack/internal/endpoints/FileWriter.kt index ccadfc19e..3b55d4bf5 100644 --- a/core/src/main/java/io/github/thibaultbee/streampack/internal/endpoints/FileWriter.kt +++ b/core/src/main/java/io/github/thibaultbee/streampack/internal/endpoints/FileWriter.kt @@ -36,7 +36,7 @@ class FileWriter : IEndpoint { var outputStream: OutputStream? = null - override fun startStream() { + override suspend fun startStream() { if (outputStream == null) { throw UnsupportedOperationException("Set a file before trying to write it") } @@ -49,7 +49,7 @@ class FileWriter : IEndpoint { ?: throw UnsupportedOperationException("Set a file before trying to write it") } - override fun stopStream() { + override suspend fun stopStream() { outputStream?.close() } diff --git a/core/src/main/java/io/github/thibaultbee/streampack/internal/endpoints/IEndpoint.kt b/core/src/main/java/io/github/thibaultbee/streampack/internal/endpoints/IEndpoint.kt index 01ee1ed6b..b1d007896 100644 --- a/core/src/main/java/io/github/thibaultbee/streampack/internal/endpoints/IEndpoint.kt +++ b/core/src/main/java/io/github/thibaultbee/streampack/internal/endpoints/IEndpoint.kt @@ -16,15 +16,11 @@ package io.github.thibaultbee.streampack.internal.endpoints import io.github.thibaultbee.streampack.internal.data.Packet -import io.github.thibaultbee.streampack.internal.interfaces.Streamable +import io.github.thibaultbee.streampack.internal.interfaces.Configurable +import io.github.thibaultbee.streampack.internal.interfaces.Releaseable +import io.github.thibaultbee.streampack.internal.interfaces.SuspendStreamable -interface IEndpoint : Streamable { - - /** - * Configure endpoint bitrate, mainly for network endpoint. - * @param config bitrate at the beginning of the communication - */ - override fun configure(config: Int) +interface IEndpoint : SuspendStreamable, Configurable, Releaseable { /** * Writes a buffer to endpoint. diff --git a/core/src/main/java/io/github/thibaultbee/streampack/internal/interfaces/Streamable.kt b/core/src/main/java/io/github/thibaultbee/streampack/internal/interfaces/Streamable.kt index c39f39fef..1ef81ecd0 100644 --- a/core/src/main/java/io/github/thibaultbee/streampack/internal/interfaces/Streamable.kt +++ b/core/src/main/java/io/github/thibaultbee/streampack/internal/interfaces/Streamable.kt @@ -15,25 +15,45 @@ */ package io.github.thibaultbee.streampack.internal.interfaces -interface Streamable { +interface Streamable { /** - * Configure the [Streamable] implementation. - * - * @param config [Streamable] implementation configuration + * Starts frames or data stream generation + * Throws an exception if not ready for live stream */ - fun configure(config: T) + fun startStream() + + /** + * Stops frames or data stream generation + */ + fun stopStream() +} +/** + * Same as [Streamable] but with suspend functions. + */ +interface SuspendStreamable { /** * Starts frames or data stream generation * Throws an exception if not ready for live stream */ - fun startStream() + suspend fun startStream() /** * Stops frames or data stream generation */ - fun stopStream() + suspend fun stopStream() +} + +interface Configurable { + /** + * Configure the [Configurable] implementation. + * + * @param config [Configurable] implementation configuration + */ + fun configure(config: T) +} +interface Releaseable { /** * Closes and releases resources */ diff --git a/core/src/main/java/io/github/thibaultbee/streampack/internal/muxers/IMuxer.kt b/core/src/main/java/io/github/thibaultbee/streampack/internal/muxers/IMuxer.kt index 3ab1ee8d2..5d5f87acf 100644 --- a/core/src/main/java/io/github/thibaultbee/streampack/internal/muxers/IMuxer.kt +++ b/core/src/main/java/io/github/thibaultbee/streampack/internal/muxers/IMuxer.kt @@ -18,9 +18,10 @@ package io.github.thibaultbee.streampack.internal.muxers import io.github.thibaultbee.streampack.data.Config import io.github.thibaultbee.streampack.internal.data.Frame import io.github.thibaultbee.streampack.internal.interfaces.ISourceOrientationProvider +import io.github.thibaultbee.streampack.internal.interfaces.Releaseable import io.github.thibaultbee.streampack.internal.interfaces.Streamable -interface IMuxer : Streamable { +interface IMuxer : Streamable, Releaseable { val helper: IMuxerHelper var sourceOrientationProvider: ISourceOrientationProvider? diff --git a/core/src/main/java/io/github/thibaultbee/streampack/internal/muxers/flv/FlvMuxer.kt b/core/src/main/java/io/github/thibaultbee/streampack/internal/muxers/flv/FlvMuxer.kt index 5ee20ce4f..f9d17b7af 100644 --- a/core/src/main/java/io/github/thibaultbee/streampack/internal/muxers/flv/FlvMuxer.kt +++ b/core/src/main/java/io/github/thibaultbee/streampack/internal/muxers/flv/FlvMuxer.kt @@ -93,10 +93,6 @@ class FlvMuxer( return streamMap } - override fun configure(config: Unit) { - // Nothing to configure - } - override fun startStream() { // Header if (writeToFile) { diff --git a/core/src/main/java/io/github/thibaultbee/streampack/internal/muxers/mp4/MP4Muxer.kt b/core/src/main/java/io/github/thibaultbee/streampack/internal/muxers/mp4/MP4Muxer.kt index de905c680..a1bbfeddd 100644 --- a/core/src/main/java/io/github/thibaultbee/streampack/internal/muxers/mp4/MP4Muxer.kt +++ b/core/src/main/java/io/github/thibaultbee/streampack/internal/muxers/mp4/MP4Muxer.kt @@ -80,9 +80,6 @@ class MP4Muxer( return streamMap } - override fun configure(config: Unit) { - } - override fun startStream() { writeBuffer(FileTypeBox().toByteBuffer()) currentSegment = createNewSegment(MovieBoxFactory(timescale)) diff --git a/core/src/main/java/io/github/thibaultbee/streampack/internal/muxers/ts/TSMuxer.kt b/core/src/main/java/io/github/thibaultbee/streampack/internal/muxers/ts/TSMuxer.kt index 84a18a2ec..a3e4a2179 100644 --- a/core/src/main/java/io/github/thibaultbee/streampack/internal/muxers/ts/TSMuxer.kt +++ b/core/src/main/java/io/github/thibaultbee/streampack/internal/muxers/ts/TSMuxer.kt @@ -380,10 +380,6 @@ class TSMuxer( service.streams.clear() } - override fun configure(config: Unit) { - // Nothing to configure - } - override fun startStream() { // Nothing to start } diff --git a/core/src/main/java/io/github/thibaultbee/streampack/internal/sources/IFrameSource.kt b/core/src/main/java/io/github/thibaultbee/streampack/internal/sources/IFrameSource.kt index e1bc889a3..a6fd89bc0 100644 --- a/core/src/main/java/io/github/thibaultbee/streampack/internal/sources/IFrameSource.kt +++ b/core/src/main/java/io/github/thibaultbee/streampack/internal/sources/IFrameSource.kt @@ -16,10 +16,12 @@ package io.github.thibaultbee.streampack.internal.sources import io.github.thibaultbee.streampack.internal.data.Frame +import io.github.thibaultbee.streampack.internal.interfaces.Configurable +import io.github.thibaultbee.streampack.internal.interfaces.Releaseable import io.github.thibaultbee.streampack.internal.interfaces.Streamable import java.nio.ByteBuffer -interface IFrameSource : Streamable { +interface IFrameSource : Streamable, Configurable, Releaseable { /** * Generate a frame from capture device diff --git a/core/src/main/java/io/github/thibaultbee/streampack/internal/sources/ISurfaceSource.kt b/core/src/main/java/io/github/thibaultbee/streampack/internal/sources/ISurfaceSource.kt index d6052ef64..1be1b4947 100644 --- a/core/src/main/java/io/github/thibaultbee/streampack/internal/sources/ISurfaceSource.kt +++ b/core/src/main/java/io/github/thibaultbee/streampack/internal/sources/ISurfaceSource.kt @@ -17,9 +17,11 @@ package io.github.thibaultbee.streampack.internal.sources import android.view.Surface import io.github.thibaultbee.streampack.data.VideoConfig +import io.github.thibaultbee.streampack.internal.interfaces.Configurable +import io.github.thibaultbee.streampack.internal.interfaces.Releaseable import io.github.thibaultbee.streampack.internal.interfaces.Streamable -interface ISurfaceSource : Streamable { +interface ISurfaceSource : Streamable, Configurable, Releaseable { /** * The offset between source capture time and MONOTONIC clock. It is used to synchronize video * with audio. It is only useful for camera source. diff --git a/core/src/main/java/io/github/thibaultbee/streampack/streamers/StreamerLifeCycleObserver.kt b/core/src/main/java/io/github/thibaultbee/streampack/streamers/StreamerLifeCycleObserver.kt index 085542bdf..9c435be1f 100644 --- a/core/src/main/java/io/github/thibaultbee/streampack/streamers/StreamerLifeCycleObserver.kt +++ b/core/src/main/java/io/github/thibaultbee/streampack/streamers/StreamerLifeCycleObserver.kt @@ -20,6 +20,7 @@ import androidx.lifecycle.LifecycleOwner import io.github.thibaultbee.streampack.streamers.interfaces.IStreamer import io.github.thibaultbee.streampack.utils.getCameraStreamer import io.github.thibaultbee.streampack.utils.getLiveStreamer +import kotlinx.coroutines.runBlocking /** * Add [DefaultLifecycleObserver] to a streamer. @@ -35,7 +36,9 @@ import io.github.thibaultbee.streampack.utils.getLiveStreamer open class StreamerLifeCycleObserver(var streamer: IStreamer) : DefaultLifecycleObserver { override fun onPause(owner: LifecycleOwner) { streamer.getCameraStreamer()?.stopPreview() - streamer.stopStream() + runBlocking { + streamer.stopStream() + } streamer.getLiveStreamer()?.let { if (it.isConnected) { it.disconnect() diff --git a/core/src/main/java/io/github/thibaultbee/streampack/streamers/bases/BaseCameraStreamer.kt b/core/src/main/java/io/github/thibaultbee/streampack/streamers/bases/BaseCameraStreamer.kt index c5c8843a6..8c53b38bb 100644 --- a/core/src/main/java/io/github/thibaultbee/streampack/streamers/bases/BaseCameraStreamer.kt +++ b/core/src/main/java/io/github/thibaultbee/streampack/streamers/bases/BaseCameraStreamer.kt @@ -117,7 +117,9 @@ open class BaseCameraStreamer( * @see [startPreview] */ override fun stopPreview() { - stopStream() + runBlocking { + stopStream() + } cameraSource.stopPreview() } diff --git a/core/src/main/java/io/github/thibaultbee/streampack/streamers/bases/BaseStreamer.kt b/core/src/main/java/io/github/thibaultbee/streampack/streamers/bases/BaseStreamer.kt index 8fec040ec..914bfce49 100644 --- a/core/src/main/java/io/github/thibaultbee/streampack/streamers/bases/BaseStreamer.kt +++ b/core/src/main/java/io/github/thibaultbee/streampack/streamers/bases/BaseStreamer.kt @@ -40,6 +40,7 @@ import io.github.thibaultbee.streampack.streamers.helpers.IConfigurationHelper import io.github.thibaultbee.streampack.streamers.helpers.StreamerConfigurationHelper import io.github.thibaultbee.streampack.streamers.interfaces.IStreamer import io.github.thibaultbee.streampack.streamers.settings.BaseStreamerSettings +import kotlinx.coroutines.runBlocking import java.nio.ByteBuffer @@ -146,7 +147,9 @@ abstract class BaseStreamer( */ private fun onStreamError(error: StreamPackError) { try { - stopStream() + runBlocking { + stopStream() + } } catch (e: Exception) { Logger.e(TAG, "onStreamError: Can't stop stream") } finally { @@ -311,7 +314,7 @@ abstract class BaseStreamer( * * @see [startStream] */ - override fun stopStream() { + override suspend fun stopStream() { stopStreamImpl() // Encoder does not return to CONFIGURED state... so we have to reset everything... @@ -324,7 +327,7 @@ abstract class BaseStreamer( * * @see [stopStream] */ - private fun stopStreamImpl() { + private suspend fun stopStreamImpl() { videoSource?.stopStream() videoEncoder?.stopStream() audioEncoder?.stopStream() diff --git a/core/src/main/java/io/github/thibaultbee/streampack/streamers/interfaces/IStreamer.kt b/core/src/main/java/io/github/thibaultbee/streampack/streamers/interfaces/IStreamer.kt index 1478246be..642c98c4b 100644 --- a/core/src/main/java/io/github/thibaultbee/streampack/streamers/interfaces/IStreamer.kt +++ b/core/src/main/java/io/github/thibaultbee/streampack/streamers/interfaces/IStreamer.kt @@ -88,7 +88,7 @@ interface IStreamer { * * @see [startStream] */ - fun stopStream() + suspend fun stopStream() /** * Clean and reset the streamer. diff --git a/core/src/main/java/io/github/thibaultbee/streampack/streamers/services/BaseScreenRecorderService.kt b/core/src/main/java/io/github/thibaultbee/streampack/streamers/services/BaseScreenRecorderService.kt index e5105bc74..ca66a1131 100644 --- a/core/src/main/java/io/github/thibaultbee/streampack/streamers/services/BaseScreenRecorderService.kt +++ b/core/src/main/java/io/github/thibaultbee/streampack/streamers/services/BaseScreenRecorderService.kt @@ -39,6 +39,7 @@ import io.github.thibaultbee.streampack.streamers.bases.BaseScreenRecorderStream import io.github.thibaultbee.streampack.streamers.interfaces.ILiveStreamer import io.github.thibaultbee.streampack.utils.NotificationUtils import io.github.thibaultbee.streampack.utils.getStreamer +import kotlinx.coroutines.runBlocking /** * Foreground service that manages screen recorder streamers. @@ -155,7 +156,9 @@ abstract class BaseScreenRecorderService( override fun onDestroy() { super.onDestroy() - streamer?.stopStream() + runBlocking { + streamer?.stopStream() + } (streamer as ILiveStreamer?)?.disconnect() streamer?.release() streamer = null diff --git a/core/src/test/java/io/github/thibaultbee/streampack/internal/endpoints/FileWriterTest.kt b/core/src/test/java/io/github/thibaultbee/streampack/internal/endpoints/FileWriterTest.kt index d2e4814df..a8bbb26a9 100644 --- a/core/src/test/java/io/github/thibaultbee/streampack/internal/endpoints/FileWriterTest.kt +++ b/core/src/test/java/io/github/thibaultbee/streampack/internal/endpoints/FileWriterTest.kt @@ -19,6 +19,7 @@ import io.github.thibaultbee.streampack.internal.data.Packet import io.github.thibaultbee.streampack.logger.Logger import io.github.thibaultbee.streampack.utils.FakeLogger import io.github.thibaultbee.streampack.utils.Utils +import kotlinx.coroutines.runBlocking import org.junit.After import org.junit.Assert.* import org.junit.Test @@ -46,7 +47,9 @@ class FileWriterTest { @Test fun `startStream with non existing file test`() { try { - filePublisher.startStream() + runBlocking{ + filePublisher.startStream() + } fail("Null file must not be streamable") } catch (_: Exception) { } @@ -56,7 +59,9 @@ class FileWriterTest { fun `write to non existing file test`() { try { val randomArray = Utils.generateRandomArray(1024) - filePublisher.startStream() + runBlocking{ + filePublisher.startStream() + } filePublisher.write( Packet( ByteBuffer.wrap(randomArray), @@ -75,7 +80,9 @@ class FileWriterTest { filePublisher.file = tmpFile val randomArray = Utils.generateRandomArray(1024) try { - filePublisher.startStream() + runBlocking{ + filePublisher.startStream() + } filePublisher.write( Packet( ByteBuffer.wrap(randomArray), @@ -83,7 +90,9 @@ class FileWriterTest { ) ) assertArrayEquals(randomArray, tmpFile.readBytes()) - filePublisher.stopStream() + runBlocking { + filePublisher.stopStream() + } } catch (e: Exception) { fail() } @@ -97,7 +106,9 @@ class FileWriterTest { filePublisher.outputStream = tmpFile.outputStream() val randomArray = Utils.generateRandomArray(1024) try { - filePublisher.startStream() + runBlocking{ + filePublisher.startStream() + } filePublisher.write( Packet( ByteBuffer.wrap(randomArray), @@ -105,7 +116,9 @@ class FileWriterTest { ) ) assertArrayEquals(randomArray, tmpFile.readBytes()) - filePublisher.stopStream() + runBlocking { + filePublisher.stopStream() + } } catch (e: Exception) { fail() } @@ -122,7 +135,9 @@ class FileWriterTest { directBuffer.put(randomArray) directBuffer.rewind() try { - filePublisher.startStream() + runBlocking { + filePublisher.startStream() + } filePublisher.write( Packet( directBuffer, @@ -130,7 +145,9 @@ class FileWriterTest { ) ) assertArrayEquals(randomArray, tmpFile.readBytes()) - filePublisher.stopStream() + runBlocking { + filePublisher.stopStream() + } } catch (e: Exception) { fail() } diff --git a/demos/camera/src/main/java/io/github/thibaultbee/streampack/app/utils/StreamerManager.kt b/demos/camera/src/main/java/io/github/thibaultbee/streampack/app/utils/StreamerManager.kt index 590c9ff30..49678b864 100644 --- a/demos/camera/src/main/java/io/github/thibaultbee/streampack/app/utils/StreamerManager.kt +++ b/demos/camera/src/main/java/io/github/thibaultbee/streampack/app/utils/StreamerManager.kt @@ -28,6 +28,7 @@ import io.github.thibaultbee.streampack.streamers.interfaces.IStreamer import io.github.thibaultbee.streampack.streamers.interfaces.settings.IBaseCameraStreamerSettings import io.github.thibaultbee.streampack.utils.* import io.github.thibaultbee.streampack.views.PreviewView +import kotlinx.coroutines.runBlocking import java.io.File @@ -122,7 +123,9 @@ class StreamerManager( } fun stopStream() { - streamer?.stopStream() + runBlocking { + streamer?.stopStream() + } streamer?.getLiveStreamer()?.disconnect() } diff --git a/demos/screenrecorder/src/main/java/io/github/thibaultbee/streampack/screenrecorder/MainActivity.kt b/demos/screenrecorder/src/main/java/io/github/thibaultbee/streampack/screenrecorder/MainActivity.kt index 1056e0cad..568237f5e 100644 --- a/demos/screenrecorder/src/main/java/io/github/thibaultbee/streampack/screenrecorder/MainActivity.kt +++ b/demos/screenrecorder/src/main/java/io/github/thibaultbee/streampack/screenrecorder/MainActivity.kt @@ -40,7 +40,6 @@ import io.github.thibaultbee.streampack.ext.srt.services.ScreenRecorderSrtLiveSe import io.github.thibaultbee.streampack.ext.srt.streamers.interfaces.ISrtLiveStreamer import io.github.thibaultbee.streampack.internal.encoders.MediaCodecHelper import io.github.thibaultbee.streampack.internal.muxers.ts.data.TsServiceInfo -import io.github.thibaultbee.streampack.utils.getStreamer import io.github.thibaultbee.streampack.screenrecorder.databinding.ActivityMainBinding import io.github.thibaultbee.streampack.screenrecorder.models.EndpointType import io.github.thibaultbee.streampack.screenrecorder.services.DemoScreenRecorderRtmpLiveService @@ -49,6 +48,7 @@ import io.github.thibaultbee.streampack.screenrecorder.settings.SettingsActivity import io.github.thibaultbee.streampack.streamers.bases.BaseScreenRecorderStreamer import io.github.thibaultbee.streampack.streamers.interfaces.ILiveStreamer import io.github.thibaultbee.streampack.streamers.live.BaseScreenRecorderLiveStreamer +import io.github.thibaultbee.streampack.utils.getStreamer import kotlinx.coroutines.runBlocking class MainActivity : AppCompatActivity() { @@ -132,6 +132,7 @@ class MainActivity : AppCompatActivity() { Log.i(TAG, "Service disconnected") }) } + EndpointType.RTMP -> { ScreenRecorderRtmpLiveService.launch( this, @@ -230,7 +231,9 @@ class MainActivity : AppCompatActivity() { } private fun stopService() { - streamer.stopStream() + runBlocking { + streamer.stopStream() + } streamer.disconnect() when (configuration.endpoint.type) { @@ -240,6 +243,7 @@ class MainActivity : AppCompatActivity() { DemoScreenRecorderSrtLiveService::class.java ) ) + EndpointType.RTMP -> stopService( Intent( this, diff --git a/demos/screenrecorder/src/main/java/io/github/thibaultbee/streampack/screenrecorder/services/DemoScreenRecorderRtmpLiveService.kt b/demos/screenrecorder/src/main/java/io/github/thibaultbee/streampack/screenrecorder/services/DemoScreenRecorderRtmpLiveService.kt index 90530ca8e..d2c930da1 100644 --- a/demos/screenrecorder/src/main/java/io/github/thibaultbee/streampack/screenrecorder/services/DemoScreenRecorderRtmpLiveService.kt +++ b/demos/screenrecorder/src/main/java/io/github/thibaultbee/streampack/screenrecorder/services/DemoScreenRecorderRtmpLiveService.kt @@ -22,6 +22,7 @@ import androidx.core.app.NotificationCompat import io.github.thibaultbee.streampack.ext.rtmp.services.ScreenRecorderRtmpLiveService import io.github.thibaultbee.streampack.screenrecorder.R import io.github.thibaultbee.streampack.screenrecorder.models.Actions +import kotlinx.coroutines.runBlocking class DemoScreenRecorderRtmpLiveService : ScreenRecorderRtmpLiveService( @@ -32,7 +33,9 @@ class DemoScreenRecorderRtmpLiveService : ScreenRecorderRtmpLiveService( override fun onStartCommand(intent: Intent, flags: Int, startId: Int): Int { streamer?.let { if (intent.action == Actions.STOP.value) { - streamer?.stopStream() + runBlocking { + streamer?.stopStream() + } } } return super.onStartCommand(intent, flags, startId) diff --git a/demos/screenrecorder/src/main/java/io/github/thibaultbee/streampack/screenrecorder/services/DemoScreenRecorderSrtLiveService.kt b/demos/screenrecorder/src/main/java/io/github/thibaultbee/streampack/screenrecorder/services/DemoScreenRecorderSrtLiveService.kt index 7c93ee6af..da1541427 100644 --- a/demos/screenrecorder/src/main/java/io/github/thibaultbee/streampack/screenrecorder/services/DemoScreenRecorderSrtLiveService.kt +++ b/demos/screenrecorder/src/main/java/io/github/thibaultbee/streampack/screenrecorder/services/DemoScreenRecorderSrtLiveService.kt @@ -22,6 +22,7 @@ import androidx.core.app.NotificationCompat import io.github.thibaultbee.streampack.ext.srt.services.ScreenRecorderSrtLiveService import io.github.thibaultbee.streampack.screenrecorder.R import io.github.thibaultbee.streampack.screenrecorder.models.Actions +import kotlinx.coroutines.runBlocking class DemoScreenRecorderSrtLiveService : ScreenRecorderSrtLiveService( notificationId = 0x4569, @@ -31,7 +32,9 @@ class DemoScreenRecorderSrtLiveService : ScreenRecorderSrtLiveService( override fun onStartCommand(intent: Intent, flags: Int, startId: Int): Int { streamer?.let { if (intent.action == Actions.STOP.value) { - streamer?.stopStream() + runBlocking { + streamer?.stopStream() + } } } return super.onStartCommand(intent, flags, startId) diff --git a/extensions/rtmp/src/main/java/io/github/thibaultbee/streampack/ext/rtmp/internal/endpoints/RtmpProducer.kt b/extensions/rtmp/src/main/java/io/github/thibaultbee/streampack/ext/rtmp/internal/endpoints/RtmpProducer.kt index 9f15f11ca..25409f074 100644 --- a/extensions/rtmp/src/main/java/io/github/thibaultbee/streampack/ext/rtmp/internal/endpoints/RtmpProducer.kt +++ b/extensions/rtmp/src/main/java/io/github/thibaultbee/streampack/ext/rtmp/internal/endpoints/RtmpProducer.kt @@ -108,19 +108,23 @@ class RtmpProducer( } } - override fun startStream() { - synchronized(this) { - socket.connectStream() + override suspend fun startStream() { + withContext(coroutineDispatcher) { + synchronized(this) { + socket.connectStream() + } } } - override fun stopStream() { - synchronized(this) { - if (isConnected) { - /** - * deleteStream is blocking, if the connection does not exist yet. - */ - socket.deleteStream() + override suspend fun stopStream() { + withContext(coroutineDispatcher) { + synchronized(this) { + if (isConnected) { + /** + * deleteStream is blocking, if the connection does not exist yet. + */ + socket.deleteStream() + } } } } diff --git a/extensions/srt/src/main/java/io/github/thibaultbee/streampack/ext/srt/internal/endpoints/SrtProducer.kt b/extensions/srt/src/main/java/io/github/thibaultbee/streampack/ext/srt/internal/endpoints/SrtProducer.kt index 0b767d661..32321456f 100644 --- a/extensions/srt/src/main/java/io/github/thibaultbee/streampack/ext/srt/internal/endpoints/SrtProducer.kt +++ b/extensions/srt/src/main/java/io/github/thibaultbee/streampack/ext/srt/internal/endpoints/SrtProducer.kt @@ -170,7 +170,7 @@ class SrtProducer( } } - override fun startStream() { + override suspend fun startStream() { if (!socket.isConnected) { throw ConnectException("SrtEndpoint should be connected at this point") } @@ -179,7 +179,7 @@ class SrtProducer( socket.setSockFlag(SockOpt.INPUTBW, bitrate) } - override fun stopStream() { + override suspend fun stopStream() { } diff --git a/extensions/srt/src/main/java/io/github/thibaultbee/streampack/ext/srt/streamers/CameraSrtLiveStreamer.kt b/extensions/srt/src/main/java/io/github/thibaultbee/streampack/ext/srt/streamers/CameraSrtLiveStreamer.kt index 8daaa2c1b..bcafa2fac 100644 --- a/extensions/srt/src/main/java/io/github/thibaultbee/streampack/ext/srt/streamers/CameraSrtLiveStreamer.kt +++ b/extensions/srt/src/main/java/io/github/thibaultbee/streampack/ext/srt/streamers/CameraSrtLiveStreamer.kt @@ -181,7 +181,7 @@ class CameraSrtLiveStreamer( /** * Same as [BaseCameraLiveStreamer.stopStream] but also stops bitrate regulator. */ - override fun stopStream() { + override suspend fun stopStream() { scheduler.cancel() super.stopStream() } diff --git a/extensions/srt/src/main/java/io/github/thibaultbee/streampack/ext/srt/streamers/ScreenRecorderSrtLiveStreamer.kt b/extensions/srt/src/main/java/io/github/thibaultbee/streampack/ext/srt/streamers/ScreenRecorderSrtLiveStreamer.kt index 3f1bad61f..a6f103877 100644 --- a/extensions/srt/src/main/java/io/github/thibaultbee/streampack/ext/srt/streamers/ScreenRecorderSrtLiveStreamer.kt +++ b/extensions/srt/src/main/java/io/github/thibaultbee/streampack/ext/srt/streamers/ScreenRecorderSrtLiveStreamer.kt @@ -187,7 +187,7 @@ class ScreenRecorderSrtLiveStreamer( /** * Same as [BaseScreenRecorderLiveStreamer.stopStream] but also stops bitrate regulator. */ - override fun stopStream() { + override suspend fun stopStream() { scheduler.cancel() super.stopStream() }