Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Develop zerocopy #1053

Merged
merged 32 commits into from
Jul 12, 2024
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
4a1f818
Added SharedMemory header
TheMutta Jun 17, 2024
c36ab3b
Finalized SharedMemory class
TheMutta Jun 18, 2024
cc44859
Added differentiation between SharedMemory and VectorMemory, added he…
TheMutta Jun 19, 2024
557abfd
Replaced -1 fd with 0 fd
TheMutta Jun 19, 2024
185186d
This reverts commit 557abfd0477a5217cec6cd095477e91d526dd0d4.
TheMutta Jun 19, 2024
c4ff206
Made fd and mapping in SharedMemory protected instead of private
TheMutta Jun 24, 2024
ad66010
XLinkStream read helper functions for handling FDs
TheMutta Jun 24, 2024
3c1d976
Added SharedMemory support with FD in Buffer data structure
TheMutta Jun 25, 2024
cf35d8c
Added FD support in ImgFrame data structure
TheMutta Jun 25, 2024
a5016b6
Parsing fd in StreamMessageParser
TheMutta Jun 25, 2024
5019cca
Updated XLink version.
TheMutta Jun 26, 2024
fd710a6
Closing FDs when destructing SharedMemory
TheMutta Jun 27, 2024
1effc78
Added size parameters to stream write
TheMutta Jun 27, 2024
88e86f8
Passing FDs by value
TheMutta Jun 28, 2024
0d4b57d
Resolved conflict by reverting size changes.
TheMutta Jun 28, 2024
979573a
Added new max throughput example to test the efficency of the transfe…
TheMutta Jul 1, 2024
242d6ec
Added new example in CMakeLists.txt
TheMutta Jul 1, 2024
316c073
Disabled openssh that caused issues when cross compiling to RVC4
TheMutta Jul 1, 2024
74441bd
Added X_LINK_TCP_IP_OR_LOCAL_SHDMEM
TheMutta Jul 4, 2024
587f9a2
Changes to removed unused constructor to SharedMemory and changes to …
TheMutta Jul 5, 2024
21e4649
Removed MemoryKinds
TheMutta Jul 5, 2024
bc17b50
Re-added empty constructor.
TheMutta Jul 5, 2024
d8b81e8
Added tcpshd enviroment variable
TheMutta Jul 8, 2024
2efe453
Setting back connection in deviceInfo from connectionHandler
TheMutta Jul 10, 2024
cb9ac60
Added memfd_create constructors.
TheMutta Jul 10, 2024
9c4f05c
Readded kinds.
TheMutta Jul 11, 2024
950dbd4
Added SharedMemory Fd writing in data queue
TheMutta Jul 11, 2024
48f664d
Updated XLink
TheMutta Jul 12, 2024
26fb39c
Clangformat
TheMutta Jul 12, 2024
f99c0d8
Removed example file
TheMutta Jul 12, 2024
4560bb3
reset OpenSSL compile flag
TheMutta Jul 12, 2024
1a249f3
Merge branch 'v3_develop' into v3_develop_zerocopy
TheMutta Jul 12, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ if(WIN32)
set(DEPTHAI_CURL_USE_OPENSSL OFF)
else()
set(DEPTHAI_CURL_USE_SCHANNEL OFF)
set(DEPTHAI_CURL_USE_OPENSSL ON)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this can be removed.

set(DEPTHAI_CURL_USE_OPENSSL OFF)
endif()

# Set type to canonicalize relative paths for user-provided toolchain
Expand Down
6 changes: 3 additions & 3 deletions cmake/Hunter/config.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ hunter_config(

hunter_config(
XLink
VERSION "luxonis-2021.4.3-develop"
URL "https://github.com/luxonis/XLink/archive/061709357b06a74510b5b9e3e2832d9d80efd3ff.tar.gz"
SHA1 "affa1bdd7487cf70c9caf40365844967ae761ea3"
VERSION "luxonis-local-shdmem-develop"
URL "https://github.com/luxonis/XLink/archive/7fb600ab881e6863e856c69c076ea4b4c32e8787.tar.gz"
SHA1 "44b07ff4b7d624a7435b7c25d246336f8b4b480a"
CMAKE_ARGS
XLINK_ENABLE_LIBUSB=${DEPTHAI_ENABLE_LIBUSB}
)
Expand Down
2 changes: 2 additions & 0 deletions examples/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,8 @@ dai_add_example(host_pipeline host_side/host_pipeline.cpp ON OFF)
dai_add_example(host_pipeline_opencv host_side/host_pipeline_opencv.cpp ON OFF)
dai_add_example(host_pipeline_synced_node host_side/host_pipeline_synced_node.cpp ON OFF)
dai_add_example(host_only_camera host_side/host_only_camera.cpp OFF OFF)
dai_add_example(host_only_image host_side/host_only_image.cpp OFF OFF)

#
# # TMP
# dai_add_example(tmp_preview tmp/tmp_preview.cpp OFF OFF)
Expand Down
71 changes: 71 additions & 0 deletions examples/host_side/host_only_image.cpp
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can remove this one before the merge.

Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
#include <depthai/depthai.hpp>
#include <opencv2/highgui.hpp>
#include "depthai/pipeline/ThreadedHostNode.hpp"
#include "depthai/pipeline/node/ImageManip.hpp"

class TestSource : public dai::NodeCRTP<dai::node::ThreadedHostNode, TestSource> {
public:
Output output = dai::Node::Output{*this, {}};

void run() override {
int64_t seqNum = 0;

// Generate a cv::Mat frame with a gradient
cv::Mat frame(768, 1920, CV_8UC3);
for(int i = 0; i < frame.rows; i++) {
for(int j = 0; j < frame.cols; j++) {
frame.at<cv::Vec3b>(i, j) = cv::Vec3b(i % 256, j % 256, (i + j) % 256);
}
}
if(frame.empty()) {
throw std::runtime_error("Couldn't capture frame");
}

cv::Mat frameResized;
cv::resize(frame, frameResized, cv::Size(1920, 768));

while(isRunning()) {
auto imgFrame = std::make_shared<dai::ImgFrame>();
imgFrame->setFrame(frameResized);
imgFrame->setSequenceNum(seqNum++);
imgFrame->setType(dai::ImgFrame::Type::NV12);
imgFrame->setWidth(frameResized.cols);
imgFrame->setHeight(frameResized.rows);
imgFrame->setTimestamp(std::chrono::steady_clock::now());
output.send(imgFrame);
}
}
};

int main() {
// Create pipeline
dai::Pipeline pipeline(true);
auto camRgb = pipeline.create<TestSource>();
auto manip = pipeline.create<dai::node::ImageManip>();
camRgb->output.link(manip->inputImage);
manip->initialConfig.setResize(1920, 768);
manip->setMaxOutputFrameSize(1920 * 768 * 3);
auto queue = manip->out.createOutputQueue();
pipeline.start();

std::cout << "Pipeline running" << std::endl;
std::size_t framesRecieved = 0;
std::size_t totalTime = 0;
while(pipeline.isRunning()) {
std::chrono::steady_clock::time_point begin = std::chrono::steady_clock::now();

auto img = queue->get<dai::ImgFrame>();
auto cvFrame = img->getCvFrame();

std::chrono::steady_clock::time_point end = std::chrono::steady_clock::now();
++framesRecieved;
totalTime += std::chrono::duration_cast<std::chrono::microseconds>(end - begin).count();

if (framesRecieved % 1024 == 0) {
std::cout << "Mean time: " << totalTime / framesRecieved << "µs" << std::endl;
}
}

pipeline.wait();
return 0;
}
2 changes: 2 additions & 0 deletions include/depthai/pipeline/datatype/ADatatype.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
#include "depthai/utility/Memory.hpp"
#include "depthai/utility/Serialization.hpp"
#include "depthai/utility/VectorMemory.hpp"
#include "depthai/utility/SharedMemory.hpp"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not needed?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, removed


namespace dai {

/// Abstract message
Expand Down
3 changes: 3 additions & 0 deletions include/depthai/pipeline/datatype/Buffer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ class Buffer : public ADatatype {
public:
Buffer() = default;
Buffer(size_t size);
Buffer(long fd);
virtual ~Buffer() = default;

virtual void serialize(std::vector<std::uint8_t>& metadata, DatatypeEnum& datatype) const {
Expand All @@ -35,11 +36,13 @@ class Buffer : public ADatatype {
* @param data Copies data to internal buffer
*/
void setData(const std::vector<std::uint8_t>& data);
void setData(const long fd);

/**
* @param data Moves data to internal buffer
*/
void setData(std::vector<std::uint8_t>&& data);

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Run clangformat target


/**
* Retrieves timestamp related to dai::Clock::now()
Expand Down
1 change: 1 addition & 0 deletions include/depthai/pipeline/datatype/ImgFrame.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ class ImgFrame : public Buffer {
* Timestamp is set to now
*/
ImgFrame();
ImgFrame(long fd);
ImgFrame(size_t size);
virtual ~ImgFrame() = default;

Expand Down
13 changes: 13 additions & 0 deletions include/depthai/utility/Memory.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,12 @@ holder(std::move(holder)) {}
};
*/

enum MemoryKinds {
MEMORY_KIND_VECTOR_MEMORY,
MEMORY_KIND_SHARED_MEMORY,
MEMORY_KIND_COUNT,
};

// memory as interface
class Memory {
public:
Expand All @@ -120,6 +126,13 @@ class Memory {
std::size_t getSize() const {
return getData().size();
};

MemoryKinds getKind() const {
return kind;
};

protected:
MemoryKinds kind;
};

} // namespace dai
93 changes: 93 additions & 0 deletions include/depthai/utility/SharedMemory.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
#pragma once

// std
#include <cstdint>
#include <cstring>
#include <functional>
#include <iostream>
#ifdef __unix__
#include <fcntl.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <sys/un.h>
#include <unistd.h>
#endif

// project
#include "depthai/utility/Memory.hpp"

namespace dai {

// memory as interface
class SharedMemory : public Memory {
protected:
long fd = -1;
void* mapping;
void mapFd() {
mapping = mmap(NULL, getSize(), PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
if (mapping == NULL) {
/* Error handling here */
}
}
void unmapFd() {
if (mapping == NULL) {
return;
}

munmap(mapping, getSize());
}
public:
SharedMemory() {
kind = MemoryKinds::MEMORY_KIND_SHARED_MEMORY;
fd = -1;
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it make sense to allow this?

I think it would make more sense to only allow to pass in a size and allocate the shared memory.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah probably useless, i removed it.

I think it would make more sense to only allow to pass in a size and allocate the shared memory.

Well, yes, that makes sense. So both passing the FD externally or automatically allocating with a shared memory buffer.

Also, i think it would be worth it looking into memfd_create(), since it is automatically closed when all references are released, which would be better than shm_open, which must be unlinked.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please look into memfd_create() in that case so we can send data zero copy in the other direction as well then yeah.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is pretty nice. It's like shm_open(), but better. You can do sealing and it has no need for unlinking.
The only issue would be name generation, which is the same for shm_open().
I'd wager we could use a static class variable to create a string like "shared_memory_" and then add the number of instances created. I think that should last us up until 2^64 instances :)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh btw, that constructor is actually needed because of the GstBufferMemory constructor in DepthAI device, as we set the FD in the constructor itself.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we set it in the constructor, can we just initialize shared memory in the initializer list?

GstBufferMemory(int fd): SharedMemory(fd) {}...

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I see what you mean (checked the source).

Let's leave it then.

SharedMemory(long argFd) : fd(argFd) {
kind = MemoryKinds::MEMORY_KIND_SHARED_MEMORY;
mapFd();
}

~SharedMemory() {
unmapFd();
close(fd);
}

SharedMemory& operator=(long argFd) {
unmapFd();
fd = argFd;
mapFd();

return *this;
}

span<std::uint8_t> getData() override {
if (mapping == NULL) {
mapFd();
}
return {(uint8_t*)mapping, getSize()};
}
span<const std::uint8_t> getData() const override {
return {(const uint8_t*)mapping, getSize()};
}
std::size_t getMaxSize() const override {
struct stat fileStats;
fstat(fd, &fileStats);

return fileStats.st_size;
}
std::size_t getOffset() const override {
return ftell(fdopen(fd, "r"));
}
void setSize(std::size_t size) override {
ftruncate(fd, size);
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if size is bigger than current size?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, technically ftruncate extends it, but how that would interact with the various kinds of shared memory depends on the implementation

If it's for example memory created with shm_open, it will just expand it. If it's a DMA buffer, it depends on the implementation. For Gst, it seems to be the same as shm_open.
This behavior should be overridden by implementations if it's different.

However, we probably need to unmap and map the FD so that we can actually have a mapping of the correct size.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, I though truncate only downsizes.


std::size_t getSize() const {
return getMaxSize();
}

long getFd() const {
return fd;
}
};

} // namespace dai
6 changes: 3 additions & 3 deletions include/depthai/utility/VectorMemory.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ namespace dai {
class VectorMemory : public std::vector<std::uint8_t>, public Memory {
public:
// using std::vector<std::uint8_t>::vector;
VectorMemory() = default;
VectorMemory(const std::vector<std::uint8_t>& d) : vector(std::move(d)) {}
VectorMemory(std::vector<std::uint8_t>&& d) : vector(std::move(d)) {}
VectorMemory() { kind = MemoryKinds::MEMORY_KIND_VECTOR_MEMORY; }
VectorMemory(const std::vector<std::uint8_t>& d) : vector(std::move(d)) { kind = MemoryKinds::MEMORY_KIND_VECTOR_MEMORY; }
VectorMemory(std::vector<std::uint8_t>&& d) : vector(std::move(d)) { kind = MemoryKinds::MEMORY_KIND_VECTOR_MEMORY; }
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can remove the kind I think and use casts instead.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the kind is important, as for example in XLinkOut in DepthAI device, we need it to discern how to output. Unless we want to understand which it is by using sizeof(), which is imperfect, this system works better.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use if(std::dynamic_ptr_cast<SharedMemory>(data))?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't think about that...
That should work yes. I've tested it on a separate program and it works. I'm not sure about overhead, but it should be comparable anyway.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like it doesn't work. I've reinstated the kind variables and they work now.

VectorMemory& operator=(std::vector<std::uint8_t>&& d) {
std::vector<std::uint8_t>::operator=(std::move(d));
return *this;
Expand Down
5 changes: 4 additions & 1 deletion include/depthai/xlink/XLinkStream.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ namespace dai {

class StreamPacketDesc : public streamPacketDesc_t {
public:
StreamPacketDesc() noexcept : streamPacketDesc_t{nullptr, 0, {}, {}} {};
StreamPacketDesc() noexcept : streamPacketDesc_t{nullptr, 0, -1, {}, {}} {};
StreamPacketDesc(const StreamPacketDesc&) = delete;
StreamPacketDesc(StreamPacketDesc&& other) noexcept;
StreamPacketDesc& operator=(const StreamPacketDesc&) = delete;
Expand Down Expand Up @@ -89,11 +89,14 @@ class XLinkStream {
// Blocking
void write(span<const uint8_t> data, span<const uint8_t> data2);
void write(span<const uint8_t> data);
void write(long fd);
void write(const void* data, std::size_t size);
std::vector<std::uint8_t> read();
std::vector<std::uint8_t> read(XLinkTimespec& timestampReceived);
void read(std::vector<std::uint8_t>& data);
void read(std::vector<std::uint8_t>& data, long& fd);
void read(std::vector<std::uint8_t>& data, XLinkTimespec& timestampReceived);
void read(std::vector<std::uint8_t>& data, long& fd, XLinkTimespec& timestampReceived);
// split write helper
void writeSplit(const void* data, std::size_t size, std::size_t split);
void writeSplit(const std::vector<uint8_t>& data, std::size_t split);
Expand Down
10 changes: 10 additions & 0 deletions src/pipeline/datatype/Buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,12 @@ Buffer::Buffer(size_t size) : Buffer() {
data = mem;
}

Buffer::Buffer(long fd) : Buffer() {
auto mem = std::make_shared<SharedMemory>(fd);
data = mem;
}


span<uint8_t> Buffer::getData() {
return data->getData();
}
Expand All @@ -27,6 +33,10 @@ void Buffer::setData(const std::vector<std::uint8_t>& d) {
}
}

void Buffer::setData(const long fd) {
data = std::make_shared<SharedMemory>(fd);
}

void Buffer::setData(std::vector<std::uint8_t>&& d) {
// allocate new holder
data = std::make_shared<VectorMemory>(std::move(d));
Expand Down
6 changes: 6 additions & 0 deletions src/pipeline/datatype/ImgFrame.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@ ImgFrame::ImgFrame(size_t size) : ImgFrame() {
data = mem;
}

ImgFrame::ImgFrame(long fd) : ImgFrame() {
auto mem = std::make_shared<SharedMemory>(fd);
data = mem;

}

std::chrono::time_point<std::chrono::steady_clock, std::chrono::steady_clock::duration> ImgFrame::getTimestamp(CameraExposureOffset offset) const {
auto ts = getTimestamp();
auto expTime = getExposureTime();
Expand Down
Loading
Loading