Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Develop zerocopy #1053

Merged
merged 32 commits into from
Jul 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
4a1f818
Added SharedMemory header
TheMutta Jun 17, 2024
c36ab3b
Finalized SharedMemory class
TheMutta Jun 18, 2024
cc44859
Added differentiation between SharedMemory and VectorMemory, added he…
TheMutta Jun 19, 2024
557abfd
Replaced -1 fd with 0 fd
TheMutta Jun 19, 2024
185186d
This reverts commit 557abfd0477a5217cec6cd095477e91d526dd0d4.
TheMutta Jun 19, 2024
c4ff206
Made fd and mapping in SharedMemory protected instead of private
TheMutta Jun 24, 2024
ad66010
XLinkStream read helper functions for handling FDs
TheMutta Jun 24, 2024
3c1d976
Added SharedMemory support with FD in Buffer data structure
TheMutta Jun 25, 2024
cf35d8c
Added FD support in ImgFrame data structure
TheMutta Jun 25, 2024
a5016b6
Parsing fd in StreamMessageParser
TheMutta Jun 25, 2024
5019cca
Updated XLink version.
TheMutta Jun 26, 2024
fd710a6
Closing FDs when destructing SharedMemory
TheMutta Jun 27, 2024
1effc78
Added size parameters to stream write
TheMutta Jun 27, 2024
88e86f8
Passing FDs by value
TheMutta Jun 28, 2024
0d4b57d
Resolved conflict by reverting size changes.
TheMutta Jun 28, 2024
979573a
Added new max throughput example to test the efficency of the transfe…
TheMutta Jul 1, 2024
242d6ec
Added new example in CMakeLists.txt
TheMutta Jul 1, 2024
316c073
Disabled openssh that caused issues when cross compiling to RVC4
TheMutta Jul 1, 2024
74441bd
Added X_LINK_TCP_IP_OR_LOCAL_SHDMEM
TheMutta Jul 4, 2024
587f9a2
Changes to removed unused constructor to SharedMemory and changes to …
TheMutta Jul 5, 2024
21e4649
Removed MemoryKinds
TheMutta Jul 5, 2024
bc17b50
Re-added empty constructor.
TheMutta Jul 5, 2024
d8b81e8
Added tcpshd enviroment variable
TheMutta Jul 8, 2024
2efe453
Setting back connection in deviceInfo from connectionHandler
TheMutta Jul 10, 2024
cb9ac60
Added memfd_create constructors.
TheMutta Jul 10, 2024
9c4f05c
Readded kinds.
TheMutta Jul 11, 2024
950dbd4
Added SharedMemory Fd writing in data queue
TheMutta Jul 11, 2024
48f664d
Updated XLink
TheMutta Jul 12, 2024
26fb39c
Clangformat
TheMutta Jul 12, 2024
f99c0d8
Removed example file
TheMutta Jul 12, 2024
4560bb3
reset OpenSSL compile flag
TheMutta Jul 12, 2024
1a249f3
Merge branch 'v3_develop' into v3_develop_zerocopy
TheMutta Jul 12, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ The following environment variables can be set to alter default behavior of the
| DEPTHAI_SEARCH_TIMEOUT | Specifies timeout in milliseconds for device searching in blocking functions. |
| DEPTHAI_CONNECT_TIMEOUT | Specifies timeout in milliseconds for establishing a connection to a given device. |
| DEPTHAI_BOOTUP_TIMEOUT | Specifies timeout in milliseconds for waiting the device to boot after sending the binary. |
| DEPTHAI_PROTOCOL | Restricts default search to the specified protocol. Options: `any`, `usb`, `tcpip`. |
| DEPTHAI_PROTOCOL | Restricts default search to the specified protocol. Options: `any`, `usb`, `tcpip`, `tcpshd`. |
| DEPTHAI_PLATFORM | Restricts default search to the specified platform. Options: `any`, `rvc2`, `rvc3`, `rvc4`. |
| DEPTHAI_DEVICE_MXID_LIST | Restricts default search to the specified MXIDs. Accepts comma separated list of MXIDs. Lists filter results in an "AND" manner and not "OR" |
| DEPTHAI_DEVICE_ID_LIST | Alias to MXID list. Lists filter results in an "AND" manner and not "OR" |
Expand Down
6 changes: 3 additions & 3 deletions cmake/Hunter/config.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ hunter_config(

hunter_config(
XLink
VERSION "luxonis-2021.4.3-develop"
URL "https://github.com/luxonis/XLink/archive/061709357b06a74510b5b9e3e2832d9d80efd3ff.tar.gz"
SHA1 "affa1bdd7487cf70c9caf40365844967ae761ea3"
VERSION "luxonis-develop-server"
URL "https://github.com/luxonis/XLink/archive/585a38fe4707e5f023de277135d8cb6ff9c4e0c4.tar.gz"
SHA1 "d82827dec8b6f2702f4b31d8186fd70265cd0ca4"
CMAKE_ARGS
XLINK_ENABLE_LIBUSB=${DEPTHAI_ENABLE_LIBUSB}
)
Expand Down
2 changes: 1 addition & 1 deletion examples/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
add_subdirectory(cpp)
# add_subdirectory(python)
# add_subdirectory(python)
1 change: 1 addition & 0 deletions include/depthai/pipeline/datatype/ADatatype.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include "depthai/utility/Memory.hpp"
#include "depthai/utility/Serialization.hpp"
#include "depthai/utility/VectorMemory.hpp"

namespace dai {

/// Abstract message
Expand Down
3 changes: 3 additions & 0 deletions include/depthai/pipeline/datatype/Buffer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ class Buffer : public ADatatype {
public:
Buffer() = default;
Buffer(size_t size);
Buffer(long fd);
Buffer(long fd, size_t size);
virtual ~Buffer() = default;

virtual void serialize(std::vector<std::uint8_t>& metadata, DatatypeEnum& datatype) const {
Expand All @@ -35,6 +37,7 @@ class Buffer : public ADatatype {
* @param data Copies data to internal buffer
*/
void setData(const std::vector<std::uint8_t>& data);
void setData(const long fd);

/**
* @param data Moves data to internal buffer
Expand Down
2 changes: 2 additions & 0 deletions include/depthai/pipeline/datatype/ImgFrame.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,9 @@ class ImgFrame : public Buffer {
* Timestamp is set to now
*/
ImgFrame();
ImgFrame(long fd);
ImgFrame(size_t size);
ImgFrame(long fd, size_t size);
virtual ~ImgFrame() = default;

ImgTransformations transformations;
Expand Down
13 changes: 13 additions & 0 deletions include/depthai/utility/Memory.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,12 @@ holder(std::move(holder)) {}
};
*/

enum MemoryKinds {
MEMORY_KIND_VECTOR,
MEMORY_KIND_SHARED,
MEMORY_KINDS_COUNT,
};

// memory as interface
class Memory {
public:
Expand All @@ -120,6 +126,13 @@ class Memory {
std::size_t getSize() const {
return getData().size();
};

MemoryKinds getKind() {
return kind;
}

protected:
MemoryKinds kind;
};

} // namespace dai
125 changes: 125 additions & 0 deletions include/depthai/utility/SharedMemory.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
#pragma once

// std
#include <cstdint>
#include <cstring>
#include <functional>
#include <iostream>
#ifdef __unix__
#include <fcntl.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <sys/un.h>
#include <unistd.h>
#endif

// project
#include "depthai/utility/Memory.hpp"

namespace dai {

// memory as interface
class SharedMemory : public Memory {
protected:
long fd = -1;
void* mapping;
void mapFd() {
if(fd < 0) {
/* Error handling here */
}

mapping = mmap(NULL, getSize(), PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
if(mapping == NULL) {
/* Error handling here */
}
}
void unmapFd() {
if(mapping == NULL) {
return;
}

munmap(mapping, getSize());
}

public:
SharedMemory() {
kind = MemoryKinds::MEMORY_KIND_SHARED;
fd = -1;
}

SharedMemory(long argFd) : fd(argFd) {
kind = MemoryKinds::MEMORY_KIND_SHARED;
mapFd();
}

SharedMemory(long argFd, std::size_t size) : fd(argFd) {
kind = MemoryKinds::MEMORY_KIND_SHARED;
setSize(size);
mapFd();
}

SharedMemory(const char* name) {
kind = MemoryKinds::MEMORY_KIND_SHARED;
fd = memfd_create(name, 0);
mapFd();
}

SharedMemory(const char* name, std::size_t size) {
kind = MemoryKinds::MEMORY_KIND_SHARED;
fd = memfd_create(name, 0);

setSize(size);
mapFd();
}

~SharedMemory() {
unmapFd();
close(fd);
}

SharedMemory& operator=(long argFd) {
unmapFd();
fd = argFd;
mapFd();

return *this;
}

span<std::uint8_t> getData() override {
if(mapping == NULL) {
mapFd();
}

return {(uint8_t*)mapping, getSize()};
}
span<const std::uint8_t> getData() const override {
return {(const uint8_t*)mapping, getSize()};
}
std::size_t getMaxSize() const override {
struct stat fileStats;
fstat(fd, &fileStats);

return fileStats.st_size;
}
std::size_t getOffset() const override {
return ftell(fdopen(fd, "r"));
}
void setSize(std::size_t size) override {
if(mapping != NULL) {
unmapFd();
}

ftruncate(fd, size);
mapFd();
}

std::size_t getSize() const {
return getMaxSize();
}

long getFd() const {
return fd;
}
};

} // namespace dai
12 changes: 9 additions & 3 deletions include/depthai/utility/VectorMemory.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,15 @@ namespace dai {
class VectorMemory : public std::vector<std::uint8_t>, public Memory {
public:
// using std::vector<std::uint8_t>::vector;
VectorMemory() = default;
VectorMemory(const std::vector<std::uint8_t>& d) : vector(std::move(d)) {}
VectorMemory(std::vector<std::uint8_t>&& d) : vector(std::move(d)) {}
VectorMemory() {
kind = MemoryKinds::MEMORY_KIND_VECTOR;
}
VectorMemory(const std::vector<std::uint8_t>& d) : vector(std::move(d)) {
kind = MemoryKinds::MEMORY_KIND_VECTOR;
}
VectorMemory(std::vector<std::uint8_t>&& d) : vector(std::move(d)) {
kind = MemoryKinds::MEMORY_KIND_VECTOR;
}
VectorMemory& operator=(std::vector<std::uint8_t>&& d) {
std::vector<std::uint8_t>::operator=(std::move(d));
return *this;
Expand Down
6 changes: 5 additions & 1 deletion include/depthai/xlink/XLinkStream.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ namespace dai {

class StreamPacketDesc : public streamPacketDesc_t {
public:
StreamPacketDesc() noexcept : streamPacketDesc_t{nullptr, 0, {}, {}} {};
StreamPacketDesc() noexcept : streamPacketDesc_t{nullptr, 0, -1, {}, {}} {};
StreamPacketDesc(const StreamPacketDesc&) = delete;
StreamPacketDesc(StreamPacketDesc&& other) noexcept;
StreamPacketDesc& operator=(const StreamPacketDesc&) = delete;
Expand Down Expand Up @@ -89,11 +89,15 @@ class XLinkStream {
// Blocking
void write(span<const uint8_t> data, span<const uint8_t> data2);
void write(span<const uint8_t> data);
void write(long fd);
void write(long fd, span<const uint8_t> data);
void write(const void* data, std::size_t size);
std::vector<std::uint8_t> read();
std::vector<std::uint8_t> read(XLinkTimespec& timestampReceived);
void read(std::vector<std::uint8_t>& data);
void read(std::vector<std::uint8_t>& data, long& fd);
void read(std::vector<std::uint8_t>& data, XLinkTimespec& timestampReceived);
void read(std::vector<std::uint8_t>& data, long& fd, XLinkTimespec& timestampReceived);
// split write helper
void writeSplit(const void* data, std::size_t size, std::size_t split);
void writeSplit(const std::vector<uint8_t>& data, std::size_t split);
Expand Down
16 changes: 14 additions & 2 deletions src/device/DataQueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include "depthai/pipeline/datatype/ADatatype.hpp"
#include "depthai/pipeline/datatype/DatatypeEnum.hpp"
#include "depthai/pipeline/datatype/MessageGroup.hpp"
#include "depthai/utility/SharedMemory.hpp"
#include "depthai/xlink/XLinkStream.hpp"
#include "pipeline/datatype/MessageGroup.hpp"
#include "pipeline/datatype/StreamMessageParser.hpp"
Expand Down Expand Up @@ -205,8 +206,14 @@ DataInputQueue::DataInputQueue(

// Blocking
auto t1 = steady_clock::now();

if(outgoing->data->getSize() > 0) {
stream.write(outgoing->data->getData(), metadata);
if(outgoing->data->getKind() == dai::MemoryKinds::MEMORY_KIND_VECTOR) {
stream.write(outgoing->data->getData(), metadata);
} else if(outgoing->data->getKind() == dai::MemoryKinds::MEMORY_KIND_VECTOR) {
std::shared_ptr<SharedMemory> memory = std::dynamic_pointer_cast<SharedMemory>(outgoing->data);
stream.write(memory->getFd(), metadata);
}
} else {
stream.write(metadata);
}
Expand All @@ -227,7 +234,12 @@ DataInputQueue::DataInputQueue(
logger::trace("Sending part of a group message: {}", msg.first);
auto metadata = StreamMessageParser::serializeMetadata(msg.second);
if(msg.second->data->getSize() > 0) {
stream.write(msg.second->data->getData(), metadata);
if(msg.second->data->getKind() == dai::MemoryKinds::MEMORY_KIND_VECTOR) {
stream.write(msg.second->data->getData(), metadata);
} else if(msg.second->data->getKind() == dai::MemoryKinds::MEMORY_KIND_VECTOR) {
std::shared_ptr<SharedMemory> memory = std::dynamic_pointer_cast<SharedMemory>(msg.second->data);
stream.write(memory->getFd(), metadata);
}
} else {
stream.write(metadata);
}
Expand Down
15 changes: 15 additions & 0 deletions src/pipeline/datatype/Buffer.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include "depthai/pipeline/datatype/Buffer.hpp"

#include "depthai/utility/SharedMemory.hpp"
#include "depthai/utility/VectorMemory.hpp"

namespace dai {
Expand All @@ -9,6 +10,16 @@ Buffer::Buffer(size_t size) : Buffer() {
data = mem;
}

Buffer::Buffer(long fd) : Buffer() {
auto mem = std::make_shared<SharedMemory>(fd);
data = mem;
}

Buffer::Buffer(long fd, size_t size) : Buffer() {
auto mem = std::make_shared<SharedMemory>(fd, size);
data = mem;
}

span<uint8_t> Buffer::getData() {
return data->getData();
}
Expand All @@ -27,6 +38,10 @@ void Buffer::setData(const std::vector<std::uint8_t>& d) {
}
}

void Buffer::setData(const long fd) {
data = std::make_shared<SharedMemory>(fd);
}

void Buffer::setData(std::vector<std::uint8_t>&& d) {
// allocate new holder
data = std::make_shared<VectorMemory>(std::move(d));
Expand Down
11 changes: 11 additions & 0 deletions src/pipeline/datatype/ImgFrame.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#define _USE_MATH_DEFINES
#include "depthai/pipeline/datatype/ImgFrame.hpp"

#include "depthai/utility/SharedMemory.hpp"
#include "spdlog/fmt/fmt.h"
#include "spdlog/spdlog.h"
namespace dai {
Expand All @@ -16,6 +17,16 @@ ImgFrame::ImgFrame(size_t size) : ImgFrame() {
data = mem;
}

ImgFrame::ImgFrame(long fd) : ImgFrame() {
auto mem = std::make_shared<SharedMemory>(fd);
data = mem;
}

ImgFrame::ImgFrame(long fd, size_t size) : ImgFrame() {
auto mem = std::make_shared<SharedMemory>(fd, size);
data = mem;
}

std::chrono::time_point<std::chrono::steady_clock, std::chrono::steady_clock::duration> ImgFrame::getTimestamp(CameraExposureOffset offset) const {
auto ts = getTimestamp();
auto expTime = getExposureTime();
Expand Down
Loading
Loading