Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add set_topic_filter & matched_pub, sub & status getters & statistics #43

Merged
merged 2 commits into from
Jan 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 140 additions & 0 deletions clayer/pysertype.c
Original file line number Diff line number Diff line change
Expand Up @@ -1670,6 +1670,138 @@ ddspy_take_endpoint(PyObject *self, PyObject *args)
/* end builtin topic */


static PyObject *
ddspy_get_matched_subscription_data(PyObject *self, PyObject *args)
{
dds_entity_t writer;
dds_instance_handle_t handle;
dds_builtintopic_endpoint_t* endpoint = NULL;

PyObject* endpoint_constructor;
PyObject* cqos_to_qos;
(void)self;

if (!PyArg_ParseTuple(args, "iKOO", &writer, &handle, &endpoint_constructor, &cqos_to_qos))
return NULL;

endpoint = dds_get_matched_subscription_data(writer, handle);

if (endpoint == NULL) {
Py_INCREF(Py_None);
return Py_None;
}

PyObject* qos_p, *qos;

if (endpoint->qos != NULL) {
qos_p = PyLong_FromVoidPtr(endpoint->qos);
if (PyErr_Occurred()) {
PyErr_Clear();
PyErr_SetString(PyExc_Exception, "VoidPtr errored.");
return NULL;
}
qos = PyObject_CallFunction(cqos_to_qos, "O", qos_p);
if (PyErr_Occurred()) {
PyErr_Clear();
PyErr_SetString(PyExc_Exception, "Callfunc cqos errored.");
return NULL;
}
} else {
Py_INCREF(Py_None);
Py_INCREF(Py_None);
qos_p = Py_None;
qos = Py_None;
}

PyObject* item = PyObject_CallFunction( \
endpoint_constructor, "y#y#Ks#s#O", \
endpoint->key.v, (Py_ssize_t) 16, \
endpoint->participant_key.v, (Py_ssize_t) 16, \
endpoint->participant_instance_handle,
endpoint->topic_name, endpoint->topic_name == NULL ? 0 : strlen(endpoint->topic_name),
endpoint->type_name, endpoint->type_name == NULL ? 0 : strlen(endpoint->type_name),
qos
);
if (PyErr_Occurred()) {
PyErr_Clear();
PyErr_SetString(PyExc_Exception, "Callfunc endpoint constructor errored.");
return NULL;
}
Py_DECREF(qos_p);
Py_DECREF(qos);

dds_builtintopic_free_endpoint(endpoint);

return item;
}


static PyObject *
ddspy_get_matched_publication_data(PyObject *self, PyObject *args)
{
dds_entity_t reader;
dds_instance_handle_t handle;
dds_builtintopic_endpoint_t* endpoint = NULL;

PyObject* endpoint_constructor;
PyObject* cqos_to_qos;
(void)self;

if (!PyArg_ParseTuple(args, "iKOO", &reader, &handle, &endpoint_constructor, &cqos_to_qos))
return NULL;

endpoint = dds_get_matched_publication_data(reader, handle);

if (endpoint == NULL) {
Py_INCREF(Py_None);
return Py_None;
}

PyObject* qos_p, *qos;

if (endpoint->qos != NULL) {
qos_p = PyLong_FromVoidPtr(endpoint->qos);
if (PyErr_Occurred()) {
PyErr_Clear();
PyErr_SetString(PyExc_Exception, "VoidPtr errored.");
return NULL;
}
qos = PyObject_CallFunction(cqos_to_qos, "O", qos_p);
if (PyErr_Occurred()) {
PyErr_Clear();
PyErr_SetString(PyExc_Exception, "Callfunc cqos errored.");
return NULL;
}
} else {
Py_INCREF(Py_None);
Py_INCREF(Py_None);
qos_p = Py_None;
qos = Py_None;
}

PyObject* item = PyObject_CallFunction( \
endpoint_constructor, "y#y#Ks#s#O", \
endpoint->key.v, (Py_ssize_t) 16, \
endpoint->participant_key.v, (Py_ssize_t) 16, \
endpoint->participant_instance_handle,
endpoint->topic_name, endpoint->topic_name == NULL ? 0 : strlen(endpoint->topic_name),
endpoint->type_name, endpoint->type_name == NULL ? 0 : strlen(endpoint->type_name),
qos
);
if (PyErr_Occurred()) {
PyErr_Clear();
PyErr_SetString(PyExc_Exception, "Callfunc endpoint constructor errored.");
return NULL;
}
Py_DECREF(qos_p);
Py_DECREF(qos);

dds_builtintopic_free_endpoint(endpoint);

return item;
}


char ddspy_docs[] = "DDSPY module";

PyMethodDef ddspy_funcs[] = {
Expand Down Expand Up @@ -1777,6 +1909,14 @@ PyMethodDef ddspy_funcs[] = {
(PyCFunction)ddspy_take_endpoint,
METH_VARARGS,
ddspy_docs},
{ "ddspy_get_matched_subscription_data",
(PyCFunction)ddspy_get_matched_subscription_data,
METH_VARARGS,
ddspy_docs},
{ "ddspy_get_matched_publication_data",
(PyCFunction)ddspy_get_matched_publication_data,
METH_VARARGS,
ddspy_docs},
{ NULL}
};

Expand Down
93 changes: 91 additions & 2 deletions cyclonedds/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@
import ctypes as ct
from weakref import WeakValueDictionary
from typing import Any, Callable, Dict, Optional, List, TYPE_CHECKING
from datetime import datetime, time, timedelta

from .internal import c_call, c_callable, dds_infinity, dds_c_t, DDS
from .internal import c_call, c_callable, dds_infinity, dds_c_t, DDS, stat_keyvalue, stat_kind
from .qos import Qos, Policy, _CQos


Expand Down Expand Up @@ -1529,6 +1530,7 @@ async def wait_async(self, timeout: Optional[int] = None) -> int:
with concurrent.futures.ThreadPoolExecutor() as pool:
return await loop.run_in_executor(pool, self.wait, timeout)


@c_call("dds_create_waitset")
def _create_waitset(self, domain_participant: dds_c_t.entity) -> dds_c_t.entity:
pass
Expand Down Expand Up @@ -1556,6 +1558,93 @@ def _waitset_set_trigger(self, waitset: dds_c_t.entity, value: ct.c_bool) -> dds
pass


class Statistics(DDS):
"""Statistics object for entity.

Attributes
----------
entity: Entity
The handle of entity to which this set of statistics applies.
opaque: int
The internal data.
time: datatime
Time stamp of lastest call to `Statistics(entity).refresh()` in nanoseconds since epoch.
count: int
Number of key-value pairs.
data: dict
Data.

Examples
--------
>>> Statistics(datawriter)
>>> Statistics(datawriter).refresh()
"""

entity: Entity
opaque: int
time: datetime
count: int
data: Dict[str, int]

def __init__(self, entity: Entity):
self.entity = entity
self._c_statistics = self._create_statistics(entity._ref)
if not self._c_statistics:
raise DDSException(DDSException.DDS_RETCODE_ERROR, msg="Could not initialize statistics.")
self._c_statistics = ct.cast(
self._c_statistics, ct.POINTER(dds_c_t.stat_factory(self._c_statistics[0].count))
)
self._update()

def __del__(self):
self._delete_statistics(ct.cast(self._c_statistics, ct.POINTER(dds_c_t.statistics)))

def _update(self):
self.data = {}
self.opaque = self._c_statistics[0].opaque
self.time = self._c_statistics[0].time
self.count = self._c_statistics[0].count
self.kv = self._c_statistics[0].kv

for i in range(self.count):
name = self.kv[i].name.decode('utf8') # ct.c_char_p
value = None
if self.kv[i].kind == stat_kind.DDS_STAT_KIND_UINT32:
value = self.kv[i].u.u32
elif self.kv[i].kind == stat_kind.DDS_STAT_KIND_UINT64:
value = self.kv[i].u.u64
elif self.kv[i].kind == stat_kind.DDS_STAT_KIND_LENGTHTIME:
value = self.kv[i].u.lengthtime
self.data[name] = value

def refresh(self):
"""Update a previously created statistics structure with current values.

Only the time stamp and the values (and "opaque") may change.
The set of keys and the types of the values do not change.
"""
self._refresh_statistics(ct.cast(self._c_statistics, ct.POINTER(dds_c_t.statistics)))
self._c_statistics = ct.cast(self._c_statistics, ct.POINTER(dds_c_t.stat_factory(self._c_statistics[0].count)))
self._update()

def __str__(self):
return f"Statistics({self.entity}, opaque={self.opaque}, time={self.time}, data={self.data})"

@c_call("dds_create_statistics")
def _create_statistics(self, entity: dds_c_t.entity) -> ct.POINTER(dds_c_t.statistics):
pass

@c_call("dds_refresh_statistics")
def _refresh_statistics(self, stat: ct.POINTER(dds_c_t.statistics)) -> dds_c_t.returnv:
pass

@c_call("dds_delete_statistics")
def _delete_statistics(self, stat: ct.POINTER(dds_c_t.statistics)) -> None:
pass

__repr__ = __str__


__all__ = ["DDSException", "Entity", "Qos", "Policy", "Listener", "DDSStatus", "ViewState",
"InstanceState", "SampleState", "ReadCondition", "QueryCondition", "GuardCondition",
"WaitSet"]
"WaitSet", "Statistics"]
41 changes: 41 additions & 0 deletions cyclonedds/internal.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from ctypes.util import find_library
from functools import wraps
from dataclasses import dataclass
from enum import IntEnum


class CycloneDDSLoaderException(Exception):
Expand Down Expand Up @@ -235,6 +236,26 @@ class InvalidSample:
sample_info: SampleInfo


class stat_kind(IntEnum):
DDS_STAT_KIND_UINT32 = 0
DDS_STAT_KIND_UINT64 = 1
DDS_STAT_KIND_LENGTHTIME = 2


class stat_value(ct.Union):
_fields_ = [
('u32', ct.c_uint32),
('u64', ct.c_uint64),
('lengthtime', ct.c_uint64)
]

class stat_keyvalue(ct.Structure):
_fields_ = [
('name', ct.c_char_p),
('kind', ct.c_int),
('u', stat_value)
]

class dds_c_t: # noqa N801
entity = ct.c_int32
time = ct.c_int64
Expand Down Expand Up @@ -345,6 +366,26 @@ class sample_buffer(ct.Structure): # noqa N801
('len', ct.c_size_t)
]

class statistics(ct.Structure):
_fields_ = [
('entity', ct.c_int32),
('opaque', ct.c_uint64),
('time', ct.c_int64),
('count', ct.c_size_t),
('kv', ct.c_void_p)
]

def stat_factory(n_kv: int) -> ct.Structure:
class vstatistics(ct.Structure):
_fields_ = [
('entity', ct.c_int32),
('opaque', ct.c_uint64),
('time', ct.c_int64),
('count', ct.c_size_t),
('kv', stat_keyvalue * n_kv)
]
return vstatistics


import cyclonedds._clayer as _clayer

Expand Down
Loading