Skip to content

Commit

Permalink
topic-sink now listens to multiple topics
Browse files Browse the repository at this point in the history
  • Loading branch information
maloel committed May 12, 2024
1 parent d2d0d90 commit 752c363
Show file tree
Hide file tree
Showing 2 changed files with 111 additions and 54 deletions.
4 changes: 2 additions & 2 deletions third-party/realdds/scripts/topic-send.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ def e( *a, **kw ):
i( f'Writing {blob} on {topic_path} ...' )
start = dds.now()
blob.write_to( writer )
# We must wait for acks, since we use a flow controller and write_to() will return before we've
# We must wait for acks, since we use a flow controller and write_to() will return before we've
# actually finished the send
if not writer.wait_for_acks( dds.time( 5. ) ): # seconds
e( 'Timeout waiting for ack' )
Expand Down Expand Up @@ -126,7 +126,7 @@ def e( *a, **kw ):
e( 'Timeout waiting for ack' )
sys.exit( 1 )
i( f'Acknowledged' )
# NOTE: if we don't wait for acks there's no guarrantee that the message is received; even if
# NOTE: if we don't wait for acks there's no guarrantee that the message is received; even if
# all the packets are sent, they may need resending (reliable) but if we exit they won't be...

i( f'After {dds.timestr( dds.now(), start )}' )
Expand Down
161 changes: 109 additions & 52 deletions third-party/realdds/scripts/topic-sink.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,26 @@
args = ArgumentParser()
args.add_argument( '--debug', action='store_true', help='enable debug mode' )
args.add_argument( '--quiet', action='store_true', help='no output' )
args.add_argument( '--topic', metavar='<path>', help='the topic on which to listen' )
args.add_argument( '--blob', action='store_true', help='when set, listen for blobs' )
args.add_argument( '--image', action='store_true', help='when set, listen for images' )
args.add_argument( '--flexible', metavar='<path>', action='append', help='the flexible (reliable) topic on which to listen' )
args.add_argument( '--flexible-be', metavar='<path>', action='append', help='the flexible (best-effort) topic on which to listen' )
args.add_argument( '--blob', metavar='<path>', action='append', help='the blob topic on which to listen' )
args.add_argument( '--image', metavar='<path>', action='append', help='the image topic on which to listen' )
def domain_arg(x):
t = int(x)
if t <= 0 or t > 232:
raise ArgumentError( f'--domain should be [0-232]' )
return t
args.add_argument( '--domain', metavar='<0-232>', type=domain_arg, default=-1, help='DDS domain to use (default=0)' )
def time_arg(x):
t = float(x)
if t < 0.:
raise ValueError( f'--time should be >=0' )
return t
args.add_argument( '--wait', metavar='<seconds>', type=time_arg, default=5., help='seconds to wait for writers (default 5; 0=disable)' )
args.add_argument( '--time', metavar='<seconds>', type=time_arg, default=5., help='runtime before stopping, in seconds (default 0=forever)' )
args.add_argument( '--not-ready', action='store_true', help='start output immediately, without waiting for all topics' )
args = args.parse_args()


if args.quiet:
def i( *a, **kw ):
pass
Expand All @@ -29,11 +37,9 @@ def e( *a, **kw ):

import sys

if not args.topic:
e( '--topic is required' )
if not args.flexible and not args.flexible_be and not args.blob and not args.image:
e( '--flexible, --flexible-be, --blob, or --image required' )
sys.exit( 1 )
topic_path = args.topic


import pyrealdds as dds
import time
Expand All @@ -45,63 +51,114 @@ def e( *a, **kw ):
participant = dds.participant()
participant.init( dds.load_rs_settings( settings ), args.domain )

if args.blob:
readers = []
ready = args.not_ready

# BLOB
def on_blob_available( reader ):
if not ready:
return
got_something = False
while True:
sample = dds.message.sample_info()
msg = dds.message.blob.take_next( reader, sample )
if not msg:
if not got_something:
raise RuntimeError( "expected message not received!" )
break
i( f'{msg}', )
got_something = True
for topic_path in args.blob or []:
reader = dds.topic_reader( dds.message.blob.create_topic( participant, topic_path ))
def on_data_available( reader ):
got_something = False
while True:
sample = dds.message.sample_info()
msg = dds.message.blob.take_next( reader, sample )
if not msg:
if not got_something:
raise RuntimeError( "expected message not received!" )
break
i( f'-----> {msg}', )
got_something = True
reader.on_data_available( on_data_available )
reader.on_data_available( on_blob_available )
reader.run( dds.topic_reader.qos() )
i( f'Waiting for {topic_path}' )
if args.wait and not reader.wait_for_writers( dds.time( args.wait ) ):
e( f'Timeout waiting for writers on {topic_path}' )
sys.exit( 1 )
readers.append( reader )

elif args.image:
# IMAGE
def on_image_available( reader ):
if not ready:
return
got_something = False
while True:
sample = dds.message.sample_info()
msg = dds.message.image.take_next( reader, sample )
if not msg:
if not got_something:
raise RuntimeError( "expected message not received!" )
break
i( f'{msg}', )
got_something = True
for topic_path in args.image or []:
reader = dds.topic_reader( dds.message.image.create_topic( participant, topic_path ))
def on_data_available( reader ):
got_something = False
while True:
sample = dds.message.sample_info()
msg = dds.message.image.take_next( reader, sample )
if not msg:
if not got_something:
raise RuntimeError( "expected message not received!" )
break
i( f'-----> {msg}', )
got_something = True
reader.on_data_available( on_data_available )
reader.on_data_available( on_image_available )
reader.run( dds.topic_reader.qos( dds.reliability.best_effort, dds.durability.volatile ) )
i( f'Waiting for {topic_path}' )
if args.wait and not reader.wait_for_writers( dds.time( args.wait ) ):
e( f'Timeout waiting for writers on {topic_path}' )
sys.exit( 1 )
readers.append( reader )

else:
# FLEXIBLE
def on_flexible_available( reader ):
if not ready:
return
got_something = False
while True:
sample = dds.message.sample_info()
msg = dds.message.flexible.take_next( reader, sample )
if not msg:
if not got_something:
raise RuntimeError( "expected message not received!" )
break
i( f'{json.dumps( msg.json_data(), indent=4 )}', )
got_something = True
for topic_path in args.flexible_be or []:
reader = dds.topic_reader( dds.message.flexible.create_topic( participant, topic_path ))
import json
def on_data_available( reader ):
got_something = False
while True:
sample = dds.message.sample_info()
msg = dds.message.flexible.take_next( reader, sample )
if not msg:
if not got_something:
raise RuntimeError( "expected message not received!" )
break
i( f'-----> {json.dumps( msg.json_data(), indent=4 )}', )
got_something = True
reader.on_data_available( on_data_available )
reader.on_data_available( on_flexible_available )
reader.run( dds.topic_reader.qos( dds.reliability.best_effort, dds.durability.volatile ) )
i( f'Waiting for {topic_path}' )
if args.wait and not reader.wait_for_writers( dds.time( args.wait ) ):
e( f'Timeout waiting for writers on {topic_path}' )
sys.exit( 1 )
readers.append( reader )
for topic_path in args.flexible or []:
reader = dds.topic_reader( dds.message.flexible.create_topic( participant, topic_path ))
import json
reader.on_data_available( on_flexible_available )
reader.run( dds.topic_reader.qos() )
i( f'Waiting for {topic_path}' )
if args.wait and not reader.wait_for_writers( dds.time( args.wait ) ):
e( f'Timeout waiting for writers on {topic_path}' )
sys.exit( 1 )
readers.append( reader )

# Keep waiting until the user breaks...
def stop():
global readers, ready
ready = False
for reader in readers:
reader.stop() # must stop or we get hangs!
del readers
sys.exit( 0 )

import signal
def handler(signum, frame):
#res = input("Ctrl-c was pressed. Do you really want to exit? y/n ")
#if res == 'y':
sys.exit( 0 )
signal.signal( signal.SIGINT, handler )
i( 'Press ctrl+C to break...' )
while True:
time.sleep(1)
stop()
signal.signal( signal.SIGINT, lambda signum, frame: stop() )

ready = True
if args.time:
time.sleep( args.time )
stop()
else:
i( 'Press ctrl+C to break...' )
while True:
time.sleep(1)

0 comments on commit 752c363

Please sign in to comment.