diff --git a/third-party/realdds/scripts/topic-send.py b/third-party/realdds/scripts/topic-send.py index 6f9fb991e7..cbd988de71 100644 --- a/third-party/realdds/scripts/topic-send.py +++ b/third-party/realdds/scripts/topic-send.py @@ -78,7 +78,7 @@ def e( *a, **kw ): i( f'Writing {blob} on {topic_path} ...' ) start = dds.now() blob.write_to( writer ) - # We must wait for acks, since we use a flow controller and write_to() will return before we've + # We must wait for acks, since we use a flow controller and write_to() will return before we've # actually finished the send if not writer.wait_for_acks( dds.time( 5. ) ): # seconds e( 'Timeout waiting for ack' ) @@ -126,7 +126,7 @@ def e( *a, **kw ): e( 'Timeout waiting for ack' ) sys.exit( 1 ) i( f'Acknowledged' ) - # NOTE: if we don't wait for acks there's no guarrantee that the message is received; even if + # NOTE: if we don't wait for acks there's no guarrantee that the message is received; even if # all the packets are sent, they may need resending (reliable) but if we exit they won't be... i( f'After {dds.timestr( dds.now(), start )}' ) diff --git a/third-party/realdds/scripts/topic-sink.py b/third-party/realdds/scripts/topic-sink.py index c3e5a46ac5..f66de33644 100644 --- a/third-party/realdds/scripts/topic-sink.py +++ b/third-party/realdds/scripts/topic-sink.py @@ -6,18 +6,26 @@ args = ArgumentParser() args.add_argument( '--debug', action='store_true', help='enable debug mode' ) args.add_argument( '--quiet', action='store_true', help='no output' ) -args.add_argument( '--topic', metavar='', help='the topic on which to listen' ) -args.add_argument( '--blob', action='store_true', help='when set, listen for blobs' ) -args.add_argument( '--image', action='store_true', help='when set, listen for images' ) +args.add_argument( '--flexible', metavar='', action='append', help='the flexible (reliable) topic on which to listen' ) +args.add_argument( '--flexible-be', metavar='', action='append', help='the flexible (best-effort) topic on which to listen' ) +args.add_argument( '--blob', metavar='', action='append', help='the blob topic on which to listen' ) +args.add_argument( '--image', metavar='', action='append', help='the image topic on which to listen' ) def domain_arg(x): t = int(x) if t <= 0 or t > 232: raise ArgumentError( f'--domain should be [0-232]' ) return t args.add_argument( '--domain', metavar='<0-232>', type=domain_arg, default=-1, help='DDS domain to use (default=0)' ) +def time_arg(x): + t = float(x) + if t < 0.: + raise ValueError( f'--time should be >=0' ) + return t +args.add_argument( '--wait', metavar='', type=time_arg, default=5., help='seconds to wait for writers (default 5; 0=disable)' ) +args.add_argument( '--time', metavar='', type=time_arg, default=5., help='runtime before stopping, in seconds (default 0=forever)' ) +args.add_argument( '--not-ready', action='store_true', help='start output immediately, without waiting for all topics' ) args = args.parse_args() - if args.quiet: def i( *a, **kw ): pass @@ -29,11 +37,9 @@ def e( *a, **kw ): import sys -if not args.topic: - e( '--topic is required' ) +if not args.flexible and not args.flexible_be and not args.blob and not args.image: + e( '--flexible, --flexible-be, --blob, or --image required' ) sys.exit( 1 ) -topic_path = args.topic - import pyrealdds as dds import time @@ -45,63 +51,114 @@ def e( *a, **kw ): participant = dds.participant() participant.init( dds.load_rs_settings( settings ), args.domain ) -if args.blob: +readers = [] +ready = args.not_ready + +# BLOB +def on_blob_available( reader ): + if not ready: + return + got_something = False + while True: + sample = dds.message.sample_info() + msg = dds.message.blob.take_next( reader, sample ) + if not msg: + if not got_something: + raise RuntimeError( "expected message not received!" ) + break + i( f'{msg}', ) + got_something = True +for topic_path in args.blob or []: reader = dds.topic_reader( dds.message.blob.create_topic( participant, topic_path )) - def on_data_available( reader ): - got_something = False - while True: - sample = dds.message.sample_info() - msg = dds.message.blob.take_next( reader, sample ) - if not msg: - if not got_something: - raise RuntimeError( "expected message not received!" ) - break - i( f'-----> {msg}', ) - got_something = True - reader.on_data_available( on_data_available ) + reader.on_data_available( on_blob_available ) reader.run( dds.topic_reader.qos() ) + i( f'Waiting for {topic_path}' ) + if args.wait and not reader.wait_for_writers( dds.time( args.wait ) ): + e( f'Timeout waiting for writers on {topic_path}' ) + sys.exit( 1 ) + readers.append( reader ) -elif args.image: +# IMAGE +def on_image_available( reader ): + if not ready: + return + got_something = False + while True: + sample = dds.message.sample_info() + msg = dds.message.image.take_next( reader, sample ) + if not msg: + if not got_something: + raise RuntimeError( "expected message not received!" ) + break + i( f'{msg}', ) + got_something = True +for topic_path in args.image or []: reader = dds.topic_reader( dds.message.image.create_topic( participant, topic_path )) - def on_data_available( reader ): - got_something = False - while True: - sample = dds.message.sample_info() - msg = dds.message.image.take_next( reader, sample ) - if not msg: - if not got_something: - raise RuntimeError( "expected message not received!" ) - break - i( f'-----> {msg}', ) - got_something = True - reader.on_data_available( on_data_available ) + reader.on_data_available( on_image_available ) reader.run( dds.topic_reader.qos( dds.reliability.best_effort, dds.durability.volatile ) ) + i( f'Waiting for {topic_path}' ) + if args.wait and not reader.wait_for_writers( dds.time( args.wait ) ): + e( f'Timeout waiting for writers on {topic_path}' ) + sys.exit( 1 ) + readers.append( reader ) -else: +# FLEXIBLE +def on_flexible_available( reader ): + if not ready: + return + got_something = False + while True: + sample = dds.message.sample_info() + msg = dds.message.flexible.take_next( reader, sample ) + if not msg: + if not got_something: + raise RuntimeError( "expected message not received!" ) + break + i( f'{json.dumps( msg.json_data(), indent=4 )}', ) + got_something = True +for topic_path in args.flexible_be or []: reader = dds.topic_reader( dds.message.flexible.create_topic( participant, topic_path )) import json - def on_data_available( reader ): - got_something = False - while True: - sample = dds.message.sample_info() - msg = dds.message.flexible.take_next( reader, sample ) - if not msg: - if not got_something: - raise RuntimeError( "expected message not received!" ) - break - i( f'-----> {json.dumps( msg.json_data(), indent=4 )}', ) - got_something = True - reader.on_data_available( on_data_available ) + reader.on_data_available( on_flexible_available ) + reader.run( dds.topic_reader.qos( dds.reliability.best_effort, dds.durability.volatile ) ) + i( f'Waiting for {topic_path}' ) + if args.wait and not reader.wait_for_writers( dds.time( args.wait ) ): + e( f'Timeout waiting for writers on {topic_path}' ) + sys.exit( 1 ) + readers.append( reader ) +for topic_path in args.flexible or []: + reader = dds.topic_reader( dds.message.flexible.create_topic( participant, topic_path )) + import json + reader.on_data_available( on_flexible_available ) reader.run( dds.topic_reader.qos() ) + i( f'Waiting for {topic_path}' ) + if args.wait and not reader.wait_for_writers( dds.time( args.wait ) ): + e( f'Timeout waiting for writers on {topic_path}' ) + sys.exit( 1 ) + readers.append( reader ) # Keep waiting until the user breaks... +def stop(): + global readers, ready + ready = False + for reader in readers: + reader.stop() # must stop or we get hangs! + del readers + sys.exit( 0 ) + import signal def handler(signum, frame): #res = input("Ctrl-c was pressed. Do you really want to exit? y/n ") #if res == 'y': - sys.exit( 0 ) -signal.signal( signal.SIGINT, handler ) -i( 'Press ctrl+C to break...' ) -while True: - time.sleep(1) + stop() +signal.signal( signal.SIGINT, lambda signum, frame: stop() ) + +ready = True +if args.time: + time.sleep( args.time ) + stop() +else: + i( 'Press ctrl+C to break...' ) + while True: + time.sleep(1)