From 833fb87f8f1b96818c487bfdc9a595a16bedb1d6 Mon Sep 17 00:00:00 2001 From: ThibaultBee <37510686+ThibaultBee@users.noreply.github.com> Date: Tue, 25 Jun 2024 15:03:43 +0200 Subject: [PATCH 1/2] feat(*): add a coroutine SRT socket --- example/build.gradle | 2 +- .../srtdroid/example/MainViewModel.kt | 148 +-- .../thibaultbee/srtdroid/example/Utils.kt | 6 - .../srtdroid/example/tests/RecvFile.kt | 77 +- .../srtdroid/example/tests/SendFile.kt | 97 +- .../srtdroid/example/tests/Test.kt | 33 +- .../srtdroid/example/tests/TestClient.kt | 53 +- .../srtdroid/example/tests/TestServer.kt | 63 +- settings.gradle | 1 + srtdroid-ktx/.gitignore | 1 + srtdroid-ktx/build.gradle | 41 + srtdroid-ktx/consumer-rules.pro | 0 srtdroid-ktx/proguard-rules.pro | 21 + srtdroid-ktx/src/main/AndroidManifest.xml | 4 + .../srtdroid/ktx/CoroutineSocket.kt | 855 ++++++++++++++++++ .../extensions/CoroutineSocketExtensions.kt | 159 ++++ 16 files changed, 1336 insertions(+), 225 deletions(-) create mode 100644 srtdroid-ktx/.gitignore create mode 100644 srtdroid-ktx/build.gradle create mode 100644 srtdroid-ktx/consumer-rules.pro create mode 100644 srtdroid-ktx/proguard-rules.pro create mode 100644 srtdroid-ktx/src/main/AndroidManifest.xml create mode 100644 srtdroid-ktx/src/main/java/io/github/thibaultbee/srtdroid/ktx/CoroutineSocket.kt create mode 100644 srtdroid-ktx/src/main/java/io/github/thibaultbee/srtdroid/ktx/extensions/CoroutineSocketExtensions.kt diff --git a/example/build.gradle b/example/build.gradle index 9595f1b9..9589bce5 100644 --- a/example/build.gradle +++ b/example/build.gradle @@ -41,7 +41,7 @@ android { } dependencies { - implementation project(':srtdroid') + implementation project(':srtdroid-ktx') implementation fileTree(dir: 'libs', include: ['*.jar']) implementation "org.jetbrains.kotlin:kotlin-stdlib-jdk7:$kotlin_version" implementation 'androidx.appcompat:appcompat:1.6.1' diff --git a/example/src/main/java/io/github/thibaultbee/srtdroid/example/MainViewModel.kt b/example/src/main/java/io/github/thibaultbee/srtdroid/example/MainViewModel.kt index b8311a66..a73636af 100644 --- a/example/src/main/java/io/github/thibaultbee/srtdroid/example/MainViewModel.kt +++ b/example/src/main/java/io/github/thibaultbee/srtdroid/example/MainViewModel.kt @@ -2,21 +2,23 @@ package io.github.thibaultbee.srtdroid.example import android.app.Application import android.content.Context +import android.util.Log import androidx.lifecycle.AndroidViewModel import androidx.lifecycle.MutableLiveData +import androidx.lifecycle.viewModelScope import io.github.thibaultbee.srtdroid.example.tests.RecvFile import io.github.thibaultbee.srtdroid.example.tests.SendFile import io.github.thibaultbee.srtdroid.example.tests.TestClient import io.github.thibaultbee.srtdroid.example.tests.TestServer -import java.util.concurrent.Executors +import kotlinx.coroutines.Job +import kotlinx.coroutines.cancelChildren +import kotlinx.coroutines.launch class MainViewModel(application: Application) : AndroidViewModel(application) { private val configuration = Configuration(getApplication()) private val sendFileName = "MyFileToSend" private val recvFileName = "RecvFile" - private val executor = Executors.newFixedThreadPool(4) - val error = MutableLiveData() val success = MutableLiveData() @@ -26,56 +28,24 @@ class MainViewModel(application: Application) : AndroidViewModel(application) { val sendFileCompletion = MutableLiveData() private val testServer = - TestServer(executor, - { msg -> - testServerCompletion.postValue(true) - success.postValue(msg) - }, - { msg -> - testClientCompletion.postValue(true) - error.postValue(msg) - } - ) + TestServer() private val testClient = - TestClient( - executor, - { msg -> - testClientCompletion.postValue(true) - success.postValue(msg) - }, - { msg -> - testClientCompletion.postValue(true) - error.postValue(msg) - } - ) + TestClient() private val recvFile = - RecvFile(sendFileName, + RecvFile( + sendFileName, recvFileName, - (getApplication() as Context).filesDir, - executor, - { msg -> - recvFileCompletion.postValue(true) - success.postValue(msg) - }, - { msg -> - recvFileCompletion.postValue(true) - error.postValue(msg) - } + (getApplication() as Context).filesDir ) - private val sendFile = - SendFile((getApplication() as Context).filesDir, - executor, - { msg -> - sendFileCompletion.postValue(true) - success.postValue(msg) - }, - { msg -> - sendFileCompletion.postValue(true) - error.postValue(msg) - } - ) + SendFile((getApplication() as Context).filesDir) + + + private var testClientJob: Job? = null + private var testServerJob: Job? = null + private var recvFileJob: Job? = null + private var sendFileJob: Job? = null /** * Sends multiple messages to a SRT server. @@ -83,11 +53,27 @@ class MainViewModel(application: Application) : AndroidViewModel(application) { * Same as SRT examples/test-c-client.c */ fun launchTestClient() { - testClient.launch(configuration.clientIP, configuration.clientPort) + if (testClientJob?.isActive == true) { + Log.w(TAG, "Test client job is already running") + return + } + testClientJob = viewModelScope.launch { + try { + testClient.run(configuration.clientIP, configuration.clientPort) + success.postValue("Client success!") + } catch (e: Exception) { + Log.e(TAG, "Client error: ${e.message}", e) + error.postValue(e.message) + } finally { + testClientCompletion.postValue(true) + } + } } fun cancelTestClient() { - testClient.cancel() + Log.i(TAG, "Canceling test client job") + testClientJob?.cancelChildren() + testClientJob = null } /** @@ -96,11 +82,27 @@ class MainViewModel(application: Application) : AndroidViewModel(application) { * Same as SRT examples/test-c-server.c */ fun launchTestServer() { - testServer.launch(configuration.serverIP, configuration.serverPort) + if (testServerJob?.isActive == true) { + Log.w(TAG, "Test server job is already running") + return + } + testServerJob = viewModelScope.launch { + try { + testServer.run(configuration.serverIP, configuration.serverPort) + success.postValue("Server success!") + } catch (e: Exception) { + Log.e(TAG, "Server error: ${e.message}", e) + error.postValue(e.message) + } finally { + testServerCompletion.postValue(true) + } + } } fun cancelTestServer() { - testServer.cancel() + Log.i(TAG, "Canceling test server job") + testServerJob?.cancel() + testServerJob = null } /** @@ -109,11 +111,27 @@ class MainViewModel(application: Application) : AndroidViewModel(application) { * Same as SRT examples/recvfile.cpp */ fun launchRecvFile() { - recvFile.launch(configuration.clientIP, configuration.clientPort) + if (recvFileJob?.isActive == true) { + Log.w(TAG, "Recv file job is already running") + return + } + recvFileJob = viewModelScope.launch { + try { + recvFile.run(configuration.clientIP, configuration.clientPort) + success.postValue("RecvFile success!") + } catch (e: Exception) { + Log.e(TAG, "RecvFile error: ${e.message}", e) + error.postValue(e.message) + } finally { + recvFileCompletion.postValue(true) + } + } } fun cancelRecvFile() { - recvFile.cancel() + Log.i(TAG, "Canceling recv file job") + recvFileJob?.cancel() + recvFileJob = null } /** @@ -123,10 +141,30 @@ class MainViewModel(application: Application) : AndroidViewModel(application) { * Same as SRT examples/sendfile.cpp */ fun launchSendFile() { - sendFile.launch(configuration.serverIP, configuration.serverPort) + if (sendFileJob?.isActive == true) { + Log.w(TAG, "Send file job is already running") + return + } + sendFileJob = viewModelScope.launch { + try { + sendFile.run(configuration.serverIP, configuration.serverPort) + success.postValue("SendFile success!") + } catch (e: Exception) { + Log.e(TAG, "SendFile error: ${e.message}", e) + error.postValue(e.message) + } finally { + sendFileCompletion.postValue(true) + } + } } fun cancelSendFile() { - sendFile.cancel() + Log.i(TAG, "Canceling send file job") + sendFileJob?.cancel() + sendFileJob = null + } + + companion object { + private val TAG = MainViewModel::class.simpleName } } \ No newline at end of file diff --git a/example/src/main/java/io/github/thibaultbee/srtdroid/example/Utils.kt b/example/src/main/java/io/github/thibaultbee/srtdroid/example/Utils.kt index 143a7d3e..ade637e5 100644 --- a/example/src/main/java/io/github/thibaultbee/srtdroid/example/Utils.kt +++ b/example/src/main/java/io/github/thibaultbee/srtdroid/example/Utils.kt @@ -37,10 +37,4 @@ object Utils { Error.clearLastError() return message } - - fun writeFile(file: File, text: String) { - FileOutputStream(file).use { - it.write(text.toByteArray()) - } - } } diff --git a/example/src/main/java/io/github/thibaultbee/srtdroid/example/tests/RecvFile.kt b/example/src/main/java/io/github/thibaultbee/srtdroid/example/tests/RecvFile.kt index fa215c61..cb0a99c0 100644 --- a/example/src/main/java/io/github/thibaultbee/srtdroid/example/tests/RecvFile.kt +++ b/example/src/main/java/io/github/thibaultbee/srtdroid/example/tests/RecvFile.kt @@ -1,56 +1,63 @@ package io.github.thibaultbee.srtdroid.example.tests import android.util.Log +import com.google.common.primitives.Ints +import com.google.common.primitives.Longs import io.github.thibaultbee.srtdroid.enums.SockOpt import io.github.thibaultbee.srtdroid.enums.Transtype import io.github.thibaultbee.srtdroid.example.Utils -import io.github.thibaultbee.srtdroid.models.Socket -import com.google.common.primitives.Ints -import com.google.common.primitives.Longs +import io.github.thibaultbee.srtdroid.ktx.CoroutineSocket +import io.github.thibaultbee.srtdroid.ktx.extensions.connect +import io.github.thibaultbee.srtdroid.ktx.extensions.recvFile +import io.github.thibaultbee.srtdroid.ktx.extensions.send +import kotlinx.coroutines.delay import java.io.File -import java.util.concurrent.ExecutorService class RecvFile( private val sendFileName: String, private val recvFileName: String, private val recvFileDir: File, - executorService: ExecutorService, - onSuccess: (String) -> Unit, - onError: (String) -> Unit -) : Test(executorService, onSuccess, onError) { +) : Test { companion object { private val TAG = RecvFile::class.simpleName } - override val testName: String = this::class.simpleName!! + override val name: String = this::class.simpleName!! - override fun launchImpl(ip: String, port: Int, socket: Socket) { + override suspend fun run(ip: String, port: Int) { Log.i(TAG, "Will get file $sendFileName from server") - - if (!socket.isValid) { - throw Exception("Invalid socket") - } - - socket.setSockFlag(SockOpt.TRANSTYPE, Transtype.FILE) - socket.connect(ip, port) - - // Request server file - socket.send(Ints.toByteArray(sendFileName.length).reversedArray()) - - socket.send(sendFileName) - - val fileSize = Longs.fromByteArray(socket.recv(Longs.BYTES).reversedArray()) - - // Where file will be written - val recvFile = File(recvFileDir, recvFileName) - if (fileSize != socket.recvFile(recvFile, 0, fileSize)) { - throw Exception("Failed to recv file: ${Utils.getErrorMessage()}") + val socket = CoroutineSocket() + + try { + socket.setSockFlag(SockOpt.TRANSTYPE, Transtype.FILE) + socket.connect(ip, port) + Log.i( + TAG, + "Is connected: ${socket.isConnected}" + ) + + // Request server file + socket.send(Ints.toByteArray(sendFileName.length).reversedArray()) + + socket.send(sendFileName) + + val array = socket.recv(Longs.BYTES) + val fileSize = Longs.fromByteArray(array.reversedArray()) + + // Where file will be written + val recvFile = File(recvFileDir, recvFileName) + Log.i(TAG, "Receiving file ${recvFile.path}") + if (fileSize != socket.recvFile(recvFile, 0, fileSize)) { + throw Exception("Failed to recv file: ${Utils.getErrorMessage()}") + } + + // If session is close too early, last msg will not be receive by server + delay(1000) + Log.i(TAG, "Received file $recvFile") + } catch (e: Exception) { + throw e + } finally { + socket.close() } - - // If session is close too early, last msg will not be receive by server - Thread.sleep(1000) - - successMsg = "Recv file is in $recvFile" } - } \ No newline at end of file diff --git a/example/src/main/java/io/github/thibaultbee/srtdroid/example/tests/SendFile.kt b/example/src/main/java/io/github/thibaultbee/srtdroid/example/tests/SendFile.kt index 41fbc1b6..de137a93 100644 --- a/example/src/main/java/io/github/thibaultbee/srtdroid/example/tests/SendFile.kt +++ b/example/src/main/java/io/github/thibaultbee/srtdroid/example/tests/SendFile.kt @@ -1,73 +1,78 @@ package io.github.thibaultbee.srtdroid.example.tests import android.util.Log -import io.github.thibaultbee.srtdroid.enums.SockOpt -import io.github.thibaultbee.srtdroid.enums.Transtype -import io.github.thibaultbee.srtdroid.example.Utils -import io.github.thibaultbee.srtdroid.models.Socket import com.google.common.primitives.Ints import com.google.common.primitives.Longs +import io.github.thibaultbee.srtdroid.enums.SockOpt +import io.github.thibaultbee.srtdroid.enums.Transtype +import io.github.thibaultbee.srtdroid.ktx.CoroutineSocket +import io.github.thibaultbee.srtdroid.ktx.extensions.bind +import io.github.thibaultbee.srtdroid.ktx.extensions.sendFile +import kotlinx.coroutines.delay import java.io.File -import java.util.concurrent.ExecutorService class SendFile( private val fileSendDir: File, - executorService: ExecutorService, - onSuccess: (String) -> Unit, - onError: (String) -> Unit -) : Test(executorService, onSuccess, onError) { +) : Test { companion object { private val TAG = SendFile::class.simpleName } - override val testName: String = this::class.simpleName!! + override val name: String = this::class.simpleName!! - override fun launchImpl(ip: String, port: Int, socket: Socket) { + override suspend fun run(ip: String, port: Int) { Log.i(TAG, "Will send requested file") + val socket = CoroutineSocket() - if (!socket.isValid) { - throw Exception("Invalid socket") - } + try { + socket.setSockFlag(SockOpt.TRANSTYPE, Transtype.FILE) - socket.setSockFlag(SockOpt.TRANSTYPE, Transtype.FILE) - socket.bind(ip, port) - socket.listen(10) + Log.i(TAG, "Will bind on $ip:$port") + socket.bind(ip, port) + socket.listen(10) - val peer = socket.accept() - val clientSocket = peer.first + Log.i(TAG, "Waiting for incoming socket on $ip:$port") + val peer = socket.accept() + val clientSocket = peer.first - // Get file name length - var array = clientSocket.recv(Ints.BYTES) - val fileNameLength = Ints.fromByteArray(array.reversedArray()) - when { - array.isNotEmpty() -> Log.i(TAG, "File name is $fileNameLength char long") - } + Log.i(TAG, "Get an incoming connection") + // Get file name length + var array = clientSocket.recv(Ints.BYTES) + val fileNameLength = Ints.fromByteArray(array.reversedArray()) + when { + array.isNotEmpty() -> Log.i(TAG, "File name is $fileNameLength char long") + } - // Get file name - array = clientSocket.recv(fileNameLength) - val fileName = String(array) - Log.i(TAG, "File name is $fileName") + // Get file name + array = clientSocket.recv(fileNameLength) + val fileName = String(array) + Log.i(TAG, "File name is $fileName") - val file = File("$fileSendDir/$fileName") - if (!file.exists()) { - Log.w(TAG, "File ${file.path} does not exist. Try to create it") - Utils.writeFile(file, "myServerFileContent. Hello Client! This is server.") - } - if (!file.exists()) { - throw Exception("Failed to get file ${file.path}") - } + val file = File("$fileSendDir/$fileName") + if (!file.exists()) { + Log.w(TAG, "File ${file.path} does not exist. Try to create it") + file.writeText("myServerFileContent. Hello Client! This is server.") + } + if (!file.exists()) { + throw Exception("Failed to get file ${file.path}") + } - // Send file size - clientSocket.send(Longs.toByteArray(file.length()).reversedArray()) + // Send file size + clientSocket.send(Longs.toByteArray(file.length()).reversedArray()) - // Send file - clientSocket.sendFile(file) + // Send file + Log.i(TAG, "Sending file ${file.path}") + clientSocket.sendFile(file) - // If session is close too early, last msg will not be receive by server - clientSocket.close() - Thread.sleep(1000) + // If session is close too early, last msg will not be receive by server + clientSocket.close() + delay(1000) - successMsg = "Sent file ${file.path}" + Log.i(TAG, "Sent file ${file.path}") + } catch (e: Exception) { + throw e + } finally { + socket.close() + } } - } \ No newline at end of file diff --git a/example/src/main/java/io/github/thibaultbee/srtdroid/example/tests/Test.kt b/example/src/main/java/io/github/thibaultbee/srtdroid/example/tests/Test.kt index 03d40b58..2cbd3f6c 100644 --- a/example/src/main/java/io/github/thibaultbee/srtdroid/example/tests/Test.kt +++ b/example/src/main/java/io/github/thibaultbee/srtdroid/example/tests/Test.kt @@ -1,33 +1,6 @@ package io.github.thibaultbee.srtdroid.example.tests -import io.github.thibaultbee.srtdroid.models.Socket -import java.util.concurrent.ExecutorService - -abstract class Test( - private val executor: ExecutorService, - protected val onSuccess: (String) -> Unit, - protected val onError: (String) -> Unit -) { - protected abstract val testName: String - private var socket: Socket? = null - protected var successMsg: String = "" - - fun launch(ip: String, port: Int) { - executor.execute { - try { - socket = Socket() - launchImpl(ip, port, socket!!) - socket?.close() - onSuccess("$testName: $successMsg") - } catch (e: Exception) { - onError("$testName: ${e.message}") - } - } - } - - abstract fun launchImpl(ip: String, port: Int, socket: Socket) - - fun cancel() { - socket?.close() - } +interface Test { + val name: String + suspend fun run(ip: String, port: Int) } diff --git a/example/src/main/java/io/github/thibaultbee/srtdroid/example/tests/TestClient.kt b/example/src/main/java/io/github/thibaultbee/srtdroid/example/tests/TestClient.kt index 5944dc15..e40754f0 100644 --- a/example/src/main/java/io/github/thibaultbee/srtdroid/example/tests/TestClient.kt +++ b/example/src/main/java/io/github/thibaultbee/srtdroid/example/tests/TestClient.kt @@ -2,38 +2,45 @@ package io.github.thibaultbee.srtdroid.example.tests import android.util.Log import io.github.thibaultbee.srtdroid.enums.SockOpt -import io.github.thibaultbee.srtdroid.models.Socket -import java.util.concurrent.ExecutorService - -class TestClient( - executorService: ExecutorService, - onSuccess: (String) -> Unit, - onError: (String) -> Unit -) : Test(executorService, onSuccess, onError) { +import io.github.thibaultbee.srtdroid.ktx.CoroutineSocket +import io.github.thibaultbee.srtdroid.ktx.extensions.connect +import io.github.thibaultbee.srtdroid.ktx.extensions.send +import kotlinx.coroutines.delay + +class TestClient : Test { + private val numOfMessages = 100 + companion object { private val TAG = TestClient::class.simpleName } - override val testName: String = this::class.simpleName!! + override val name: String = this::class.simpleName!! - override fun launchImpl(ip: String, port: Int, socket: Socket) { - Log.i(TAG, "Will send messages to the server") - val numOfMessages = 100 + override suspend fun run(ip: String, port: Int) { + val socket = CoroutineSocket() - if (!socket.isValid) { - throw Exception("Invalid socket") - } + try { + Log.i(TAG, "Will send $numOfMessages messages to the server") - socket.setSockFlag(SockOpt.SENDER, 1) - socket.connect(ip, port) + socket.setSockFlag(SockOpt.SENDER, 1) + socket.connect(ip, port) + Log.i( + TAG, + "Is connected: ${socket.isConnected}. Will send $numOfMessages messages to the server" + ) - repeat(numOfMessages) { - socket.send("This message should be sent to the other side") - } - // If session is close too early, last msg will not be receive by server - Thread.sleep(1000) + repeat(numOfMessages) { + socket.send("This message should be sent to the other side") + } + // If session is close too early, last msg will not be receive by server + delay(1000) - successMsg = "Sent $numOfMessages messages" + Log.i(TAG, "Sent $numOfMessages messages") + } catch (e: Exception) { + throw e + } finally { + socket.close() + } } } \ No newline at end of file diff --git a/example/src/main/java/io/github/thibaultbee/srtdroid/example/tests/TestServer.kt b/example/src/main/java/io/github/thibaultbee/srtdroid/example/tests/TestServer.kt index 9bce4146..491d477b 100644 --- a/example/src/main/java/io/github/thibaultbee/srtdroid/example/tests/TestServer.kt +++ b/example/src/main/java/io/github/thibaultbee/srtdroid/example/tests/TestServer.kt @@ -2,45 +2,50 @@ package io.github.thibaultbee.srtdroid.example.tests import android.util.Log import io.github.thibaultbee.srtdroid.enums.SockOpt -import io.github.thibaultbee.srtdroid.models.Socket -import java.util.concurrent.ExecutorService - -class TestServer( - executorService: ExecutorService, - onSuccess: (String) -> Unit, - onError: (String) -> Unit -) : Test(executorService, onSuccess, onError) { +import io.github.thibaultbee.srtdroid.ktx.CoroutineSocket +import io.github.thibaultbee.srtdroid.ktx.extensions.bind +import kotlinx.coroutines.delay + +class TestServer : Test { + private val numOfMessages = 100 + companion object { private val TAG = TestServer::class.simpleName } - - override val testName: String = this::class.simpleName!! - override fun launchImpl(ip: String, port: Int, socket: Socket) { - Log.i(TAG, "Waiting messages from the client") - val numOfMessages = 100 + override val name: String = this::class.simpleName!! - if (!socket.isValid) { - throw Exception("Invalid socket") - } + override suspend fun run(ip: String, port: Int) { + Log.i(TAG, "Waiting $numOfMessages messages from the client") + val socket = CoroutineSocket() - socket.setSockFlag(SockOpt.RCVSYN, true) - socket.bind(ip, port) - socket.listen(10) + try { + Log.i(TAG, "Will bind on $ip:$port") + socket.bind(ip, port) + socket.listen(10) - val peer = socket.accept() - val clientSocket = peer.first + Log.i(TAG, "Waiting for incoming socket on $ip:$port") + val peer = socket.accept() - repeat(numOfMessages) { - val message = clientSocket.recv(2048) - Log.i(TAG, "#$it >> Got msg of length ${message.size} << ${String(message)}") - } + Log.i(TAG, "Get an incoming connection") + val clientSocket = peer.first + + clientSocket.setSockFlag(SockOpt.RCVTIMEO, 3000) + repeat(numOfMessages) { + val message = clientSocket.recv(2048) + Log.i(TAG, "#$it >> Got msg of length ${message.size} << ${String(message)}") + } - // If session is close too early, last msg will not be receive by server - clientSocket.close() - Thread.sleep(1000) + // If session is close too early, last msg will not be receive by server + clientSocket.close() + delay(1000) - successMsg = "Received $numOfMessages messages (check logcat)" + Log.i(TAG, "Received $numOfMessages messages (check logcat)") + } catch (e: Exception) { + throw e + } finally { + socket.close() + } } } \ No newline at end of file diff --git a/settings.gradle b/settings.gradle index 2caa0444..395363a0 100644 --- a/settings.gradle +++ b/settings.gradle @@ -1,2 +1,3 @@ include ':srtdroid', ":example" rootProject.name='srtdroid' +include ':srtdroid-ktx' diff --git a/srtdroid-ktx/.gitignore b/srtdroid-ktx/.gitignore new file mode 100644 index 00000000..42afabfd --- /dev/null +++ b/srtdroid-ktx/.gitignore @@ -0,0 +1 @@ +/build \ No newline at end of file diff --git a/srtdroid-ktx/build.gradle b/srtdroid-ktx/build.gradle new file mode 100644 index 00000000..6d57d7dd --- /dev/null +++ b/srtdroid-ktx/build.gradle @@ -0,0 +1,41 @@ +plugins { + id 'com.android.library' + id 'org.jetbrains.kotlin.android' +} + +android { + namespace 'io.github.thibaultbee.srtdroid' + compileSdk 34 + + defaultConfig { + minSdk 19 + + testInstrumentationRunner "androidx.test.runner.AndroidJUnitRunner" + consumerProguardFiles "consumer-rules.pro" + } + + buildTypes { + release { + minifyEnabled false + proguardFiles getDefaultProguardFile('proguard-android-optimize.txt'), 'proguard-rules.pro' + } + } + compileOptions { + sourceCompatibility JavaVersion.VERSION_1_8 + targetCompatibility JavaVersion.VERSION_1_8 + } + kotlinOptions { + jvmTarget = '1.8' + } +} + +dependencies { + api project(':srtdroid') + + implementation 'androidx.core:core-ktx:1.12.0' + implementation 'androidx.appcompat:appcompat:1.6.1' + + testImplementation 'junit:junit:4.13.2' + androidTestImplementation 'androidx.test.ext:junit:1.1.5' + androidTestImplementation 'androidx.test.espresso:espresso-core:3.5.1' +} \ No newline at end of file diff --git a/srtdroid-ktx/consumer-rules.pro b/srtdroid-ktx/consumer-rules.pro new file mode 100644 index 00000000..e69de29b diff --git a/srtdroid-ktx/proguard-rules.pro b/srtdroid-ktx/proguard-rules.pro new file mode 100644 index 00000000..481bb434 --- /dev/null +++ b/srtdroid-ktx/proguard-rules.pro @@ -0,0 +1,21 @@ +# Add project specific ProGuard rules here. +# You can control the set of applied configuration files using the +# proguardFiles setting in build.gradle. +# +# For more details, see +# http://developer.android.com/guide/developing/tools/proguard.html + +# If your project uses WebView with JS, uncomment the following +# and specify the fully qualified class name to the JavaScript interface +# class: +#-keepclassmembers class fqcn.of.javascript.interface.for.webview { +# public *; +#} + +# Uncomment this to preserve the line number information for +# debugging stack traces. +#-keepattributes SourceFile,LineNumberTable + +# If you keep the line number information, uncomment this to +# hide the original source file name. +#-renamesourcefileattribute SourceFile \ No newline at end of file diff --git a/srtdroid-ktx/src/main/AndroidManifest.xml b/srtdroid-ktx/src/main/AndroidManifest.xml new file mode 100644 index 00000000..a5918e68 --- /dev/null +++ b/srtdroid-ktx/src/main/AndroidManifest.xml @@ -0,0 +1,4 @@ + + + + \ No newline at end of file diff --git a/srtdroid-ktx/src/main/java/io/github/thibaultbee/srtdroid/ktx/CoroutineSocket.kt b/srtdroid-ktx/src/main/java/io/github/thibaultbee/srtdroid/ktx/CoroutineSocket.kt new file mode 100644 index 00000000..569a04e4 --- /dev/null +++ b/srtdroid-ktx/src/main/java/io/github/thibaultbee/srtdroid/ktx/CoroutineSocket.kt @@ -0,0 +1,855 @@ +package io.github.thibaultbee.srtdroid.ktx + +import android.util.Log +import android.util.Pair +import io.github.thibaultbee.srtdroid.enums.EpollOpt +import io.github.thibaultbee.srtdroid.enums.ErrorType +import io.github.thibaultbee.srtdroid.enums.SockOpt +import io.github.thibaultbee.srtdroid.enums.SockStatus +import io.github.thibaultbee.srtdroid.models.Epoll +import io.github.thibaultbee.srtdroid.models.Error +import io.github.thibaultbee.srtdroid.models.MsgCtrl +import io.github.thibaultbee.srtdroid.models.Socket +import io.github.thibaultbee.srtdroid.models.Socket.ServerListener +import io.github.thibaultbee.srtdroid.models.Stats +import io.github.thibaultbee.srtdroid.models.rejectreason.InternalRejectReason +import io.github.thibaultbee.srtdroid.models.rejectreason.PredefinedRejectReason +import io.github.thibaultbee.srtdroid.models.rejectreason.RejectReason +import io.github.thibaultbee.srtdroid.models.rejectreason.UserDefinedRejectReason +import kotlinx.coroutines.CompletableJob +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.Job +import kotlinx.coroutines.cancelChildren +import kotlinx.coroutines.channels.awaitClose +import kotlinx.coroutines.channels.trySendBlocking +import kotlinx.coroutines.flow.callbackFlow +import kotlinx.coroutines.isActive +import kotlinx.coroutines.suspendCancellableCoroutine +import kotlinx.coroutines.withContext +import kotlinx.coroutines.withTimeout +import java.io.IOException +import java.net.BindException +import java.net.ConnectException +import java.net.InetAddress +import java.net.InetSocketAddress +import java.net.SocketException +import java.net.SocketTimeoutException +import java.nio.ByteBuffer +import kotlin.coroutines.CoroutineContext +import kotlin.coroutines.resumeWithException +import kotlin.math.min + +/** + * A coroutine-based SRT socket. + */ +class CoroutineSocket +private constructor( + private val socket: Socket +) : + CoroutineScope { + constructor() : this(Socket()) + + init { + socket.setSockFlag(SockOpt.RCVSYN, false) + socket.setSockFlag(SockOpt.SNDSYN, false) + + socket.clientListener = object : Socket.ClientListener { + override fun onConnectionLost( + ns: Socket, + error: ErrorType, + peerAddress: InetSocketAddress, + token: Int + ) { + if (hasBeenConnected) { + socketContext.completeExceptionally(ConnectException(error.toString())) + coroutineContext.cancelChildren() + } + } + } + } + + private var hasBeenConnected = false + + val socketContext: CompletableJob = Job() + + @OptIn(ExperimentalCoroutinesApi::class) + override val coroutineContext: CoroutineContext = Dispatchers.IO.limitedParallelism(1) + + /** + * Flow of incoming sockets. + * It is a hook before returning from accept. Reject an incoming connection by throwing an exception. + * Only for server sockets. + */ + val incomingSocket = callbackFlow { + val listener = object : ServerListener { + override fun onListen( + ns: Socket, + hsVersion: Int, + peerAddress: InetSocketAddress, + streamId: String + ): Int { + val channelResult = trySendBlocking(IncomingSocket(CoroutineSocket(), streamId)) + return if (channelResult.isSuccess) { + 0 + } else { + Log.e(TAG, "Rejected incoming socket") + -1 + } + } + } + socket.serverListener = listener + awaitClose { socket.serverListener = null } + } + + /** + * Check if the SRT socket is a valid SRT socket. + * + * @return true if the SRT socket is valid, otherwise false + */ + val isValid: Boolean + get() = socket.isValid + + /** + * Gets the current status of the socket. + * + * **See Also:** [srt_getsockstate](https://github.com/Haivision/srt/blob/master/docs/API/API-functions.md#srt_getsockstate) + * + * @return the current [SockStatus] + */ + val sockState: SockStatus + get() = socket.sockState + + /** + * Retrieves the remote [InetSocketAddress] to which the SRT socket is connected. + * + * **See Also:** [srt_getpeername](https://github.com/Haivision/srt/blob/master/docs/API/API-functions.md#srt_getpeername) + * + * @return the remote [InetSocketAddress] if SRT socket is connected and valid. Otherwise, it returns a null. + * @see [inetAddress] and [port] + * @throws [SocketException] if SRT socket is invalid or not connected + */ + val peerName: InetSocketAddress + get() = socket.peerName + + /** + * Retrieves the remote [InetAddress] to which the socket is connected. + * + * **See Also:** [srt_getpeername](https://github.com/Haivision/srt/blob/master/docs/API/API-functions.md#srt_getpeername) + * + * @return the remote [InetAddress] if SRT socket is connected and valid. Otherwise, it returns a null. + * @see [peerName] and [port] + * @throws [SocketException] if SRT socket is invalid or not connected + */ + val inetAddress: InetAddress + get() = socket.inetAddress + + /** + * Retrieves the port to which the SRT socket is connected. + * + * **See Also:** [srt_getpeername](https://github.com/Haivision/srt/blob/master/docs/API/API-functions.md#srt_getpeername) + * + * @return the remote port if SRT socket is connected and valid. Otherwise, it returns a 0. + * @see [peerName] and [inetAddress] + * @throws [SocketException] if SRT socket is invalid or not connected + */ + val port: Int + get() = socket.port + + /** + * Extracts the [InetSocketAddress] to which the socket was bound. + * + * **See Also:** [srt_getsockname](https://github.com/Haivision/srt/blob/master/docs/API/API-functions.md#srt_getsockname) + * + * @return if socket is bound and valid, it returns the local [InetSocketAddress]. Otherwise, it returns a null. + * @throws [SocketException] if SRT socket is invalid or not bound + * @see [localAddress] and [localPort] + */ + val sockName: InetSocketAddress + get() = socket.sockName + + /** + * Extracts the [InetAddress] to which the socket was bound. + * + * **See Also:** [srt_getsockname](https://github.com/Haivision/srt/blob/master/docs/API/API-functions.md#srt_getsockname) + * + * @return if socket is bound and valid, it returns the local [InetAddress]. Otherwise, it returns a null. + * @throws [SocketException] if SRT socket is invalid or not bound + * @see [sockName] and [localPort] + */ + val localAddress: InetAddress + get() = socket.localAddress + + /** + * Extracts the port to which the socket was bound. + * + * **See Also:** [srt_getsockname](https://github.com/Haivision/srt/blob/master/docs/API/API-functions.md#srt_getsockname) + * + * @return if socket is bound and valid, it returns the local port. Otherwise, it returns a 0. + * @throws [SocketException] if SRT socket is invalid or not bound + * @see [sockName] and [localPort] + */ + val localPort: Int + get() = socket.localPort + + /** + * Closes the socket or group and frees all used resources. + * + * **See Also:** [srt_close](https://github.com/Haivision/srt/blob/master/docs/API/API-functions.md#srt_close) + * + * @throws SocketException if close failed + */ + fun close() { + coroutineContext.cancelChildren() + socket.close() + socketContext.complete() + } + + /** + * Binds the socket to a local address. + * + * **See Also:** [srt_bind](https://github.com/Haivision/srt/blob/master/docs/API/API-functions.md#srt_bind) + * + * @param address the [InetSocketAddress] to bind to + * + * @throws BindException if bind has failed + */ + suspend fun bind(address: InetSocketAddress) = withContext(coroutineContext) { + socket.bind(address) + hasBeenConnected = true + } + + /** + * Connects a socket to a specified address and port. + * + * **See Also:** [srt_connect](https://github.com/Haivision/srt/blob/master/docs/API/API-functions.md#srt_connect) + * + * @param address the [InetSocketAddress] to connect to + * @throws ConnectException if connection has failed + */ + suspend fun connect(address: InetSocketAddress) { + execute(EpollOpt.OUT, onContinuation = { socket.connect(address) }) { + null + } + hasBeenConnected = true + } + + /** + * Performs a rendezvous connection. + * + * **See Also:** [srt_rendezvous](https://github.com/Haivision/srt/blob/master/docs/API/API-functions.md#srt_rendezvous) + * + * @param localAddress the local [InetSocketAddress] to bind to + * @param remoteAddress the remote [InetSocketAddress] to connect to + * @throws SocketException if rendezvous connection has failed + */ + suspend fun rendezVous( + localAddress: InetSocketAddress, + remoteAddress: InetSocketAddress + ) { + execute(EpollOpt.OUT, onContinuation = { socket.rendezVous(localAddress, remoteAddress) }) { + null + } + hasBeenConnected = true + } + + /** + * Sets up the listening state on a socket. + * + * **See Also:** [srt_listen](https://github.com/Haivision/srt/blob/master/docs/API/API-functions.md#srt_listen) + * + * @param backlog the number of sockets that may be allowed to wait until they are accepted + * @throws SocketException if listen failed + * @see [ServerListener.onListen] + */ + fun listen(backlog: Int) { + socket.listen(backlog) + } + + /** + * Accepts a pending connection. + * + * **See Also:** [srt_accept](https://github.com/Haivision/srt/blob/master/docs/API/API-functions.md#srt_accept) + * + * @return a pair containing the new Socket connection and the IP address and port specification of the remote device. + * @throws SocketException if returned SRT socket is not valid + */ + suspend fun accept(): Pair { + return execute(EpollOpt.IN) { + val pair = socket.accept() + Pair(CoroutineSocket(pair.first), pair.second) + } + } + + /** + * Gets the value of the given socket option. + * + * **See Also:** [srt_getsockflag](https://github.com/Haivision/srt/blob/master/docs/API/API-functions.md#srt_getsockflag) + * + * @param opt the [SockOpt] to get + * @return an object containing the [SockOpt] value. Type depends of the specified [opt]. + * @throws IOException if can't get [SockOpt] + * @see [setSockFlag] + */ + fun getSockFlag(opt: SockOpt): Any { + return socket.getSockFlag(opt) + } + + /** + * Sets the value of the given socket option. + * + * **See Also:** [srt_setsockflag](https://github.com/Haivision/srt/blob/master/docs/API/API-functions.md#srt_setsockflag) + * + * @param opt the [SockOpt] to set + * @param value the [SockOpt] value to set. Type depends of the specified [opt]. + * @throws IOException if can't set [SockOpt] + * @see [getSockFlag] + */ + fun setSockFlag(opt: SockOpt, value: Any) { + if ((opt == SockOpt.RCVSYN) || (opt == SockOpt.SNDSYN)) { + throw IllegalArgumentException("Options not supported") + } + socket.setSockFlag(opt, value) + } + + /** + * Sends a message to a remote party. + * + * It waits till it is possible to write on the socket. When this method is returned, it does + * not mean that the [msg] was sent. + * + * **See Also:** [srt_send](https://github.com/Haivision/srt/blob/master/docs/API/API-functions.md#srt_send) + * + * @param msg the [ByteBuffer] to send. It must be allocate with [ByteBuffer.allocateDirect]. It sends ByteBuffer from [ByteBuffer.position] to [ByteBuffer.limit]. + * @return the number of byte sent + * @throws SocketException if it has failed to send message + * @throws SocketTimeoutException if a timeout has been triggered + * @see [recv] + */ + suspend fun send(msg: ByteBuffer): Int { + val timeoutInMs = (socket.getSockFlag(SockOpt.SNDTIMEO) as Int).toLong() + + return execute(EpollOpt.OUT, timeoutInMs) { + socket.send(msg) + } + } + + /** + * Sends a message to a remote party. + * + * It waits till it is possible to write on the socket. When this method is returned, it does + * not mean that the [msg] was sent. + * + * **See Also:** [srt_sendmsg2](https://github.com/Haivision/srt/blob/master/docs/API/API-functions.md#srt_sendmsg2) + * + * @param msg the [ByteBuffer] to send. It must be allocate with [ByteBuffer.allocateDirect]. It sends ByteBuffer from [ByteBuffer.position] to [ByteBuffer.limit]. + * @param msgCtrl the [MsgCtrl] that contains extra parameter + * @return the number of byte sent + * @throws SocketException if it has failed to send message + * @throws SocketTimeoutException if a timeout has been triggered + * @see [recv] + */ + suspend fun send(msg: ByteBuffer, msgCtrl: MsgCtrl): Int { + val timeoutInMs = (socket.getSockFlag(SockOpt.SNDTIMEO) as Int).toLong() + + return execute(EpollOpt.OUT, timeoutInMs) { + socket.send(msg, msgCtrl) + } + } + + /** + * Sends a message to a remote party. + * + * It waits till it is possible to write on the socket. When this method is returned, it does + * not mean that the [msg] was sent. + * + * **See Also:** [srt_send](https://github.com/Haivision/srt/blob/master/docs/API/API-functions.md#srt_send) + * + * @param msg the [ByteArray] to send + * @param offset the offset of the [msg] + * @param size the size of the [msg] to send + * @return the number of byte sent + * @throws SocketException if it has failed to send message + * @throws SocketTimeoutException if a timeout has been triggered + * @see [recv] + */ + suspend fun send(msg: ByteArray, offset: Int = 0, size: Int = msg.size): Int { + val timeoutInMs = (socket.getSockFlag(SockOpt.SNDTIMEO) as Int).toLong() + + return execute(EpollOpt.OUT, timeoutInMs) { + socket.send(msg, offset, size) + } + } + + /** + * Sends a message to a remote party. + * + * It waits till it is possible to write on the socket. When this method is returned, it does + * not mean that the [msg] was sent. + * + * **See Also:** [srt_sendmsg2](https://github.com/Haivision/srt/blob/master/docs/API/API-functions.md#srt_sendmsg2) + * + * @param msg the [ByteArray] to send + * @param offset the offset of the [msg] + * @param size the size of the [msg] to send + * @param msgCtrl the [MsgCtrl] that contains extra parameter + * @return the number of byte sent + * @throws SocketException if it has failed to send message + * @throws SocketTimeoutException if a timeout has been triggered + * @see [recv] + */ + suspend fun send(msg: ByteArray, offset: Int, size: Int, msgCtrl: MsgCtrl): Int { + val timeoutInMs = (socket.getSockFlag(SockOpt.SNDTIMEO) as Int).toLong() + + return execute(EpollOpt.OUT, timeoutInMs) { + socket.send(msg, offset, size, msgCtrl) + } + } + + /** + * Received a message from a remote device + * + * It waits till it is possible to write on the socket. + * + * **See Also:** [srt_recv](https://github.com/Haivision/srt/blob/master/docs/API/API-functions.md#srt_recv) + * + * @param size Size of the expected message. + * @return the [ByteArray] that contains the received message. + * @throws SocketException if it has failed to send message + * @throws SocketTimeoutException if a timeout has been triggered + */ + suspend fun recv(size: Int): ByteArray { + val timeoutInMs = (socket.getSockFlag(SockOpt.RCVTIMEO) as Int).toLong() + + return execute(EpollOpt.IN, timeoutInMs) { + socket.recv(size) + } + } + + /** + * Received a message from a remote device + * + * **See Also:** [srt_recv](https://github.com/Haivision/srt/blob/master/docs/API/API-functions.md#srt_recv) + * + * @param buffer the [ByteArray] where received data are copied to. + * @param offset the offset in the specified [buffer]. + * @param byteCount the size of the specified [buffer]. + * @return the number of bytes received. + * @throws SocketException if it has failed to send message + * @throws SocketTimeoutException if a timeout has been triggered + */ + suspend fun recv( + buffer: ByteArray, + offset: Int = 0, + byteCount: Int = buffer.size + ): Int { + val timeoutInMs = (socket.getSockFlag(SockOpt.RCVTIMEO) as Int).toLong() + + return execute(EpollOpt.IN, timeoutInMs) { + socket.recv(buffer, offset, byteCount) + } + } + + /** + * Received a message from a remote device + * + * **See Also:** [srt_recvmsg2](https://github.com/Haivision/srt/blob/master/docs/API/API-functions.md#srt_recvmsg2) + * + * @param size Size of the expected message. + * @param msgCtrl the [MsgCtrl] that contains extra parameter + * @return the [ByteArray] that contains the received message. + * @throws SocketException if it has failed to send message + * @throws SocketTimeoutException if a timeout has been triggered + */ + suspend fun recv(size: Int, msgCtrl: MsgCtrl): ByteArray { + val timeoutInMs = (socket.getSockFlag(SockOpt.RCVTIMEO) as Int).toLong() + + return execute(EpollOpt.IN, timeoutInMs) { + socket.recv(size, msgCtrl) + } + } + + /** + * Received a message from a remote device + * + * **See Also:** [srt_recvmsg2](https://github.com/Haivision/srt/blob/master/docs/API/API-functions.md#srt_recvmsg2) + * + * @param buffer the [ByteArray] where received data are copied to. + * @param offset the offset in the specified [buffer]. + * @param byteCount the size of the specified [buffer]. + * @param msgCtrl the [MsgCtrl] that contains extra parameter + * @return the number of bytes received. + * @throws SocketException if it has failed to send message + * @throws SocketTimeoutException if a timeout has been triggered + */ + suspend fun recv( + buffer: ByteArray, + offset: Int = 0, + byteCount: Int = buffer.size, + msgCtrl: MsgCtrl + ): Int { + val timeoutInMs = (socket.getSockFlag(SockOpt.RCVTIMEO) as Int).toLong() + + return execute(EpollOpt.IN, timeoutInMs) { + socket.recv(buffer, offset, byteCount, msgCtrl) + } + } + + /** + * Sends a specified file. + * + * **See Also:** [srt_sendfile](https://github.com/Haivision/srt/blob/master/docs/API/API-functions.md#srt_sendfile) + * + * @param path the path of the file to send + * @param offset the offset used to read file from + * @param size the size of the file + * @param block the size of the single block to read at once before writing it to a file + * @return the size (>0) of the transmitted data of a file. + * @throws SocketException if it has failed to send message + * @throws SocketTimeoutException if a timeout has been triggered + * @see [recvFile] + */ + suspend fun sendFile(path: String, offset: Long = 0, size: Long, block: Int = 364000): Long { + var byteSent = 0L + while (byteSent < size) { + execute(EpollOpt.OUT) { + byteSent += socket.sendFile( + path, + offset + byteSent, + min(size - byteSent, block.toLong()), + block + ) + } + } + return byteSent + } + + /** + * Receives a file. File will be located at [path]. + * + * **See Also:** [srt_recvfile](https://github.com/Haivision/srt/blob/master/docs/API/API-functions.md#srt_recvfile) + * + * @param path the path where to write received data + * @param offset the offset used to write file + * @param size the size of the file + * @param block the size of the single block to read at once before writing it to a file + * @return the size (>0) of the received data of a file. + * @throws SocketException if it has failed to send message + * @throws SocketTimeoutException if a timeout has been triggered + * @see [sendFile] + */ + suspend fun recvFile(path: String, offset: Long = 0, size: Long, block: Int = 364000): Long { + var byteReceived = 0L + while (byteReceived < size) { + execute(EpollOpt.IN) { + byteReceived += socket.recvFile( + path, + offset + byteReceived, + min(size - byteReceived, block.toLong()), + block + ) + } + } + return byteReceived + } + + /** + * Set/get detailed reason for a failed connection attempt. + * + * @see [InternalRejectReason], [PredefinedRejectReason] and [UserDefinedRejectReason] + */ + var rejectReason: RejectReason + /** + * Get detailed reason for a failed connection attempt. + * + * **See Also:** [srt_getrejectreason](https://github.com/Haivision/srt/blob/master/docs/API/API-functions.md#srt_getrejectreason) + * + * @return the object describing the rejection reason. Could be either [InternalRejectReason], [PredefinedRejectReason] or [UserDefinedRejectReason] + */ + get() = socket.rejectReason + /** + * Set detailed reason for a failed connection attempt. You can not set [InternalRejectReason]. + * + * **See Also:** [srt_setrejectreason](https://github.com/Haivision/srt/blob/master/docs/API/API-functions.md#srt_setrejectreason) + * + * @param value the object describing the rejection reason. Could be either [InternalRejectReason], [PredefinedRejectReason] or [UserDefinedRejectReason] + * @throws [SocketException] if action has failed + */ + set(value) { + socket.rejectReason = value + } + + // Performance tracking + /** + * Reports the current statistics. + * + * **See Also:** [srt_bstats](https://github.com/Haivision/srt/blob/master/docs/API/API-functions.md#srt_bstats) + * + * @param clear true if the statistics should be cleared after retrieval + * @return the current [Stats] + */ + fun bstats(clear: Boolean) = socket.bstats(clear) + + /** + * Reports the current statistics. + * + * **See Also:** [srt_bistats](https://github.com/Haivision/srt/blob/master/docs/API/API-functions.md#srt_bistats) + * + * @param clear true if the statistics should be cleared after retrieval + * @param instantaneous true if the statistics should use instant data, not moving averages + * @return the current [Stats] + */ + fun bistats(clear: Boolean, instantaneous: Boolean) = socket.bistats(clear, instantaneous) + + // Time access + /** + * Gets the time when SRT socket was open to establish a connection. + * + * **See Also:** [srt_connection_time](https://github.com/Haivision/srt/blob/master/docs/API/API-functions.md#srt_connection_time) + * + * @return the connection time in microseconds + * @throws [SocketException] if SRT socket is not valid + */ + val connectionTime: Long + get() = socket.connectionTime + + // Android Socket like API + /** + * Sets/gets the value of the [SockOpt.RCVBUF] option for this SRT socket. + * + * **See Also:** [SRTO_RCVBUF](https://github.com/Haivision/srt/blob/master/docs/API/API-socket-options.md#SRTO_RCVBUF) + */ + var receiveBufferSize: Int + /** + * Gets the value of the [SockOpt.RCVBUF] option for this SRT socket. + * + * @return the receive buffer size in bytes + * @throws IOException if can't get [SockOpt] + */ + get() = socket.receiveBufferSize + /** + * Sets the value of the [SockOpt.RCVBUF] option for this SRT socket. + * + * @param value receive buffer size in bytes + * @throws IOException if can't set [SockOpt] + */ + set(value) { + socket.receiveBufferSize = value + } + + /** + * Sets/gets the value of the [SockOpt.SNDBUF] option for this SRT socket. + * + * **See Also:** [SRTO_SNDBUF](https://github.com/Haivision/srt/blob/master/docs/API/API-socket-options.md#SRTO_SNDBUF) + */ + var sendBufferSize: Int + /** + * Gets the value of the [SockOpt.SNDBUF] option for this SRT socket. + * + * @return the send buffer size in bytes + * @throws IOException if can't get [SockOpt] + */ + get() = socket.sendBufferSize + /** + * Sets the value of the [SockOpt.SNDBUF] option for this SRT socket. + * + * @param value send buffer size in bytes + * @throws IOException if can't set [SockOpt] + */ + set(value) { + socket.sendBufferSize = value + } + + /** + * Tests if [SockOpt.REUSEADDR] is enabled. + * + * **See Also:** [SRTO_REUSEADDR](https://github.com/Haivision/srt/blob/master/docs/API/API-socket-options.md#srto_reuseaddr) + */ + var reuseAddress: Boolean + /** + * Gets the value of the [SockOpt.REUSEADDR] option for this SRT socket. + * + * @return true if it allows the SRT socket to use the binding address used already by another SRT socket in the same application, otherwise false + * @throws IOException if can't get [SockOpt] + */ + get() = socket.reuseAddress + /** + * Sets the value of the [SockOpt.REUSEADDR] option for this SRT socket. + * + * @param value true if it allows the SRT socket to use the binding address used already by another SRT socket in the same application, otherwise false + * @throws IOException if can't set [SockOpt] + */ + set(value) { + socket.reuseAddress = value + } + + /** + * Returns setting for [SockOpt.LINGER]. + * + * **See Also:** [SRTO_LINGER](https://github.com/Haivision/srt/blob/master/docs/API/API-socket-options.md#srto_linger) + */ + var soLinger: Int + /** + * Gets the value of the [SockOpt.LINGER] option for this SRT socket. + * + * @return linger time on close in seconds + * @throws IOException if can't get [SockOpt] + */ + get() = socket.soLinger + /** + * Sets the value of the [SockOpt.LINGER] option for this SRT socket. + * + * @param value the linger time on close + * @throws IOException if can't set [SockOpt] + */ + set(value) { + socket.soLinger = value + } + + /** + * Tests if the SRT socket is bound. + * + * @return true if the SRT socket is bound, otherwise false + */ + val isBound: Boolean + get() = socket.isBound + + /** + * Tests if the SRT socket is closed. + * + * @return true if the SRT socket is closed, otherwise false + */ + val isClose: Boolean + get() = socket.isClose + + /** + * Tests if the SRT socket is connected. + * + * @return true if the SRT socket is connected, otherwise false + */ + val isConnected: Boolean + get() = socket.isConnected + + /** + * Get the size of the available data in the receive buffer. + * + * @return the size of the available data in the receive buffer + * @throws IOException if can't get [SockOpt] + */ + fun available(): Int = socket.available() + + override fun equals(other: Any?): Boolean { + return socket == other + } + + override fun hashCode(): Int { + return socket.hashCode() + } + + private suspend fun execute( + epollOpt: EpollOpt, + timeoutInMs: Long? = null, + onContinuation: () -> Unit = {}, + block: () -> T + ): T { + val epoll = Epoll() + epoll.addUSock(socket, listOf(EpollOpt.ERR, epollOpt)) + + try { + return withContext(coroutineContext) { + if (timeoutInMs == null) { + executeEpoll(epoll, onContinuation, block) + } else { + executeEpollWithTimeout(epoll, timeoutInMs, onContinuation, block) + } + } + } catch (e: Exception) { + throw e + } finally { + epoll.clearUSock() + epoll.release() + } + } + + private suspend fun executeEpollWithTimeout( + epoll: Epoll, + timeoutInMs: Long, + onContinuation: () -> Unit = {}, + block: () -> T + ): T { + return if (timeoutInMs >= 0) { + withTimeout(timeoutInMs) { + executeEpoll(epoll, onContinuation, block) + } + } else { + executeEpoll(epoll, onContinuation, block) + } + } + + private suspend fun executeEpoll( + epoll: Epoll, + onContinuation: () -> Unit = {}, + block: () -> T + ): T { + return suspendCancellableCoroutine { continuation -> + continuation.invokeOnCancellation { + epoll.clearUSock() + } + onContinuation() + while (isActive) { + val epollEvents = try { + epoll.uWait(POLLING_TIMEOUT_IN_MS) + } catch (e: Exception) { + continuation.resumeWithException(SocketException(Error.lastErrorMessage)) + return@suspendCancellableCoroutine + } + if (epollEvents.isEmpty()) { + continue + } + val socketEvents = epollEvents.filter { it.socket == socket } + if (socketEvents.isEmpty()) { + continue + } + epoll.addUSock(socket, null) // Unsubscribe from all events + + if (socketEvents.any { it.events.contains(EpollOpt.ERR) }) { + if (sockState == SockStatus.BROKEN) { + continuation.resumeWithException(SocketException("Socket is broken. Maybe due to timeout?")) + } else { + if (Error.lastError != ErrorType.SUCCESS) { + continuation.resumeWithException(SocketException(Error.lastErrorMessage)) + } else { + continuation.resumeWithException(SocketException("Epoll returned an unknown error")) + } + } + } else { + try { + if (socketEvents.any { + it.events.contains(EpollOpt.IN) || it.events.contains( + EpollOpt.OUT + ) + }) { + continuation.resumeWith(Result.success(block())) + } + } catch (e: Exception) { + continuation.resumeWithException(e) + } + } + return@suspendCancellableCoroutine + } + } + } + + companion object { + private const val TAG = "CoroutineSocket" + + private const val POLLING_TIMEOUT_IN_MS = 1000L + } + + /** + * Listening socket data class. + * Use to store the socket and the stream ID. + */ + data class IncomingSocket(val socket: CoroutineSocket, val streamId: String) +} \ No newline at end of file diff --git a/srtdroid-ktx/src/main/java/io/github/thibaultbee/srtdroid/ktx/extensions/CoroutineSocketExtensions.kt b/srtdroid-ktx/src/main/java/io/github/thibaultbee/srtdroid/ktx/extensions/CoroutineSocketExtensions.kt new file mode 100644 index 00000000..d83bbf1c --- /dev/null +++ b/srtdroid-ktx/src/main/java/io/github/thibaultbee/srtdroid/ktx/extensions/CoroutineSocketExtensions.kt @@ -0,0 +1,159 @@ +package io.github.thibaultbee.srtdroid.ktx.extensions + +import io.github.thibaultbee.srtdroid.enums.SockStatus +import io.github.thibaultbee.srtdroid.ktx.CoroutineSocket +import java.io.File +import java.net.BindException +import java.net.ConnectException +import java.net.InetAddress +import java.net.InetSocketAddress +import java.net.SocketException +import java.net.SocketTimeoutException + + +/** + * Tests if the SRT socket is connected. + * + * @return true if the SRT socket is connected, otherwise false + */ +val CoroutineSocket.isConnected: Boolean + get() = sockState == SockStatus.CONNECTED + + +/** + * Binds the socket to a local address. + * + * **See Also:** [srt_bind](https://github.com/Haivision/srt/blob/master/docs/API/API-functions.md#srt_bind) + * + * @param address the address to bind to + * @param port the port to bind to + * @throws BindException if bind has failed + */ +suspend fun CoroutineSocket.bind(address: String, port: Int) = + bind(InetSocketAddress(address, port)) + +/** + * Binds the socket to a local address. + * + * **See Also:** [srt_bind](https://github.com/Haivision/srt/blob/master/docs/API/API-functions.md#srt_bind) + * + * @param address the [InetAddress] to bind to + * @param port the port to bind to + * + * @throws BindException if bind has failed + */ +suspend fun CoroutineSocket.bind(address: InetAddress, port: Int) = + bind(InetSocketAddress(address, port)) + +/** + * Connects a socket to a specified address and port. + * + * **See Also:** [srt_connect](https://github.com/Haivision/srt/blob/master/docs/API/API-functions.md#srt_connect) + * + * @param address the address to connect to + * @param port the port to connect to + * @throws ConnectException if connection has failed + */ +suspend fun CoroutineSocket.connect(address: String, port: Int) = + connect(InetSocketAddress(address, port)) + +/** + * Connects a socket to a specified address and port. + * + * **See Also:** [srt_connect](https://github.com/Haivision/srt/blob/master/docs/API/API-functions.md#srt_connect) + * + * @param address the [InetAddress] to connect to + * @param port the port to connect to + * @throws ConnectException if connection has failed + */ +suspend fun CoroutineSocket.connect(address: InetAddress, port: Int) = + connect(InetSocketAddress(address, port)) + + +/** + * Performs a rendezvous connection. + * + * **See Also:** [srt_rendezvous](https://github.com/Haivision/srt/blob/master/docs/API/API-functions.md#srt_rendezvous) + * + * @param localAddress the local address to bind to + * @param remoteAddress the remote address to connect to + * @throws SocketException if rendezvous connection has failed + */ +suspend fun CoroutineSocket.rendezVous(localAddress: String, remoteAddress: String, port: Int) = + rendezVous( + InetSocketAddress(localAddress, port), + InetSocketAddress(remoteAddress, port) + ) + +/** + * Performs a rendezvous connection. + * + * **See Also:** [srt_rendezvous](https://github.com/Haivision/srt/blob/master/docs/API/API-functions.md#srt_rendezvous) + * + * @param localAddress the local [InetAddress] to bind to + * @param remoteAddress the remote [InetAddress] to connect to + * @throws SocketException if rendezvous connection has failed + */ +suspend fun CoroutineSocket.rendezVous( + localAddress: InetAddress, + remoteAddress: InetAddress, + port: Int +) = rendezVous( + InetSocketAddress(localAddress, port), + InetSocketAddress(remoteAddress, port) +) + +/** + * Sends a message to a remote party. + * + * **See Also:** [srt_send](https://github.com/Haivision/srt/blob/master/docs/API/API-functions.md#srt_send) + * + * @param msg the [String] to send + * @return the number of byte sent + * @throws SocketException if it has failed to send message + * @throws SocketTimeoutException if a timeout has been triggered + * @see [recv] + */ +suspend fun CoroutineSocket.send(msg: String) = send(msg.toByteArray()) + +/** + * Sends a specified file. + * + * **See Also:** [srt_sendfile](https://github.com/Haivision/srt/blob/master/docs/API/API-functions.md#srt_sendfile) + * + * @param file the [File] to send + * @param offset the offset used to read file from + * @param size the size of the file + * @param block the size of the single block to read at once before writing it to a file + * @return the size (>0) of the transmitted data of a file. + * @throws SocketException if it has failed to send message + * @throws SocketTimeoutException if a timeout has been triggered + * @see [recvFile] + */ +suspend fun CoroutineSocket.sendFile( + file: File, + offset: Long = 0, + size: Long = file.length(), + block: Int = 364000 +) = sendFile(file.path, offset, size, block) + +/** + * Receives a file. File is create in [file]. + * + * **See Also:** [srt_recvfile](https://github.com/Haivision/srt/blob/master/docs/API/API-functions.md#srt_recvfile) + * + * @param file the [File] where to write received data + * @param offset the offset used to write file + * @param size the size of the file + * @param block the size of the single block to read at once before writing it to a file + * @return the size (>0) of the received data of a file. + * @throws SocketException if it has failed to send message + * @throws SocketTimeoutException if a timeout has been triggered + * @see [sendFile] + */ +suspend fun CoroutineSocket.recvFile( + file: File, + offset: Long = 0, + size: Long, + block: Int = 7280000 +) = recvFile(file.path, offset, size, block) \ No newline at end of file From a3759db676da5227057addb9b7eb7782e9f5fea5 Mon Sep 17 00:00:00 2001 From: ThibaultBee <37510686+ThibaultBee@users.noreply.github.com> Date: Mon, 8 Jul 2024 16:58:53 +0200 Subject: [PATCH 2/2] feat(*): add SrtUrl support for CoroutineSocket --- .../srtdroid/ktx/CoroutineSocket.kt | 7 +- .../extensions/CoroutineSocketExtensions.kt | 89 +++++++++++++++++-- .../srtdroid/extensions/SocketExtensions.kt | 6 +- .../srtdroid/interfaces/ConfigurableSocket.kt | 11 +++ .../thibaultbee/srtdroid/models/Socket.kt | 7 +- .../thibaultbee/srtdroid/models/SrtUrl.kt | 19 +++- 6 files changed, 122 insertions(+), 17 deletions(-) create mode 100644 srtdroid/src/main/java/io/github/thibaultbee/srtdroid/interfaces/ConfigurableSocket.kt diff --git a/srtdroid-ktx/src/main/java/io/github/thibaultbee/srtdroid/ktx/CoroutineSocket.kt b/srtdroid-ktx/src/main/java/io/github/thibaultbee/srtdroid/ktx/CoroutineSocket.kt index 569a04e4..7ee4deec 100644 --- a/srtdroid-ktx/src/main/java/io/github/thibaultbee/srtdroid/ktx/CoroutineSocket.kt +++ b/srtdroid-ktx/src/main/java/io/github/thibaultbee/srtdroid/ktx/CoroutineSocket.kt @@ -6,6 +6,7 @@ import io.github.thibaultbee.srtdroid.enums.EpollOpt import io.github.thibaultbee.srtdroid.enums.ErrorType import io.github.thibaultbee.srtdroid.enums.SockOpt import io.github.thibaultbee.srtdroid.enums.SockStatus +import io.github.thibaultbee.srtdroid.interfaces.ConfigurableSocket import io.github.thibaultbee.srtdroid.models.Epoll import io.github.thibaultbee.srtdroid.models.Error import io.github.thibaultbee.srtdroid.models.MsgCtrl @@ -48,7 +49,7 @@ class CoroutineSocket private constructor( private val socket: Socket ) : - CoroutineScope { + ConfigurableSocket, CoroutineScope { constructor() : this(Socket()) init { @@ -292,7 +293,7 @@ private constructor( * @throws IOException if can't get [SockOpt] * @see [setSockFlag] */ - fun getSockFlag(opt: SockOpt): Any { + override fun getSockFlag(opt: SockOpt): Any { return socket.getSockFlag(opt) } @@ -306,7 +307,7 @@ private constructor( * @throws IOException if can't set [SockOpt] * @see [getSockFlag] */ - fun setSockFlag(opt: SockOpt, value: Any) { + override fun setSockFlag(opt: SockOpt, value: Any) { if ((opt == SockOpt.RCVSYN) || (opt == SockOpt.SNDSYN)) { throw IllegalArgumentException("Options not supported") } diff --git a/srtdroid-ktx/src/main/java/io/github/thibaultbee/srtdroid/ktx/extensions/CoroutineSocketExtensions.kt b/srtdroid-ktx/src/main/java/io/github/thibaultbee/srtdroid/ktx/extensions/CoroutineSocketExtensions.kt index d83bbf1c..8310dad5 100644 --- a/srtdroid-ktx/src/main/java/io/github/thibaultbee/srtdroid/ktx/extensions/CoroutineSocketExtensions.kt +++ b/srtdroid-ktx/src/main/java/io/github/thibaultbee/srtdroid/ktx/extensions/CoroutineSocketExtensions.kt @@ -1,7 +1,7 @@ package io.github.thibaultbee.srtdroid.ktx.extensions -import io.github.thibaultbee.srtdroid.enums.SockStatus import io.github.thibaultbee.srtdroid.ktx.CoroutineSocket +import io.github.thibaultbee.srtdroid.models.SrtUrl import java.io.File import java.net.BindException import java.net.ConnectException @@ -10,15 +10,36 @@ import java.net.InetSocketAddress import java.net.SocketException import java.net.SocketTimeoutException +/** + * Binds the socket to a local address. + * + * **See Also:** [srt_bind](https://github.com/Haivision/srt/blob/master/docs/API/API-functions.md#srt_bind) + * + * @param url the URL to bind to in FFmpeg format srt://hostname:port[?options] + * + * @throws BindException if bind has failed + */ +suspend fun CoroutineSocket.bind(url: String) = bind(SrtUrl(url)) /** - * Tests if the SRT socket is connected. + * Binds the socket to a local address. + * + * **See Also:** [srt_bind](https://github.com/Haivision/srt/blob/master/docs/API/API-functions.md#srt_bind) + * + * @param srtUrl the URL to bind to in FFmpeg format srt://hostname:port[?options] * - * @return true if the SRT socket is connected, otherwise false + * @throws BindException if bind has failed */ -val CoroutineSocket.isConnected: Boolean - get() = sockState == SockStatus.CONNECTED +suspend fun CoroutineSocket.bind(srtUrl: SrtUrl) { + if (srtUrl.mode != null) { + require(srtUrl.mode != SrtUrl.Mode.CALLER) { "Bind is only for `listener` or `rendezvous` mode but ${srtUrl.mode}" } + } + srtUrl.preApplyTo(this) + srtUrl.preBindApplyTo(this) + bind(srtUrl.hostname, srtUrl.port) + srtUrl.postApplyTo(this) +} /** * Binds the socket to a local address. @@ -45,6 +66,35 @@ suspend fun CoroutineSocket.bind(address: String, port: Int) = suspend fun CoroutineSocket.bind(address: InetAddress, port: Int) = bind(InetSocketAddress(address, port)) + +/** + * Connects a socket to an URL. + * + * **See Also:** [srt_connect](https://github.com/Haivision/srt/blob/master/docs/API/API-functions.md#srt_connect) + * + * @param url the URL to connect to in FFmpeg format srt://hostname:port[?options] + * @throws ConnectException if connection has failed + */ +suspend fun CoroutineSocket.connect(url: String) = connect(SrtUrl(url)) + +/** + * Connects a socket to an URL. + * + * **See Also:** [srt_connect](https://github.com/Haivision/srt/blob/master/docs/API/API-functions.md#srt_connect) + * + * @param srtUrl the URL to connect to in FFmpeg format srt://hostname:port[?options] + * @throws ConnectException if connection has failed + */ +suspend fun CoroutineSocket.connect(srtUrl: SrtUrl) { + if (srtUrl.mode != null) { + require(srtUrl.mode != SrtUrl.Mode.LISTENER) { "Connect is only for `caller` or `rendezvous` mode but ${srtUrl.mode}" } + } + + srtUrl.preApplyTo(this) + connect(srtUrl.hostname, srtUrl.port) + srtUrl.postApplyTo(this) +} + /** * Connects a socket to a specified address and port. * @@ -69,6 +119,35 @@ suspend fun CoroutineSocket.connect(address: String, port: Int) = suspend fun CoroutineSocket.connect(address: InetAddress, port: Int) = connect(InetSocketAddress(address, port)) +/** + * Performs a rendezvous connection. + * + * **See Also:** [srt_rendezvous](https://github.com/Haivision/srt/blob/master/docs/API/API-functions.md#srt_rendezvous) + * + * @param url the URL to rendezvous to in FFmpeg format srt://hostname:port[?options] + * @throws SocketException if rendezvous connection has failed + */ +suspend fun CoroutineSocket.rendezVous(url: String) = rendezVous(SrtUrl(url)) + +/** + * Performs a rendezvous connection. + * + * **See Also:** [srt_rendezvous](https://github.com/Haivision/srt/blob/master/docs/API/API-functions.md#srt_rendezvous) + * + * @param srtUrl the URL to rendezvous to in FFmpeg format srt://hostname:port[?options] + * @throws SocketException if rendezvous connection has failed + */ +suspend fun CoroutineSocket.rendezVous( + srtUrl: SrtUrl +) { + if (srtUrl.mode != null) { + require(srtUrl.mode == SrtUrl.Mode.RENDEZ_VOUS) { "Connect is only for `caller` or `rendezvous` mode but ${srtUrl.mode}" } + } + + srtUrl.preApplyTo(this) + rendezVous(srtUrl.hostname, srtUrl.hostname, srtUrl.port) + srtUrl.postApplyTo(this) +} /** * Performs a rendezvous connection. diff --git a/srtdroid/src/main/java/io/github/thibaultbee/srtdroid/extensions/SocketExtensions.kt b/srtdroid/src/main/java/io/github/thibaultbee/srtdroid/extensions/SocketExtensions.kt index e183025d..aa13e9f1 100644 --- a/srtdroid/src/main/java/io/github/thibaultbee/srtdroid/extensions/SocketExtensions.kt +++ b/srtdroid/src/main/java/io/github/thibaultbee/srtdroid/extensions/SocketExtensions.kt @@ -28,7 +28,7 @@ fun Socket.bind(url: String) = bind(SrtUrl(url)) */ fun Socket.bind(srtUrl: SrtUrl) { if (srtUrl.mode != null) { - require(srtUrl.mode != SrtUrl.Mode.CALLER) { "Bind is only for `listener` or `rendezvous` mode but ${srtUrl.mode.value}" } + require(srtUrl.mode != SrtUrl.Mode.CALLER) { "Bind is only for `listener` or `rendezvous` mode but ${srtUrl.mode}" } } srtUrl.preApplyTo(this) @@ -57,7 +57,7 @@ fun Socket.connect(url: String) = connect(SrtUrl(url)) */ fun Socket.connect(srtUrl: SrtUrl) { if (srtUrl.mode != null) { - require(srtUrl.mode != SrtUrl.Mode.LISTENER) { "Connect is only for `caller` or `rendezvous` mode but ${srtUrl.mode.value}" } + require(srtUrl.mode != SrtUrl.Mode.LISTENER) { "Connect is only for `caller` or `rendezvous` mode but ${srtUrl.mode}" } } srtUrl.preApplyTo(this) @@ -87,7 +87,7 @@ fun Socket.rendezVous( srtUrl: SrtUrl ) { if (srtUrl.mode != null) { - require(srtUrl.mode == SrtUrl.Mode.RENDEZ_VOUS) { "Connect is only for `caller` or `rendezvous` mode but ${srtUrl.mode.value}" } + require(srtUrl.mode == SrtUrl.Mode.RENDEZ_VOUS) { "Connect is only for `caller` or `rendezvous` mode but ${srtUrl.mode}" } } srtUrl.preApplyTo(this) diff --git a/srtdroid/src/main/java/io/github/thibaultbee/srtdroid/interfaces/ConfigurableSocket.kt b/srtdroid/src/main/java/io/github/thibaultbee/srtdroid/interfaces/ConfigurableSocket.kt new file mode 100644 index 00000000..f0888093 --- /dev/null +++ b/srtdroid/src/main/java/io/github/thibaultbee/srtdroid/interfaces/ConfigurableSocket.kt @@ -0,0 +1,11 @@ +package io.github.thibaultbee.srtdroid.interfaces + +import io.github.thibaultbee.srtdroid.enums.SockOpt + +/** + * A convenient interface to get and set socket options + */ +interface ConfigurableSocket { + fun getSockFlag(opt: SockOpt): Any + fun setSockFlag(opt: SockOpt, value: Any) +} \ No newline at end of file diff --git a/srtdroid/src/main/java/io/github/thibaultbee/srtdroid/models/Socket.kt b/srtdroid/src/main/java/io/github/thibaultbee/srtdroid/models/Socket.kt index d19ed876..a9132733 100644 --- a/srtdroid/src/main/java/io/github/thibaultbee/srtdroid/models/Socket.kt +++ b/srtdroid/src/main/java/io/github/thibaultbee/srtdroid/models/Socket.kt @@ -21,6 +21,7 @@ import io.github.thibaultbee.srtdroid.enums.ErrorType import io.github.thibaultbee.srtdroid.enums.RejectReasonCode import io.github.thibaultbee.srtdroid.enums.SockOpt import io.github.thibaultbee.srtdroid.enums.SockStatus +import io.github.thibaultbee.srtdroid.interfaces.ConfigurableSocket import io.github.thibaultbee.srtdroid.models.rejectreason.InternalRejectReason import io.github.thibaultbee.srtdroid.models.rejectreason.PredefinedRejectReason import io.github.thibaultbee.srtdroid.models.rejectreason.RejectReason @@ -45,7 +46,7 @@ import java.nio.ByteBuffer * Once it has been called, you must release Srt context with [Srt.cleanUp] when application leaves. */ class Socket -private constructor(private val srtsocket: Int) : Closeable { +private constructor(private val srtsocket: Int) : ConfigurableSocket, Closeable { companion object { @JvmStatic private external fun nativeCreateSocket(): Int @@ -412,7 +413,7 @@ private constructor(private val srtsocket: Int) : Closeable { * @throws IOException if can't get [SockOpt] * @see [setSockFlag] */ - fun getSockFlag(opt: SockOpt): Any { + override fun getSockFlag(opt: SockOpt): Any { return nativeGetSockFlag(opt) ?: throw IOException(Error.lastErrorMessage) } @@ -428,7 +429,7 @@ private constructor(private val srtsocket: Int) : Closeable { * @throws IOException if can't set [SockOpt] * @see [getSockFlag] */ - fun setSockFlag(opt: SockOpt, value: Any) { + override fun setSockFlag(opt: SockOpt, value: Any) { if (nativeSetSockFlag(opt, value) != 0) { throw IOException(Error.lastErrorMessage) } diff --git a/srtdroid/src/main/java/io/github/thibaultbee/srtdroid/models/SrtUrl.kt b/srtdroid/src/main/java/io/github/thibaultbee/srtdroid/models/SrtUrl.kt index da260936..30c34438 100644 --- a/srtdroid/src/main/java/io/github/thibaultbee/srtdroid/models/SrtUrl.kt +++ b/srtdroid/src/main/java/io/github/thibaultbee/srtdroid/models/SrtUrl.kt @@ -21,6 +21,7 @@ import io.github.thibaultbee.srtdroid.Srt import io.github.thibaultbee.srtdroid.enums.SockOpt import io.github.thibaultbee.srtdroid.enums.Transtype import io.github.thibaultbee.srtdroid.extensions.toBoolean +import io.github.thibaultbee.srtdroid.interfaces.ConfigurableSocket import java.security.InvalidParameterException /** @@ -197,7 +198,11 @@ data class SrtUrl( } } - internal fun preBindApplyTo(socket: Socket) { + /** + * Sets pre configuration for binding socket. + * Internal purpose only. + */ + fun preBindApplyTo(socket: ConfigurableSocket) { iptos?.let { socket.setSockFlag(SockOpt.IPTOS, it) } ipttl?.let { socket.setSockFlag(SockOpt.IPTTL, it) } maxSegmentSize?.let { socket.setSockFlag(SockOpt.MSS, it) } @@ -207,7 +212,11 @@ data class SrtUrl( recvBufferSize?.let { socket.setSockFlag(SockOpt.RCVBUF, it) } } - internal fun preApplyTo(socket: Socket) { + /** + * Sets pre configuration for socket. + * Internal purpose only. + */ + fun preApplyTo(socket: ConfigurableSocket) { connectTimeoutInMs?.let { socket.setSockFlag(SockOpt.CONNTIMEO, it) } flightFlagSize?.let { socket.setSockFlag(SockOpt.FC, it) } @@ -236,7 +245,11 @@ data class SrtUrl( enableTimestampBasedPacketDelivery?.let { socket.setSockFlag(SockOpt.TSBPDMODE, it) } } - internal fun postApplyTo(socket: Socket) { + /** + * Sets post configuration for socket. + * Internal purpose only. + */ + fun postApplyTo(socket: ConfigurableSocket) { inputBandwidth?.let { socket.setSockFlag(SockOpt.INPUTBW, it) } maxBandwidth?.let { socket.setSockFlag(SockOpt.MAXBW, it) } overheadBandwidth?.let { socket.setSockFlag(SockOpt.OHEADBW, it) }