diff --git a/dds/DCPS/InternalDataReader.h b/dds/DCPS/InternalDataReader.h index 3f499e642b0..db078975a32 100644 --- a/dds/DCPS/InternalDataReader.h +++ b/dds/DCPS/InternalDataReader.h @@ -197,6 +197,18 @@ class InternalDataReader : public InternalEntity { return listener_.lock(); } + void set_interesting_instances(const SampleSequence& instances) + { + ACE_GUARD(ACE_Thread_Mutex, g, mutex_); + interesting_instances_ = instances; + } + + const SampleSequence& get_interesting_instances() const + { + ACE_GUARD_RETURN(ACE_Thread_Mutex, g, mutex_, interesting_instances_); + return interesting_instances_; + } + void read(SampleSequence& samples, InternalSampleInfoSequence& infos, CORBA::Long max_samples, @@ -295,6 +307,8 @@ class InternalDataReader : public InternalEntity { // destroyed. Listener_wrch listener_; + SampleSequence interesting_instances_; + typedef OPENDDS_SET(InternalEntity_wrch) PublicationSet; class Instance { @@ -495,7 +509,7 @@ class InternalDataReader : public InternalEntity { { publication_set_.insert(publication_handle); - if (instance_state_ != DDS::NOT_ALIVE_DISPOSED_INSTANCE_STATE) { + if (instance_state_ == DDS::ALIVE_INSTANCE_STATE) { instance_state_ = DDS::NOT_ALIVE_DISPOSED_INSTANCE_STATE; disposed_expiration_date_ = SystemTimePoint::now().to_dds_time() + qos.reader_data_lifecycle.autopurge_disposed_samples_delay; informed_of_not_alive_ = false; diff --git a/dds/DCPS/InternalDataWriter.h b/dds/DCPS/InternalDataWriter.h index 129218f839f..9574fb1d065 100644 --- a/dds/DCPS/InternalDataWriter.h +++ b/dds/DCPS/InternalDataWriter.h @@ -47,6 +47,7 @@ class InternalDataWriter : public InternalEntity { public: typedef RcHandle > InternalDataReader_rch; typedef WeakRcHandle > InternalDataReader_wrch; + typedef OPENDDS_VECTOR(T) SampleSequence; explicit InternalDataWriter(const DDS::DataWriterQos& qos) : qos_(qos) @@ -57,12 +58,29 @@ class InternalDataWriter : public InternalEntity { void add_reader(InternalDataReader_rch reader) { ACE_GUARD(ACE_Thread_Mutex, g, mutex_); - readers_.insert(reader); - if (qos_.durability.kind == DDS::TRANSIENT_LOCAL_DURABILITY_QOS && reader->durable()) { - for (typename InstanceMap::iterator pos = instance_map_.begin(), limit = instance_map_.end(); - pos != limit; ++pos) { - pos->second.add_reader(reader, static_rchandle_cast(rchandle_from(this))); + const SampleSequence& instances = reader->get_interesting_instances(); + + if (instances.empty()) { + readers_.insert(reader); + + if (qos_.durability.kind == DDS::TRANSIENT_LOCAL_DURABILITY_QOS && reader->durable()) { + for (typename InstanceMap::iterator pos = instance_map_.begin(), limit = instance_map_.end(); + pos != limit; ++pos) { + pos->second.add_reader(reader, static_rchandle_cast(rchandle_from(this))); + } + } + } else { + all_instance_readers_.insert(reader); + for (typename SampleSequence::const_iterator pos = instances.begin(), limit = instances.end(); + pos != limit; ++ pos) { + instance_readers_[*pos].insert(reader); + if (qos_.durability.kind == DDS::TRANSIENT_LOCAL_DURABILITY_QOS && reader->durable()) { + typename InstanceMap::iterator pos2 = instance_map_.find(*pos); + if (pos2 != instance_map_.end()) { + pos2->second.add_reader(reader, static_rchandle_cast(rchandle_from(this))); + } + } } } } @@ -70,15 +88,32 @@ class InternalDataWriter : public InternalEntity { void remove_reader(InternalDataReader_rch reader) { ACE_GUARD(ACE_Thread_Mutex, g, mutex_); + if (readers_.erase(reader)) { reader->remove_publication(static_rchandle_cast(rchandle_from(this)), qos_.writer_data_lifecycle.autodispose_unregistered_instances); } + + if (all_instance_readers_.erase(reader)) { + const SampleSequence& instances = reader->get_interesting_instances(); + for (typename SampleSequence::const_iterator pos = instances.begin(), limit = instances.end(); + pos != limit; ++pos) { + typename InstanceReaderSet::iterator pos2 = instance_readers_.find(*pos); + if (pos2 != instance_readers_.end()) { + if (pos2->second.erase(reader)) { + reader->remove_publication(static_rchandle_cast(rchandle_from(this)), qos_.writer_data_lifecycle.autodispose_unregistered_instances); + } + if (pos2->second.empty()) { + instance_readers_.erase(pos2); + } + } + } + } } bool has_reader(InternalDataReader_rch reader) { ACE_GUARD_RETURN(ACE_Thread_Mutex, g, mutex_, false); - return readers_.count(reader); + return readers_.count(reader) + all_instance_readers_.count(reader); } InternalEntity_wrch publication_handle() @@ -98,12 +133,12 @@ class InternalDataWriter : public InternalEntity { p.first->second.write(sample, qos_); } - for (typename ReaderSet::const_iterator pos = readers_.begin(), limit = readers_.end(); pos != limit; ++pos) { - InternalDataReader_rch reader = pos->lock(); - if (reader) { - reader->write(static_rchandle_cast(rchandle_from(this)), sample); - } + typename InstanceReaderSet::const_iterator pos = instance_readers_.find(sample); + if (pos != instance_readers_.end()) { + write_set(sample, pos->second); } + + write_set(sample, readers_); } void dispose(const T& sample) @@ -117,12 +152,12 @@ class InternalDataWriter : public InternalEntity { } } - for (typename ReaderSet::const_iterator pos = readers_.begin(), limit = readers_.end(); pos != limit; ++pos) { - InternalDataReader_rch reader = pos->lock(); - if (reader) { - reader->dispose(static_rchandle_cast(rchandle_from(this)), sample); - } + typename InstanceReaderSet::const_iterator pos = instance_readers_.find(sample); + if (pos != instance_readers_.end()) { + dispose_set(sample, pos->second); } + + dispose_set(sample, readers_); } void unregister_instance(const T& sample) @@ -133,15 +168,12 @@ class InternalDataWriter : public InternalEntity { instance_map_.erase(sample); } - for (typename ReaderSet::const_iterator pos = readers_.begin(), limit = readers_.end(); pos != limit; ++pos) { - InternalDataReader_rch reader = pos->lock(); - if (reader) { - if (qos_.writer_data_lifecycle.autodispose_unregistered_instances) { - reader->dispose(static_rchandle_cast(rchandle_from(this)), sample); - } - reader->unregister_instance(static_rchandle_cast(rchandle_from(this)), sample); - } + typename InstanceReaderSet::const_iterator pos = instance_readers_.find(sample); + if (pos != instance_readers_.end()) { + unregister_instance_set(sample, pos->second); } + + unregister_instance_set(sample, readers_); } /// @} @@ -150,6 +182,45 @@ class InternalDataWriter : public InternalEntity { typedef OPENDDS_SET(InternalDataReader_wrch) ReaderSet; ReaderSet readers_; + ReaderSet all_instance_readers_; + typedef OPENDDS_MAP_T(T, ReaderSet) InstanceReaderSet; + InstanceReaderSet instance_readers_; + + void write_set(const T& sample, + const ReaderSet& readers) + { + for (typename ReaderSet::const_iterator pos = readers.begin(), limit = readers.end(); pos != limit; ++pos) { + InternalDataReader_rch reader = pos->lock(); + if (reader) { + reader->write(static_rchandle_cast(rchandle_from(this)), sample); + } + } + } + + void dispose_set(const T& sample, + const ReaderSet& readers) + { + for (typename ReaderSet::const_iterator pos = readers.begin(), limit = readers.end(); pos != limit; ++pos) { + InternalDataReader_rch reader = pos->lock(); + if (reader) { + reader->dispose(static_rchandle_cast(rchandle_from(this)), sample); + } + } + } + + void unregister_instance_set(const T& sample, + const ReaderSet& readers) + { + for (typename ReaderSet::const_iterator pos = readers.begin(), limit = readers.end(); pos != limit; ++pos) { + InternalDataReader_rch reader = pos->lock(); + if (reader) { + if (qos_.writer_data_lifecycle.autodispose_unregistered_instances) { + reader->dispose(static_rchandle_cast(rchandle_from(this)), sample); + } + reader->unregister_instance(static_rchandle_cast(rchandle_from(this)), sample); + } + } + } class SampleHolder { public: diff --git a/tests/unit-tests/dds/DCPS/InternalDataWriter.cpp b/tests/unit-tests/dds/DCPS/InternalDataWriter.cpp index 6cd29850286..887106903dc 100644 --- a/tests/unit-tests/dds/DCPS/InternalDataWriter.cpp +++ b/tests/unit-tests/dds/DCPS/InternalDataWriter.cpp @@ -207,3 +207,55 @@ TEST(dds_DCPS_InternalDataWriter, dispose) EXPECT_EQ(samples[0], sample); EXPECT_EQ(SIW(infos[0]), SIW(make_sample_info(DDS::NOT_READ_SAMPLE_STATE, DDS::NEW_VIEW_STATE, DDS::NOT_ALIVE_DISPOSED_INSTANCE_STATE, 0, 0, 0, 0, 0, true))); } + +TEST(dds_DCPS_InternalDataWriter, add_instance_reader) +{ + ReaderType::SampleSequence interesting_samples; + interesting_samples.push_back(Sample("a")); + interesting_samples.push_back(Sample("c")); + + RcHandle writer = make_rch(DataWriterQosBuilder()); + RcHandle reader = make_rch(DataReaderQosBuilder().reliability_reliable()); + reader->set_interesting_instances(interesting_samples); + writer->add_reader(reader); + + EXPECT_TRUE(writer->has_reader(reader)); + writer->write(Sample("a")); + writer->write(Sample("b")); + writer->write(Sample("c")); + + ReaderType::SampleSequence samples; + InternalSampleInfoSequence infos; + reader->take(samples, infos, DDS::LENGTH_UNLIMITED, DDS::ANY_SAMPLE_STATE, DDS::ANY_VIEW_STATE, DDS::ANY_INSTANCE_STATE); + + ASSERT_EQ(samples.size(), 2U); + ASSERT_EQ(infos.size(), 2U); + + EXPECT_EQ(samples[0], Sample("a")); + EXPECT_EQ(SIW(infos[0]), SIW(make_sample_info(DDS::NOT_READ_SAMPLE_STATE, DDS::NEW_VIEW_STATE, DDS::ALIVE_INSTANCE_STATE, 0, 0, 0, 0, 0, true))); + EXPECT_EQ(samples[1], Sample("c")); + EXPECT_EQ(SIW(infos[1]), SIW(make_sample_info(DDS::NOT_READ_SAMPLE_STATE, DDS::NEW_VIEW_STATE, DDS::ALIVE_INSTANCE_STATE, 0, 0, 0, 0, 0, true))); + + writer->dispose(Sample("a")); + writer->dispose(Sample("b")); + + reader->take(samples, infos, DDS::LENGTH_UNLIMITED, DDS::ANY_SAMPLE_STATE, DDS::ANY_VIEW_STATE, DDS::ANY_INSTANCE_STATE); + + ASSERT_EQ(samples.size(), 1U); + ASSERT_EQ(infos.size(), 1U); + + EXPECT_EQ(samples[0], Sample("a")); + EXPECT_EQ(SIW(infos[0]), SIW(make_sample_info(DDS::NOT_READ_SAMPLE_STATE, DDS::NOT_NEW_VIEW_STATE, DDS::NOT_ALIVE_DISPOSED_INSTANCE_STATE, 0, 0, 0, 0, 0, false))); + + writer->unregister_instance(Sample("a")); + writer->unregister_instance(Sample("b")); + writer->unregister_instance(Sample("c")); + + reader->take(samples, infos, DDS::LENGTH_UNLIMITED, DDS::ANY_SAMPLE_STATE, DDS::ANY_VIEW_STATE, DDS::ANY_INSTANCE_STATE); + + ASSERT_EQ(samples.size(), 1U); + ASSERT_EQ(infos.size(), 1U); + + EXPECT_EQ(samples[0], Sample("c")); + EXPECT_EQ(SIW(infos[0]), SIW(make_sample_info(DDS::NOT_READ_SAMPLE_STATE, DDS::NOT_NEW_VIEW_STATE, DDS::NOT_ALIVE_DISPOSED_INSTANCE_STATE, 0, 0, 0, 0, 0, false))); +}