diff --git a/include/zenoh-pico/transport/multicast/lease.h b/include/zenoh-pico/transport/multicast/lease.h index 38c51e494..a7896a3f5 100644 --- a/include/zenoh-pico/transport/multicast/lease.h +++ b/include/zenoh-pico/transport/multicast/lease.h @@ -19,8 +19,8 @@ int8_t _zp_multicast_send_join(_z_transport_multicast_t *ztm); int8_t _zp_multicast_send_keep_alive(_z_transport_multicast_t *ztm); -int8_t _zp_multicast_start_lease_task(_z_transport_t *zt, _z_task_attr_t *attr, _z_task_t *task); -int8_t _zp_multicast_stop_lease_task(_z_transport_t *zt); +int8_t _zp_multicast_start_lease_task(_z_transport_multicast_t *ztm, _z_task_attr_t *attr, _z_task_t *task); +int8_t _zp_multicast_stop_lease_task(_z_transport_multicast_t *ztm); void *_zp_multicast_lease_task(void *ztm_arg); // The argument is void* to avoid incompatible pointer types in tasks #endif /* ZENOH_PICO_MULTICAST_LEASE_H */ diff --git a/include/zenoh-pico/transport/raweth/lease.h b/include/zenoh-pico/transport/raweth/lease.h deleted file mode 100644 index 3a60ee852..000000000 --- a/include/zenoh-pico/transport/raweth/lease.h +++ /dev/null @@ -1,26 +0,0 @@ -// -// Copyright (c) 2022 ZettaScale Technology -// -// This program and the accompanying materials are made available under the -// terms of the Eclipse Public License 2.0 which is available at -// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 -// which is available at https://www.apache.org/licenses/LICENSE-2.0. -// -// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 -// -// Contributors: -// ZettaScale Zenoh Team, -// - -#ifndef ZENOH_PICO_RAWETH_LEASE_H -#define ZENOH_PICO_RAWETH_LEASE_H - -#include "zenoh-pico/transport/transport.h" - -int8_t _zp_raweth_send_join(_z_transport_multicast_t *ztm); -int8_t _zp_raweth_send_keep_alive(_z_transport_multicast_t *ztm); -int8_t _zp_raweth_start_lease_task(_z_transport_t *zt, _z_task_attr_t *attr, _z_task_t *task); -int8_t _zp_raweth_stop_lease_task(_z_transport_t *zt); -void *_zp_raweth_lease_task(void *ztm_arg); // The argument is void* to avoid incompatible pointer types in tasks - -#endif /* ZENOH_PICO_RAWETH_LEASE_H */ diff --git a/include/zenoh-pico/transport/transport.h b/include/zenoh-pico/transport/transport.h index bb9ad72c2..2c2132786 100644 --- a/include/zenoh-pico/transport/transport.h +++ b/include/zenoh-pico/transport/transport.h @@ -53,6 +53,11 @@ _Z_LIST_DEFINE(_z_transport_peer_entry, _z_transport_peer_entry_t) _z_transport_peer_entry_list_t *_z_transport_peer_entry_list_insert(_z_transport_peer_entry_list_t *root, _z_transport_peer_entry_t *entry); +// Forward declaration to be used in _zp_f_send_tmsg* +typedef struct _z_transport_multicast_t _z_transport_multicast_t; +// Send function prototype +typedef int8_t (*_zp_f_send_tmsg)(_z_transport_multicast_t *self, const _z_transport_message_t *t_msg); + typedef struct { // Session associated to the transport @@ -93,7 +98,7 @@ typedef struct { volatile _Bool _transmitted; } _z_transport_unicast_t; -typedef struct { +typedef struct _z_transport_multicast_t { // Session associated to the transport void *_session; @@ -121,6 +126,9 @@ typedef struct { // Known valid peers _z_transport_peer_entry_list_t *_peers; + // T message send function + _zp_f_send_tmsg _send_f; + #if Z_FEATURE_MULTI_THREAD == 1 _z_task_t *_read_task; _z_task_t *_lease_task; diff --git a/src/net/session.c b/src/net/session.c index f5dcbc341..b9c4c947d 100644 --- a/src/net/session.c +++ b/src/net/session.c @@ -29,8 +29,8 @@ #include "zenoh-pico/transport/multicast.h" #include "zenoh-pico/transport/multicast/lease.h" #include "zenoh-pico/transport/multicast/read.h" -#include "zenoh-pico/transport/raweth/lease.h" #include "zenoh-pico/transport/raweth/read.h" +#include "zenoh-pico/transport/transport.h" #include "zenoh-pico/transport/unicast.h" #include "zenoh-pico/transport/unicast/lease.h" #include "zenoh-pico/transport/unicast/read.h" @@ -221,10 +221,10 @@ int8_t _zp_start_lease_task(_z_session_t *zn, _z_task_attr_t *attr) { ret = _zp_unicast_start_lease_task(&zn->_tp, attr, task); break; case _Z_TRANSPORT_MULTICAST_TYPE: - ret = _zp_multicast_start_lease_task(&zn->_tp, attr, task); + ret = _zp_multicast_start_lease_task(&zn->_tp._transport._multicast, attr, task); break; case _Z_TRANSPORT_RAWETH_TYPE: - ret = _zp_raweth_start_lease_task(&zn->_tp, attr, task); + ret = _zp_multicast_start_lease_task(&zn->_tp._transport._raweth, attr, task); break; default: ret = _Z_ERR_TRANSPORT_NOT_AVAILABLE; @@ -265,10 +265,10 @@ int8_t _zp_stop_lease_task(_z_session_t *zn) { ret = _zp_unicast_stop_lease_task(&zn->_tp); break; case _Z_TRANSPORT_MULTICAST_TYPE: - ret = _zp_multicast_stop_lease_task(&zn->_tp); + ret = _zp_multicast_stop_lease_task(&zn->_tp._transport._multicast); break; case _Z_TRANSPORT_RAWETH_TYPE: - ret = _zp_raweth_stop_lease_task(&zn->_tp); + ret = _zp_multicast_stop_lease_task(&zn->_tp._transport._raweth); break; default: ret = _Z_ERR_TRANSPORT_NOT_AVAILABLE; diff --git a/src/transport/common/lease.c b/src/transport/common/lease.c index d37f56f14..8b8cc1078 100644 --- a/src/transport/common/lease.c +++ b/src/transport/common/lease.c @@ -17,7 +17,6 @@ #include #include "zenoh-pico/transport/multicast/lease.h" -#include "zenoh-pico/transport/raweth/lease.h" #include "zenoh-pico/transport/unicast/lease.h" int8_t _z_send_keep_alive(_z_transport_t *zt) { @@ -30,7 +29,7 @@ int8_t _z_send_keep_alive(_z_transport_t *zt) { ret = _zp_multicast_send_keep_alive(&zt->_transport._multicast); break; case _Z_TRANSPORT_RAWETH_TYPE: - ret = _zp_raweth_send_keep_alive(&zt->_transport._raweth); + ret = _zp_multicast_send_keep_alive(&zt->_transport._raweth); break; default: ret = _Z_ERR_TRANSPORT_NOT_AVAILABLE; @@ -47,7 +46,7 @@ int8_t _z_send_join(_z_transport_t *zt) { ret = _zp_multicast_send_join(&zt->_transport._multicast); break; case _Z_TRANSPORT_RAWETH_TYPE: - ret = _zp_raweth_send_join(&zt->_transport._raweth); + ret = _zp_multicast_send_join(&zt->_transport._raweth); break; default: (void)zt; @@ -55,4 +54,4 @@ int8_t _z_send_join(_z_transport_t *zt) { break; } return ret; -} +} \ No newline at end of file diff --git a/src/transport/multicast/lease.c b/src/transport/multicast/lease.c index dc652992c..4a56a9c39 100644 --- a/src/transport/multicast/lease.c +++ b/src/transport/multicast/lease.c @@ -19,11 +19,9 @@ #include "zenoh-pico/config.h" #include "zenoh-pico/session/utils.h" #include "zenoh-pico/transport/common/lease.h" -#include "zenoh-pico/transport/multicast/transport.h" -#include "zenoh-pico/transport/multicast/tx.h" #include "zenoh-pico/utils/logging.h" -#if Z_FEATURE_MULTICAST_TRANSPORT == 1 +#if Z_FEATURE_MULTICAST_TRANSPORT == 1 || Z_FEATURE_RAWETH_TRANSPORT == 1 static _z_zint_t _z_get_minimum_lease(_z_transport_peer_entry_list_t *peers, _z_zint_t local_lease) { _z_zint_t ret = local_lease; @@ -68,34 +66,30 @@ int8_t _zp_multicast_send_join(_z_transport_multicast_t *ztm) { _z_id_t zid = ((_z_session_t *)ztm->_session)->_local_zid; _z_transport_message_t jsm = _z_t_msg_make_join(Z_WHATAMI_PEER, Z_TRANSPORT_LEASE, zid, next_sn); - return _z_multicast_send_t_msg(ztm, &jsm); + return ztm->_send_f(ztm, &jsm); } int8_t _zp_multicast_send_keep_alive(_z_transport_multicast_t *ztm) { - int8_t ret = _Z_RES_OK; - _z_transport_message_t t_msg = _z_t_msg_make_keep_alive(); - ret = _z_multicast_send_t_msg(ztm, &t_msg); - - return ret; + return ztm->_send_f(ztm, &t_msg); } -int8_t _zp_multicast_start_lease_task(_z_transport_t *zt, _z_task_attr_t *attr, _z_task_t *task) { +int8_t _zp_multicast_start_lease_task(_z_transport_multicast_t *ztm, _z_task_attr_t *attr, _z_task_t *task) { // Init memory (void)memset(task, 0, sizeof(_z_task_t)); // Attach task - zt->_transport._multicast._lease_task = task; - zt->_transport._multicast._lease_task_running = true; + ztm->_lease_task = task; + ztm->_lease_task_running = true; // Init task - if (_z_task_init(task, attr, _zp_multicast_lease_task, &zt->_transport._multicast) != _Z_RES_OK) { - zt->_transport._multicast._lease_task_running = false; + if (_z_task_init(task, attr, _zp_multicast_lease_task, ztm) != _Z_RES_OK) { + ztm->_lease_task_running = false; return _Z_ERR_SYSTEM_TASK_FAILED; } return _Z_RES_OK; } -int8_t _zp_multicast_stop_lease_task(_z_transport_t *zt) { - zt->_transport._multicast._lease_task_running = false; +int8_t _zp_multicast_stop_lease_task(_z_transport_multicast_t *ztm) { + ztm->_lease_task_running = false; return _Z_RES_OK; } @@ -205,15 +199,15 @@ int8_t _zp_multicast_send_keep_alive(_z_transport_multicast_t *ztm) { return _Z_ERR_TRANSPORT_NOT_AVAILABLE; } -int8_t _zp_multicast_start_lease_task(_z_transport_t *zt, _z_task_attr_t *attr, _z_task_t *task) { - _ZP_UNUSED(zt); +int8_t _zp_multicast_start_lease_task(_z_transport_multicast_t *ztm, _z_task_attr_t *attr, _z_task_t *task) { + _ZP_UNUSED(ztm); _ZP_UNUSED(attr); _ZP_UNUSED(task); return _Z_ERR_TRANSPORT_NOT_AVAILABLE; } -int8_t _zp_multicast_stop_lease_task(_z_transport_t *zt) { - _ZP_UNUSED(zt); +int8_t _zp_multicast_stop_lease_task(_z_transport_multicast_t *ztm) { + _ZP_UNUSED(ztm); return _Z_ERR_TRANSPORT_NOT_AVAILABLE; } @@ -221,4 +215,4 @@ void *_zp_multicast_lease_task(void *ztm_arg) { _ZP_UNUSED(ztm_arg); return NULL; } -#endif // Z_FEATURE_MULTICAST_TRANSPORT == 1 +#endif // Z_FEATURE_MULTICAST_TRANSPORT == 1 || Z_FEATURE_RAWETH_TRANSPORT == 1 diff --git a/src/transport/multicast/transport.c b/src/transport/multicast/transport.c index 51286de6b..8df2f7907 100644 --- a/src/transport/multicast/transport.c +++ b/src/transport/multicast/transport.c @@ -35,6 +35,7 @@ int8_t _z_multicast_transport_create(_z_transport_t *zt, _z_link_t *zl, int8_t ret = _Z_RES_OK; zt->_type = _Z_TRANSPORT_MULTICAST_TYPE; + zt->_transport._multicast._send_f = _z_multicast_send_t_msg; #if Z_FEATURE_MULTI_THREAD == 1 // Initialize the mutexes diff --git a/src/transport/raweth/lease.c b/src/transport/raweth/lease.c deleted file mode 100644 index f3fc827c6..000000000 --- a/src/transport/raweth/lease.c +++ /dev/null @@ -1,224 +0,0 @@ -// -// Copyright (c) 2022 ZettaScale Technology -// -// This program and the accompanying materials are made available under the -// terms of the Eclipse Public License 2.0 which is available at -// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 -// which is available at https://www.apache.org/licenses/LICENSE-2.0. -// -// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 -// -// Contributors: -// ZettaScale Zenoh Team, -// - -#include "zenoh-pico/transport/raweth/lease.h" - -#include - -#include "zenoh-pico/config.h" -#include "zenoh-pico/session/utils.h" -#include "zenoh-pico/transport/common/lease.h" -#include "zenoh-pico/transport/raweth/transport.h" -#include "zenoh-pico/transport/raweth/tx.h" -#include "zenoh-pico/utils/logging.h" - -#if Z_FEATURE_RAWETH_TRANSPORT == 1 - -static _z_zint_t _z_get_minimum_lease(_z_transport_peer_entry_list_t *peers, _z_zint_t local_lease) { - _z_zint_t ret = local_lease; - - _z_transport_peer_entry_list_t *it = peers; - while (it != NULL) { - _z_transport_peer_entry_t *val = _z_transport_peer_entry_list_head(it); - _z_zint_t lease = val->_lease; - if (lease < ret) { - ret = lease; - } - - it = _z_transport_peer_entry_list_tail(it); - } - - return ret; -} - -static _z_zint_t _z_get_next_lease(_z_transport_peer_entry_list_t *peers) { - _z_zint_t ret = SIZE_MAX; - - _z_transport_peer_entry_list_t *it = peers; - while (it != NULL) { - _z_transport_peer_entry_t *val = _z_transport_peer_entry_list_head(it); - _z_zint_t next_lease = val->_next_lease; - if (next_lease < ret) { - ret = next_lease; - } - - it = _z_transport_peer_entry_list_tail(it); - } - - return ret; -} - -int8_t _zp_raweth_send_join(_z_transport_multicast_t *ztm) { - _z_conduit_sn_list_t next_sn; - next_sn._is_qos = false; - next_sn._val._plain._best_effort = ztm->_sn_tx_best_effort; - next_sn._val._plain._reliable = ztm->_sn_tx_reliable; - - _z_id_t zid = ((_z_session_t *)ztm->_session)->_local_zid; - _z_transport_message_t jsm = _z_t_msg_make_join(Z_WHATAMI_PEER, Z_TRANSPORT_LEASE, zid, next_sn); - - return _z_raweth_send_t_msg(ztm, &jsm); -} - -int8_t _zp_raweth_send_keep_alive(_z_transport_multicast_t *ztm) { - int8_t ret = _Z_RES_OK; - - _z_transport_message_t t_msg = _z_t_msg_make_keep_alive(); - ret = _z_raweth_send_t_msg(ztm, &t_msg); - - return ret; -} - -int8_t _zp_raweth_start_lease_task(_z_transport_t *zt, _z_task_attr_t *attr, _z_task_t *task) { - // Init memory - (void)memset(task, 0, sizeof(_z_task_t)); - // Attach task - zt->_transport._raweth._lease_task = task; - zt->_transport._raweth._lease_task_running = true; - // Init task - if (_z_task_init(task, attr, _zp_raweth_lease_task, &zt->_transport._raweth) != _Z_RES_OK) { - zt->_transport._raweth._lease_task_running = false; - return _Z_ERR_SYSTEM_TASK_FAILED; - } - return _Z_RES_OK; -} - -int8_t _zp_raweth_stop_lease_task(_z_transport_t *zt) { - zt->_transport._raweth._lease_task_running = false; - return _Z_RES_OK; -} - -void *_zp_raweth_lease_task(void *ztm_arg) { -#if Z_FEATURE_MULTI_THREAD == 1 - _z_transport_multicast_t *ztm = (_z_transport_multicast_t *)ztm_arg; - ztm->_transmitted = false; - - // From all peers, get the next lease time (minimum) - _z_zint_t next_lease = _z_get_minimum_lease(ztm->_peers, ztm->_lease); - _z_zint_t next_keep_alive = (_z_zint_t)(next_lease / Z_TRANSPORT_LEASE_EXPIRE_FACTOR); - _z_zint_t next_join = Z_JOIN_INTERVAL; - - _z_transport_peer_entry_list_t *it = NULL; - while (ztm->_lease_task_running == true) { - _z_mutex_lock(&ztm->_mutex_peer); - - if (next_lease <= 0) { - it = ztm->_peers; - while (it != NULL) { - _z_transport_peer_entry_t *entry = _z_transport_peer_entry_list_head(it); - if (entry->_received == true) { - // Reset the lease parameters - entry->_received = false; - entry->_next_lease = entry->_lease; - it = _z_transport_peer_entry_list_tail(it); - } else { - _Z_INFO("Remove peer from know list because it has expired after %zums\n", entry->_lease); - ztm->_peers = - _z_transport_peer_entry_list_drop_filter(ztm->_peers, _z_transport_peer_entry_eq, entry); - it = ztm->_peers; - } - } - } - - if (next_join <= 0) { - _zp_raweth_send_join(ztm); - ztm->_transmitted = true; - - // Reset the join parameters - next_join = Z_JOIN_INTERVAL; - } - - if (next_keep_alive <= 0) { - // Check if need to send a keep alive - if (ztm->_transmitted == false) { - if (_zp_raweth_send_keep_alive(ztm) < 0) { - // TODO: Handle retransmission or error - } - } - - // Reset the keep alive parameters - ztm->_transmitted = false; - next_keep_alive = - (_z_zint_t)(_z_get_minimum_lease(ztm->_peers, ztm->_lease) / Z_TRANSPORT_LEASE_EXPIRE_FACTOR); - } - - // Compute the target interval to sleep - _z_zint_t interval; - if (next_lease > 0) { - interval = next_lease; - if (next_keep_alive < interval) { - interval = next_keep_alive; - } - if (next_join < interval) { - interval = next_join; - } - } else { - interval = next_keep_alive; - if (next_join < interval) { - interval = next_join; - } - } - - _z_mutex_unlock(&ztm->_mutex_peer); - - // The keep alive and lease intervals are expressed in milliseconds - z_sleep_ms(interval); - - // Decrement all intervals - _z_mutex_lock(&ztm->_mutex_peer); - - it = ztm->_peers; - while (it != NULL) { - _z_transport_peer_entry_t *entry = _z_transport_peer_entry_list_head(it); - entry->_next_lease = entry->_next_lease - interval; - it = _z_transport_peer_entry_list_tail(it); - } - next_lease = _z_get_next_lease(ztm->_peers); - next_keep_alive = next_keep_alive - interval; - next_join = next_join - interval; - - _z_mutex_unlock(&ztm->_mutex_peer); - } -#endif // Z_FEATURE_MULTI_THREAD == 1 - - return 0; -} -#else -int8_t _zp_raweth_send_join(_z_transport_multicast_t *ztm) { - _ZP_UNUSED(ztm); - return _Z_ERR_TRANSPORT_NOT_AVAILABLE; -} - -int8_t _zp_raweth_send_keep_alive(_z_transport_multicast_t *ztm) { - _ZP_UNUSED(ztm); - return _Z_ERR_TRANSPORT_NOT_AVAILABLE; -} - -int8_t _zp_raweth_start_lease_task(_z_transport_t *zt, _z_task_attr_t *attr, _z_task_t *task) { - _ZP_UNUSED(zt); - _ZP_UNUSED(attr); - _ZP_UNUSED(task); - return _Z_ERR_TRANSPORT_NOT_AVAILABLE; -} - -int8_t _zp_raweth_stop_lease_task(_z_transport_t *zt) { - _ZP_UNUSED(zt); - return _Z_ERR_TRANSPORT_NOT_AVAILABLE; -} - -void *_zp_raweth_lease_task(void *ztm_arg) { - _ZP_UNUSED(ztm_arg); - return NULL; -} -#endif // Z_FEATURE_RAWETH_TRANSPORT == 1 diff --git a/src/transport/raweth/transport.c b/src/transport/raweth/transport.c index 7dced540d..ddc0d7b8b 100644 --- a/src/transport/raweth/transport.c +++ b/src/transport/raweth/transport.c @@ -34,6 +34,7 @@ int8_t _z_raweth_transport_create(_z_transport_t *zt, _z_link_t *zl, _z_transpor int8_t ret = _Z_RES_OK; zt->_type = _Z_TRANSPORT_RAWETH_TYPE; + zt->_transport._raweth._send_f = _z_raweth_send_t_msg; #if Z_FEATURE_MULTI_THREAD == 1 // Initialize the mutexes