Skip to content

Commit

Permalink
feat: support start/stop fragment marker
Browse files Browse the repository at this point in the history
  • Loading branch information
wyfo committed Dec 4, 2024
1 parent b63ffa3 commit 63fbb17
Show file tree
Hide file tree
Showing 14 changed files with 257 additions and 28 deletions.
24 changes: 20 additions & 4 deletions include/zenoh-pico/protocol/definitions/transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,16 @@ extern "C" {
// Z Extensions if Z==1 then Zenoh extensions are present
#define _Z_FLAG_T_CLOSE_S 0x20 // 1 << 5

/*=============================*/
/* Patch */
/*=============================*/
/// Used to negotiate the patch version of the protocol
/// if not present (or 0), then protocol as released with 1.0.0
/// if >= 1, then fragmentation start/stop marker
#define _Z_NO_PATCH 0x00
#define _Z_CURRENT_PATCH 0x01
#define _Z_PATCH_HAS_FRAGMENT_START_STOP(patch) (patch >= 1)

/*=============================*/
/* Transport Messages */
/*=============================*/
Expand Down Expand Up @@ -235,6 +245,9 @@ typedef struct {
uint8_t _req_id_res;
uint8_t _seq_num_res;
uint8_t _version;
#if Z_FEATURE_FRAGMENTATION == 1
uint8_t _patch;
#endif
} _z_t_msg_join_t;
void _z_t_msg_join_clear(_z_t_msg_join_t *msg);

Expand Down Expand Up @@ -315,6 +328,9 @@ typedef struct {
uint8_t _req_id_res;
uint8_t _seq_num_res;
uint8_t _version;
#if Z_FEATURE_FRAGMENTATION == 1
uint8_t _patch;
#endif
} _z_t_msg_init_t;
void _z_t_msg_init_clear(_z_t_msg_init_t *msg);

Expand Down Expand Up @@ -478,11 +494,11 @@ void _z_t_msg_frame_clear(_z_t_msg_frame_t *msg);
typedef struct {
_z_slice_t _payload;
_z_zint_t _sn;
bool start;
bool stop;
} _z_t_msg_fragment_t;
void _z_t_msg_fragment_clear(_z_t_msg_fragment_t *msg);

#define _Z_FRAGMENT_HEADER_SIZE 12

/*------------------ Transport Message ------------------*/
typedef union {
_z_t_msg_join_t _join;
Expand Down Expand Up @@ -514,9 +530,9 @@ _z_transport_message_t _z_t_msg_make_keep_alive(void);
_z_transport_message_t _z_t_msg_make_frame(_z_zint_t sn, _z_network_message_vec_t messages,
z_reliability_t reliability);
_z_transport_message_t _z_t_msg_make_frame_header(_z_zint_t sn, z_reliability_t reliability);
_z_transport_message_t _z_t_msg_make_fragment_header(_z_zint_t sn, z_reliability_t reliability, bool is_last);
_z_transport_message_t _z_t_msg_make_fragment_header(_z_zint_t sn, z_reliability_t reliability, bool is_last, bool start, bool stop);
_z_transport_message_t _z_t_msg_make_fragment(_z_zint_t sn, _z_slice_t messages, z_reliability_t reliability,
bool is_last);
bool is_last, bool start, bool stop);

/*------------------ Copy ------------------*/
void _z_t_msg_copy(_z_transport_message_t *clone, _z_transport_message_t *msg);
Expand Down
7 changes: 6 additions & 1 deletion include/zenoh-pico/protocol/ext.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,11 @@ extern "C" {
/*=============================*/
/* Extension IDs */
/*=============================*/
// #define _Z_MSG_EXT_ID_FOO 0x00 // Hex(ENC|M|ID)
#define _Z_MSG_EXT_ID_JOIN_QOS (0x01 | _Z_MSG_EXT_FLAG_M | _Z_MSG_EXT_ENC_ZBUF)
#define _Z_MSG_EXT_ID_JOIN_PATCH (0x07 | _Z_MSG_EXT_ENC_ZINT)
#define _Z_MSG_EXT_ID_INIT_PATCH (0x07 | _Z_MSG_EXT_ENC_ZINT)
#define _Z_MSG_EXT_ID_FRAGMENT_START (0x02 | _Z_MSG_EXT_ENC_UNIT)
#define _Z_MSG_EXT_ID_FRAGMENT_STOP (0x03 | _Z_MSG_EXT_ENC_UNIT)

/*=============================*/
/* Extension Encodings */
Expand All @@ -58,6 +62,7 @@ extern "C" {
#define _Z_MSG_EXT_FLAG_M 0x10
#define _Z_MSG_EXT_IS_MANDATORY(h) ((h & _Z_MSG_EXT_FLAG_M) != 0)
#define _Z_MSG_EXT_FLAG_Z 0x80
#define _Z_MSG_EXT_MORE(more) (more ? _Z_MSG_EXT_FLAG_Z : 0)

typedef struct {
uint8_t __dummy; // Just to avoid empty structures that might cause undefined behavior
Expand Down
2 changes: 1 addition & 1 deletion include/zenoh-pico/transport/common/tx.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ void __unsafe_z_prepare_wbuf(_z_wbuf_t *buf, uint8_t link_flow_capability);
void __unsafe_z_finalize_wbuf(_z_wbuf_t *buf, uint8_t link_flow_capability);
/*This function is unsafe because it operates in potentially concurrent
data.*Make sure that the following mutexes are locked before calling this function : *-ztu->mutex_tx */
z_result_t __unsafe_z_serialize_zenoh_fragment(_z_wbuf_t *dst, _z_wbuf_t *src, z_reliability_t reliability, size_t sn);
z_result_t __unsafe_z_serialize_zenoh_fragment(_z_wbuf_t *dst, _z_wbuf_t *src, z_reliability_t reliability, size_t sn, bool start);

/*------------------ Transmission and Reception helpers ------------------*/
z_result_t _z_send_t_msg(_z_transport_t *zt, const _z_transport_message_t *t_msg);
Expand Down
13 changes: 13 additions & 0 deletions include/zenoh-pico/transport/transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@ typedef struct {

uint16_t _peer_id;
volatile bool _received;

#if Z_FEATURE_FRAGMENTATION == 1
// Patch
uint8_t _patch;
#endif
} _z_transport_peer_entry_t;

size_t _z_transport_peer_entry_size(const _z_transport_peer_entry_t *src);
Expand Down Expand Up @@ -108,6 +113,11 @@ typedef struct {

volatile bool _received;
volatile bool _transmitted;

#if Z_FEATURE_FRAGMENTATION == 1
// Patch
uint8_t _patch;
#endif
} _z_transport_unicast_t;

typedef struct _z_transport_multicast_t {
Expand Down Expand Up @@ -175,6 +185,9 @@ typedef struct {
uint8_t _req_id_res;
uint8_t _seq_num_res;
bool _is_qos;
#if Z_FEATURE_FRAGMENTATION == 1
uint8_t _patch;
#endif
} _z_transport_unicast_establish_param_t;

typedef struct {
Expand Down
1 change: 1 addition & 0 deletions include/zenoh-pico/transport/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ _z_zint_t _z_sn_max(uint8_t bits);
_z_zint_t _z_sn_half(_z_zint_t sn);
_z_zint_t _z_sn_modulo_mask(uint8_t bits);
bool _z_sn_precedes(const _z_zint_t sn_resolution, const _z_zint_t sn_left, const _z_zint_t sn_right);
bool _z_sn_consecutive(const _z_zint_t sn_resolution, const _z_zint_t sn_left, const _z_zint_t sn_right);
_z_zint_t _z_sn_increment(const _z_zint_t sn_resolution, const _z_zint_t sn);
_z_zint_t _z_sn_decrement(const _z_zint_t sn_resolution, const _z_zint_t sn);

Expand Down
59 changes: 52 additions & 7 deletions src/protocol/codec/transport.c
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,14 @@ z_result_t _z_join_encode(_z_wbuf_t *wbf, uint8_t header, const _z_t_msg_join_t
}
_Z_RETURN_IF_ERR(_z_zsize_encode(wbf, msg->_next_sn._val._plain._reliable));
_Z_RETURN_IF_ERR(_z_zsize_encode(wbf, msg->_next_sn._val._plain._best_effort));
#if Z_FEATURE_FRAGMENTATION == 1
bool has_patch = msg->_patch != _Z_NO_PATCH;
#else
bool has_patch = false;
#endif
if (msg->_next_sn._is_qos) {
if (_Z_HAS_FLAG(header, _Z_FLAG_T_Z)) {
_Z_RETURN_IF_ERR(_z_uint8_encode(wbf, _Z_MSG_EXT_ENC_ZBUF | _Z_MSG_EXT_FLAG_M | 1));
if (_Z_HAS_FLAG(header, _Z_FLAG_T_Z) == true) {
_Z_RETURN_IF_ERR(_z_uint8_encode(wbf, _Z_MSG_EXT_ID_JOIN_QOS | _Z_MSG_EXT_MORE(has_patch)));
size_t len = 0;
for (uint8_t i = 0; (i < Z_PRIORITIES_NUM) && (ret == _Z_RES_OK); i++) {
len += _z_zint_len(msg->_next_sn._val._qos[i]._reliable) +
Expand All @@ -82,21 +87,35 @@ z_result_t _z_join_encode(_z_wbuf_t *wbf, uint8_t header, const _z_t_msg_join_t
ret |= _Z_ERR_MESSAGE_SERIALIZATION_FAILED;
}
}
#if Z_FEATURE_FRAGMENTATION == 1
if (has_patch) {
if (_Z_HAS_FLAG(header, _Z_FLAG_T_Z) == true) {
_Z_RETURN_IF_ERR(_z_uint8_encode(wbf, _Z_MSG_EXT_ID_JOIN_PATCH));
_Z_RETURN_IF_ERR(_z_zint64_encode(wbf, msg->_patch));
} else {
_Z_DEBUG("Attempted to serialize Patch extension, but the header extension flag was unset");
ret |= _Z_ERR_MESSAGE_SERIALIZATION_FAILED;
}
}
#endif

return ret;
}

z_result_t _z_join_decode_ext(_z_msg_ext_t *extension, void *ctx) {
z_result_t ret = _Z_RES_OK;
_z_t_msg_join_t *msg = (_z_t_msg_join_t *)ctx;
if (_Z_EXT_FULL_ID(extension->_header) ==
(_Z_MSG_EXT_ENC_ZBUF | _Z_MSG_EXT_FLAG_M | 1)) { // QOS: (enc=zbuf)(mandatory=true)(id=1)
if (_Z_EXT_FULL_ID(extension->_header) == _Z_MSG_EXT_ID_JOIN_QOS) {
msg->_next_sn._is_qos = true;
_z_zbuf_t zbf = _z_slice_as_zbuf(extension->_body._zbuf._val);
for (int i = 0; (ret == _Z_RES_OK) && (i < Z_PRIORITIES_NUM); ++i) {
ret |= _z_zsize_decode(&msg->_next_sn._val._qos[i]._reliable, &zbf);
ret |= _z_zsize_decode(&msg->_next_sn._val._qos[i]._best_effort, &zbf);
}
#if Z_FEATURE_FRAGMENTATION == 1
} else if (_Z_EXT_FULL_ID(extension->_header) == _Z_MSG_EXT_ID_JOIN_PATCH) {
msg->_patch = (uint8_t)extension->_body._zint._val;
#endif
} else if (_Z_MSG_EXT_IS_MANDATORY(extension->_header)) {
ret = _Z_ERR_MESSAGE_EXTENSION_MANDATORY_AND_UNKNOWN;
}
Expand Down Expand Up @@ -147,7 +166,7 @@ z_result_t _z_join_decode(_z_t_msg_join_t *msg, _z_zbuf_t *zbf, uint8_t header)
ret |= _z_zsize_decode(&msg->_next_sn._val._plain._reliable, zbf);
ret |= _z_zsize_decode(&msg->_next_sn._val._plain._best_effort, zbf);
}
if ((ret == _Z_RES_OK) && _Z_HAS_FLAG(header, _Z_FLAG_T_Z)) {
if ((ret == _Z_RES_OK) && _Z_HAS_FLAG(header, _Z_FLAG_T_Z) == true) {
ret |= _z_msg_ext_decode_iter(zbf, _z_join_decode_ext, msg);
}

Expand Down Expand Up @@ -180,6 +199,32 @@ z_result_t _z_init_encode(_z_wbuf_t *wbf, uint8_t header, const _z_t_msg_init_t
_Z_RETURN_IF_ERR(_z_slice_encode(wbf, &msg->_cookie))
}

#if Z_FEATURE_FRAGMENTATION == 1
if (msg->_patch != _Z_CURRENT_PATCH) {
if (_Z_HAS_FLAG(header, _Z_FLAG_T_Z) == true) {
_Z_RETURN_IF_ERR(_z_uint8_encode(wbf, _Z_MSG_EXT_ID_JOIN_PATCH));
_Z_RETURN_IF_ERR(_z_zint64_encode(wbf, msg->_patch));
} else {
_Z_DEBUG("Attempted to serialize Patch extension, but the header extension flag was unset");
ret |= _Z_ERR_MESSAGE_SERIALIZATION_FAILED;
}
}
#endif

return ret;
}

z_result_t _z_init_decode_ext(_z_msg_ext_t *extension, void *ctx) {
z_result_t ret = _Z_RES_OK;
_z_t_msg_init_t *msg = (_z_t_msg_init_t *)ctx;
if (false) {
#if Z_FEATURE_FRAGMENTATION == 1
} else if (_Z_EXT_FULL_ID(extension->_header) == _Z_MSG_EXT_ID_INIT_PATCH) {
msg->_patch = (uint8_t)extension->_body._zint._val;
#endif
} else if (_Z_MSG_EXT_IS_MANDATORY(extension->_header)) {
ret = _Z_ERR_MESSAGE_EXTENSION_MANDATORY_AND_UNKNOWN;
}
return ret;
}

Expand Down Expand Up @@ -222,8 +267,8 @@ z_result_t _z_init_decode(_z_t_msg_init_t *msg, _z_zbuf_t *zbf, uint8_t header)
msg->_cookie = _z_slice_empty();
}

if ((ret == _Z_RES_OK) && (_Z_HAS_FLAG(header, _Z_FLAG_T_Z) == true)) {
ret |= _z_msg_ext_skip_non_mandatories(zbf, 0x01);
if ((ret == _Z_RES_OK) && _Z_HAS_FLAG(header, _Z_FLAG_T_Z) == true) {
ret |= _z_msg_ext_decode_iter(zbf, _z_init_decode_ext, msg);
}

return ret;
Expand Down
49 changes: 45 additions & 4 deletions src/protocol/definitions/transport.c
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,9 @@ _z_transport_message_t _z_t_msg_make_join(z_whatami_t whatami, _z_zint_t lease,
msg._body._join._batch_size = Z_BATCH_MULTICAST_SIZE;
msg._body._join._next_sn = next_sn;
msg._body._join._zid = zid;
#if Z_FEATURE_FRAGMENTATION == 1
msg._body._join._patch = _Z_CURRENT_PATCH;
#endif

if ((lease % 1000) == 0) {
_Z_SET_FLAG(msg._header, _Z_FLAG_T_JOIN_T);
Expand All @@ -112,7 +115,12 @@ _z_transport_message_t _z_t_msg_make_join(z_whatami_t whatami, _z_zint_t lease,
_Z_SET_FLAG(msg._header, _Z_FLAG_T_JOIN_S);
}

if (next_sn._is_qos) {
#if Z_FEATURE_FRAGMENTATION == 1
bool has_patch = msg._body._join._patch != _Z_NO_PATCH;
#else
bool has_patch = false;
#endif
if (next_sn._is_qos == true || has_patch == true) {
_Z_SET_FLAG(msg._header, _Z_FLAG_T_Z);
}

Expand All @@ -131,13 +139,25 @@ _z_transport_message_t _z_t_msg_make_init_syn(z_whatami_t whatami, _z_id_t zid)
msg._body._init._req_id_res = Z_REQ_RESOLUTION;
msg._body._init._batch_size = Z_BATCH_UNICAST_SIZE;
_z_slice_reset(&msg._body._init._cookie);
#if Z_FEATURE_FRAGMENTATION == 1
msg._body._init._patch = _Z_CURRENT_PATCH;
#endif

if ((msg._body._init._batch_size != _Z_DEFAULT_UNICAST_BATCH_SIZE) ||
(msg._body._init._seq_num_res != _Z_DEFAULT_RESOLUTION_SIZE) ||
(msg._body._init._req_id_res != _Z_DEFAULT_RESOLUTION_SIZE)) {
_Z_SET_FLAG(msg._header, _Z_FLAG_T_INIT_S);
}

#if Z_FEATURE_FRAGMENTATION == 1
bool has_patch = msg._body._join._patch != _Z_NO_PATCH;
#else
bool has_patch = false;
#endif
if (has_patch == true) {
_Z_SET_FLAG(msg._header, _Z_FLAG_T_Z);
}

return msg;
}

Expand All @@ -153,13 +173,25 @@ _z_transport_message_t _z_t_msg_make_init_ack(z_whatami_t whatami, _z_id_t zid,
msg._body._init._req_id_res = Z_REQ_RESOLUTION;
msg._body._init._batch_size = Z_BATCH_UNICAST_SIZE;
msg._body._init._cookie = cookie;
#if Z_FEATURE_FRAGMENTATION == 1
msg._body._init._patch = _Z_CURRENT_PATCH;
#endif

if ((msg._body._init._batch_size != _Z_DEFAULT_UNICAST_BATCH_SIZE) ||
(msg._body._init._seq_num_res != _Z_DEFAULT_RESOLUTION_SIZE) ||
(msg._body._init._req_id_res != _Z_DEFAULT_RESOLUTION_SIZE)) {
_Z_SET_FLAG(msg._header, _Z_FLAG_T_INIT_S);
}

#if Z_FEATURE_FRAGMENTATION == 1
bool has_patch = msg._body._join._patch != _Z_NO_PATCH;
#else
bool has_patch = false;
#endif
if (has_patch == true) {
_Z_SET_FLAG(msg._header, _Z_FLAG_T_Z);
}

return msg;
}

Expand Down Expand Up @@ -247,11 +279,11 @@ _z_transport_message_t _z_t_msg_make_frame_header(_z_zint_t sn, z_reliability_t
}

/*------------------ Fragment Message ------------------*/
_z_transport_message_t _z_t_msg_make_fragment_header(_z_zint_t sn, z_reliability_t reliability, bool is_last) {
return _z_t_msg_make_fragment(sn, _z_slice_empty(), reliability, is_last);
_z_transport_message_t _z_t_msg_make_fragment_header(_z_zint_t sn, z_reliability_t reliability, bool is_last, bool start, bool stop) {
return _z_t_msg_make_fragment(sn, _z_slice_empty(), reliability, is_last, start, stop);
}
_z_transport_message_t _z_t_msg_make_fragment(_z_zint_t sn, _z_slice_t payload, z_reliability_t reliability,
bool is_last) {
bool is_last, bool start, bool stop) {
_z_transport_message_t msg;
msg._header = _Z_MID_T_FRAGMENT;
if (is_last == false) {
Expand All @@ -263,12 +295,20 @@ _z_transport_message_t _z_t_msg_make_fragment(_z_zint_t sn, _z_slice_t payload,

msg._body._fragment._sn = sn;
msg._body._fragment._payload = payload;
if (start == true || stop == true) {
_Z_SET_FLAG(msg._header, _Z_FLAG_T_Z);
}
msg._body._fragment.start = start;
msg._body._fragment.stop = stop;

return msg;
}

void _z_t_msg_copy_fragment(_z_t_msg_fragment_t *clone, _z_t_msg_fragment_t *msg) {
clone->_payload = msg->_payload;
_z_slice_copy(&clone->_payload, &msg->_payload);
clone->start = msg->start;
clone->stop = msg->stop;
}

void _z_t_msg_copy_join(_z_t_msg_join_t *clone, _z_t_msg_join_t *msg) {
Expand All @@ -279,6 +319,7 @@ void _z_t_msg_copy_join(_z_t_msg_join_t *clone, _z_t_msg_join_t *msg) {
clone->_req_id_res = msg->_req_id_res;
clone->_batch_size = msg->_batch_size;
clone->_next_sn = msg->_next_sn;
clone->_patch = msg->_patch;
memcpy(clone->_zid.id, msg->_zid.id, 16);
}

Expand Down
4 changes: 2 additions & 2 deletions src/transport/common/tx.c
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ z_result_t _z_link_send_t_msg(const _z_link_t *zl, const _z_transport_message_t
return ret;
}

z_result_t __unsafe_z_serialize_zenoh_fragment(_z_wbuf_t *dst, _z_wbuf_t *src, z_reliability_t reliability, size_t sn) {
z_result_t __unsafe_z_serialize_zenoh_fragment(_z_wbuf_t *dst, _z_wbuf_t *src, z_reliability_t reliability, size_t sn, bool start) {
z_result_t ret = _Z_RES_OK;

// Assume first that this is not the final fragment
Expand All @@ -144,7 +144,7 @@ z_result_t __unsafe_z_serialize_zenoh_fragment(_z_wbuf_t *dst, _z_wbuf_t *src, z
size_t w_pos = _z_wbuf_get_wpos(dst); // Mark the buffer for the writing operation

_z_transport_message_t f_hdr =
_z_t_msg_make_fragment_header(sn, reliability == Z_RELIABILITY_RELIABLE, is_final);
_z_t_msg_make_fragment_header(sn, reliability == Z_RELIABILITY_RELIABLE, is_final, start, false);
ret = _z_transport_message_encode(dst, &f_hdr); // Encode the frame header
if (ret == _Z_RES_OK) {
size_t space_left = _z_wbuf_space_left(dst);
Expand Down
Loading

0 comments on commit 63fbb17

Please sign in to comment.