Skip to content

Commit

Permalink
rename dds test and join metadata-server into test-metadata
Browse files Browse the repository at this point in the history
  • Loading branch information
maloel committed Mar 24, 2024
1 parent 0af8861 commit 7b9f23a
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 72 deletions.
54 changes: 0 additions & 54 deletions unit-tests/dds/metadata-server.py

This file was deleted.

File renamed without changes.
79 changes: 61 additions & 18 deletions unit-tests/dds/test-metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,29 +10,73 @@
# latter and we have a problem where the broadcaster does not work.
#test:donotrun:linux


import pyrealdds as dds
from rspy import log, test
from time import sleep
from rspy.timer import Timer
import d435i

dds.debug( log.is_debug_on(), 'C ' )
log.nested = 'C '

import d435i
with test.remote.fork( nested_indent=' S' ) as remote:
if remote is None: # we're the server fork

dds.debug( log.is_debug_on(), log.nested )

participant = dds.participant()
participant.init( 123, "server" )

# set up a server device with a single color stream
device_server = dds.device_server( participant, d435i.device_info.topic_root )

color_stream = dds.color_stream_server( 'Color', 'RGB Camera' )
color_stream.enable_metadata() # not there in d435i by default
color_stream.init_profiles( d435i.color_stream_profiles(), 0 )
color_stream.init_options( [] )
color_stream.set_intrinsics( d435i.color_stream_intrinsics() )

def on_control( server, id, control, reply ):
# the control has already been output to debug by the calling code, as will the reply
return True # otherwise the control will be flagged as error

device_server.on_control( on_control )
device_server.init( [color_stream], [], {} )


def broadcast():
global device_server
device_server.broadcast( d435i.device_info )


def new_image( width, height, bpp, timestamp_as_ns = None ):
i = dds.message.image()
i.width = width
i.height = height
i.data = bytearray( width * height * bpp )
if timestamp_as_ns is not None:
i.timestamp = dds.time.from_ns( timestamp_as_ns )
return i

participant = dds.participant()
participant.init( 123, "client" )

def publish_image( img, timestamp ):
img.timestamp = timestamp
color_stream.publish_image( img )

# run the server in another process: GHA has some issue with it in the same process...
import os.path
cwd = os.path.dirname(os.path.realpath(__file__))
remote_script = os.path.join( cwd, 'metadata-server.py' )
with test.remote( remote_script, nested_indent=" S" ) as remote:
remote.wait_until_ready()

# From here down, we're in "interactive" mode (see test-metadata.py)
# ...
raise StopIteration()


###############################################################################################################
# The client
#

from time import sleep
from rspy.timer import Timer

dds.debug( log.is_debug_on(), 'C ' )
log.nested = 'C '

participant = dds.participant()
participant.init( 123, "client" )

# set up the client device and keep all its streams - this is connected directly and we can get notifications on it!
device_direct = dds.device( participant, d435i.device_info )
Expand Down Expand Up @@ -235,7 +279,6 @@ def __exit__( self, type, value, traceback ):
#
with test.closure( "Metadata without a stream name is ignored" ):
pass
#
#############################################################################################
#
test.print_results_and_exit()


test.print_results()

0 comments on commit 7b9f23a

Please sign in to comment.