Skip to content

Commit

Permalink
Deferred message filter/feed activation for race-free setups (#132)
Browse files Browse the repository at this point in the history
Signed-off-by: Michel Hidalgo <[email protected]>
  • Loading branch information
mhidalgo-bdai authored Nov 25, 2024
1 parent e4d5c80 commit 5aa699a
Show file tree
Hide file tree
Showing 4 changed files with 244 additions and 56 deletions.
48 changes: 32 additions & 16 deletions bdai_ros2_wrappers/bdai_ros2_wrappers/feeds.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def __init__(
self._link.registerCallback(
lambda *msgs: self._tape.write(msgs if len(msgs) > 1 else msgs[0]),
)
node.context.on_shutdown(self._tape.close)
node.context.on_shutdown(self.close)

@property
def link(self) -> Filter:
Expand Down Expand Up @@ -178,10 +178,17 @@ def stream(
timeout_sec=timeout_sec,
)

def close(self) -> None:
"""Closes the message feed."""
def start(self) -> None:
"""Start the message feed."""
self._link.start()

def stop(self) -> None:
"""Stop the message feed."""
self._link.stop()
self._tape.close()

close = stop


class AdaptedMessageFeed(MessageFeed[MessageT]):
"""A message feed decorator to simplify adapter patterns."""
Expand All @@ -190,6 +197,8 @@ def __init__(
self,
feed: MessageFeed,
fn: Callable[..., MessageT],
*,
autostart: bool = True,
**kwargs: Any,
) -> None:
"""Initializes the message feed.
Expand All @@ -199,19 +208,20 @@ def __init__(
fn: message adapting callable.
kwargs: all other keyword arguments are forwarded
for `MessageFeed` initialization.
autostart: whether to start feeding messages immediately or not.
"""
super().__init__(Adapter(feed.link, fn), **kwargs)
super().__init__(Adapter(feed.link, fn, autostart=autostart), **kwargs)
self._feed = feed

@property
def feed(self) -> MessageFeed:
"""Gets the upstream message feed."""
return self._feed

def close(self) -> None:
"""Closes this message feed and the upstream one as well."""
self._feed.close()
super().close()
def stop(self) -> None:
"""Stop this message feed and the upstream one as well."""
self._feed.stop()
super().stop()


class FramedMessageFeed(MessageFeed[MessageT]):
Expand All @@ -226,6 +236,7 @@ def __init__(
tf_buffer: Optional[tf2_ros.Buffer] = None,
history_length: Optional[int] = None,
node: Optional[Node] = None,
autostart: bool = True,
) -> None:
"""Initializes the message feed.
Expand All @@ -238,6 +249,7 @@ def __init__(
history_length: optional historic data size, defaults to 1.
node: optional node for the underlying native subscription, defaults to
the current process node.
autostart: whether to start feeding messages immediately or not.
"""
if node is None:
node = scope.ensure_node()
Expand All @@ -251,6 +263,7 @@ def __init__(
tf_buffer,
tolerance_sec,
node.get_logger(),
autostart=autostart,
),
history_length=history_length,
node=node,
Expand All @@ -262,10 +275,10 @@ def feed(self) -> MessageFeed[MessageT]:
"""Gets the upstream message feed."""
return self._feed

def close(self) -> None:
"""Closes this message feed and the upstream one as well."""
self._feed.close()
super().close()
def stop(self) -> None:
"""Stop this message feed and the upstream one as well."""
self._feed.stop()
super().stop()


class SynchronizedMessageFeed(MessageFeed):
Expand All @@ -279,6 +292,7 @@ def __init__(
allow_headerless: bool = False,
history_length: Optional[int] = None,
node: Optional[Node] = None,
autostart: bool = True,
) -> None:
"""Initializes the message feed.
Expand All @@ -291,13 +305,15 @@ def __init__(
history_length: optional historic data size, defaults to 1.
node: optional node for the underlying native subscription, defaults to
the current process node.
autostart: whether to start feeding messages immediately or not.
"""
super().__init__(
ApproximateTimeSynchronizer(
[f.link for f in feeds],
queue_size,
delay,
allow_headerless=allow_headerless,
autostart=autostart,
),
history_length=history_length,
node=node,
Expand All @@ -309,8 +325,8 @@ def feeds(self) -> Iterable[MessageFeed]:
"""Gets all aggregated message feeds."""
return self._feeds

def close(self) -> None:
"""Closes this message feed and all upstream ones as well."""
def stop(self) -> None:
"""Stop this message feed and all upstream ones as well."""
for feed in self._feeds:
feed.close()
super().close()
feed.stop()
super().stop()
Loading

0 comments on commit 5aa699a

Please sign in to comment.