Skip to content

Commit

Permalink
Update batching to stable. (#850)
Browse files Browse the repository at this point in the history
* feat: return is express val when encoding network message

* feat: send batch immediately in some cases

* feat: remove batch api from unstable

* fix: misplaced raweth argument

* Revert "feat: return is express val when encoding network message"

This reverts commit dc20d6c.

* feat: get is express status directly in tx

* doc: update batch documentation

* fix: remove redundant break in switch

Co-authored-by: Alexander Bushnev <[email protected]>

---------

Co-authored-by: Alexander Bushnev <[email protected]>
  • Loading branch information
jean-roland and sashacmc authored Jan 7, 2025
1 parent 860099f commit 2514041
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 21 deletions.
6 changes: 2 additions & 4 deletions include/zenoh-pico/api/primitives.h
Original file line number Diff line number Diff line change
Expand Up @@ -2077,12 +2077,11 @@ z_result_t z_declare_background_subscriber(const z_loaned_session_t *zs, const z
const z_loaned_keyexpr_t *z_subscriber_keyexpr(const z_loaned_subscriber_t *subscriber);
#endif

#ifdef Z_FEATURE_UNSTABLE_API
#if Z_FEATURE_BATCHING == 1
/**
* Activate the batching mechanism, any message that would have been sent on the network by a subsequent api call (e.g
* z_put, z_get) will be instead stored until the batch is full, flushed with :c:func:`zp_batch_flush` or batching is
* stopped with :c:func:`zp_batch_stop`.
* z_put, z_get) will be instead stored until either: the batch is full, flushed with :c:func:`zp_batch_flush`, batching
* is stopped with :c:func:`zp_batch_stop`, a message needs to be sent immediately.
*
* Parameters:
* zs: Pointer to a :c:type:`z_loaned_session_t` that will start batching messages.
Expand Down Expand Up @@ -2114,7 +2113,6 @@ z_result_t zp_batch_flush(const z_loaned_session_t *zs);
*/
z_result_t zp_batch_stop(const z_loaned_session_t *zs);
#endif
#endif

/************* Multi Thread Tasks helpers **************/
/**
Expand Down
2 changes: 2 additions & 0 deletions include/zenoh-pico/protocol/definitions/network.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ extern "C" {

typedef _z_qos_t _z_n_qos_t;

#define _Z_N_QOS_IS_EXPRESS_FLAG (1 << 4)

static inline _z_qos_t _z_n_qos_create(bool express, z_congestion_control_t congestion_control, z_priority_t priority) {
_z_n_qos_t ret;
bool nodrop = congestion_control == Z_CONGESTION_CONTROL_DROP ? 0 : 1;
Expand Down
2 changes: 0 additions & 2 deletions src/api/api.c
Original file line number Diff line number Diff line change
Expand Up @@ -1490,7 +1490,6 @@ const z_loaned_keyexpr_t *z_subscriber_keyexpr(const z_loaned_subscriber_t *sub)
}
#endif

#ifdef Z_FEATURE_UNSTABLE_API
#if Z_FEATURE_BATCHING == 1
z_result_t zp_batch_start(const z_loaned_session_t *zs) {
if (_Z_RC_IS_NULL(zs)) {
Expand Down Expand Up @@ -1519,7 +1518,6 @@ z_result_t zp_batch_stop(const z_loaned_session_t *zs) {
return _z_send_n_batch(session, Z_CONGESTION_CONTROL_DEFAULT);
}
#endif
#endif

/**************** Tasks ****************/
void zp_task_read_options_default(zp_task_read_options_t *options) {
Expand Down
60 changes: 45 additions & 15 deletions src/transport/common/tx.c
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,20 @@

/*------------------ Transmission helper ------------------*/

static bool _z_transport_tx_get_express_status(const _z_network_message_t *msg) {
switch (msg->_tag) {
case _Z_N_DECLARE:
return _Z_HAS_FLAG(msg->_body._declare._ext_qos._val, _Z_N_QOS_IS_EXPRESS_FLAG);
case _Z_N_PUSH:
return _Z_HAS_FLAG(msg->_body._push._qos._val, _Z_N_QOS_IS_EXPRESS_FLAG);
case _Z_N_REQUEST:
return _Z_HAS_FLAG(msg->_body._request._ext_qos._val, _Z_N_QOS_IS_EXPRESS_FLAG);
case _Z_N_RESPONSE:
return _Z_HAS_FLAG(msg->_body._response._ext_qos._val, _Z_N_QOS_IS_EXPRESS_FLAG);
default:
return false;
}
}
static _z_zint_t _z_transport_tx_get_sn(_z_transport_common_t *ztc, z_reliability_t reliability) {
_z_zint_t sn;
if (reliability == Z_RELIABILITY_RELIABLE) {
Expand Down Expand Up @@ -139,13 +153,19 @@ static z_result_t _z_transport_tx_batch_overflow(_z_transport_common_t *ztc, con
_z_transport_message_t t_msg = _z_t_msg_make_frame_header(sn, reliability);
_Z_RETURN_IF_ERR(_z_transport_message_encode(&ztc->_wbuf, &t_msg));
// Retry encode
bool is_express = _z_transport_tx_get_express_status(n_msg);
z_result_t ret = _z_network_message_encode(&ztc->_wbuf, n_msg);
if (ret != _Z_RES_OK) {
// Message still doesn't fit in buffer, send as fragments
return _z_transport_tx_send_fragment(ztc, n_msg, reliability, sn);
} else {
// Increment batch
ztc->_batch_count++;
if (is_express) {
// Send immediately
return _z_transport_tx_flush_buffer(ztc);
} else {
// Increment batch
ztc->_batch_count++;
}
}
return _Z_RES_OK;
#else
Expand Down Expand Up @@ -180,10 +200,16 @@ static z_result_t _z_transport_tx_send_n_msg_inner(_z_transport_common_t *ztc, c
}
// Try encoding the network message
size_t prev_wpos = _z_transport_tx_save_wpos(&ztc->_wbuf);
bool is_express = _z_transport_tx_get_express_status(n_msg);
z_result_t ret = _z_network_message_encode(&ztc->_wbuf, n_msg);
if (ret == _Z_RES_OK) {
// Flush buffer or increase batch
return _z_transport_tx_flush_or_incr_batch(ztc);
if (is_express) {
// Send immediately
return _z_transport_tx_flush_buffer(ztc);
} else {
// Flush buffer or increase batch
return _z_transport_tx_flush_or_incr_batch(ztc);
}
} else if (!batch_has_data) {
// Message doesn't fit in buffer, send as fragments
return _z_transport_tx_send_fragment(ztc, n_msg, reliability, sn);
Expand All @@ -193,22 +219,26 @@ static z_result_t _z_transport_tx_send_n_msg_inner(_z_transport_common_t *ztc, c
}
}

static z_result_t _z_transport_tx_send_t_msg_inner(_z_transport_common_t *ztc, const _z_transport_message_t *t_msg) {
// Send batch if needed
bool batch_has_data = _z_transport_tx_batch_has_data(ztc);
if (batch_has_data) {
_Z_RETURN_IF_ERR(_z_transport_tx_flush_buffer(ztc));
}
// Encode transport message
__unsafe_z_prepare_wbuf(&ztc->_wbuf, ztc->_link._cap._flow);
_Z_RETURN_IF_ERR(_z_transport_message_encode(&ztc->_wbuf, t_msg));
// Send message
return _z_transport_tx_flush_buffer(ztc);
}

z_result_t _z_transport_tx_send_t_msg(_z_transport_common_t *ztc, const _z_transport_message_t *t_msg) {
z_result_t ret = _Z_RES_OK;
_Z_DEBUG("Send session message");
_z_transport_tx_mutex_lock(ztc, true);

// Encode transport message
__unsafe_z_prepare_wbuf(&ztc->_wbuf, ztc->_link._cap._flow);
ret = _z_transport_message_encode(&ztc->_wbuf, t_msg);
if (ret == _Z_RES_OK) {
// Send message
__unsafe_z_finalize_wbuf(&ztc->_wbuf, ztc->_link._cap._flow);
ret = _z_link_send_wbuf(&ztc->_link, &ztc->_wbuf);
if (ret == _Z_RES_OK) {
ztc->_transmitted = true; // Tell session we transmitted data
}
}
ret = _z_transport_tx_send_t_msg_inner(ztc, t_msg);

_z_transport_tx_mutex_unlock(ztc);
return ret;
}
Expand Down

0 comments on commit 2514041

Please sign in to comment.