diff --git a/CMakeLists.txt b/CMakeLists.txt index 6176147d..6d185aad 100755 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -87,7 +87,7 @@ list(APPEND _deps "microcdr\;${_microcdr_version}") ############################################################################### set(CMAKE_MODULE_PATH ${CMAKE_MODULE_PATH} ${CMAKE_CURRENT_SOURCE_DIR}/cmake/modules) if(NOT UCLIENT_SUPERBUILD) - project(microxrcedds_client VERSION "2.1.0" LANGUAGES C) + project(microxrcedds_client VERSION "2.1.1" LANGUAGES C) else() project(uclient_superbuild NONE) include(${PROJECT_SOURCE_DIR}/cmake/SuperBuild.cmake) @@ -200,6 +200,8 @@ if(UCLIENT_PROFILE_SERIAL) list(APPEND _transport_src src/c/profile/transport/serial/serial_transport.c) if(UCLIENT_PLATFORM_POSIX) list(APPEND _transport_src src/c/profile/transport/serial/serial_transport_posix.c) + elseif(UCLIENT_PLATFORM_RTEMS_BSD_NET) + list(APPEND _transport_src src/c/profile/transport/serial/serial_transport_rtems_bsd_net.c) endif() endif() diff --git a/examples/ReplyAdder/main.c b/examples/ReplyAdder/main.c index 76693d39..b0ac79ac 100644 --- a/examples/ReplyAdder/main.c +++ b/examples/ReplyAdder/main.c @@ -151,8 +151,7 @@ int main( bool connected = true; while (connected) { - uint8_t read_data_status; - connected = uxr_run_session_until_all_status(&session, UXR_TIMEOUT_INF, &read_data_req, &read_data_status, 1); + connected = uxr_run_session_time(&session, 1000); } return 0; diff --git a/examples/SubscribeHelloWorld/main.c b/examples/SubscribeHelloWorld/main.c index 2e21f8c0..d535d77d 100644 --- a/examples/SubscribeHelloWorld/main.c +++ b/examples/SubscribeHelloWorld/main.c @@ -145,15 +145,13 @@ int main( 0 }; delivery_control.max_samples = UXR_MAX_SAMPLES_UNLIMITED; - uint16_t read_data_req = uxr_buffer_request_data(&session, reliable_out, datareader_id, reliable_in, - &delivery_control); + uxr_buffer_request_data(&session, reliable_out, datareader_id, reliable_in, &delivery_control); // Read topics bool connected = true; while (connected && count < max_topics) { - uint8_t read_data_status; - connected = uxr_run_session_until_all_status(&session, UXR_TIMEOUT_INF, &read_data_req, &read_data_status, 1); + connected = uxr_run_session_time(&session, 1000); } // Delete resources diff --git a/include/uxr/client/core/session/session.h b/include/uxr/client/core/session/session.h index 0bdb71a6..e93fee69 100644 --- a/include/uxr/client/core/session/session.h +++ b/include/uxr/client/core/session/session.h @@ -162,6 +162,11 @@ typedef struct uxrContinuousArgs size_t data_size; } uxrContinuousArgs; +typedef uint8_t pong_status_t; +#define NO_PONG_STATUS 0x00 +#define PONG_IN_SESSION_STATUS 0x01 +#define PONG_NO_SESSION_STATUS 0x02 + /** * @nosubgrouping */ @@ -193,7 +198,7 @@ typedef struct uxrSession void* on_reply_args; bool on_data_flag; - bool on_pong_flag; + pong_status_t on_pong_flag; uxrContinuousArgs continuous_args; #ifdef UCLIENT_PROFILE_MULTITHREAD diff --git a/include/uxr/client/profile/transport/can/can_transport.h b/include/uxr/client/profile/transport/can/can_transport.h index 4a8fd21b..3bc36af0 100644 --- a/include/uxr/client/profile/transport/can/can_transport.h +++ b/include/uxr/client/profile/transport/can/can_transport.h @@ -29,7 +29,7 @@ extern "C" #include #include -#define UXR_CONFIG_CAN_TRANSPORT_MTU 64 +#define UXR_CONFIG_CAN_TRANSPORT_MTU 63 typedef struct uxrCANTransport { diff --git a/include/uxr/client/profile/transport/serial/serial_transport_rtems_bsd_net.h b/include/uxr/client/profile/transport/serial/serial_transport_rtems_bsd_net.h new file mode 100644 index 00000000..86e6e6c8 --- /dev/null +++ b/include/uxr/client/profile/transport/serial/serial_transport_rtems_bsd_net.h @@ -0,0 +1,35 @@ +// Copyright 2018 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef UXR_CLIENT_PROFILE_TRANSPORT_SERIAL_SERIALTRANSPORTRTEMS_H_ +#define UXR_CLIENT_PROFILE_TRANSPORT_SERIAL_SERIALTRANSPORTRTEMS_H_ + +#ifdef __cplusplus +extern "C" +{ +#endif // ifdef __cplusplus + +#include "sys/select.h" + +typedef struct uxrSerialPlatform +{ + struct fd_set select_fd; + int fd; +} uxrSerialPlatform; + +#ifdef __cplusplus +} +#endif // ifdef __cplusplus + +#endif // UXR_CLIENT_PROFILE_TRANSPORT_SERIAL_SERIALTRANSPORTPOSIX_H_ diff --git a/include/uxr/client/transport.h b/include/uxr/client/transport.h index 95003228..e235f5b4 100644 --- a/include/uxr/client/transport.h +++ b/include/uxr/client/transport.h @@ -46,6 +46,8 @@ #ifdef UCLIENT_PROFILE_SERIAL #if defined(UCLIENT_PLATFORM_POSIX) #include +#elif defined(UCLIENT_PLATFORM_RTEMS_BSD_NET) +#include #endif // if defined(UCLIENT_EXTERNAL_SERIAL) #include #endif //UCLIENT_PROFILE_SERIAL diff --git a/src/c/core/session/session.c b/src/c/core/session/session.c index 153b4f23..6c26c6d8 100644 --- a/src/c/core/session/session.c +++ b/src/c/core/session/session.c @@ -125,7 +125,7 @@ static bool run_session_until_sync( uxrSession* session, int timeout); -bool uxr_acknack_pong( +pong_status_t uxr_acknack_pong( ucdrBuffer* buffer); //================================================================== @@ -232,6 +232,12 @@ bool uxr_create_session_retries( bool received = wait_session_status(session, create_session_buffer, ucdr_buffer_length(&ub), (size_t) retries); bool created = received && UXR_STATUS_OK == session->info.last_requested_status; + + if (created) + { + uxr_reset_stream_storage(&session->streams); + } + return created; } @@ -392,12 +398,16 @@ bool uxr_run_session_until_confirm_delivery( { UXR_LOCK_SESSION(session); + int64_t start_timestamp = uxr_millis(); + int remaining_time = timeout_ms; + uxr_flash_output_streams(session); - bool timeout = false; - while (!uxr_output_streams_confirmed(&session->streams) && !timeout) + while (remaining_time >= 0 && !uxr_output_streams_confirmed(&session->streams)) { - timeout = !listen_message_reliably(session, timeout_ms); + listen_message_reliably(session, remaining_time); + + remaining_time = timeout_ms - (int)(uxr_millis() - start_timestamp); } bool ret = uxr_output_streams_confirmed(&session->streams); @@ -478,18 +488,22 @@ bool uxr_run_session_until_one_status( session->status_list = status_list; session->request_status_list_size = list_size; - bool timeout = false; bool status_confirmed = false; + + int64_t start_timestamp = uxr_millis(); + int remaining_time = timeout_ms; + do { - timeout = !listen_message_reliably(session, timeout_ms); + listen_message_reliably(session, timeout_ms); + remaining_time = timeout_ms - (int)(uxr_millis() - start_timestamp); for (unsigned i = 0; i < list_size && !status_confirmed; ++i) { status_confirmed = status_list[i] != UXR_STATUS_NONE || request_list[i] == UXR_INVALID_REQUEST_ID; //CHECK: better give an error? an assert? } } - while (!timeout && !status_confirmed); + while (remaining_time > 0 && !status_confirmed); session->request_status_list_size = 0; UXR_UNLOCK_SESSION(session); @@ -612,14 +626,14 @@ void uxr_flash_output_streams( //================================================================== // PRIVATE //================================================================== -bool uxr_acknack_pong( +pong_status_t uxr_acknack_pong( ucdrBuffer* buffer) { bool success = false; bool ret = false; - bool must_be_read = ucdr_buffer_remaining(buffer) > SUBHEADER_SIZE; + bool active_session = false; - if (must_be_read) + if (ucdr_buffer_remaining(buffer) > SUBHEADER_SIZE) { uint8_t id = 0; uint8_t flags = 0; @@ -632,6 +646,8 @@ bool uxr_acknack_pong( INFO_Payload info_payload; success &= uxr_deserialize_BaseObjectReply(buffer, &info_payload.base); + active_session = info_payload.base.result.implementation_status; + success &= ucdr_deserialize_bool(buffer, &info_payload.object_info.optional_config); if (info_payload.object_info.optional_config) @@ -653,7 +669,7 @@ bool uxr_acknack_pong( } } - return ret; + return ret ? (active_session ? PONG_IN_SESSION_STATUS : PONG_NO_SESSION_STATUS) : NO_PONG_STATUS; } bool uxr_run_session_until_pong( @@ -665,11 +681,11 @@ bool uxr_run_session_until_pong( uxr_flash_output_streams(session); - session->on_pong_flag = false; + session->on_pong_flag = NO_PONG_STATUS; do { listen_message_reliably(session, remaining_time); - if (session->on_pong_flag) + if (NO_PONG_STATUS != session->on_pong_flag) { break; } @@ -677,7 +693,7 @@ bool uxr_run_session_until_pong( } while (remaining_time > 0); - bool ret = session->on_pong_flag; + bool ret = PONG_IN_SESSION_STATUS == session->on_pong_flag; return ret; } @@ -861,9 +877,9 @@ void read_message( uxrStreamId id = uxr_stream_id_from_raw(stream_id_raw, UXR_INPUT_STREAM); read_stream(session, ub, id, seq_num); } - else if (uxr_acknack_pong(ub)) + else { - session->on_pong_flag = true; + session->on_pong_flag = uxr_acknack_pong(ub); } } @@ -1203,11 +1219,15 @@ bool run_session_until_sync( uxrSession* session, int timeout) { + int64_t start_timestamp = uxr_millis(); + int remaining_time = timeout; session->synchronized = false; - bool timeout_exceeded = false; - while (!timeout_exceeded && !session->synchronized) + + do { - timeout_exceeded = !listen_message_reliably(session, timeout); - } + listen_message_reliably(session, remaining_time); + remaining_time = timeout - (int)(uxr_millis() - start_timestamp); + } while (remaining_time > 0 && !session->synchronized); + return session->synchronized; -} +} \ No newline at end of file diff --git a/src/c/core/session/stream/output_reliable_stream.c b/src/c/core/session/stream/output_reliable_stream.c index 1e7b706a..b3ce287d 100644 --- a/src/c/core/session/stream/output_reliable_stream.c +++ b/src/c/core/session/stream/output_reliable_stream.c @@ -62,6 +62,9 @@ bool uxr_prepare_reliable_buffer_to_write( uint8_t* buffer = uxr_get_reliable_buffer(&stream->base, seq_num); size_t buffer_size = uxr_get_reliable_buffer_size(&stream->base, seq_num); + // Aligment required for inserting an XRCE subheader + buffer_size += ucdr_alignment(buffer_size, 4); + /* Check if the message fit in the current buffer */ if (buffer_size + length <= buffer_capacity) { @@ -139,6 +142,7 @@ bool uxr_prepare_reliable_buffer_to_write( fragment_size = available_block_size; } + // Prepare last fragment ucdr_init_buffer_origin_offset( &temp_ub, uxr_get_reliable_buffer(&stream->base, seq_num), @@ -150,6 +154,7 @@ bool uxr_prepare_reliable_buffer_to_write( uxr_set_reliable_buffer_size(&stream->base, seq_num, stream->offset + (size_t)(SUBHEADER_SIZE) + last_fragment_size); + // Prepare user buffer ucdr_init_buffer( ub, buffer + buffer_size + SUBHEADER_SIZE, diff --git a/src/c/core/session/write_access.c b/src/c/core/session/write_access.c index edc7e3a0..35525c34 100644 --- a/src/c/core/session/write_access.c +++ b/src/c/core/session/write_access.c @@ -177,7 +177,7 @@ bool on_full_output_buffer_fragmented( 0u, uxr_get_reliable_buffer_size(&stream->base, stream->last_written)); - if (local_args->data_size <= buffer_capacity) + if ((local_args->data_size + SUBHEADER_SIZE + WRITE_DATA_PAYLOAD_SIZE) <= buffer_capacity) { uxr_buffer_submessage_header(&temp_ub, SUBMESSAGE_ID_FRAGMENT, (uint16_t) local_args->data_size, FLAG_LAST_FRAGMENT); @@ -220,17 +220,25 @@ uint16_t uxr_prepare_output_stream_fragmented( if (stream_id.type == UXR_BEST_EFFORT_STREAM || stream == NULL) { - return rv; + UXR_UNLOCK_STREAM_ID(session, stream_id); + return UXR_INVALID_REQUEST_ID; } size_t remaining_blocks = get_available_free_slots(stream); if (0 == remaining_blocks) { - if (!flush_callback(session, flush_callback_args) || - 0 == (remaining_blocks = get_available_free_slots(stream))) + UXR_UNLOCK_STREAM_ID(session, stream_id); + if (!flush_callback(session, flush_callback_args)) { - return rv; + return UXR_INVALID_REQUEST_ID; + } + UXR_LOCK_STREAM_ID(session, stream_id); + remaining_blocks = get_available_free_slots(stream); + if (0 == remaining_blocks) + { + UXR_UNLOCK_STREAM_ID(session, stream_id); + return UXR_INVALID_REQUEST_ID; } } @@ -283,15 +291,23 @@ uint16_t uxr_prepare_output_stream_fragmented( WRITE_DATA_Payload_Data payload; rv = uxr_init_base_object_request(&session->info, entity_id, &payload.base); - (void) uxr_serialize_WRITE_DATA_Payload_Data(ub, &payload); - ucdr_init_buffer(ub, ub->iterator, (size_t)(ub->final - ub->iterator)); + if (rv == UXR_INVALID_REQUEST_ID) + { + UXR_UNLOCK_STREAM_ID(session, stream_id); + } + else + { + (void) uxr_serialize_WRITE_DATA_Payload_Data(ub, &payload); + + ucdr_init_buffer(ub, ub->iterator, (size_t)(ub->final - ub->iterator)); - session->continuous_args.stream_id = stream_id; - session->continuous_args.data_size = user_required_space; - session->continuous_args.flush_callback = flush_callback; - session->continuous_args.flush_callback_args = flush_callback_args; - ucdr_set_on_full_buffer_callback(ub, on_full_output_buffer_fragmented, session); + session->continuous_args.stream_id = stream_id; + session->continuous_args.data_size = user_required_space; + session->continuous_args.flush_callback = flush_callback; + session->continuous_args.flush_callback_args = flush_callback_args; + ucdr_set_on_full_buffer_callback(ub, on_full_output_buffer_fragmented, session); + } return rv; } diff --git a/src/c/profile/transport/can/can_transport_posix.c b/src/c/profile/transport/can/can_transport_posix.c index 65a17755..17caa1f8 100644 --- a/src/c/profile/transport/can/can_transport_posix.c +++ b/src/c/profile/transport/can/can_transport_posix.c @@ -65,7 +65,7 @@ size_t uxr_write_can_data_platform( uint8_t* errcode) { struct canfd_frame frame = { - .can_id = platform->can_id, .len = (uint8_t)len + .can_id = platform->can_id, .len = (uint8_t) (len + 1) }; struct pollfd poll_fd_write_ = { .fd = platform->poll_fd.fd, .events = POLLOUT @@ -76,12 +76,14 @@ size_t uxr_write_can_data_platform( if (0 < poll_rv) { - memcpy(&frame.data[0], buf, len); - ssize_t bytes_sent = write(poll_fd_write_.fd, &frame, CANFD_MTU); + memcpy(&frame.data[1], buf, len); + frame.data[0] = (uint8_t) len; + + ssize_t bytes_sent = write(poll_fd_write_.fd, &frame, sizeof(struct canfd_frame)); if (-1 != bytes_sent) { - rv = frame.len; + rv = len; *errcode = 0; } else @@ -115,12 +117,12 @@ size_t uxr_read_can_data_platform( if (0 < poll_rv) { - ssize_t bytes_received = read(platform->poll_fd.fd, &frame, CANFD_MTU); + ssize_t bytes_received = read(platform->poll_fd.fd, &frame, sizeof(struct canfd_frame)); - if (-1 != bytes_received) + if (-1 != bytes_received && frame.data[0] < CANFD_MTU) { - memcpy(buf, &frame.data[0], frame.len); - rv = (size_t)frame.len; + memcpy(buf, &frame.data[1], frame.data[0]); + rv = (size_t) frame.data[0]; *errcode = 0; } else diff --git a/src/c/profile/transport/serial/serial_transport_rtems_bsd_net.c b/src/c/profile/transport/serial/serial_transport_rtems_bsd_net.c new file mode 100644 index 00000000..aa4445a3 --- /dev/null +++ b/src/c/profile/transport/serial/serial_transport_rtems_bsd_net.c @@ -0,0 +1,92 @@ +#include +#include + +#include +#include + +bool uxr_init_serial_platform( + void* args, + int fd, + uint8_t remote_addr, + uint8_t local_addr) +{ + (void) remote_addr; + (void) local_addr; + + struct uxrSerialPlatform* platform = (struct uxrSerialPlatform*) args; + + platform->fd = fd; + + /* Poll setup. */ + #pragma GCC diagnostic push + #pragma GCC diagnostic ignored "-Wsign-conversion" + FD_ZERO(&platform->select_fd); + FD_SET(platform->fd, &platform->select_fd); + #pragma GCC diagnostic pop + return true; +} + +bool uxr_close_serial_platform( + void* args) +{ + struct uxrSerialPlatform* platform = (struct uxrSerialPlatform*) args; + return (-1 == platform->fd) ? true : (0 == close(platform->fd)); +} + +size_t uxr_write_serial_data_platform( + void* args, + const uint8_t* buf, + size_t len, + uint8_t* errcode) +{ + size_t rv = 0; + struct uxrSerialPlatform* platform = (struct uxrSerialPlatform*) args; + + ssize_t bytes_written = write(platform->fd, (void*)buf, (size_t)len); + if (-1 != bytes_written) + { + rv = (size_t)bytes_written; + *errcode = 0; + } + else + { + *errcode = 1; + } + return rv; +} + +size_t uxr_read_serial_data_platform( + void* args, + uint8_t* buf, + size_t len, + int timeout, + uint8_t* errcode) +{ + size_t rv = 0; + struct uxrSerialPlatform* platform = (struct uxrSerialPlatform*) args; + + struct timeval tv; + tv.tv_sec = timeout / 1000; + tv.tv_usec = (timeout % 1000) * 1000; + + fd_set fds = platform->select_fd; + int32_t poll_rv = select(platform->fd + 1, &fds, NULL, NULL, &tv); + if (0 < poll_rv) + { + ssize_t bytes_read = read(platform->fd, buf, len); + if (-1 != bytes_read) + { + rv = (size_t)bytes_read; + *errcode = 0; + } + else + { + *errcode = 1; + } + } + else + { + *errcode = (0 == poll_rv) ? 0 : 1; + } + return rv; +} \ No newline at end of file diff --git a/src/c/profile/transport/stream_framing/stream_framing_protocol.c b/src/c/profile/transport/stream_framing/stream_framing_protocol.c index 61610fa3..c7f5f249 100644 --- a/src/c/profile/transport/stream_framing/stream_framing_protocol.c +++ b/src/c/profile/transport/stream_framing/stream_framing_protocol.c @@ -166,10 +166,10 @@ size_t uxr_write_framed_msg( /* Write payload. */ uint8_t octet = 0; - uint16_t written_len = 0; + size_t written_len = 0; uint16_t crc = 0; bool cond = true; - while (written_len < len && cond) + while ((written_len < len) && cond) { octet = *(buf + written_len); if (uxr_add_next_octet(framing_io, octet)) diff --git a/src/c/util/ping.c b/src/c/util/ping.c index 3bb89995..33719924 100644 --- a/src/c/util/ping.c +++ b/src/c/util/ping.c @@ -13,7 +13,7 @@ bool serialize_get_info_message( ucdrBuffer* ub); -bool uxr_acknack_pong( +pong_status_t uxr_acknack_pong( ucdrBuffer* buffer); bool listen_info_message( @@ -153,7 +153,7 @@ bool listen_info_message( uint8_t stream_id_raw; uxrSeqNum seq_num; uxr_read_session_header(&session_info_fake, &ub, &stream_id_raw, &seq_num); - success &= uxr_acknack_pong(&ub); + success &= NO_PONG_STATUS != uxr_acknack_pong(&ub); } return success; diff --git a/test/unitary/session/streams/OutputReliableStream.cpp b/test/unitary/session/streams/OutputReliableStream.cpp index d2ad85ad..da05eab4 100644 --- a/test/unitary/session/streams/OutputReliableStream.cpp +++ b/test/unitary/session/streams/OutputReliableStream.cpp @@ -231,7 +231,7 @@ TEST_F(OutputReliableStreamTest, WriteMultipleFragmentsAndCheckSubHeaders) } // Writing two fragmented message, 3 slots should be used - size_t first_message_size = 24; // 1.5 * MAX_FRAGMENT_SIZE; + size_t first_message_size = 24; // 1.5 * MAX_FRAGMENT_SIZE; bool available_to_write = uxr_prepare_reliable_buffer_to_write(&stream, first_message_size, &ub); ASSERT_TRUE(available_to_write); available_to_write = uxr_prepare_reliable_buffer_to_write(&stream, first_message_size, &ub);