Skip to content

Commit

Permalink
2
Browse files Browse the repository at this point in the history
  • Loading branch information
maloel committed Jun 20, 2023
1 parent c5bfad6 commit e7c55a4
Show file tree
Hide file tree
Showing 6 changed files with 1,880 additions and 6 deletions.
8 changes: 4 additions & 4 deletions examples/test_dds/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ include( ${CMAKE_SOURCE_DIR}/CMake/opengl_config.cmake )


add_executable( ${PROJECT_NAME}
Device.cxx
Host.cxx
Udds.cxx
UddsPubSubTypes.cxx
Device.cpp
Host.cpp
Udds.cpp
UddsPubSubTypes.cpp
HostEvent.cpp
test_dds.cpp
../../third-party/imgui/imgui.cpp
Expand Down
351 changes: 351 additions & 0 deletions examples/test_dds/Device.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,351 @@
// Copyright 2016 Proyectos y Sistemas de Mantenimiento SL (eProsima).
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

/*!
* @file UddsPublisher.cpp
* This file contains the implementation of the publisher functions.
*
* This file was generated by the tool fastcdrgen.
*/


#include "Device.h"
#include "UddsPubSubTypes.h"

#include <fastdds/dds/domain/DomainParticipantFactory.hpp>
#include <fastdds/dds/publisher/Publisher.hpp>
#include <fastdds/dds/publisher/qos/PublisherQos.hpp>
#include <fastdds/dds/publisher/DataWriter.hpp>
#include <fastdds/dds/publisher/qos/DataWriterQos.hpp>

#include <thread>
#include <chrono>

using namespace eprosima::fastdds::dds;

Device::Device()
: participant_(nullptr)
, publisher_(nullptr)
, rgb_topic_(nullptr)
, depth_topic_(nullptr)
, cmd_topic_(nullptr)
, devinfo_topic_(nullptr)
, rgb_writer_(nullptr)
, depth_writer_(nullptr)
, cmd_reader_(nullptr)
, devinfo_writer_(nullptr)
, blob_type_(new BlobPubSubType())
, headerblob1_type_(new HeaderBlob1PubSubType())
{
writer_listener_.owner = this;
reader_listener_.owner = this;
}

Device::~Device()
{
if (rgb_writer_ != nullptr)
{
publisher_->delete_datawriter(rgb_writer_);
}
if (depth_writer_ != nullptr)
{
publisher_->delete_datawriter(depth_writer_);
}
if (cmd_reader_ != nullptr)
{
subscriber_->delete_datareader(cmd_reader_);
}
if (rgb_topic_ != nullptr)
{
participant_->delete_topic(rgb_topic_);
}
if (depth_topic_ != nullptr)
{
participant_->delete_topic(depth_topic_);
}
if (cmd_topic_ != nullptr)
{
participant_->delete_topic(cmd_topic_);
}
if (subscriber_ != nullptr)
{
participant_->delete_subscriber(subscriber_);
}
if (publisher_ != nullptr)
{
participant_->delete_publisher(publisher_);
}
DomainParticipantFactory::get_instance()->delete_participant(participant_);
}

#define WIDTH 1280
#define HEIGHT 720

void rgb_thread(Device& device)
{
std::cout << "RGB thread started." << std::endl;
HeaderBlob1 st;
st.header().resize(sizeof(ImageHeader));
st.data().resize(WIDTH * HEIGHT * 2);
ImageHeader* pHeader = reinterpret_cast<ImageHeader*>(st.header().data());
unsigned char* pData = reinterpret_cast<unsigned char*>(st.data().data());

for (int i = 0; i < st.data().size(); i++)
pData[i] = i;
pHeader->index = 0;

while (1)
{
if (!device.rgb_stream_)
{
std::this_thread::sleep_for(std::chrono::milliseconds(100));
continue;
}

std::cout << "Send RGB frame." << std::endl;
pHeader->index++;
pHeader->timestamp = std::clock();
device.rgb_writer_->write(&st);
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
}

void depth_thread(Device& device)
{
std::cout << "Depth thread started." << std::endl;
HeaderBlob1 st;
st.header().resize(sizeof(ImageHeader));
st.data().resize(WIDTH * HEIGHT * 2);
ImageHeader* pHeader = reinterpret_cast<ImageHeader*>(st.header().data());
unsigned char* pData = reinterpret_cast<unsigned char*>(st.data().data());

for (int i = 0; i < st.data().size(); i++)
pData[i] = i;
pHeader->index = 0;

while (1)
{
if (!device.depth_stream_)
{
std::this_thread::sleep_for(std::chrono::milliseconds(100));
continue;
}

std::cout << "Send depth frame." << std::endl;
pHeader->index++;
pHeader->timestamp = std::clock();
device.depth_writer_->write(&st);
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
}

void devinfo_thread(Device& device)
{
std::cout << "Devinfo thread started." << std::endl;

Blob st;

auto& data = st.data();
char* info = "{\"deviceInfo\":{\"SN\":\"1234\"}}";

data.resize(strlen(info) + 1);
memcpy(data.data(), info, strlen(info) + 1);

while (1)
{
device.devinfo_writer_->write(&st);
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
}
}

bool Device::init()
{
/* Initialize data_ here */

//CREATE THE PARTICIPANT
DomainParticipantQos pqos;
pqos.name("Device");
participant_ = DomainParticipantFactory::get_instance()->create_participant(0, pqos);
if (participant_ == nullptr)
{
return false;
}

//REGISTER THE TYPE
blob_type_.register_type(participant_);
headerblob1_type_.register_type(participant_);

//CREATE THE PUBLISHER
publisher_ = participant_->create_publisher(PUBLISHER_QOS_DEFAULT, nullptr);
if (publisher_ == nullptr)
{
return false;
}

subscriber_ = participant_->create_subscriber(SUBSCRIBER_QOS_DEFAULT, nullptr);
if (subscriber_ == nullptr)
{
return false;
}

//CREATE THE TOPIC
rgb_topic_ = participant_->create_topic(
"rs/1234/rgb",
headerblob1_type_.get_type_name(),
TOPIC_QOS_DEFAULT);
if (rgb_topic_ == nullptr)
{
return false;
}

depth_topic_ = participant_->create_topic(
"rs/1234/depth",
headerblob1_type_.get_type_name(),
TOPIC_QOS_DEFAULT);
if (depth_topic_ == nullptr)
{
return false;
}

cmd_topic_ = participant_->create_topic(
"rs/1234/cmd",
blob_type_.get_type_name(),
TOPIC_QOS_DEFAULT);
if (cmd_topic_ == nullptr)
{
return false;
}

devinfo_topic_ = participant_->create_topic(
"rs/device_info",
blob_type_.get_type_name(),
TOPIC_QOS_DEFAULT);
if (devinfo_topic_ == nullptr)
{
return false;
}
// CREATE THE WRITER
DataWriterQos wqos = DATAWRITER_QOS_DEFAULT;
wqos.reliability().kind = RELIABLE_RELIABILITY_QOS;
rgb_writer_ = publisher_->create_datawriter(rgb_topic_, wqos, &writer_listener_);
if (rgb_writer_ == nullptr)
{
return false;
}

depth_writer_ = publisher_->create_datawriter(depth_topic_, wqos, &writer_listener_);
if (rgb_writer_ == nullptr)
{
return false;
}

devinfo_writer_ = publisher_->create_datawriter(devinfo_topic_, wqos, &writer_listener_);
if (devinfo_writer_ == nullptr)
{
return false;
}

DataReaderQos rqos = DATAREADER_QOS_DEFAULT;
rqos.reliability().kind = RELIABLE_RELIABILITY_QOS;
cmd_reader_ = subscriber_->create_datareader(cmd_topic_, rqos, &reader_listener_);
if (cmd_reader_ == nullptr)
{
return false;
}

std::thread trgb(rgb_thread, std::ref(*this));
std::this_thread::sleep_for(std::chrono::milliseconds(250));
std::thread tdepth(depth_thread, std::ref(*this));
std::this_thread::sleep_for(std::chrono::milliseconds(250));
std::thread tdevinfo(devinfo_thread, std::ref(*this));

while (1)
std::this_thread::sleep_for(std::chrono::seconds(1));

return true;
}

void Device::WriterListener::on_publication_matched(
eprosima::fastdds::dds::DataWriter* writer,
const eprosima::fastdds::dds::PublicationMatchedStatus& info)
{
if (info.current_count_change == 1)
{
matched = info.total_count;
std::cout << (writer == owner->rgb_writer_ ? "RGB" : "Depth") << " publisher matched." << std::endl;
}
else if (info.current_count_change == -1)
{
matched = info.total_count;
std::cout << (writer == owner->rgb_writer_ ? "RGB" : "Depth") << " publisher unmatched." << std::endl;
}
else
{
std::cout << info.current_count_change
<< " is not a valid value for PublicationMatchedStatus current count change" << std::endl;
}
}

void Device::ReaderListener::on_data_available(eprosima::fastdds::dds::DataReader* reader)
{
// Take data
Blob st;
SampleInfo info;

if (reader->take_next_sample(&st, &info) == ReturnCode_t::RETCODE_OK)
{
if (info.valid_data)
{
// Print your structure data here.
++samples;
char *cmd = reinterpret_cast<char*>(st.data().data());
std::cout << "Received cmd " << cmd << std::endl;
if (strcmp(cmd, "rgb_on") == 0)
{
owner->rgb_stream_ = true;
}
else if (strcmp(cmd, "rgb_off") == 0)
{
owner->rgb_stream_ = false;
}
else if (strcmp(cmd, "depth_on") == 0)
{
owner->depth_stream_ = true;
}
else if (strcmp(cmd, "depth_off") == 0)
{
owner->depth_stream_ = false;
}
}
}
}

void Device::ReaderListener::on_subscription_matched(eprosima::fastdds::dds::DataReader* reader, const eprosima::fastdds::dds::SubscriptionMatchedStatus& info)
{
if (info.current_count_change == 1)
{
matched = info.total_count;
std::cout << "Subscription matched." << std::endl;
}
else if (info.current_count_change == -1)
{
matched = info.total_count;
std::cout << "Subscription unmatched." << std::endl;
}
else
{
std::cout << info.current_count_change
<< " is not a valid value for SubscriptionMatchedStatus current count change" << std::endl;
}
}
Loading

0 comments on commit e7c55a4

Please sign in to comment.