Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for QoS in sample. #46

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions examples/src/main/kotlin/io.zenoh/ZPut.kt
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ import com.github.ajalt.clikt.core.CliktCommand
import com.github.ajalt.clikt.parameters.options.*
import io.zenoh.keyexpr.intoKeyExpr
import io.zenoh.prelude.SampleKind
import io.zenoh.publication.CongestionControl
import io.zenoh.publication.Priority
import io.zenoh.prelude.CongestionControl
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why moving sample kind and congestion control under prelude? I wonder because here for instance it's under publication, but it's true that it also appears under prelude (here). So what's the criteria, @p-avital may give us more insight

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was mostly done to avoid circular dependencies between modules (yes, I know that kotlin/java can handle it). Also these types are really not something proper to publication only since they appear in other contexts (especially after this pr)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack.

import io.zenoh.prelude.Priority

class ZPut(private val emptyArgs: Boolean) : CliktCommand(
help = "Zenoh Put example"
Expand Down
4 changes: 2 additions & 2 deletions zenoh-jni/src/put.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,8 @@ pub(crate) fn decode_priority(priority: jint) -> Result<Priority> {

pub(crate) fn decode_congestion_control(congestion_control: jint) -> Result<CongestionControl> {
match congestion_control {
0 => Ok(CongestionControl::Block),
1 => Ok(CongestionControl::Drop),
1 => Ok(CongestionControl::Block),
0 => Ok(CongestionControl::Drop),
_value => Err(Error::Session(format!(
"Unknown congestion control '{_value}'."
))),
Expand Down
4 changes: 3 additions & 1 deletion zenoh-jni/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::{mem, ops::Deref, sync::Arc};

use jni::{
objects::{GlobalRef, JByteArray, JClass, JPrimitiveArray, JValue},
sys::{jboolean, jint, jlong},
sys::{jboolean, jbyte, jint, jlong},
JNIEnv,
};
use zenoh::{
Expand Down Expand Up @@ -71,6 +71,7 @@ pub(crate) unsafe extern "C" fn Java_io_zenoh_jni_JNIQuery_replySuccessViaJNI(
sample_kind: jint,
timestamp_enabled: jboolean,
timestamp_ntp_64: jlong,
qos: jbyte,
attachment: JByteArray,
) {
let key_expr = Arc::from_raw(key_expr_ptr);
Expand All @@ -84,6 +85,7 @@ pub(crate) unsafe extern "C" fn Java_io_zenoh_jni_JNIQuery_replySuccessViaJNI(
sample_kind,
timestamp_enabled,
timestamp_ntp_64,
qos,
) {
Ok(sample) => sample,
Err(err) => {
Expand Down
5 changes: 3 additions & 2 deletions zenoh-jni/src/reply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ use zenoh::{
value::Value,
};

use crate::errors::Result;
use crate::{errors::Error, utils::attachment_to_vec};
use crate::{errors::Result, sample::qos_into_jbyte};

pub(crate) fn on_reply(
mut env: JNIEnv,
Expand Down Expand Up @@ -74,7 +74,7 @@ fn on_reply_success(
let result = match env.call_method(
callback_global_ref,
"run",
"(Ljava/lang/String;ZJ[BIIJZ[B)V",
"(Ljava/lang/String;ZJ[BIIJZB[B)V",
&[
JValue::from(&zenoh_id),
JValue::from(true),
Expand All @@ -84,6 +84,7 @@ fn on_reply_success(
JValue::from(kind),
JValue::from(timestamp as i64),
JValue::from(is_valid),
JValue::from(qos_into_jbyte(sample.qos)),
JValue::from(&attachment_bytes),
],
) {
Expand Down
56 changes: 53 additions & 3 deletions zenoh-jni/src/sample.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,18 @@ use crate::{
value::decode_value,
};
use jni::{
objects::JByteArray,
sys::{jboolean, jint, jlong},
objects::{JByteArray, JClass},
sys::{jboolean, jbyte, jint, jlong},
JNIEnv,
};
use uhlc::{Timestamp, ID, NTP64};
use zenoh::{
prelude::{KeyExpr, SampleKind},
sample::Sample,
sample::{QoS, Sample},
};

/// Attempts to reconstruct a Zenoh [Sample] from the Java/Kotlin fields specified.
#[allow(clippy::too_many_arguments)]
pub(crate) fn decode_sample(
env: &mut JNIEnv,
key_expr: KeyExpr<'static>,
Expand All @@ -36,6 +37,7 @@ pub(crate) fn decode_sample(
sample_kind: jint,
timestamp_enabled: jboolean,
timestamp_ntp_64: jlong,
qos: jbyte,
) -> Result<Sample> {
let value = decode_value(env, payload, encoding)?;
let mut sample = Sample::new(key_expr, value);
Expand All @@ -45,6 +47,7 @@ pub(crate) fn decode_sample(
} else {
None
};
sample.qos = qos_from_jbyte(qos);
Ok(sample)
}

Expand All @@ -58,3 +61,50 @@ pub(crate) fn decode_sample_kind(sample_kind: jint) -> Result<SampleKind> {
))),
}
}

pub fn qos_from_jbyte(qos: jbyte) -> QoS {
unsafe { std::mem::transmute::<jbyte, QoS>(qos) }
}

pub fn qos_into_jbyte(qos: QoS) -> jbyte {
unsafe { std::mem::transmute::<QoS, jbyte>(qos) }
}

#[no_mangle]
#[allow(non_snake_case)]
pub extern "C" fn Java_io_zenoh_jni_JNIQoS_getPriorityViaJNI(
_env: JNIEnv,
_class: JClass,
qos: jbyte,
) -> jint {
qos_from_jbyte(qos).priority() as jint
}

#[no_mangle]
#[allow(non_snake_case)]
pub extern "C" fn Java_io_zenoh_jni_JNIQoS_getCongestionControlViaJNI(
_env: JNIEnv,
_class: JClass,
qos: jbyte,
) -> jint {
qos_from_jbyte(qos).congestion_control() as jint
}

#[no_mangle]
#[allow(non_snake_case)]
pub extern "C" fn Java_io_zenoh_jni_JNIQoS_getExpressdViaJNI(
_env: JNIEnv,
_class: JClass,
qos: jbyte,
) -> jboolean {
qos_from_jbyte(qos).express() as jboolean
}

#[no_mangle]
#[allow(non_snake_case)]
pub extern "C" fn Java_io_zenoh_jni_JNIQoS_00024Companion_getDefaultQoSViaJNI(
_env: JNIEnv,
_class: JClass,
) -> jbyte {
qos_into_jbyte(QoS::default())
}
8 changes: 6 additions & 2 deletions zenoh-jni/src/subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,14 @@ use jni::{
use zenoh::prelude::r#sync::*;
use zenoh::subscriber::Subscriber;

use crate::utils::{get_callback_global_ref, get_java_vm, load_on_close};
use crate::{
errors::{Error, Result},
utils::attachment_to_vec,
};
use crate::{
sample::qos_into_jbyte,
utils::{get_callback_global_ref, get_java_vm, load_on_close},
};

/// Frees the memory associated with a Zenoh subscriber raw pointer via JNI.
///
Expand Down Expand Up @@ -134,14 +137,15 @@ pub(crate) unsafe fn declare_subscriber(
match env.call_method(
&callback_global_ref,
"run",
"(J[BIIJZ[B)V",
"(J[BIIJZB[B)V",
&[
JValue::from(key_expr_ptr as jlong),
JValue::from(&byte_array),
JValue::from(encoding),
JValue::from(kind),
JValue::from(timestamp as i64),
JValue::from(is_valid),
JValue::from(qos_into_jbyte(sample.qos)),
JValue::from(&attachment_bytes),
],
) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
package io.zenoh.jni

import io.zenoh.*
import io.zenoh.publication.CongestionControl
import io.zenoh.publication.Priority
import io.zenoh.prelude.CongestionControl
import io.zenoh.prelude.Priority
import io.zenoh.sample.Attachment
import io.zenoh.value.Value

Expand Down Expand Up @@ -64,7 +64,7 @@ internal class JNIPublisher(private val ptr: Long) {
* @return A [Result] with the status of the operation.
*/
fun setCongestionControl(congestionControl: CongestionControl): Result<Unit> = runCatching {
setCongestionControlViaJNI(congestionControl.ordinal, ptr)
setCongestionControlViaJNI(congestionControl.value, ptr)
}

/**
Expand Down
43 changes: 43 additions & 0 deletions zenoh-kotlin/src/commonMain/kotlin/io/zenoh/jni/JNIQoS.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
//
// Copyright (c) 2024 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//

package io.zenoh.jni

import io.zenoh.prelude.CongestionControl;
import io.zenoh.prelude.Priority;

internal class JNIQoS internal constructor(internal val qos: Byte) {

internal constructor(): this(getDefaultQoSViaJNI())

fun getExpress(): Boolean {
return getExpressViaJNI(qos)
}

fun getCongestionControl(): CongestionControl {
return CongestionControl.fromInt(getCongestionControlViaJNI(qos))
}

fun getPriority(): Priority {
return Priority.fromInt(getPriorityViaJNI(qos))
}

companion object {
private external fun getDefaultQoSViaJNI(): Byte
}

private external fun getPriorityViaJNI(_qos: Byte): Int
private external fun getCongestionControlViaJNI(_qos: Byte): Int
private external fun getExpressViaJNI(_qos:Byte): Boolean
}
2 changes: 2 additions & 0 deletions zenoh-kotlin/src/commonMain/kotlin/io/zenoh/jni/JNIQuery.kt
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ internal class JNIQuery(private val ptr: Long) {
sample.kind.ordinal,
timestampEnabled,
if (timestampEnabled) sample.timestamp!!.ntpValue() else 0,
sample.qos.jniQoS.qos,
sample.attachment?.let { encodeAttachment(it) },
)
}
Expand All @@ -57,6 +58,7 @@ internal class JNIQuery(private val ptr: Long) {
sampleKind: Int,
timestampEnabled: Boolean,
timestampNtp64: Long,
qos: Byte,
attachment: ByteArray?,
)

Expand Down
11 changes: 7 additions & 4 deletions zenoh-kotlin/src/commonMain/kotlin/io/zenoh/jni/JNISession.kt
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import io.zenoh.jni.callbacks.JNISubscriberCallback
import io.zenoh.keyexpr.KeyExpr
import io.zenoh.prelude.Encoding
import io.zenoh.prelude.SampleKind
import io.zenoh.prelude.QoS
import io.zenoh.publication.Publisher
import io.zenoh.publication.Put
import io.zenoh.query.*
Expand Down Expand Up @@ -61,7 +62,7 @@ internal class JNISession {
val publisherRawPtr = declarePublisherViaJNI(
builder.keyExpr.jniKeyExpr!!.ptr,
sessionPtr.get(),
builder.congestionControl.ordinal,
builder.congestionControl.value,
builder.priority.value,
)
Publisher(
Expand All @@ -76,14 +77,15 @@ internal class JNISession {
keyExpr: KeyExpr, callback: Callback<Sample>, onClose: () -> Unit, receiver: R?, reliability: Reliability
): Result<Subscriber<R>> = runCatching {
val subCallback =
JNISubscriberCallback { keyExprPtr, payload, encoding, kind, timestampNTP64, timestampIsValid, attachmentBytes ->
JNISubscriberCallback { keyExprPtr, payload, encoding, kind, timestampNTP64, timestampIsValid, qos, attachmentBytes ->
val timestamp = if (timestampIsValid) TimeStamp(timestampNTP64) else null
val attachment = attachmentBytes.takeIf { it.isNotEmpty() }?.let { decodeAttachment(it) }
val sample = Sample(
KeyExpr(JNIKeyExpr(keyExprPtr)),
Value(payload, Encoding(KnownEncoding.fromInt(encoding))),
SampleKind.fromInt(kind),
timestamp,
QoS(qos),
attachment
)
callback.run(sample)
Expand Down Expand Up @@ -124,7 +126,7 @@ internal class JNISession {
attachment: Attachment?
): Result<R?> = runCatching {
val getCallback =
JNIGetCallback { replierId: String, success: Boolean, keyExprPtr: Long, payload: ByteArray, encoding: Int, kind: Int, timestampNTP64: Long, timestampIsValid: Boolean, attachmentBytes: ByteArray ->
JNIGetCallback { replierId: String, success: Boolean, keyExprPtr: Long, payload: ByteArray, encoding: Int, kind: Int, timestampNTP64: Long, timestampIsValid: Boolean, qos: Byte, attachmentBytes: ByteArray ->
if (success) {
val timestamp = if (timestampIsValid) TimeStamp(timestampNTP64) else null
val decodedAttachment = attachmentBytes.takeIf { it.isNotEmpty() }?.let { decodeAttachment(it) }
Expand All @@ -133,6 +135,7 @@ internal class JNISession {
Value(payload, Encoding(KnownEncoding.fromInt(encoding))),
SampleKind.fromInt(kind),
timestamp,
QoS(qos),
decodedAttachment
)
val reply = Reply.Success(replierId, sample)
Expand Down Expand Up @@ -192,7 +195,7 @@ internal class JNISession {
sessionPtr.get(),
put.value.payload,
put.value.encoding.knownEncoding.ordinal,
put.congestionControl.ordinal,
put.congestionControl.value,
put.priority.value,
put.kind.ordinal,
put.attachment?.let { encodeAttachment(it) }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ internal fun interface JNIGetCallback {
kind: Int,
timestampNTP64: Long,
timestampIsValid: Boolean,
qos: Byte,
attachment: ByteArray,
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ internal fun interface JNISubscriberCallback {
kind: Int,
timestampNTP64: Long,
timestampIsValid: Boolean,
qos: Byte,
attachment: ByteArray,
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,23 @@
// ZettaScale Zenoh Team, <[email protected]>
//

package io.zenoh.publication
package io.zenoh.prelude

/** The congestion control to be applied when routing the data. */
enum class CongestionControl {
enum class CongestionControl (val value: Int) {

/**
* Allows the message to be dropped if all buffers are full.
*/
DROP(0),

/**
* Prevents the message from being dropped at all cost.
* In the face of heavy congestion on a part of the network, this could result in your publisher node blocking.
*/
BLOCK,
BLOCK(1);

/**
* Allows the message to be dropped if all buffers are full.
*/
DROP;
companion object {
fun fromInt(value: Int) = entries.first { it.value == value }
}
}
Loading
Loading