diff --git a/pyopendds/DataWriter.py b/pyopendds/DataWriter.py index a6beee8..ba8ff61 100644 --- a/pyopendds/DataWriter.py +++ b/pyopendds/DataWriter.py @@ -1,2 +1,29 @@ +from __future__ import annotations + +from .Topic import Topic +from .constants import StatusKind +from .util import TimeDurationType, normalize_time_duration + +from typing import TYPE_CHECKING +if TYPE_CHECKING: + from .Publisher import Publisher + + class DataWriter: - pass + + def __init__(self, publisher: Publisher, topic: Topic, qos=None, listener=None): + self.topic = topic + self.qos = qos + self.listener = listener + self.publisher = publisher + publisher.writers.append(self) + + from _pyopendds import create_datawriter + create_datawriter(self, publisher, topic) + + def wait_for(self, status: StatusKind, timeout: TimeDurationType): + from _pyopendds import datareader_wait_for + return datareader_wait_for(self, status, *normalize_time_duration(timeout)) + + def write(self, sample): + return self.topic._ts_package.write(self, sample) diff --git a/pyopendds/Publisher.py b/pyopendds/Publisher.py index a292b85..8c19715 100644 --- a/pyopendds/Publisher.py +++ b/pyopendds/Publisher.py @@ -1,5 +1,6 @@ from __future__ import annotations +from .DataWriter import DataWriter from .Topic import Topic from typing import TYPE_CHECKING @@ -18,5 +19,5 @@ def __init__(self, participant: DomainParticipant, qos=None, listener=None): from _pyopendds import create_publisher create_publisher(self, participant) - def create_datawriter(self, topic: Topic, qos=None, listener=None): - pass + def create_datawriter(self, topic: Topic, qos=None, listener=None) -> DataWriter: + return DataWriter(self, topic, qos, listener) diff --git a/pyopendds/dev/include/pyopendds/user.hpp b/pyopendds/dev/include/pyopendds/user.hpp index aa7c7fd..06b1a1a 100644 --- a/pyopendds/dev/include/pyopendds/user.hpp +++ b/pyopendds/dev/include/pyopendds/user.hpp @@ -30,7 +30,7 @@ class IntegerType { static PyObject* get_python_class() { - return PyLong_Type; + return PyLong_FromLong(0); } static void cpp_to_python(const T& cpp, PyObject*& py) @@ -56,13 +56,17 @@ class IntegerType { "Integer Value is Out of Range for IDL Type", PyExc_ValueError); } if (value == -1 && PyErr_Occurred()) throw Exception(); - cpp = value; + cpp = T(value); } + }; typedef ::CORBA::Long i32; template<> class Type: public IntegerType {}; +typedef ::CORBA::Short i16; +template<> class Type: public IntegerType {}; + // TODO: Put Other Integer Types Here const char* string_data(const std::string& cpp) @@ -90,7 +94,7 @@ class StringType { public: static PyObject* get_python_class() { - return PyUnicode_Type; + return PyUnicode_FromString(""); } static void cpp_to_python(const T& cpp, PyObject*& py, const char* encoding) @@ -101,9 +105,16 @@ class StringType { py = o; } - static void python_to_cpp(PyObject* py, T& cpp) + static void python_to_cpp(PyObject* py, T& cpp, const char* encoding) { - // TODO: Encode or Throw Unicode Error + PyObject* repr = PyObject_Str(py); + if (!repr) throw Exception(); + PyObject* str = PyUnicode_AsEncodedString(repr, encoding, NULL); + if (!str) throw Exception(); + const char *bytes = PyBytes_AS_STRING(str); + cpp = T(bytes); + Py_XDECREF(repr); + Py_XDECREF(str); } }; @@ -127,6 +138,7 @@ class TopicTypeBase { virtual const char* type_name() = 0; virtual void register_type(PyObject* pyparticipant) = 0; virtual PyObject* take_next_sample(PyObject* pyreader) = 0; + virtual PyObject* write(PyObject* pywriter, PyObject* pysample) = 0; typedef std::shared_ptr Ptr; typedef std::map TopicTypes; @@ -215,7 +227,7 @@ class TopicType : public TopicTypeBase { DDS::WaitSet_var ws = new DDS::WaitSet; ws->attach_condition(read_condition); DDS::ConditionSeq active; - const DDS::Duration_t max_wait_time = {10, 0}; + const DDS::Duration_t max_wait_time = {60, 0}; if (Errors::check_rc(ws->wait(active, max_wait_time))) { throw Exception(); } @@ -223,7 +235,7 @@ class TopicType : public TopicTypeBase { reader_impl->delete_readcondition(read_condition); IdlType sample; - DDS::SampleInfo info; + DDS::SampleInfo info; if (Errors::check_rc(reader_impl->take_next_sample(sample, info))) { throw Exception(); } @@ -234,6 +246,34 @@ class TopicType : public TopicTypeBase { return rv; } + PyObject* write(PyObject* pywriter, PyObject* pysample) + { + DDS::DataWriter* writer = get_capsule(pywriter); + if (!writer) throw Exception(); + + DataWriter* writer_impl = DataWriter::_narrow(writer); + if (!writer_impl) { + throw Exception( + "Could not narrow writer implementation", Errors::PyOpenDDS_Error()); + } + + IdlType rv; + Type::python_to_cpp(pysample, rv); + + DDS::ReturnCode_t rc = writer_impl->write(rv, DDS::HANDLE_NIL); + if (Errors::check_rc(rc)) { + throw Exception(); + } + // Wait for samples to be acknowledged + DDS::Duration_t timeout = { 30, 0 }; + if (writer_impl->wait_for_acknowledgments(timeout) != DDS::RETCODE_OK) { + throw Exception( + "wait_for_acknowledgments error : ", Errors::PyOpenDDS_Error()); + } + + return pysample; + } + PyObject* get_python_class() { return Type::get_python_class(); diff --git a/pyopendds/dev/itl2py/CppOutput.py b/pyopendds/dev/itl2py/CppOutput.py index b922eb0..5a71e25 100644 --- a/pyopendds/dev/itl2py/CppOutput.py +++ b/pyopendds/dev/itl2py/CppOutput.py @@ -1,6 +1,6 @@ from jinja2 import Environment -from .ast import PrimitiveType, StructType, EnumType +from .ast import PrimitiveType, StructType, EnumType, SequenceType from .Output import Output @@ -13,6 +13,8 @@ def cpp_type_name(type_node): return type_node.kind.name elif isinstance(type_node, (StructType, EnumType)): return cpp_name(type_node.name.parts) + elif isinstance(type_node, (SequenceType)): + return cpp_name(type_node.name.parts); else: raise NotImplementedError @@ -44,33 +46,85 @@ def visit_struct(self, struct_type): struct_to_lines = [ 'Ref field_value;', ] - struct_from_lines = [] + struct_from_lines = [ + 'Ref field_value;', + ] for field_name, field_node in struct_type.fields.items(): to_lines = [] from_lines = [] pyopendds_type = '' is_string = isinstance(field_node.type_node, PrimitiveType) and \ field_node.type_node.is_string() + is_sequence = isinstance(field_node.type_node, SequenceType) + + if is_sequence: + to_lines = [ + 'Ref field_elem;', + 'field_value = PyList_New(0);', + 'for (int i = 0; i < cpp.{field_name}.length(); i++) {{', + ' {pyopendds_type} elem = cpp.{field_name}[i];', + ' field_elem = nullptr;', + ' Type<{pyopendds_type}>::cpp_to_python(elem', + ' #ifdef CPP11_IDL', + ' ()', + ' #endif', + ' , *field_elem' + (', "{default_encoding}"' if is_string else '') + ');', + ' PyList_Append(*field_value, *field_elem);', + '}}' + ] + else: + to_lines = [ + 'Type<{pyopendds_type}>::cpp_to_python(cpp.{field_name}', + '#ifdef CPP11_IDL', + ' ()', + '#endif', + ' , *field_value' + + (', "{default_encoding}"' if is_string else '') + ');', + ] - to_lines = [ - 'Type<{pyopendds_type}>::cpp_to_python(cpp.{field_name}', - '#ifdef CPP11_IDL', - ' ()', - '#endif', - ' , *field_value' - + (', "{default_encoding}"' if is_string else '') + ');', + from_lines = [ + 'if (PyObject_HasAttrString(py, "{field_name}")) {{', + ' *field_value = PyObject_GetAttrString(py, "{field_name}");', + '}}', + 'if (!field_value) {{', + ' throw Exception();', + '}}' ] pyopendds_type = cpp_type_name(field_node.type_node) if to_lines: to_lines.extend([ - 'if (!field_value || PyObject_SetAttrString(' + 'if (!field_value || PyObject_SetAttrString(', 'py, "{field_name}", *field_value)) {{', ' throw Exception();', '}}' ]) + if from_lines: + if is_sequence: + from_lines.extend([ + 'cpp.{field_name}.length(PyList_Size(*field_value));', + 'for (int i = 0; i < PyList_Size(*field_value); i++) {{', + ' ::ContTrajSegment elem = cpp.{field_name}[i];', + ' Type<{pyopendds_type}>::python_to_cpp(PyList_GetItem(*field_value, i), elem', + '#ifdef CPP11_IDL', + ' ()', + '#endif', + ' ' + (', "{default_encoding}"' if is_string else '') + ');', + ' cpp.{field_name}[i] = elem;', + '}}' + ]) + else: + from_lines.extend([ + 'Type<{pyopendds_type}>::python_to_cpp(*field_value, cpp.{field_name}', + '#ifdef CPP11_IDL', + ' ()', + '#endif', + ' ' + + (', "{default_encoding}"' if is_string else '') + ');' + ]) + def line_process(lines): return [''] + [ s.format( @@ -107,9 +161,6 @@ def visit_enum(self, enum_type): 'args = PyTuple_Pack(1, PyLong_FromLong(static_cast(cpp)));', ]), 'to_lines': '', - 'from_lines': '\n'.join([ - '', - '// left unimplemented' - ]), + 'from_lines': '', 'is_topic_type': False, }) diff --git a/pyopendds/dev/itl2py/PythonOutput.py b/pyopendds/dev/itl2py/PythonOutput.py index 42920d3..ae624a2 100644 --- a/pyopendds/dev/itl2py/PythonOutput.py +++ b/pyopendds/dev/itl2py/PythonOutput.py @@ -1,4 +1,4 @@ -from .ast import PrimitiveType, StructType, EnumType +from .ast import PrimitiveType, StructType, EnumType, SequenceType from .Output import Output @@ -72,6 +72,8 @@ def get_python_default_value_string(self, field_type): return type_name + '()' elif isinstance(field_type, EnumType): return type_name + '.' + field_type.default_member + elif isinstance(field_type, SequenceType): + return 'field(default_factory=list)' else: raise NotImplementedError(repr(field_type) + " is not supported") @@ -98,4 +100,4 @@ def visit_enum(self, enum_type): dict(name=name, value=value) for name, value in enum_type.members.items() ], ), - )) + )) \ No newline at end of file diff --git a/pyopendds/dev/itl2py/ast.py b/pyopendds/dev/itl2py/ast.py index 28a59a4..1e201cc 100644 --- a/pyopendds/dev/itl2py/ast.py +++ b/pyopendds/dev/itl2py/ast.py @@ -215,6 +215,9 @@ def __repr__(self): return self.repr_template(repr(self.base_type) + ("max " + str(self.max_count) if self.max_count else "no max")) + def repr_name(self): + if self.name: + return '::' + self.name.join('::') + '::_tao_seq_' + self.base_type + '_' class NodeVisitor: diff --git a/pyopendds/dev/itl2py/itl.py b/pyopendds/dev/itl2py/itl.py index d447ee3..f248211 100644 --- a/pyopendds/dev/itl2py/itl.py +++ b/pyopendds/dev/itl2py/itl.py @@ -74,7 +74,7 @@ def parse_string(details): def parse_sequence(types, details): - base_type = parse_type(types, details["type"]) + base_type = parse_type(types, list(types)[0]) sequence_max_count = details.get("capacity", None) array_dimensions = details.get("size", None) if array_dimensions is not None: @@ -86,9 +86,16 @@ def parse_sequence(types, details): def parse_record(types, details): struct_type = StructType() for field_dict in details['fields']: - struct_type.add_field( - field_dict['name'], parse_type(types, field_dict['type']), - field_dict.get('optional', False)) + if 'sequence' in field_dict['type']: + sequence = parse_sequence(types, {'type': field_dict['type'], 'capacity': 1, 'size': None}) + sequence.set_name(itl_name=sequence.base_type.name.itl_name) + struct_type.add_field( + field_dict['name'], sequence, + field_dict.get('optional', False)) + else: + struct_type.add_field( + field_dict['name'], parse_type(types, field_dict['type']), + field_dict.get('optional', False)) return struct_type @@ -132,6 +139,8 @@ def parse_type(types, details): if details_type is str: if details in types: return types[details] + elif 'sequence' in details : + return parse_sequence(types, {'type':types, 'capacity': 1, 'size': None}) else: raise ValueError("Invalid Type: " + details) elif details_type is dict: diff --git a/pyopendds/dev/itl2py/templates/user.cpp b/pyopendds/dev/itl2py/templates/user.cpp index e57c9ca..fa5e882 100644 --- a/pyopendds/dev/itl2py/templates/user.cpp +++ b/pyopendds/dev/itl2py/templates/user.cpp @@ -59,7 +59,21 @@ class Type { static void python_to_cpp(PyObject* py, /*{{ type.cpp_name }}*/& cpp) { PyObject* cls = get_python_class(); - /*{{ type.from_lines | indent(4) }}*/ + /*{% if type.to_replace %}*/ + cpp = static_cast(PyLong_AsLong(py)); + /*{% else %}*/ + if (py) { + + if (PyObject_IsInstance(py, cls) != 1) { + throw Exception("Not a {{ type.py_name }}", PyExc_TypeError); + } + } else { + PyObject* args; + /*{{ type.new_lines | indent(6) }}*/ + py = PyObject_CallObject(cls, args); + } + /*{% if type.from_lines %}*//*{{ type.from_lines | indent(4) }}*//*{% endif %}*/ + /*{% endif %}*/ } }; @@ -119,9 +133,30 @@ PyObject* pytake_next_sample(PyObject* self, PyObject* args) } } +PyObject* pywrite(PyObject* self, PyObject* args) +{ + Ref pywriter; + Ref pysample; + if (!PyArg_ParseTuple(args, "OO", &*pywriter, &*pysample)) return nullptr; + pywriter++; + + // Try to Get Reading Type and Do write + Ref pytopic = PyObject_GetAttrString(*pywriter, "topic"); + if (!pytopic) return nullptr; + Ref pytype = PyObject_GetAttrString(*pytopic, "type"); + if (!pytype) return nullptr; + + try { + return TopicTypeBase::find(*pytype)->write(*pywriter, *pysample); + } catch (const Exception& e) { + return e.set(); + } +} + PyMethodDef /*{{ native_package_name }}*/_Methods[] = { {"register_type", pyregister_type, METH_VARARGS, ""}, {"type_name", pytype_name, METH_VARARGS, ""}, + {"write", pywrite, METH_VARARGS, ""}, {"take_next_sample", pytake_next_sample, METH_VARARGS, ""}, {nullptr, nullptr, 0, nullptr} }; diff --git a/pyopendds/dev/itl2py/templates/user.py b/pyopendds/dev/itl2py/templates/user.py index 63aaf37..0d4391d 100644 --- a/pyopendds/dev/itl2py/templates/user.py +++ b/pyopendds/dev/itl2py/templates/user.py @@ -1,5 +1,6 @@ {% if has_struct -%} from dataclasses import dataclass as _pyopendds_struct +from dataclasses import field {%- endif %} {% if has_enum -%} from enum import IntFlag as _pyopendds_enum diff --git a/pyopendds/ext/_pyopendds.cpp b/pyopendds/ext/_pyopendds.cpp index 0f16aaa..34b7785 100644 --- a/pyopendds/ext/_pyopendds.cpp +++ b/pyopendds/ext/_pyopendds.cpp @@ -366,6 +366,59 @@ PyObject* create_datareader(PyObject* self, PyObject* args) Py_RETURN_NONE; } +/** + * Callback for Python to Call when the DataWriter Capsule is Deleted + */ +void delete_datawriter_var(PyObject* writer_capsule) +{ + if (PyCapsule_CheckExact(writer_capsule)) { + DDS::DataWriter_var writer = static_cast( + PyCapsule_GetPointer(writer_capsule, nullptr)); + writer = nullptr; + } +} + +/** + * create_datawriter(datawriter: DataWriter, publisher: Publisher, topic: Topic) -> None + */ +PyObject* create_datawriter(PyObject* self, PyObject* args) +{ + Ref pydatawriter; + Ref pypublisher; + Ref pytopic; + if (!PyArg_ParseTuple(args, "OOO", + &*pydatawriter, &*pypublisher, &*pytopic)) { + return nullptr; + } + pydatawriter++; + pypublisher++; + pytopic++; + + // Get Publisher + DDS::Publisher* publisher = get_capsule(*pypublisher); + if (!publisher) return nullptr; + + // Get Topic + DDS::Topic* topic = get_capsule(*pytopic); + if (!topic) return nullptr; + + // Create DataWriter + DDS::DataWriter* datawriter = publisher->create_datawriter( + topic, DATAWRITER_QOS_DEFAULT, nullptr, + OpenDDS::DCPS::DEFAULT_STATUS_MASK); + if (!datawriter) { + PyErr_SetString(Errors::PyOpenDDS_Error(), "Failed to Create DataWriter"); + return nullptr; + } + + // Attach OpenDDS DataWriter to DataWriter Python Object + if (set_capsule(*pydatawriter, datawriter, delete_datawriter_var)) { + return nullptr; + } + + Py_RETURN_NONE; +} + /** * datareader_wait_for( * datareader: DataReader, status: StatusKind, @@ -400,6 +453,40 @@ PyObject* datareader_wait_for(PyObject* self, PyObject* args) Py_RETURN_NONE; } +/** + * datawriter_wait_for( + * datawriter: DataWriter, status: StatusKind, + * seconds: int, nanoseconds: int) -> None + */ +PyObject* datawriter_wait_for(PyObject* self, PyObject* args) +{ + Ref pydatawriter; + unsigned status; + int seconds; + unsigned nanoseconds; + if (!PyArg_ParseTuple(args, "OIiI", + &*pydatawriter, &status, &seconds, &nanoseconds)) { + return nullptr; + } + pydatawriter++; + + // Get DataWriter + DDS::DataWriter* writer = get_capsule(*pydatawriter); + if (!writer) return nullptr; + + // Wait + DDS::StatusCondition_var condition = writer->get_statuscondition(); + condition->set_enabled_statuses(status); + DDS::WaitSet_var waitset = new DDS::WaitSet; + if (!waitset) return PyErr_NoMemory(); + waitset->attach_condition(condition); + DDS::ConditionSeq active; + DDS::Duration_t max_duration = {seconds, nanoseconds}; + if (Errors::check_rc(waitset->wait(active, max_duration))) return nullptr; + + Py_RETURN_NONE; +} + /// Documentation for Internal Python Objects const char* internal_docstr = "Internal to PyOpenDDS, not for use directly!"; @@ -414,7 +501,9 @@ PyMethodDef pyopendds_Methods[] = { {"create_publisher", create_publisher, METH_VARARGS, internal_docstr}, {"create_topic", create_topic, METH_VARARGS, internal_docstr}, {"create_datareader", create_datareader, METH_VARARGS, internal_docstr}, + {"create_datawriter", create_datawriter, METH_VARARGS, internal_docstr}, {"datareader_wait_for", datareader_wait_for, METH_VARARGS, internal_docstr}, + {"datawriter_wait_for", datawriter_wait_for, METH_VARARGS, internal_docstr}, {nullptr, nullptr, 0, nullptr} }; diff --git a/pyopendds/init_opendds.py b/pyopendds/init_opendds.py index 8c537ad..ecf1bec 100644 --- a/pyopendds/init_opendds.py +++ b/pyopendds/init_opendds.py @@ -1,6 +1,7 @@ '''Manage the initialization of OpenDDS and related functionality. ''' +import sys def init_opendds(*args, default_rtps=True, @@ -19,7 +20,7 @@ def init_opendds(*args, verbose). It is printed to stdout. ''' - args = list(args) + args = list(sys.argv[1:]) if opendds_debug_level > 0: if not (1 <= opendds_debug_level <= 10): diff --git a/tests/basic_test/CMakeLists.txt b/tests/basic_test/CMakeLists.txt index 4b8b942..a5a8bb9 100644 --- a/tests/basic_test/CMakeLists.txt +++ b/tests/basic_test/CMakeLists.txt @@ -23,3 +23,10 @@ if(${CPP11_IDL}) set_target_properties(publisher PROPERTIES COMPILE_DEFINITIONS "CPP11_IDL") endif() + +add_executable(subscriber subscriber.cpp DataReaderListenerImpl.cpp) +target_link_libraries(subscriber OpenDDS::OpenDDS basic_idl) +if(${CPP11_IDL}) + set_target_properties(subscriber PROPERTIES + COMPILE_DEFINITIONS "CPP11_IDL") +endif() diff --git a/tests/basic_test/DataReaderListenerImpl.cpp b/tests/basic_test/DataReaderListenerImpl.cpp new file mode 100644 index 0000000..a164871 --- /dev/null +++ b/tests/basic_test/DataReaderListenerImpl.cpp @@ -0,0 +1,93 @@ +/* + * + * + * Distributed under the OpenDDS License. + * See: http://www.opendds.org/license.html + */ + +#include +#include + +#include "DataReaderListenerImpl.h" +#include "basicTypeSupportC.h" +#include "basicTypeSupportImpl.h" + +#include + +void +DataReaderListenerImpl::on_requested_deadline_missed( + DDS::DataReader_ptr /*reader*/, + const DDS::RequestedDeadlineMissedStatus& /*status*/) +{ +} + +void +DataReaderListenerImpl::on_requested_incompatible_qos( + DDS::DataReader_ptr /*reader*/, + const DDS::RequestedIncompatibleQosStatus& /*status*/) +{ +} + +void +DataReaderListenerImpl::on_sample_rejected( + DDS::DataReader_ptr /*reader*/, + const DDS::SampleRejectedStatus& /*status*/) +{ +} + +void +DataReaderListenerImpl::on_liveliness_changed( + DDS::DataReader_ptr /*reader*/, + const DDS::LivelinessChangedStatus& /*status*/) +{ +} + +void +DataReaderListenerImpl::on_data_available(DDS::DataReader_ptr reader) +{ + basic::ReadingDataReader_var reader_i = + basic::ReadingDataReader::_narrow(reader); + + if (!reader_i) { + ACE_ERROR((LM_ERROR, + ACE_TEXT("ERROR: %N:%l: on_data_available() -") + ACE_TEXT(" _narrow failed!\n"))); + ACE_OS::exit(1); + } + + basic::Reading sample; + DDS::SampleInfo info; + + DDS::ReturnCode_t error = reader_i->take_next_sample(sample, info); + + if (error == DDS::RETCODE_OK) { + std::cout << "SampleInfo.sample_rank = " << info.sample_rank << std::endl; + std::cout << "SampleInfo.instance_state = " << info.instance_state << std::endl; + + if (info.valid_data) { + std::cout << "Message: kind = " << sample.kind << std::endl + << " value = " << sample.value << std::endl + << " where = " << sample.where << std::endl; + + } + + } else { + ACE_ERROR((LM_ERROR, + ACE_TEXT("ERROR: %N:%l: on_data_available() -") + ACE_TEXT(" take_next_sample failed!\n"))); + } +} + +void +DataReaderListenerImpl::on_subscription_matched( + DDS::DataReader_ptr /*reader*/, + const DDS::SubscriptionMatchedStatus& /*status*/) +{ +} + +void +DataReaderListenerImpl::on_sample_lost( + DDS::DataReader_ptr /*reader*/, + const DDS::SampleLostStatus& /*status*/) +{ +} \ No newline at end of file diff --git a/tests/basic_test/DataReaderListenerImpl.h b/tests/basic_test/DataReaderListenerImpl.h new file mode 100644 index 0000000..79955a5 --- /dev/null +++ b/tests/basic_test/DataReaderListenerImpl.h @@ -0,0 +1,48 @@ +/* + * + * + * Distributed under the OpenDDS License. + * See: http://www.opendds.org/license.html + */ + +#ifndef DATAREADER_LISTENER_IMPL_H +#define DATAREADER_LISTENER_IMPL_H + +#include + +#include +#include +#include + +class DataReaderListenerImpl + : public virtual OpenDDS::DCPS::LocalObject { +public: + virtual void on_requested_deadline_missed( + DDS::DataReader_ptr reader, + const DDS::RequestedDeadlineMissedStatus& status); + + virtual void on_requested_incompatible_qos( + DDS::DataReader_ptr reader, + const DDS::RequestedIncompatibleQosStatus& status); + + virtual void on_sample_rejected( + DDS::DataReader_ptr reader, + const DDS::SampleRejectedStatus& status); + + virtual void on_liveliness_changed( + DDS::DataReader_ptr reader, + const DDS::LivelinessChangedStatus& status); + + virtual void on_data_available( + DDS::DataReader_ptr reader); + + virtual void on_subscription_matched( + DDS::DataReader_ptr reader, + const DDS::SubscriptionMatchedStatus& status); + + virtual void on_sample_lost( + DDS::DataReader_ptr reader, + const DDS::SampleLostStatus& status); +}; + +#endif /* DATAREADER_LISTENER_IMPL_H */ \ No newline at end of file diff --git a/tests/basic_test/publisher.py b/tests/basic_test/publisher.py new file mode 100644 index 0000000..4bdb919 --- /dev/null +++ b/tests/basic_test/publisher.py @@ -0,0 +1,34 @@ +import sys +import time +from datetime import timedelta + +from pyopendds import \ + init_opendds, DomainParticipant, StatusKind, PyOpenDDS_Error +from pybasic.basic import Reading, ReadingKind + +if __name__ == "__main__": + try: + # Initialize OpenDDS and Create DDS Entities + init_opendds(opendds_debug_level=1) + domain = DomainParticipant(34) + topic = domain.create_topic('Readings', Reading) + publisher = domain.create_publisher() + writer = publisher.create_datawriter(topic) + + # Wait for Subscriber to Connect + print('Waiting for Subscriber...') + writer.wait_for(StatusKind.PUBLICATION_MATCHED, timedelta(seconds=60)) + print('Found subscriber!') + + sample = Reading() + sample.kind = ReadingKind.acceleration + sample.value = 123 + sample.where = "somewhere" + + time.sleep(1) + # Read and Print Sample + writer.write(sample) + print('Done!') + + except PyOpenDDS_Error as e: + sys.exit(e) diff --git a/tests/basic_test/run_test.sh b/tests/basic_test/run_test.sh index 1591bda..a527fe2 100644 --- a/tests/basic_test/run_test.sh +++ b/tests/basic_test/run_test.sh @@ -5,20 +5,45 @@ sub=$! cd $dir ./publisher -DCPSConfigFile ../rtps.ini & pub=$! +cd - exit_status=0 wait $pub pub_status=$? if [ $pub_status -ne 0 ] then - echo "Publisher exited with status $pub_status" 1>&2 + echo "Cpp publisher exited with status $pub_status" 1>&2 exit_status=1 fi wait $sub sub_status=$? if [ $sub_status -ne 0 ] then - echo "Subscriber exited with status $sub_status" 1>&2 + echo "Python subscriber exited with status $sub_status" 1>&2 + exit_status=1 +fi + +cd $dir +./subscriber -DCPSConfigFile ../rtps.ini & +sub=$! +cd - + +LD_LIBRARY_PATH="$LD_LIBRARY_PATH:$dir" python3 publisher.py & +pub=$! + +exit_status=0 +wait $pub +pub_status=$? +if [ $pub_status -ne 0 ] +then + echo "Python publisher exited with status $pub_status" 1>&2 + exit_status=1 +fi +wait $sub +sub_status=$? +if [ $sub_status -ne 0 ] +then + echo "Cpp subscriber exited with status $sub_status" 1>&2 exit_status=1 fi exit $exit_status diff --git a/tests/basic_test/subscriber.cpp b/tests/basic_test/subscriber.cpp new file mode 100644 index 0000000..36789d4 --- /dev/null +++ b/tests/basic_test/subscriber.cpp @@ -0,0 +1,130 @@ +#include + +#include +#include +#include +#include +#include +#include + +#include "DataReaderListenerImpl.h" +#include "basicTypeSupportImpl.h" + +#include + +using OpenDDS::DCPS::retcode_to_string; + +int main(int argc, char* argv[]) { + + try { + // Init OpenDDS + TheServiceParticipant->default_configuration_file("rtps.ini"); + DDS::DomainParticipantFactory_var opendds = + TheParticipantFactoryWithArgs(argc, argv); + + DDS::DomainParticipantQos part_qos; + opendds->get_default_participant_qos(part_qos); + DDS::DomainParticipant_var participant = opendds->create_participant( + 34, part_qos, 0, OpenDDS::DCPS::DEFAULT_STATUS_MASK); + if (!participant) { + std::cerr << "Error: Failed to create participant" << std::endl; + return 1; + } + + basic::ReadingTypeSupport_var ts = new basic::ReadingTypeSupportImpl(); + DDS::ReturnCode_t rc = ts->register_type(participant.in(), ""); + if (rc != DDS::RETCODE_OK) { + std::cerr + << "Error: Failed to register type: " + << retcode_to_string(rc) << std::endl; + return 1; + } + + CORBA::String_var type_name = ts->get_type_name(); + DDS::Topic_var topic = participant->create_topic( + "Readings", type_name.in(), TOPIC_QOS_DEFAULT, 0, + OpenDDS::DCPS::DEFAULT_STATUS_MASK); + if (!topic) { + std::cerr << "Error: Failed to create topic" << std::endl; + return 1; + } + + DDS::Subscriber_var subscriber = participant->create_subscriber( + SUBSCRIBER_QOS_DEFAULT, 0, + OpenDDS::DCPS::DEFAULT_STATUS_MASK); + if (!subscriber) { + std::cerr << "Error: Failed to create subscriber" << std::endl; + return 1; + } + + DDS::DataReaderListener_var listener(new DataReaderListenerImpl); + DDS::DataReaderQos qos; + subscriber->get_default_datareader_qos(qos); + DDS::DataReader_var reader = subscriber->create_datareader( + topic.in(), qos, listener, + OpenDDS::DCPS::DEFAULT_STATUS_MASK); + if (!reader) { + std::cerr << "Error: Failed to create reader" << std::endl; + return 1; + } + basic::ReadingDataReader_var reader_i = + basic::ReadingDataReader::_narrow(reader); + + if (!reader_i) { + ACE_ERROR_RETURN((LM_ERROR, + ACE_TEXT("ERROR: %N:%l: main() -") + ACE_TEXT(" _narrow failed!\n")), + 1); + } + + // Wait for Subscriber + std::cout << "Wating for Subscriber..." << std::endl; + DDS::StatusCondition_var sc = reader->get_statuscondition(); + sc->set_enabled_statuses(DDS::SUBSCRIPTION_MATCHED_STATUS); + DDS::WaitSet_var ws = new DDS::WaitSet; + ws->attach_condition(sc); + const DDS::Duration_t max_wait = {10, 0}; + DDS::SubscriptionMatchedStatus status = {0, 0, 0, 0, 0}; + while (status.current_count < 1) { + DDS::ConditionSeq active; + if (ws->wait(active, max_wait) != DDS::RETCODE_OK) { + std::cerr << "Error: Timedout waiting for subscriber" << std::endl; + return 1; + } + if (reader->get_subscription_matched_status(status) != DDS::RETCODE_OK) { + std::cerr << "Error: Failed to get pub matched status" << std::endl; + return 1; + } + } + ws->detach_condition(sc); + std::cout << "Found Publisher..." << std::endl; + + DDS::SubscriptionMatchedStatus matches; + if (reader->get_subscription_matched_status(matches) != DDS::RETCODE_OK) { + ACE_ERROR_RETURN((LM_ERROR, + ACE_TEXT("ERROR: %N:%l: main() -") + ACE_TEXT(" get_subscription_matched_status failed!\n")), + 1); + } + + DDS::ConditionSeq conditions; + DDS::Duration_t timeout = { 60, 0 }; + if (ws->wait(conditions, timeout) != DDS::RETCODE_OK) { + ACE_ERROR_RETURN((LM_ERROR, + ACE_TEXT("ERROR: %N:%l: main() -") + ACE_TEXT(" wait failed!\n")), + 1); + } + + // Cleanup + participant->delete_contained_entities(); + opendds->delete_participant(participant.in()); + TheServiceParticipant->shutdown(); + + } catch (const CORBA::Exception& e) { + e._tao_print_exception("Exception caught in main():"); + return 1; + } + + return 0; +}