Skip to content

Commit

Permalink
Merge pull request OpenDDS#4501 from jrw972/internal-content-filter
Browse files Browse the repository at this point in the history
Internal DDS cannot efficiently filter on instances
  • Loading branch information
jrw972 authored Mar 11, 2024
2 parents 052c08f + eb04de3 commit 415170b
Show file tree
Hide file tree
Showing 3 changed files with 162 additions and 25 deletions.
16 changes: 15 additions & 1 deletion dds/DCPS/InternalDataReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,18 @@ class InternalDataReader : public InternalEntity {
return listener_.lock();
}

void set_interesting_instances(const SampleSequence& instances)
{
ACE_GUARD(ACE_Thread_Mutex, g, mutex_);
interesting_instances_ = instances;
}

const SampleSequence& get_interesting_instances() const
{
ACE_GUARD_RETURN(ACE_Thread_Mutex, g, mutex_, interesting_instances_);
return interesting_instances_;
}

void read(SampleSequence& samples,
InternalSampleInfoSequence& infos,
CORBA::Long max_samples,
Expand Down Expand Up @@ -295,6 +307,8 @@ class InternalDataReader : public InternalEntity {
// destroyed.
Listener_wrch listener_;

SampleSequence interesting_instances_;

typedef OPENDDS_SET(InternalEntity_wrch) PublicationSet;

class Instance {
Expand Down Expand Up @@ -495,7 +509,7 @@ class InternalDataReader : public InternalEntity {
{
publication_set_.insert(publication_handle);

if (instance_state_ != DDS::NOT_ALIVE_DISPOSED_INSTANCE_STATE) {
if (instance_state_ == DDS::ALIVE_INSTANCE_STATE) {
instance_state_ = DDS::NOT_ALIVE_DISPOSED_INSTANCE_STATE;
disposed_expiration_date_ = SystemTimePoint::now().to_dds_time() + qos.reader_data_lifecycle.autopurge_disposed_samples_delay;
informed_of_not_alive_ = false;
Expand Down
119 changes: 95 additions & 24 deletions dds/DCPS/InternalDataWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ class InternalDataWriter : public InternalEntity {
public:
typedef RcHandle<InternalDataReader<T> > InternalDataReader_rch;
typedef WeakRcHandle<InternalDataReader<T> > InternalDataReader_wrch;
typedef OPENDDS_VECTOR(T) SampleSequence;

explicit InternalDataWriter(const DDS::DataWriterQos& qos)
: qos_(qos)
Expand All @@ -57,28 +58,62 @@ class InternalDataWriter : public InternalEntity {
void add_reader(InternalDataReader_rch reader)
{
ACE_GUARD(ACE_Thread_Mutex, g, mutex_);
readers_.insert(reader);

if (qos_.durability.kind == DDS::TRANSIENT_LOCAL_DURABILITY_QOS && reader->durable()) {
for (typename InstanceMap::iterator pos = instance_map_.begin(), limit = instance_map_.end();
pos != limit; ++pos) {
pos->second.add_reader(reader, static_rchandle_cast<InternalEntity>(rchandle_from(this)));
const SampleSequence& instances = reader->get_interesting_instances();

if (instances.empty()) {
readers_.insert(reader);

if (qos_.durability.kind == DDS::TRANSIENT_LOCAL_DURABILITY_QOS && reader->durable()) {
for (typename InstanceMap::iterator pos = instance_map_.begin(), limit = instance_map_.end();
pos != limit; ++pos) {
pos->second.add_reader(reader, static_rchandle_cast<InternalEntity>(rchandle_from(this)));
}
}
} else {
all_instance_readers_.insert(reader);
for (typename SampleSequence::const_iterator pos = instances.begin(), limit = instances.end();
pos != limit; ++ pos) {
instance_readers_[*pos].insert(reader);
if (qos_.durability.kind == DDS::TRANSIENT_LOCAL_DURABILITY_QOS && reader->durable()) {
typename InstanceMap::iterator pos2 = instance_map_.find(*pos);
if (pos2 != instance_map_.end()) {
pos2->second.add_reader(reader, static_rchandle_cast<InternalEntity>(rchandle_from(this)));
}
}
}
}
}

void remove_reader(InternalDataReader_rch reader)
{
ACE_GUARD(ACE_Thread_Mutex, g, mutex_);

if (readers_.erase(reader)) {
reader->remove_publication(static_rchandle_cast<InternalEntity>(rchandle_from(this)), qos_.writer_data_lifecycle.autodispose_unregistered_instances);
}

if (all_instance_readers_.erase(reader)) {
const SampleSequence& instances = reader->get_interesting_instances();
for (typename SampleSequence::const_iterator pos = instances.begin(), limit = instances.end();
pos != limit; ++pos) {
typename InstanceReaderSet::iterator pos2 = instance_readers_.find(*pos);
if (pos2 != instance_readers_.end()) {
if (pos2->second.erase(reader)) {
reader->remove_publication(static_rchandle_cast<InternalEntity>(rchandle_from(this)), qos_.writer_data_lifecycle.autodispose_unregistered_instances);
}
if (pos2->second.empty()) {
instance_readers_.erase(pos2);
}
}
}
}
}

bool has_reader(InternalDataReader_rch reader)
{
ACE_GUARD_RETURN(ACE_Thread_Mutex, g, mutex_, false);
return readers_.count(reader);
return readers_.count(reader) + all_instance_readers_.count(reader);
}

InternalEntity_wrch publication_handle()
Expand All @@ -98,12 +133,12 @@ class InternalDataWriter : public InternalEntity {
p.first->second.write(sample, qos_);
}

for (typename ReaderSet::const_iterator pos = readers_.begin(), limit = readers_.end(); pos != limit; ++pos) {
InternalDataReader_rch reader = pos->lock();
if (reader) {
reader->write(static_rchandle_cast<InternalEntity>(rchandle_from(this)), sample);
}
typename InstanceReaderSet::const_iterator pos = instance_readers_.find(sample);
if (pos != instance_readers_.end()) {
write_set(sample, pos->second);
}

write_set(sample, readers_);
}

void dispose(const T& sample)
Expand All @@ -117,12 +152,12 @@ class InternalDataWriter : public InternalEntity {
}
}

for (typename ReaderSet::const_iterator pos = readers_.begin(), limit = readers_.end(); pos != limit; ++pos) {
InternalDataReader_rch reader = pos->lock();
if (reader) {
reader->dispose(static_rchandle_cast<InternalEntity>(rchandle_from(this)), sample);
}
typename InstanceReaderSet::const_iterator pos = instance_readers_.find(sample);
if (pos != instance_readers_.end()) {
dispose_set(sample, pos->second);
}

dispose_set(sample, readers_);
}

void unregister_instance(const T& sample)
Expand All @@ -133,15 +168,12 @@ class InternalDataWriter : public InternalEntity {
instance_map_.erase(sample);
}

for (typename ReaderSet::const_iterator pos = readers_.begin(), limit = readers_.end(); pos != limit; ++pos) {
InternalDataReader_rch reader = pos->lock();
if (reader) {
if (qos_.writer_data_lifecycle.autodispose_unregistered_instances) {
reader->dispose(static_rchandle_cast<InternalEntity>(rchandle_from(this)), sample);
}
reader->unregister_instance(static_rchandle_cast<InternalEntity>(rchandle_from(this)), sample);
}
typename InstanceReaderSet::const_iterator pos = instance_readers_.find(sample);
if (pos != instance_readers_.end()) {
unregister_instance_set(sample, pos->second);
}

unregister_instance_set(sample, readers_);
}
/// @}

Expand All @@ -150,6 +182,45 @@ class InternalDataWriter : public InternalEntity {

typedef OPENDDS_SET(InternalDataReader_wrch) ReaderSet;
ReaderSet readers_;
ReaderSet all_instance_readers_;
typedef OPENDDS_MAP_T(T, ReaderSet) InstanceReaderSet;
InstanceReaderSet instance_readers_;

void write_set(const T& sample,
const ReaderSet& readers)
{
for (typename ReaderSet::const_iterator pos = readers.begin(), limit = readers.end(); pos != limit; ++pos) {
InternalDataReader_rch reader = pos->lock();
if (reader) {
reader->write(static_rchandle_cast<InternalEntity>(rchandle_from(this)), sample);
}
}
}

void dispose_set(const T& sample,
const ReaderSet& readers)
{
for (typename ReaderSet::const_iterator pos = readers.begin(), limit = readers.end(); pos != limit; ++pos) {
InternalDataReader_rch reader = pos->lock();
if (reader) {
reader->dispose(static_rchandle_cast<InternalEntity>(rchandle_from(this)), sample);
}
}
}

void unregister_instance_set(const T& sample,
const ReaderSet& readers)
{
for (typename ReaderSet::const_iterator pos = readers.begin(), limit = readers.end(); pos != limit; ++pos) {
InternalDataReader_rch reader = pos->lock();
if (reader) {
if (qos_.writer_data_lifecycle.autodispose_unregistered_instances) {
reader->dispose(static_rchandle_cast<InternalEntity>(rchandle_from(this)), sample);
}
reader->unregister_instance(static_rchandle_cast<InternalEntity>(rchandle_from(this)), sample);
}
}
}

class SampleHolder {
public:
Expand Down
52 changes: 52 additions & 0 deletions tests/unit-tests/dds/DCPS/InternalDataWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -207,3 +207,55 @@ TEST(dds_DCPS_InternalDataWriter, dispose)
EXPECT_EQ(samples[0], sample);
EXPECT_EQ(SIW(infos[0]), SIW(make_sample_info(DDS::NOT_READ_SAMPLE_STATE, DDS::NEW_VIEW_STATE, DDS::NOT_ALIVE_DISPOSED_INSTANCE_STATE, 0, 0, 0, 0, 0, true)));
}

TEST(dds_DCPS_InternalDataWriter, add_instance_reader)
{
ReaderType::SampleSequence interesting_samples;
interesting_samples.push_back(Sample("a"));
interesting_samples.push_back(Sample("c"));

RcHandle<WriterType> writer = make_rch<WriterType>(DataWriterQosBuilder());
RcHandle<ReaderType> reader = make_rch<ReaderType>(DataReaderQosBuilder().reliability_reliable());
reader->set_interesting_instances(interesting_samples);
writer->add_reader(reader);

EXPECT_TRUE(writer->has_reader(reader));
writer->write(Sample("a"));
writer->write(Sample("b"));
writer->write(Sample("c"));

ReaderType::SampleSequence samples;
InternalSampleInfoSequence infos;
reader->take(samples, infos, DDS::LENGTH_UNLIMITED, DDS::ANY_SAMPLE_STATE, DDS::ANY_VIEW_STATE, DDS::ANY_INSTANCE_STATE);

ASSERT_EQ(samples.size(), 2U);
ASSERT_EQ(infos.size(), 2U);

EXPECT_EQ(samples[0], Sample("a"));
EXPECT_EQ(SIW(infos[0]), SIW(make_sample_info(DDS::NOT_READ_SAMPLE_STATE, DDS::NEW_VIEW_STATE, DDS::ALIVE_INSTANCE_STATE, 0, 0, 0, 0, 0, true)));
EXPECT_EQ(samples[1], Sample("c"));
EXPECT_EQ(SIW(infos[1]), SIW(make_sample_info(DDS::NOT_READ_SAMPLE_STATE, DDS::NEW_VIEW_STATE, DDS::ALIVE_INSTANCE_STATE, 0, 0, 0, 0, 0, true)));

writer->dispose(Sample("a"));
writer->dispose(Sample("b"));

reader->take(samples, infos, DDS::LENGTH_UNLIMITED, DDS::ANY_SAMPLE_STATE, DDS::ANY_VIEW_STATE, DDS::ANY_INSTANCE_STATE);

ASSERT_EQ(samples.size(), 1U);
ASSERT_EQ(infos.size(), 1U);

EXPECT_EQ(samples[0], Sample("a"));
EXPECT_EQ(SIW(infos[0]), SIW(make_sample_info(DDS::NOT_READ_SAMPLE_STATE, DDS::NOT_NEW_VIEW_STATE, DDS::NOT_ALIVE_DISPOSED_INSTANCE_STATE, 0, 0, 0, 0, 0, false)));

writer->unregister_instance(Sample("a"));
writer->unregister_instance(Sample("b"));
writer->unregister_instance(Sample("c"));

reader->take(samples, infos, DDS::LENGTH_UNLIMITED, DDS::ANY_SAMPLE_STATE, DDS::ANY_VIEW_STATE, DDS::ANY_INSTANCE_STATE);

ASSERT_EQ(samples.size(), 1U);
ASSERT_EQ(infos.size(), 1U);

EXPECT_EQ(samples[0], Sample("c"));
EXPECT_EQ(SIW(infos[0]), SIW(make_sample_info(DDS::NOT_READ_SAMPLE_STATE, DDS::NOT_NEW_VIEW_STATE, DDS::NOT_ALIVE_DISPOSED_INSTANCE_STATE, 0, 0, 0, 0, 0, false)));
}

0 comments on commit 415170b

Please sign in to comment.