Skip to content

Commit

Permalink
Rename DynamicBufferScatteredBuffer.
Browse files Browse the repository at this point in the history
  • Loading branch information
serges147 committed Apr 23, 2024
1 parent 80d0ea7 commit e2cca06
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 49 deletions.
48 changes: 24 additions & 24 deletions docs/design/readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -316,8 +316,8 @@ class IRunnable {
+run(now: TimePoint)
}
%% DYNAMIC BUFFER
class DynamicBuffer {
%% SCATTERED BUFFER
class ScatteredBuffer {
+copy(offset: std::size_t, destination: void*, size_bytes: std::size_t) std::size_t
}
Expand All @@ -334,12 +334,12 @@ TransferMetadata <|-- ServiceTransferMetadata
class MessageRxTransfer {
+meta: TransferMetadata
+publisher_node_id: cetl::optional~u16~
+payload: DynamicBuffer
+payload: ScatteredBuffer
}
TransferMetadata o-- MessageRxTransfer
class ServiceRxTransfer {
+meta: ServiceTransferMetadata
+payload: DynamicBuffer
+payload: ScatteredBuffer
}
ServiceTransferMetadata o-- ServiceRxTransfer
Expand Down Expand Up @@ -612,22 +612,22 @@ The `send` method can either accept the transfer for transmission in its entiret
The transfer timestamp given to the `send` method signifies the transmission deadline for the transport frames originating from this transfer; transport frames whose transmission could not be completed prior to the expiration of their deadline will be dropped (dequeued without transmission) by the transport. Such occurrences should normally be recorded for diagnostic purposes via the appropriate statistical counters, but this feature is currently outside of the scope of this proposal.
Rx sessions operate like sampling ports, storing one most recently received transfer internally as an instance of `DynamicBuffer` (see below). The buffer is passed over to the application upon the next call to `receive`. A sampling port is similar to a FIFO queue of depth 1 unit, so in the future this design can be trivially generalized to support variable-depth FIFO queues inside Rx session instances that are set to the depth of 1 unit by default. Shall the application fail to collect the received transfer(s) before the FIFO queue is full, the least recently received transfer is dropped, and the new one is pushed to the opposite end of the queue. Such occurrences should normally be recorded for diagnostic purposes via the appropriate statistical counters, but this feature is currently outside of the scope of this proposal.
Rx sessions operate like sampling ports, storing one most recently received transfer internally as an instance of `ScatteredBuffer` (see below). The buffer is passed over to the application upon the next call to `receive`. A sampling port is similar to a FIFO queue of depth 1 unit, so in the future this design can be trivially generalized to support variable-depth FIFO queues inside Rx session instances that are set to the depth of 1 unit by default. Shall the application fail to collect the received transfer(s) before the FIFO queue is full, the least recently received transfer is dropped, and the new one is pushed to the opposite end of the queue. Such occurrences should normally be recorded for diagnostic purposes via the appropriate statistical counters, but this feature is currently outside of the scope of this proposal.
#### `DynamicBuffer`
#### `ScatteredBuffer`
Lizards operate on raw serialized binary blobs of data rather than high-level message representations. Transmission is performed by enqueueing serialized transfers into a private transmission queue managed by the lizard, which is easy to abstract from the application. Reception is a more complicated case because it requires a lizard to return memory to the application that is owned by the lizard, requiring the latter to free it after use in a lizard-specific manner; further and more importantly, such memory may or may not be fragmented in a gather-scatter buffer. To hide the specifics of such memory management from the application, a new abstraction is introduced, represented by the class named `DynamicBuffer`.
Lizards operate on raw serialized binary blobs of data rather than high-level message representations. Transmission is performed by enqueueing serialized transfers into a private transmission queue managed by the lizard, which is easy to abstract from the application. Reception is a more complicated case because it requires a lizard to return memory to the application that is owned by the lizard, requiring the latter to free it after use in a lizard-specific manner; further and more importantly, such memory may or may not be fragmented in a gather-scatter buffer. To hide the specifics of such memory management from the application, a new abstraction is introduced, represented by the class named `ScatteredBuffer`.
Examples of lizard-specific management of the returned memory can be found in
<https://github.com/OpenCyphal/libcanard/blob/73d0a9cae94307038344b0d3eac2fd6dac44e139/libcanard/canard.h#L335-L338> and
<https://github.com/OpenCyphal/libudpard/blob/417d92cf9779e28c7b96bebba41665874cc13ad0/libudpard/udpard.h#L285-L311>.
The `DynamicBuffer` provides a uniform API for dealing with the Cyphal transfer payload returned by a lizard and also implements the movable/non-copyable RAII semantics for freeing the memory allocated for the buffer once the dynamic buffer instance is disposed of. The interface hides the gather-scatter nature of the buffer, providing a simplified linearized view. The definition of the class is approximately as follows:
The `ScatteredBuffer` provides a uniform API for dealing with the Cyphal transfer payload returned by a lizard and also implements the movable/non-copyable RAII semantics for freeing the memory allocated for the buffer once the scattered buffer instance is disposed of. The interface hides the gather-scatter nature of the buffer, providing a simplified linearized view. The definition of the class is approximately as follows:
```c++
/// The buffer is movable but not copyable because copying the contents of a buffer is considered wasteful.
/// The buffer behaves as if it's empty if the underlying implementation is moved away.
class DynamicBuffer final
class ScatteredBuffer final
{
public:
static constexpr std::size_t ImplementationFootprint = sizeof(void*) * 8;
Expand All @@ -651,7 +651,7 @@ public:
/// Accepts a Lizard-specific implementation of Iface and moves it into the internal storage.
template<typename T, typename = std::enable_if_t<std::is_base_of<Iface, T>::value>>
explicit DynamicBuffer(T&& source) : impl_(std::move(source)) {}
explicit ScatteredBuffer(T&& source) : impl_(std::move(source)) {}
/// Copies a fragment of the specified size at the specified offset out of the buffer.
/// The request is truncated to prevent out-of-range memory access.
Expand Down Expand Up @@ -791,7 +791,7 @@ class IRxSocket
public:
/// Payload is returned as a pointer to the heap. The buffer is allocated using the allocator given to the media
/// instance. We use heap allocation here because LibUDPard takes ownership of the payload and then transfers it
/// to the upper layers without copying via DynamicBuffer.
/// to the upper layers without copying via ScatteredBuffer.
virtual [[nodiscard]]
std::expected<std::optional<std::tuple<std::size_t, cetl::unique_ptr<std::byte[]>>>,
std::variant<PlatformError, ArgumentError, MemoryError>>
Expand Down Expand Up @@ -916,7 +916,7 @@ public:
// If the message is of type cetl::VariableLengthArray<cetl::byte, Allocator>, where Allocator may be arbitrary,
// a specialized subscriber is constructed that does not deserialize messages but returns the serialzied
// representation as-is after copying it into a new instance of the specified array type.
// Note that we can't just deliver DynamicBuffer to the application because it is non-copyable.
// Note that we can't just deliver ScatteredBuffer to the application because it is non-copyable.
// See the specialization for usage details.
template <typename Message>
[[nodiscard]] std::expected<Subscriber<Message>, Error> makeSubscriber(const std::uint16_t subject_id);
Expand All @@ -931,7 +931,7 @@ public:
// state is not lost by creating a dummy client instance whose only purpose is to keep the counter alive.
//
// If the service is of type void, a specialized client is constructed that does not serialize requests nor
// deserializes responses, but accepts raw pre-serialized requests and returns responses contained in DynamicBuffer
// deserializes responses, but accepts raw pre-serialized requests and returns responses contained in ScatteredBuffer
// as received from the transport layer. See the specialization for usage details.
template <typename Service>
[[nodiscard]] std::expected<Client<Service>, Error> makeClient(const std::uint16_t service_id,
Expand Down Expand Up @@ -1073,13 +1073,13 @@ protected:
explicit SubscriberBase(const cetl::shared_ptr<SubscriberImpl>& impl);
[[nodiscard]] virtual nunavut::support::SerializeResult doAccept(const Metadata& meta,
const DynamicBuffer& data) noexcept = 0;
const ScatteredBuffer& data) noexcept = 0;
private:
/// This is invoked from SubscriberImpl when a new transfer for this subscription is received.
/// The concrete subscriber implements this by invoking the auto-generated deserialization function.
[[nodiscard]] nunavut::support::SerializeResult accept(const Metadata& meta,
const DynamicBuffer& data) noexcept;
const ScatteredBuffer& data) noexcept;
cetl::shared_ptr<SubscriberImpl> impl_;
};
Expand All @@ -1105,7 +1105,7 @@ public:
private:
[[nodiscard]] nunavut::support::SerializeResult doAccept(const Metadata& meta,
const DynamicBuffer& data) noexcept override
const ScatteredBuffer& data) noexcept override
{
Message msg;
if (const auto res = Message::deserialize(msg, data); !res)
Expand Down Expand Up @@ -1143,7 +1143,7 @@ public:
private:
[[nodiscard]] nunavut::support::SerializeResult doAccept(const Metadata& meta,
const DynamicBuffer& data) noexcept override
const ScatteredBuffer& data) noexcept override
{
Message msg;
msg.resize(std::min(data.size(), msg.max_max_size())); // Excess will be truncated per Cyphal spec.
Expand All @@ -1161,11 +1161,11 @@ private:
};
```

The message RX session object is managed by `SubscriberImpl`, which also implements `IRunnable`. The `run` method polls the RX session, and if there is a transfer available, it is consumed, and then the `accept` method of each living `SubscriberBase` is invoked sequentially. Each `Subscriber` deserializes its own copy of the message and stores it for consumption by the application in the FIFO queue. The `SubscriberImpl` then destroys the `DynamicBuffer` containing the received transfer.
The message RX session object is managed by `SubscriberImpl`, which also implements `IRunnable`. The `run` method polls the RX session, and if there is a transfer available, it is consumed, and then the `accept` method of each living `SubscriberBase` is invoked sequentially. Each `Subscriber` deserializes its own copy of the message and stores it for consumption by the application in the FIFO queue. The `SubscriberImpl` then destroys the `ScatteredBuffer` containing the received transfer.

It is easy to see that there is a certain redundancy involved, as each `Subscriber` performs deserialization independently of each other, resulting in duplicate work. This is avoidable but is somewhat convoluted, as the type of the message is not known to `SubscriberImpl`; one approach is to use the first instance of `Subscriber` to perform deserialization and then to deliver the deserialized message object to each of its siblings by value (unless we are comfortable using shared pointers here, which we are probably not). This may need to be revised in a later version; however, one should keep in mind that the work duplication only becomes a problem for large messages that are expensive to deserialize, and the application is arguably less likely to have more than one subscriber for such expensive messages.

Another issue to be aware of on is the deep copying inherent to the C++ deserialization code generated by Nunavut. This creates potentially significant issues for large messages (imagery, point clouds, radar samples) that can be avoided with more careful buffer memory management. For example, the Python codegen implemented in Nunavut heavily leverages shared pointers and aliasing for this purpose (resorting to NumPy-aliased arrays for large blobs) instead of the slow byte-by-byte copying implemented for C++. This approach is difficult to recreate directly in LibCyphal because the presentation layer receives transfers in `DynamicBuffer`, which is fragmented as it receives the data from the underlying media layer as a sequence of byte spans pointing directly into the memory allocated for the network frames (like UDP datagrams). As the MTU is typically small, large data blobs where it's worth worrying about zero-copy deserialization will invariably end up being fragmented; hence, whatever solution is chosen for the zero-copy deserialization needs to be able to accept scattered buffers and present them to the application with a contiguous API. This leads to another problem: many (all known to me) APIs designed for imagery and point cloud manipulation expect the imagery data to be arranged in contiguous memory chunks, which suggests that at least one deep copy will be needed *somewhere* along the stack. The current revision does it in `Subscriber::doAccept` but there is no implication that this choice is optimal.
Another issue to be aware of on is the deep copying inherent to the C++ deserialization code generated by Nunavut. This creates potentially significant issues for large messages (imagery, point clouds, radar samples) that can be avoided with more careful buffer memory management. For example, the Python codegen implemented in Nunavut heavily leverages shared pointers and aliasing for this purpose (resorting to NumPy-aliased arrays for large blobs) instead of the slow byte-by-byte copying implemented for C++. This approach is difficult to recreate directly in LibCyphal because the presentation layer receives transfers in `ScatteredBuffer`, which is fragmented as it receives the data from the underlying media layer as a sequence of byte spans pointing directly into the memory allocated for the network frames (like UDP datagrams). As the MTU is typically small, large data blobs where it's worth worrying about zero-copy deserialization will invariably end up being fragmented; hence, whatever solution is chosen for the zero-copy deserialization needs to be able to accept scattered buffers and present them to the application with a contiguous API. This leads to another problem: many (all known to me) APIs designed for imagery and point cloud manipulation expect the imagery data to be arranged in contiguous memory chunks, which suggests that at least one deep copy will be needed *somewhere* along the stack. The current revision does it in `Subscriber::doAccept` but there is no implication that this choice is optimal.

The proposal intentionally excludes statistical counters; such auxiliary features are to be retrofitted at a later stage.

Expand All @@ -1184,7 +1184,7 @@ public:
[[nodiscard]] TimePoint getRequestTimestamp() const noexcept;

protected:
[[nodiscard]] std::optional<std::tuple<DynamicBuffer, ServiceTransferMetadata>> getRaw() noexcept;
[[nodiscard]] std::optional<std::tuple<ScatteredBuffer, ServiceTransferMetadata>> getRaw() noexcept;
};

template <typename Response>
Expand Down Expand Up @@ -1244,7 +1244,7 @@ public:
[[nodiscard]] std::expected<ResponsePromise<Service::Response>, Error> request(const Service::Request& req);
};

/// The non-typed specialization that accepts the request as a raw blob and returns the response as a raw DynamicBuffer.
/// The non-typed specialization that accepts the request as a raw blob and returns the response as a raw ScatteredBuffer.
template <>
class Client<void> final : public ClientBase
{
Expand All @@ -1256,7 +1256,7 @@ public:
TooManyPendingRequestsError>;

/// Sends the request and returns a promise that will be materialized when the response is successfully received.
[[nodiscard]] std::expected<ResponsePromise<DynamicBuffer>, Error> request(
[[nodiscard]] std::expected<ResponsePromise<ScatteredBuffer>, Error> request(
const std::span<const std::span<const cetl::byte>> req);
};
```
Expand Down Expand Up @@ -1307,7 +1307,7 @@ private:
std::optional<cetl::function<void(const RequestContext&)>> callback_;
};
/// The non-typed specialization that presents incoming requests as a raw DynamicBuffer and accepts responses as raw blobs.
/// The non-typed specialization that presents incoming requests as a raw ScatteredBuffer and accepts responses as raw blobs.
template <>
class Server<void> final : public ServerBase
{
Expand All @@ -1316,7 +1316,7 @@ public:
{
using Continuation = cetl::function<std::expected<void, Error>(const std::span<const std::span<const cetl::byte>>)>;
DynamicBuffer request;
ScatteredBuffer request;
Metadata meta;
Continuation continuation; ///< The continuation can be used at most once.
};
Expand Down
6 changes: 3 additions & 3 deletions include/libcyphal/transport/defines.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
#ifndef LIBCYPHAL_TRANSPORT_DEFINES_HPP_INCLUDED
#define LIBCYPHAL_TRANSPORT_DEFINES_HPP_INCLUDED

#include "dynamic_buffer.hpp"
#include "scattered_buffer.hpp"

namespace libcyphal
{
Expand Down Expand Up @@ -71,13 +71,13 @@ using PayloadFragments = cetl::span<const cetl::span<const cetl::byte>>;
struct MessageRxTransfer final
{
MessageTransferMetadata metadata;
DynamicBuffer payload;
ScatteredBuffer payload;
};

struct ServiceRxTransfer final
{
ServiceTransferMetadata metadata;
DynamicBuffer payload;
ScatteredBuffer payload;
};

/// @brief Defines maximum number of media interfaces that can be used in a Cyphal transport.
Expand Down
Loading

0 comments on commit e2cca06

Please sign in to comment.