diff --git a/examples/src/main/kotlin/io.zenoh/ZPut.kt b/examples/src/main/kotlin/io.zenoh/ZPut.kt index 447839aff..18af771bb 100644 --- a/examples/src/main/kotlin/io.zenoh/ZPut.kt +++ b/examples/src/main/kotlin/io.zenoh/ZPut.kt @@ -18,8 +18,8 @@ import com.github.ajalt.clikt.core.CliktCommand import com.github.ajalt.clikt.parameters.options.* import io.zenoh.keyexpr.intoKeyExpr import io.zenoh.prelude.SampleKind -import io.zenoh.publication.CongestionControl -import io.zenoh.publication.Priority +import io.zenoh.prelude.CongestionControl +import io.zenoh.prelude.Priority class ZPut(private val emptyArgs: Boolean) : CliktCommand( help = "Zenoh Put example" diff --git a/zenoh-jni/src/put.rs b/zenoh-jni/src/put.rs index e0cbbe14a..180771beb 100644 --- a/zenoh-jni/src/put.rs +++ b/zenoh-jni/src/put.rs @@ -104,8 +104,8 @@ pub(crate) fn decode_priority(priority: jint) -> Result { pub(crate) fn decode_congestion_control(congestion_control: jint) -> Result { match congestion_control { - 0 => Ok(CongestionControl::Block), - 1 => Ok(CongestionControl::Drop), + 1 => Ok(CongestionControl::Block), + 0 => Ok(CongestionControl::Drop), _value => Err(Error::Session(format!( "Unknown congestion control '{_value}'." ))), diff --git a/zenoh-jni/src/query.rs b/zenoh-jni/src/query.rs index 3af7122af..c3125af0c 100644 --- a/zenoh-jni/src/query.rs +++ b/zenoh-jni/src/query.rs @@ -16,7 +16,7 @@ use std::{mem, ops::Deref, sync::Arc}; use jni::{ objects::{GlobalRef, JByteArray, JClass, JPrimitiveArray, JValue}, - sys::{jboolean, jint, jlong}, + sys::{jboolean, jbyte, jint, jlong}, JNIEnv, }; use zenoh::{ @@ -71,6 +71,7 @@ pub(crate) unsafe extern "C" fn Java_io_zenoh_jni_JNIQuery_replySuccessViaJNI( sample_kind: jint, timestamp_enabled: jboolean, timestamp_ntp_64: jlong, + qos: jbyte, attachment: JByteArray, ) { let key_expr = Arc::from_raw(key_expr_ptr); @@ -84,6 +85,7 @@ pub(crate) unsafe extern "C" fn Java_io_zenoh_jni_JNIQuery_replySuccessViaJNI( sample_kind, timestamp_enabled, timestamp_ntp_64, + qos, ) { Ok(sample) => sample, Err(err) => { diff --git a/zenoh-jni/src/reply.rs b/zenoh-jni/src/reply.rs index ad5a0238e..40383eaaf 100644 --- a/zenoh-jni/src/reply.rs +++ b/zenoh-jni/src/reply.rs @@ -26,8 +26,8 @@ use zenoh::{ value::Value, }; -use crate::errors::Result; use crate::{errors::Error, utils::attachment_to_vec}; +use crate::{errors::Result, sample::qos_into_jbyte}; pub(crate) fn on_reply( mut env: JNIEnv, @@ -74,7 +74,7 @@ fn on_reply_success( let result = match env.call_method( callback_global_ref, "run", - "(Ljava/lang/String;ZJ[BIIJZ[B)V", + "(Ljava/lang/String;ZJ[BIIJZB[B)V", &[ JValue::from(&zenoh_id), JValue::from(true), @@ -84,6 +84,7 @@ fn on_reply_success( JValue::from(kind), JValue::from(timestamp as i64), JValue::from(is_valid), + JValue::from(qos_into_jbyte(sample.qos)), JValue::from(&attachment_bytes), ], ) { diff --git a/zenoh-jni/src/sample.rs b/zenoh-jni/src/sample.rs index 1d8dd23fb..43495c19a 100644 --- a/zenoh-jni/src/sample.rs +++ b/zenoh-jni/src/sample.rs @@ -17,17 +17,18 @@ use crate::{ value::decode_value, }; use jni::{ - objects::JByteArray, - sys::{jboolean, jint, jlong}, + objects::{JByteArray, JClass}, + sys::{jboolean, jbyte, jint, jlong}, JNIEnv, }; use uhlc::{Timestamp, ID, NTP64}; use zenoh::{ prelude::{KeyExpr, SampleKind}, - sample::Sample, + sample::{QoS, Sample}, }; /// Attempts to reconstruct a Zenoh [Sample] from the Java/Kotlin fields specified. +#[allow(clippy::too_many_arguments)] pub(crate) fn decode_sample( env: &mut JNIEnv, key_expr: KeyExpr<'static>, @@ -36,6 +37,7 @@ pub(crate) fn decode_sample( sample_kind: jint, timestamp_enabled: jboolean, timestamp_ntp_64: jlong, + qos: jbyte, ) -> Result { let value = decode_value(env, payload, encoding)?; let mut sample = Sample::new(key_expr, value); @@ -45,6 +47,7 @@ pub(crate) fn decode_sample( } else { None }; + sample.qos = qos_from_jbyte(qos); Ok(sample) } @@ -58,3 +61,50 @@ pub(crate) fn decode_sample_kind(sample_kind: jint) -> Result { ))), } } + +pub fn qos_from_jbyte(qos: jbyte) -> QoS { + unsafe { std::mem::transmute::(qos) } +} + +pub fn qos_into_jbyte(qos: QoS) -> jbyte { + unsafe { std::mem::transmute::(qos) } +} + +#[no_mangle] +#[allow(non_snake_case)] +pub extern "C" fn Java_io_zenoh_jni_JNIQoS_getPriorityViaJNI( + _env: JNIEnv, + _class: JClass, + qos: jbyte, +) -> jint { + qos_from_jbyte(qos).priority() as jint +} + +#[no_mangle] +#[allow(non_snake_case)] +pub extern "C" fn Java_io_zenoh_jni_JNIQoS_getCongestionControlViaJNI( + _env: JNIEnv, + _class: JClass, + qos: jbyte, +) -> jint { + qos_from_jbyte(qos).congestion_control() as jint +} + +#[no_mangle] +#[allow(non_snake_case)] +pub extern "C" fn Java_io_zenoh_jni_JNIQoS_getExpressdViaJNI( + _env: JNIEnv, + _class: JClass, + qos: jbyte, +) -> jboolean { + qos_from_jbyte(qos).express() as jboolean +} + +#[no_mangle] +#[allow(non_snake_case)] +pub extern "C" fn Java_io_zenoh_jni_JNIQoS_00024Companion_getDefaultQoSViaJNI( + _env: JNIEnv, + _class: JClass, +) -> jbyte { + qos_into_jbyte(QoS::default()) +} diff --git a/zenoh-jni/src/subscriber.rs b/zenoh-jni/src/subscriber.rs index 169e55455..2d77bdc47 100644 --- a/zenoh-jni/src/subscriber.rs +++ b/zenoh-jni/src/subscriber.rs @@ -22,11 +22,14 @@ use jni::{ use zenoh::prelude::r#sync::*; use zenoh::subscriber::Subscriber; -use crate::utils::{get_callback_global_ref, get_java_vm, load_on_close}; use crate::{ errors::{Error, Result}, utils::attachment_to_vec, }; +use crate::{ + sample::qos_into_jbyte, + utils::{get_callback_global_ref, get_java_vm, load_on_close}, +}; /// Frees the memory associated with a Zenoh subscriber raw pointer via JNI. /// @@ -134,7 +137,7 @@ pub(crate) unsafe fn declare_subscriber( match env.call_method( &callback_global_ref, "run", - "(J[BIIJZ[B)V", + "(J[BIIJZB[B)V", &[ JValue::from(key_expr_ptr as jlong), JValue::from(&byte_array), @@ -142,6 +145,7 @@ pub(crate) unsafe fn declare_subscriber( JValue::from(kind), JValue::from(timestamp as i64), JValue::from(is_valid), + JValue::from(qos_into_jbyte(sample.qos)), JValue::from(&attachment_bytes), ], ) { diff --git a/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/jni/JNIPublisher.kt b/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/jni/JNIPublisher.kt index 16e375460..451954ad5 100644 --- a/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/jni/JNIPublisher.kt +++ b/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/jni/JNIPublisher.kt @@ -15,8 +15,8 @@ package io.zenoh.jni import io.zenoh.* -import io.zenoh.publication.CongestionControl -import io.zenoh.publication.Priority +import io.zenoh.prelude.CongestionControl +import io.zenoh.prelude.Priority import io.zenoh.sample.Attachment import io.zenoh.value.Value @@ -64,7 +64,7 @@ internal class JNIPublisher(private val ptr: Long) { * @return A [Result] with the status of the operation. */ fun setCongestionControl(congestionControl: CongestionControl): Result = runCatching { - setCongestionControlViaJNI(congestionControl.ordinal, ptr) + setCongestionControlViaJNI(congestionControl.value, ptr) } /** diff --git a/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/jni/JNIQoS.kt b/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/jni/JNIQoS.kt new file mode 100644 index 000000000..102ce4460 --- /dev/null +++ b/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/jni/JNIQoS.kt @@ -0,0 +1,43 @@ +// +// Copyright (c) 2024 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +package io.zenoh.jni + +import io.zenoh.prelude.CongestionControl; +import io.zenoh.prelude.Priority; + +internal class JNIQoS internal constructor(internal val qos: Byte) { + + internal constructor(): this(getDefaultQoSViaJNI()) + + fun getExpress(): Boolean { + return getExpressViaJNI(qos) + } + + fun getCongestionControl(): CongestionControl { + return CongestionControl.fromInt(getCongestionControlViaJNI(qos)) + } + + fun getPriority(): Priority { + return Priority.fromInt(getPriorityViaJNI(qos)) + } + + companion object { + private external fun getDefaultQoSViaJNI(): Byte + } + + private external fun getPriorityViaJNI(_qos: Byte): Int + private external fun getCongestionControlViaJNI(_qos: Byte): Int + private external fun getExpressViaJNI(_qos:Byte): Boolean +} diff --git a/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/jni/JNIQuery.kt b/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/jni/JNIQuery.kt index 9ce202020..f0943584a 100644 --- a/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/jni/JNIQuery.kt +++ b/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/jni/JNIQuery.kt @@ -36,6 +36,7 @@ internal class JNIQuery(private val ptr: Long) { sample.kind.ordinal, timestampEnabled, if (timestampEnabled) sample.timestamp!!.ntpValue() else 0, + sample.qos.jniQoS.qos, sample.attachment?.let { encodeAttachment(it) }, ) } @@ -57,6 +58,7 @@ internal class JNIQuery(private val ptr: Long) { sampleKind: Int, timestampEnabled: Boolean, timestampNtp64: Long, + qos: Byte, attachment: ByteArray?, ) diff --git a/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/jni/JNISession.kt b/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/jni/JNISession.kt index 96a81f162..20602a10a 100644 --- a/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/jni/JNISession.kt +++ b/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/jni/JNISession.kt @@ -24,6 +24,7 @@ import io.zenoh.jni.callbacks.JNISubscriberCallback import io.zenoh.keyexpr.KeyExpr import io.zenoh.prelude.Encoding import io.zenoh.prelude.SampleKind +import io.zenoh.prelude.QoS import io.zenoh.publication.Publisher import io.zenoh.publication.Put import io.zenoh.query.* @@ -61,7 +62,7 @@ internal class JNISession { val publisherRawPtr = declarePublisherViaJNI( builder.keyExpr.jniKeyExpr!!.ptr, sessionPtr.get(), - builder.congestionControl.ordinal, + builder.congestionControl.value, builder.priority.value, ) Publisher( @@ -76,7 +77,7 @@ internal class JNISession { keyExpr: KeyExpr, callback: Callback, onClose: () -> Unit, receiver: R?, reliability: Reliability ): Result> = runCatching { val subCallback = - JNISubscriberCallback { keyExprPtr, payload, encoding, kind, timestampNTP64, timestampIsValid, attachmentBytes -> + JNISubscriberCallback { keyExprPtr, payload, encoding, kind, timestampNTP64, timestampIsValid, qos, attachmentBytes -> val timestamp = if (timestampIsValid) TimeStamp(timestampNTP64) else null val attachment = attachmentBytes.takeIf { it.isNotEmpty() }?.let { decodeAttachment(it) } val sample = Sample( @@ -84,6 +85,7 @@ internal class JNISession { Value(payload, Encoding(KnownEncoding.fromInt(encoding))), SampleKind.fromInt(kind), timestamp, + QoS(qos), attachment ) callback.run(sample) @@ -124,7 +126,7 @@ internal class JNISession { attachment: Attachment? ): Result = runCatching { val getCallback = - JNIGetCallback { replierId: String, success: Boolean, keyExprPtr: Long, payload: ByteArray, encoding: Int, kind: Int, timestampNTP64: Long, timestampIsValid: Boolean, attachmentBytes: ByteArray -> + JNIGetCallback { replierId: String, success: Boolean, keyExprPtr: Long, payload: ByteArray, encoding: Int, kind: Int, timestampNTP64: Long, timestampIsValid: Boolean, qos: Byte, attachmentBytes: ByteArray -> if (success) { val timestamp = if (timestampIsValid) TimeStamp(timestampNTP64) else null val decodedAttachment = attachmentBytes.takeIf { it.isNotEmpty() }?.let { decodeAttachment(it) } @@ -133,6 +135,7 @@ internal class JNISession { Value(payload, Encoding(KnownEncoding.fromInt(encoding))), SampleKind.fromInt(kind), timestamp, + QoS(qos), decodedAttachment ) val reply = Reply.Success(replierId, sample) @@ -192,7 +195,7 @@ internal class JNISession { sessionPtr.get(), put.value.payload, put.value.encoding.knownEncoding.ordinal, - put.congestionControl.ordinal, + put.congestionControl.value, put.priority.value, put.kind.ordinal, put.attachment?.let { encodeAttachment(it) } diff --git a/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/jni/callbacks/JNIGetCallback.kt b/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/jni/callbacks/JNIGetCallback.kt index 621eaf980..72ce911e7 100644 --- a/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/jni/callbacks/JNIGetCallback.kt +++ b/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/jni/callbacks/JNIGetCallback.kt @@ -25,6 +25,7 @@ internal fun interface JNIGetCallback { kind: Int, timestampNTP64: Long, timestampIsValid: Boolean, + qos: Byte, attachment: ByteArray, ) } diff --git a/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/jni/callbacks/JNISubscriberCallback.kt b/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/jni/callbacks/JNISubscriberCallback.kt index 750888eb1..5d57641e3 100644 --- a/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/jni/callbacks/JNISubscriberCallback.kt +++ b/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/jni/callbacks/JNISubscriberCallback.kt @@ -22,6 +22,7 @@ internal fun interface JNISubscriberCallback { kind: Int, timestampNTP64: Long, timestampIsValid: Boolean, + qos: Byte, attachment: ByteArray, ) } diff --git a/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/publication/CongestionControl.kt b/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/prelude/CongestionControl.kt similarity index 79% rename from zenoh-kotlin/src/commonMain/kotlin/io/zenoh/publication/CongestionControl.kt rename to zenoh-kotlin/src/commonMain/kotlin/io/zenoh/prelude/CongestionControl.kt index 3662e5428..26ab9fcae 100644 --- a/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/publication/CongestionControl.kt +++ b/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/prelude/CongestionControl.kt @@ -12,19 +12,23 @@ // ZettaScale Zenoh Team, // -package io.zenoh.publication +package io.zenoh.prelude /** The congestion control to be applied when routing the data. */ -enum class CongestionControl { +enum class CongestionControl (val value: Int) { + + /** + * Allows the message to be dropped if all buffers are full. + */ + DROP(0), /** * Prevents the message from being dropped at all cost. * In the face of heavy congestion on a part of the network, this could result in your publisher node blocking. */ - BLOCK, + BLOCK(1); - /** - * Allows the message to be dropped if all buffers are full. - */ - DROP; + companion object { + fun fromInt(value: Int) = entries.first { it.value == value } + } } diff --git a/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/publication/Priority.kt b/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/prelude/Priority.kt similarity index 87% rename from zenoh-kotlin/src/commonMain/kotlin/io/zenoh/publication/Priority.kt rename to zenoh-kotlin/src/commonMain/kotlin/io/zenoh/prelude/Priority.kt index a8821fcaa..820c871f8 100644 --- a/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/publication/Priority.kt +++ b/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/prelude/Priority.kt @@ -12,7 +12,7 @@ // ZettaScale Zenoh Team, // -package io.zenoh.publication +package io.zenoh.prelude /** * The Priority of Zenoh messages. @@ -30,5 +30,9 @@ enum class Priority(val value: Int) { DATA(5), DATA_LOW(6), BACKGROUND(7); + + companion object { + fun fromInt(value: Int) = entries.first { it.value == value } + } } diff --git a/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/prelude/QoS.kt b/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/prelude/QoS.kt new file mode 100644 index 000000000..ae0de0297 --- /dev/null +++ b/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/prelude/QoS.kt @@ -0,0 +1,50 @@ +// +// Copyright (c) 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +package io.zenoh.prelude +import io.zenoh.jni.JNIQoS; + +import io.zenoh.prelude.CongestionControl +import io.zenoh.prelude.Priority +/** + * Quality of service settings used to send zenoh message. + */ +class QoS internal constructor(internal val jniQoS: JNIQoS) { + internal constructor(qos: Byte): this(JNIQoS(qos)) + + /** + * Returns priority of the message. + */ + fun priority(): Priority = jniQoS.getPriority() + + /** + * Returns congestion control setting of the message. + */ + fun congestionControl(): CongestionControl = jniQoS.getCongestionControl() + + /** + * Returns express flag. If it is true, the message is not batched to reduce the latency. + */ + fun express(): Boolean = jniQoS.getExpress() + + companion object { + + /** + * Returns default QoS settings. + */ + fun default(): QoS { + return QoS(JNIQoS()) + } + } +} diff --git a/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/publication/Delete.kt b/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/publication/Delete.kt index 22e34e339..65386ada6 100644 --- a/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/publication/Delete.kt +++ b/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/publication/Delete.kt @@ -15,6 +15,8 @@ package io.zenoh.publication import io.zenoh.prelude.SampleKind +import io.zenoh.prelude.CongestionControl +import io.zenoh.prelude.Priority import io.zenoh.Session import io.zenoh.value.Value import io.zenoh.keyexpr.KeyExpr diff --git a/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/publication/Publisher.kt b/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/publication/Publisher.kt index 81d57383c..791233c68 100644 --- a/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/publication/Publisher.kt +++ b/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/publication/Publisher.kt @@ -18,6 +18,9 @@ import io.zenoh.* import io.zenoh.exceptions.SessionException import io.zenoh.jni.JNIPublisher import io.zenoh.keyexpr.KeyExpr +import io.zenoh.prelude.SampleKind +import io.zenoh.prelude.Priority +import io.zenoh.prelude.CongestionControl import io.zenoh.sample.Attachment import io.zenoh.value.Value diff --git a/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/publication/Put.kt b/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/publication/Put.kt index f0a62c229..a54c9f5bd 100644 --- a/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/publication/Put.kt +++ b/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/publication/Put.kt @@ -19,6 +19,8 @@ import io.zenoh.Session import io.zenoh.keyexpr.KeyExpr import io.zenoh.prelude.Encoding import io.zenoh.prelude.SampleKind +import io.zenoh.prelude.CongestionControl +import io.zenoh.prelude.Priority import io.zenoh.sample.Attachment import io.zenoh.value.Value diff --git a/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/query/Reply.kt b/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/query/Reply.kt index b13e9bfbf..97c5c7a21 100644 --- a/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/query/Reply.kt +++ b/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/query/Reply.kt @@ -18,6 +18,7 @@ import io.zenoh.Resolvable import io.zenoh.ZenohType import io.zenoh.sample.Sample import io.zenoh.prelude.SampleKind +import io.zenoh.prelude.QoS import io.zenoh.value.Value import io.zenoh.keyexpr.KeyExpr import io.zenoh.sample.Attachment @@ -130,7 +131,7 @@ abstract class Reply private constructor(val replierId: String) : ZenohType { * Constructs the reply sample with the provided parameters and triggers the reply to the query. */ override fun res(): Result { - val sample = Sample(keyExpr, value, kind, timeStamp, attachment) + val sample = Sample(keyExpr, value, kind, timeStamp, QoS.default(), attachment) return query.reply(Success("", sample)).res() } } diff --git a/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/sample/Sample.kt b/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/sample/Sample.kt index 5762d4cd5..1eec3a22b 100644 --- a/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/sample/Sample.kt +++ b/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/sample/Sample.kt @@ -16,6 +16,7 @@ package io.zenoh.sample import io.zenoh.ZenohType import io.zenoh.prelude.SampleKind +import io.zenoh.prelude.QoS import io.zenoh.keyexpr.KeyExpr import io.zenoh.value.Value import org.apache.commons.net.ntp.TimeStamp @@ -30,6 +31,7 @@ import org.apache.commons.net.ntp.TimeStamp * @property value The [Value] of the sample. * @property kind The [SampleKind] of the sample. * @property timestamp Optional [TimeStamp]. + * @property qos The Quality of Service settings used to deliver the sample. * @property attachment Optional [Attachment]. */ class Sample( @@ -37,6 +39,7 @@ class Sample( val value: Value, val kind: SampleKind, val timestamp: TimeStamp?, + val qos: QoS, val attachment: Attachment? = null ): ZenohType { override fun toString(): String { diff --git a/zenoh-kotlin/src/commonTest/kotlin/io/zenoh/GetTest.kt b/zenoh-kotlin/src/commonTest/kotlin/io/zenoh/GetTest.kt index 8b7588055..6de31aa15 100644 --- a/zenoh-kotlin/src/commonTest/kotlin/io/zenoh/GetTest.kt +++ b/zenoh-kotlin/src/commonTest/kotlin/io/zenoh/GetTest.kt @@ -18,6 +18,7 @@ import io.zenoh.handlers.Handler import io.zenoh.keyexpr.KeyExpr import io.zenoh.keyexpr.intoKeyExpr import io.zenoh.prelude.SampleKind +import io.zenoh.prelude.QoS import io.zenoh.query.Reply import io.zenoh.queryable.Queryable import io.zenoh.selector.Selector diff --git a/zenoh-kotlin/src/commonTest/kotlin/io/zenoh/PublisherTest.kt b/zenoh-kotlin/src/commonTest/kotlin/io/zenoh/PublisherTest.kt index 24adaed9c..5e43909b2 100644 --- a/zenoh-kotlin/src/commonTest/kotlin/io/zenoh/PublisherTest.kt +++ b/zenoh-kotlin/src/commonTest/kotlin/io/zenoh/PublisherTest.kt @@ -19,6 +19,7 @@ import io.zenoh.prelude.KnownEncoding import io.zenoh.keyexpr.intoKeyExpr import io.zenoh.prelude.Encoding import io.zenoh.prelude.SampleKind +import io.zenoh.prelude.QoS import io.zenoh.publication.Publisher import io.zenoh.sample.Sample import io.zenoh.subscriber.Subscriber diff --git a/zenoh-kotlin/src/commonTest/kotlin/io/zenoh/QueryableTest.kt b/zenoh-kotlin/src/commonTest/kotlin/io/zenoh/QueryableTest.kt index 20c9d81af..d4f4ecad2 100644 --- a/zenoh-kotlin/src/commonTest/kotlin/io/zenoh/QueryableTest.kt +++ b/zenoh-kotlin/src/commonTest/kotlin/io/zenoh/QueryableTest.kt @@ -18,6 +18,7 @@ import io.zenoh.handlers.Handler import io.zenoh.keyexpr.KeyExpr import io.zenoh.keyexpr.intoKeyExpr import io.zenoh.prelude.SampleKind +import io.zenoh.prelude.QoS import io.zenoh.query.Reply import io.zenoh.queryable.Query import io.zenoh.sample.Sample @@ -58,7 +59,7 @@ class QueryableTest { @Test fun queryable_runsWithCallback() = runBlocking { val sample = Sample( - testKeyExpr, Value(testPayload), SampleKind.PUT, TimeStamp(Date.from(Instant.now())) + testKeyExpr, Value(testPayload), SampleKind.PUT, TimeStamp(Date.from(Instant.now())), QoS.default() ) val queryable = session.declareQueryable(testKeyExpr).with { query -> query.reply(testKeyExpr).success(sample.value).withTimeStamp(sample.timestamp!!).res() @@ -161,7 +162,7 @@ private class QueryHandler : Handler { val payload = "Hello queryable $counter!" counter++ val sample = Sample( - query.keyExpr, Value(payload), SampleKind.PUT, TimeStamp(Date.from(Instant.now())) + query.keyExpr, Value(payload), SampleKind.PUT, TimeStamp(Date.from(Instant.now())), QoS.default() ) performedReplies.add(sample) query.reply(query.keyExpr).success(sample.value).withTimeStamp(sample.timestamp!!).res() diff --git a/zenoh-kotlin/src/commonTest/kotlin/io/zenoh/SubscriberTest.kt b/zenoh-kotlin/src/commonTest/kotlin/io/zenoh/SubscriberTest.kt index 143727f17..dc8bf9631 100644 --- a/zenoh-kotlin/src/commonTest/kotlin/io/zenoh/SubscriberTest.kt +++ b/zenoh-kotlin/src/commonTest/kotlin/io/zenoh/SubscriberTest.kt @@ -21,6 +21,8 @@ import io.zenoh.keyexpr.intoKeyExpr import io.zenoh.prelude.Encoding import io.zenoh.sample.Sample import io.zenoh.value.Value +import io.zenoh.prelude.CongestionControl +import io.zenoh.prelude.Priority import kotlinx.coroutines.DelicateCoroutinesApi import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.runBlocking @@ -32,6 +34,9 @@ import kotlin.test.* class SubscriberTest { companion object { + val TEST_PRIORITY = Priority.DATA_HIGH; + val TEST_CONGESTION_CONTROL = CongestionControl.BLOCK; + val testValues = arrayListOf( Value("Test 1".encodeToByteArray(), Encoding(KnownEncoding.TEXT_PLAIN)), Value("Test 2".encodeToByteArray(), Encoding(KnownEncoding.TEXT_JSON)), @@ -60,11 +65,18 @@ class SubscriberTest { val subscriber = session.declareSubscriber(testKeyExpr).with { sample -> receivedSamples.add(sample) }.res().getOrThrow() - testValues.forEach { value -> session.put(testKeyExpr, value).res() } + testValues.forEach { value -> + session.put(testKeyExpr, value) + .priority(TEST_PRIORITY) + .congestionControl(TEST_CONGESTION_CONTROL) + .res() + } assertEquals(receivedSamples.size, testValues.size) receivedSamples.zip(testValues).forEach { (sample, value) -> assertEquals(sample.value, value) + assertEquals(sample.qos.priority(), TEST_PRIORITY) + assertEquals(sample.qos.congestionControl(), TEST_CONGESTION_CONTROL) } subscriber.close() @@ -75,11 +87,18 @@ class SubscriberTest { val handler = QueueHandler() val subscriber = session.declareSubscriber(testKeyExpr).with(handler).res().getOrThrow() - testValues.forEach { value -> session.put(testKeyExpr, value).res() } + testValues.forEach { value -> + session.put(testKeyExpr, value) + .priority(TEST_PRIORITY) + .congestionControl(TEST_CONGESTION_CONTROL) + .res() + } assertEquals(handler.queue.size, testValues.size) handler.queue.zip(testValues).forEach { (sample, value) -> assertEquals(sample.value, value) + assertEquals(sample.qos.priority(), TEST_PRIORITY) + assertEquals(sample.qos.congestionControl(), TEST_CONGESTION_CONTROL) } subscriber.close()